Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 13:46
    codecov[bot] commented #3422
  • 13:46
    codecov[bot] commented #3423
  • 13:46
    codecov[bot] commented #3421
  • 13:45
    codecov[bot] commented #3420
  • 13:44
    codecov[bot] commented #3422
  • 13:44
    codecov[bot] commented #3422
  • 13:44
    codecov[bot] commented #3423
  • 13:44
    codecov[bot] commented #3423
  • 13:44
    codecov[bot] commented #3421
  • 13:44
    codecov[bot] commented #3421
  • 13:31
    scala-steward opened #3423
  • 13:31
    scala-steward opened #3422
  • 13:31
    scala-steward opened #3421
  • 13:30
    scala-steward opened #3420
  • 13:20
    RohitR1 opened #3419
  • 11:05
    RohitR1 edited #3229
  • Oct 21 11:28
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
Artur Fejklowicz
@arturroo

@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?

Yes, because I get nested JSON from PubSub and I need to store it in the same structure in BigQuery. But in future I would like to use Schema, to be able to parse fields that do not fully comply with BigQuery Types, for example it can be, that from different source I will get DATE in strange format like "m/d/Y" and for this I need to parse it manually, so I need schema from destination table (BigQueryType.fromTable.schema) to travers the schema, find path of DATE fields and change TableRowField according to my known rules into String parsable by BigQuery, like "Y-m-d"

OK, so I've manged to read nested Json String as TableRow with Google Jackson implementation (Code from Google Json2BigQuery template) and then this Table row with @BigQueryType.fromTable.fromTableRow change to CaseClass and then save it with saveAsTypedBigQueryTable.
Now I see that Scio has wrapper for Beam's "writeResult.getFailedInsertsWithErr()"
https://github.com/spotify/scio/blob/b278c289800783469275f17ee751cd3b3f101194/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala#L61
How can I use it to save those Failed Inserts into another Table?
I'm not so fluent yet in Scio/Beam/Scala/Java.

Filipe Regadas
@regadas
@arturroo to use it you need to use one of the underlying BigQueryIO* and use WriteParam to set the function that takes an SCollection[Error] https://github.com/spotify/scio/blob/4ca965e0c1d82f8aafb2628eaa92edb5b5f64337/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala#L230
i.e write(BigQueryTable(table))(WriteParam(...))
Deepak Telkar
@d66pak

Hello everyone,

I'm seeing an exception while trying to run scio WordCount example by creating a project using:

sbt new spotify/scio-template.g8

When I try to run the WordCount example that was created by the project, I see the following error:

[error] (run-main-8) java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
[error] java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
[error]     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
[error]     at com.sun.proxy.$Proxy482.getGcsUtil(Unknown Source)
[error]     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:241)
[error]     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:98)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:120)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:141)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:153)
[error]     at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:223)
[error]     at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
[error]     at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:89)
[error]     at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:76)
[error]     at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:155)
[error]     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
[error]     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[error]     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
[error]     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
[error]     at com.spotify.scio.ScioContext.execute(ScioContext.scala:592)
[error]     at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:580)
[error]     at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:568)
[error]     at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:657)
[error]     at com.spotify.scio.ScioContext.run(ScioContext.scala:568)
[error]     at com.myproj.scio.explore.WordCountWithMetrics$.main(WordCountWithMetrics.scala:27)
[error]     at com.myproj.scio.explore.WordCountWithMetrics.main(WordCountWithMetrics.scala)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]     at java.lang.reflect.Method.invoke(Method.java:498)

Here's sbt command I used to run this example:

sbt "runMain com.myproj.scio.explore.WordCount --runner=DirectRunner --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=/tmp/wordCount"

Note: I'm using DirectRunner

To solve this problem, I had to add the following dependency in build.sbt

