Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Nov 27 18:16
    codecov[bot] commented #3500
  • Nov 27 17:58
    scala-steward opened #3500
  • Nov 27 09:41

    regadas on master

    Update sbt-mdoc to 2.2.13 (#349… (compare)

  • Nov 27 09:41
    regadas closed #3499
  • Nov 25 19:01
    codecov[bot] commented #3499
  • Nov 25 18:40
    scala-steward opened #3499
  • Nov 25 10:15
    regadas labeled #3497
  • Nov 25 10:14
    regadas labeled #3498
  • Nov 25 10:13

    regadas on master

    Add parameters for passing retr… (compare)

  • Nov 25 10:13
    regadas closed #3497
  • Nov 25 10:12
    codecov[bot] commented #3497
  • Nov 25 10:12
    regadas synchronize #3497
  • Nov 25 09:29

    regadas on master

    Update scala-collection-compat … (compare)

  • Nov 25 09:29
    regadas closed #3498
  • Nov 25 06:57
    codecov[bot] commented #3498
  • Nov 25 06:37
    scala-steward opened #3498
  • Nov 25 05:42
    codecov[bot] commented #3497
  • Nov 25 05:21
    codecov[bot] commented #3497
  • Nov 25 05:21
    DeaconDesperado synchronize #3497
  • Nov 24 23:35
    codecov[bot] commented #3497
Peng Yu
can i load my fitted keras model in scio ?
and predict ?
Artur Fejklowicz
What is idAttribute in sc.pubsubSubscription?
How can I in pubsub subscription filter messages with an attribute aaa=XXX in message metadaten with Scio?
Neville Li
@arturroo most IO options translate directly to beam and can be found in their API site https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withIdAttribute-java.lang.String-
Steven Fines
well scenario
I have an event that arrives from pubsub - it needs to go read big query dynamically based upon that input and then process the results
If it were a static set I would use a SideInput
what is the best practice here
I can't really find an example
Artur Fejklowicz

@nevillelyh thx!
I have another question, I get the schema from table:

class NestedFromTable

and I traverse over NestedFromTable.schema like this:

NestedFromTable.schema.getFields.forEach(tfs => {

def printTableFieldSchema(tfs: TableFieldSchema, parent: String=""): Unit = {
        val fullFieldName = parent match {
            case x if x == ""   => tfs.getName
            case x              => s"$parent.${tfs.getName}"
        logger.info(s"TableFieldSchema Name: $fullFieldName Type: ${tfs.getType}, Mode: ${tfs.getMode}")
        if(tfs.getType == "RECORD") {
            tfs.getFields.forEach( field => printTableFieldSchema(field, fullFieldName))

It works well, but when I

  • stop the streaming directrun job
  • and I change schema (delete table, add INT64 field in middle of schema.json, add table again with new schema)
  • and I run the programm again

than I see the old schema. How can I make new schema visible in Scio Typed BigQuery?

Neville Li
@sfines-clgx https://spotify.github.io/scio/examples/index.html see the RefreshingSideInputExample not quite the same but close, u can do the update query in side input .map(fn).asSingletonSideInput?
@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?
Steven Fines
@nevillelyh I couldn't see how to make that take arguments
Hi guys, help me please with pretty simple question how actually see output in REPL console from this example https://github.com/spotify/scio/blob/master/site/src/paradox/Scio-REPL.md#local-pipeline ? I saw at the end Iterator[String] - but can't print it, when I try do something I always got error : java.lang.IllegalArgumentException: requirement failed: ScioContext has already been executed, use :newScio <[context-name] | sc> to create new context
I got it!
val result = yourSCollection.materialize
Artur Fejklowicz

@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?

Yes, because I get nested JSON from PubSub and I need to store it in the same structure in BigQuery. But in future I would like to use Schema, to be able to parse fields that do not fully comply with BigQuery Types, for example it can be, that from different source I will get DATE in strange format like "m/d/Y" and for this I need to parse it manually, so I need schema from destination table (BigQueryType.fromTable.schema) to travers the schema, find path of DATE fields and change TableRowField according to my known rules into String parsable by BigQuery, like "Y-m-d"

Artur Fejklowicz
I have another question: to save data in BigQuery I use saveAsBigQueryTable. Is it possible for this function to get the failed entries and their errors, and save them in different table so I could later check them?
Additionally how to make pipeline not to stop on error, but simply skip errors, so I would not need to manually start pipeline after every single not parsed message?
Artur Fejklowicz

@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?

Yes, because I get nested JSON from PubSub and I need to store it in the same structure in BigQuery. But in future I would like to use Schema, to be able to parse fields that do not fully comply with BigQuery Types, for example it can be, that from different source I will get DATE in strange format like "m/d/Y" and for this I need to parse it manually, so I need schema from destination table (BigQueryType.fromTable.schema) to travers the schema, find path of DATE fields and change TableRowField according to my known rules into String parsable by BigQuery, like "Y-m-d"

OK, so I've manged to read nested Json String as TableRow with Google Jackson implementation (Code from Google Json2BigQuery template) and then this Table row with @BigQueryType.fromTable.fromTableRow change to CaseClass and then save it with saveAsTypedBigQueryTable.
Now I see that Scio has wrapper for Beam's "writeResult.getFailedInsertsWithErr()"
How can I use it to save those Failed Inserts into another Table?
I'm not so fluent yet in Scio/Beam/Scala/Java.

Filipe Regadas
@arturroo to use it you need to use one of the underlying BigQueryIO* and use WriteParam to set the function that takes an SCollection[Error] https://github.com/spotify/scio/blob/4ca965e0c1d82f8aafb2628eaa92edb5b5f64337/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala#L230
i.e write(BigQueryTable(table))(WriteParam(...))
Deepak Telkar

Hello everyone,

I'm seeing an exception while trying to run scio WordCount example by creating a project using:

sbt new spotify/scio-template.g8

When I try to run the WordCount example that was created by the project, I see the following error:

[error] (run-main-8) java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
[error] java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
[error]     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
[error]     at com.sun.proxy.$Proxy482.getGcsUtil(Unknown Source)
[error]     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:241)
[error]     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:98)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:120)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:141)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:153)
[error]     at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:223)
[error]     at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
[error]     at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:89)
[error]     at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:76)
[error]     at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:155)
[error]     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
[error]     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[error]     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
[error]     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
[error]     at com.spotify.scio.ScioContext.execute(ScioContext.scala:592)
[error]     at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:580)
[error]     at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:568)
[error]     at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:657)
[error]     at com.spotify.scio.ScioContext.run(ScioContext.scala:568)
[error]     at com.myproj.scio.explore.WordCountWithMetrics$.main(WordCountWithMetrics.scala:27)
[error]     at com.myproj.scio.explore.WordCountWithMetrics.main(WordCountWithMetrics.scala)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]     at java.lang.reflect.Method.invoke(Method.java:498)

Here's sbt command I used to run this example:

sbt "runMain com.myproj.scio.explore.WordCount --runner=DirectRunner --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=/tmp/wordCount"

Note: I'm using DirectRunner

To solve this problem, I had to add the following dependency in build.sbt

"com.google.guava" % "guava" % "28.2-jre"
My question is: am I doing the right thing to solve the problem or is there something I'm missing?
Thanks for your help!
Filipe Regadas
Hi @d66pak That's a known issue in 0.9.0 and should be fixed in the next release spotify/scio#2945. You can keep that override for now and remove it with the new version
Deepak Telkar
Hi @regadas thanks for your prompt response. Will keep an eye for the next release.
Hi sorry if this is wrong place for this type of Q; a few of our pipelines run into OOM errors in Dataflow and don't seem to want to scale out; we have tried introducing a groupBy transform on random key to force a shuffle and still no dice - are there any common pitfalls or a reliable way to force scaling?
autoscaling is based on resources, throughput and data freshness. What do they look like? Is dataflow building up a backlog of data? Also, depending on what engine you are using, you might have to enable it explicitly with --autoscalingAlgorithm=THROUGHPUT_BASED

The input (PubSub) are file triggers that we then load and create SCollection of the contents; so there isn't a large backlog so to speak; however due to the lack of memory in the operation the node OOMs before it can output; and so it doesn't ack the messages and continues to process (leading to rebooted worker OOM etc.)

What I was hoping to do was coerce it to split the intermediate SCollections across more nodes to prevent the memory issues