"com.google.guava" % "guava" % "28.2-jre"
My question is: am I doing the right thing to solve the problem or is there something I'm missing?
Thanks for your help!
Filipe Regadas
@regadas
Hi @d66pak That's a known issue in 0.9.0 and should be fixed in the next release spotify/scio#2945. You can keep that override for now and remove it with the new version
Deepak Telkar
@d66pak
Hi @regadas thanks for your prompt response. Will keep an eye for the next release.
championj-foxtel
@championj-foxtel
Hi sorry if this is wrong place for this type of Q; a few of our pipelines run into OOM errors in Dataflow and don't seem to want to scale out; we have tried introducing a groupBy transform on random key to force a shuffle and still no dice - are there any common pitfalls or a reliable way to force scaling?
jmendesky
@jmendesky
autoscaling is based on resources, throughput and data freshness. What do they look like? Is dataflow building up a backlog of data? Also, depending on what engine you are using, you might have to enable it explicitly with --autoscalingAlgorithm=THROUGHPUT_BASED
championj-foxtel
@championj-foxtel

The input (PubSub) are file triggers that we then load and create SCollection of the contents; so there isn't a large backlog so to speak; however due to the lack of memory in the operation the node OOMs before it can output; and so it doesn't ack the messages and continues to process (leading to rebooted worker OOM etc.)

What I was hoping to do was coerce it to split the intermediate SCollections across more nodes to prevent the memory issues

jmendesky
@jmendesky
worth checking what engine you are using - autoscaling is only enabled by default for streaming engine, but not all streaming jobs use streaming engine by default. If your system has never autoscaled before, it is very likely that you haven't enabled it: https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
Steven Fines
@sfines-clgx
anyone ever run into this? java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field magnolia.CallByNeed.eval of type scala.Function0 in instance of magnolia.CallByNeed
Filipe Regadas
@regadas
@sfines-clgx how are you running your pipeline? from Intellij? Also, is your project depending on magnolia? (make sure to have only one version)
Steven Fines
@sfines-clgx
@regadas yes from intellij and I don't use magnolia at all
Filipe Regadas
@regadas
@sfines-clgx try launching the job from SBT for now... I think you are affected by an ongoing issue spotify/scio#2778
Steven Fines
@sfines-clgx
that'll
suck ;)
lots of command line options
Steven Fines
@sfines-clgx
Another question, what is SCIO doing that continually generates methods that are too large for the JDK?
for example: this code snippet generates a methodTooLarge compile error:
 def joinRecords(subject: SCollection[(String, SubjectKeying)], candidateComps: SCollection[(String, List[CandidateCompKeying])]): SCollection[(String, (SubjectKeying, CandidateCompKeying, Double, SubjectProperty, SubjectProperty))] = {
    subject.join(candidateComps).flatMapValues {
      r =>
        val (subject, candidateList) = r
        candidateList.map { comp =>
          (subject, comp, Distance.haversine(Distance.makePoint(subject.latitude, subject.longitude), Distance.makePoint(comp.latitude, comp.longitude)), subject.property, comp.property)
        }
    }
  }
Filipe Regadas
@regadas
@sfines-clgx interesting, I'll take a look
On other news, we just released v0.9.1 https://github.com/spotify/scio/releases/tag/v0.9.1, mostly improvements and bug fixes, let's know what you think!
I know that some of the contributors are in this channel, thank you for all of your contributions! :heart:
Steven Fines
@sfines-clgx
@regadas I've tried to make a test case to put in spotify/scio#2767 which is closed because the reporter gave up
but it can't reliably reproduce it.. It seems to be related to the size of the case classes ... seems to only happen when I use a case class which is very large and very nested (the case class in this case has roughly 50 elements and a nested class with the same or more)
another question is what is the best way to join three tables.. for example I have a mapping table which will limit the expansion of a cross join that I want to join to two other tables .. is it just to .join() and then .join()?
Neville Li
@nevillelyh
Hi all,
As Scio maintainers, lately we feel like the usability & reliability of Gitter has a lot to be desired. We're considering switching to our Spotify FOSS Slack, based on the hypotheses that many here already use it for work, and it's easier to manage than another app. Please fill in a 1 question yes/no survey and respond by Jun 30 Tue 5PM ET. There's also an "other" option if you have a strong preference for a different chatroom service. Thanks.
https://www.surveymonkey.com/r/MT6LQPG
You can get an invite at https://slackin.spotify.com/ and find us in #scio
championj-foxtel
@championj-foxtel