worth checking what engine you are using - autoscaling is only enabled by default for streaming engine, but not all streaming jobs use streaming engine by default. If your system has never autoscaled before, it is very likely that you haven't enabled it: https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
Steven Fines
anyone ever run into this? java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field magnolia.CallByNeed.eval of type scala.Function0 in instance of magnolia.CallByNeed
Filipe Regadas
@sfines-clgx how are you running your pipeline? from Intellij? Also, is your project depending on magnolia? (make sure to have only one version)
Steven Fines
@regadas yes from intellij and I don't use magnolia at all
Filipe Regadas
@sfines-clgx try launching the job from SBT for now... I think you are affected by an ongoing issue spotify/scio#2778
Steven Fines
suck ;)
lots of command line options
Steven Fines
Another question, what is SCIO doing that continually generates methods that are too large for the JDK?
for example: this code snippet generates a methodTooLarge compile error:
 def joinRecords(subject: SCollection[(String, SubjectKeying)], candidateComps: SCollection[(String, List[CandidateCompKeying])]): SCollection[(String, (SubjectKeying, CandidateCompKeying, Double, SubjectProperty, SubjectProperty))] = {
    subject.join(candidateComps).flatMapValues {
      r =>
        val (subject, candidateList) = r
        candidateList.map { comp =>
          (subject, comp, Distance.haversine(Distance.makePoint(subject.latitude, subject.longitude), Distance.makePoint(comp.latitude, comp.longitude)), subject.property, comp.property)
Filipe Regadas
@sfines-clgx interesting, I'll take a look
On other news, we just released v0.9.1 https://github.com/spotify/scio/releases/tag/v0.9.1, mostly improvements and bug fixes, let's know what you think!
I know that some of the contributors are in this channel, thank you for all of your contributions! :heart:
Steven Fines
@regadas I've tried to make a test case to put in spotify/scio#2767 which is closed because the reporter gave up
but it can't reliably reproduce it.. It seems to be related to the size of the case classes ... seems to only happen when I use a case class which is very large and very nested (the case class in this case has roughly 50 elements and a nested class with the same or more)
another question is what is the best way to join three tables.. for example I have a mapping table which will limit the expansion of a cross join that I want to join to two other tables .. is it just to .join() and then .join()?
Neville Li
Hi all,
As Scio maintainers, lately we feel like the usability & reliability of Gitter has a lot to be desired. We're considering switching to our Spotify FOSS Slack, based on the hypotheses that many here already use it for work, and it's easier to manage than another app. Please fill in a 1 question yes/no survey and respond by Jun 30 Tue 5PM ET. There's also an "other" option if you have a strong preference for a different chatroom service. Thanks.
You can get an invite at https://slackin.spotify.com/ and find us in #scio

Hi all; after updating to newer SCIO versions we seem be running into some dependency issues causing runtime failures - just wondering if anyone else is hitting these or if anyone has a good strategy for shading or otherwise side-stepping the conflicts

example trace looks a bit like this:

PipelineError(ExceptionEncountered(Exception: java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    <- org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    <- org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
    <- org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
    <- org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
    <- com.spotify.scio.ScioContext.pipeline(ScioContext.scala:515)
    <- com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:35)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.read(PubsubIO.scala:277)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.read(PubsubIO.scala:271)
    <- com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:77)
    <- com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:74)
    <- com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:674)
    <- com.spotify.scio.io.ScioIO.readWithContext(ScioIO.scala:74)
    <- com.spotify.scio.io.ScioIO.readWithContext$(ScioIO.scala:70)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.readWithContext(PubsubIO.scala:271)
    <- com.spotify.scio.ScioContext.read(ScioContext.scala:811)
    <- com.spotify.scio.ScioContext.pubsubIn(ScioContext.scala:719)
    <- com.spotify.scio.ScioContext.pubsubSubscription(ScioContext.scala:731)
    <- com.my.package.df.somejob.job.PipelineClass$.$anonfun$pipeline$5(PipelineClass.scala:51)
    <- zio.internal.FiberContext.evaluateNow(FiberContext.scala:458)
    <- zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
    <- java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    <- java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    <- java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    <- sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    <- sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    <- sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    <- java.lang.reflect.Method.invoke(Method.java:498)
    <- org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
    ... 22 more
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
    <- com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    <- com.sun.proxy.$Proxy24.getGcsUtil(Unknown Source)
    <- org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:83)
    <- org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.validateOutputFilePrefixSupported(GcsPathValidator.java:53)
    <- org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:256)
    ... 27 more))
3 replies
Steven Fines
is there a more efficient way to handle finding the top by key than using topByKey? I need to process several hundred billion records and topByKey has a nested groupByKey inside it, which is quite slow... should i just write my own that uses aggregate by key and a bounded priority queue or some such?
2 replies