Hi all; after updating to newer SCIO versions we seem be running into some dependency issues causing runtime failures - just wondering if anyone else is hitting these or if anyone has a good strategy for shading or otherwise side-stepping the conflicts

example trace looks a bit like this:

PipelineError(ExceptionEncountered(Exception: java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    <- org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    <- org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
    <- org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
    <- org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
    <- com.spotify.scio.ScioContext.pipeline(ScioContext.scala:515)
    <- com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:35)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.read(PubsubIO.scala:277)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.read(PubsubIO.scala:271)
    <- com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:77)
    <- com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:74)
    <- com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:674)
    <- com.spotify.scio.io.ScioIO.readWithContext(ScioIO.scala:74)
    <- com.spotify.scio.io.ScioIO.readWithContext$(ScioIO.scala:70)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.readWithContext(PubsubIO.scala:271)
    <- com.spotify.scio.ScioContext.read(ScioContext.scala:811)
    <- com.spotify.scio.ScioContext.pubsubIn(ScioContext.scala:719)
    <- com.spotify.scio.ScioContext.pubsubSubscription(ScioContext.scala:731)
    <- com.my.package.df.somejob.job.PipelineClass$.$anonfun$pipeline$5(PipelineClass.scala:51)
    <- zio.internal.FiberContext.evaluateNow(FiberContext.scala:458)
    <- zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
    <- java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    <- java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    <- java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    <- sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    <- sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    <- sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    <- java.lang.reflect.Method.invoke(Method.java:498)
    <- org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
    ... 22 more
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
    <- com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    <- com.sun.proxy.$Proxy24.getGcsUtil(Unknown Source)
    <- org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:83)
    <- org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.validateOutputFilePrefixSupported(GcsPathValidator.java:53)
    <- org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:256)
    ... 27 more))
3 replies
Steven Fines
@sfines-clgx
is there a more efficient way to handle finding the top by key than using topByKey? I need to process several hundred billion records and topByKey has a nested groupByKey inside it, which is quite slow... should i just write my own that uses aggregate by key and a bounded priority queue or some such?
2 replies
craneharry
@craneharry
Hi, was just wondering if anyone had used GCP emulators for integration testing with Scio? Im attempting to write to a dockerized firestore using .saveAsDatastore but it seems like there is no way of specifying to the DatastoreIO to use a local instance instead of hitting my GCP credentials? It looks like i can use the Scio test framework to mock out the inputs and outputs, but was hoping to do something more like an integration test
1 reply
Steven Fines
@sfines-clgx
@nevillelyh here is a screen shot Screen Shot 2020-06-10 at 9.53.01 AM.png
Neville Li
@nevillelyh
scio version?
Steven Fines
@sfines-clgx
latest
Neville Li
@nevillelyh
these (GroupByKey, Combine.GroupValues) are vanilla transform names and should be replaced by runner with optimized versions, does the estimate size not reflect those?
Neville Li
@nevillelyh
what's the n & avg record size of ur topByKey?
Steven Fines
@sfines-clgx
200
each record is very roughly 1kb
it does not
Neville Li
@nevillelyh
interesting, it shouldn't happen, and has always worked for me
lemme verify
Neville Li
@nevillelyh
s.png
Steven Fines
@sfines-clgx
not surprised - it works but is just slow when you start trying to use it on 131B records
there are ~2m Keys so I'm sort of thinking it doesn't scale like i would like it to
Neville Li
@nevillelyh
seems to work for me, 1B synthetic records of 1M unique keys, topByKey(200), output is ~1M records, and output size ~5% seem to add up
yeah at least Combine.perKey optimization works as expected
it's the 131B size that's causing problem
Steven Fines
@sfines-clgx
you're tellin me ;)
Neville Li
@nevillelyh
what's the exact failure? GC or worker no response?