Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 21 11:28
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
  • Oct 21 11:23
    codecov[bot] commented #3418
  • Oct 21 11:07
    scala-steward opened #3418
  • Oct 20 18:44
    codecov[bot] commented #3417
  • Oct 20 18:44
    codecov[bot] commented #3417
  • Oct 20 18:43
    codecov[bot] commented #3417
  • Oct 20 18:41
    codecov[bot] commented #3417
  • Oct 20 18:41
    codecov[bot] commented #3417
  • Oct 20 18:28
    scala-steward opened #3417
  • Oct 20 18:15
    codecov[bot] commented #3416
  • Oct 20 18:02
    stormy-ua synchronize #3416
  • Oct 20 17:45
    stormy-ua synchronize #3416
  • Oct 20 14:38
    stormy-ua edited #3416
  • Oct 20 14:36
    stormy-ua review_requested #3416
  • Oct 20 14:36
    stormy-ua opened #3416
  • Oct 20 09:29

    regadas on master

    Add SColletion `withResource` f… (compare)

  • Oct 20 09:29
    regadas closed #3389
championj-foxtel
@championj-foxtel
Hi sorry if this is wrong place for this type of Q; a few of our pipelines run into OOM errors in Dataflow and don't seem to want to scale out; we have tried introducing a groupBy transform on random key to force a shuffle and still no dice - are there any common pitfalls or a reliable way to force scaling?
jmendesky
@jmendesky
autoscaling is based on resources, throughput and data freshness. What do they look like? Is dataflow building up a backlog of data? Also, depending on what engine you are using, you might have to enable it explicitly with --autoscalingAlgorithm=THROUGHPUT_BASED
championj-foxtel
@championj-foxtel

The input (PubSub) are file triggers that we then load and create SCollection of the contents; so there isn't a large backlog so to speak; however due to the lack of memory in the operation the node OOMs before it can output; and so it doesn't ack the messages and continues to process (leading to rebooted worker OOM etc.)

What I was hoping to do was coerce it to split the intermediate SCollections across more nodes to prevent the memory issues

jmendesky
@jmendesky
worth checking what engine you are using - autoscaling is only enabled by default for streaming engine, but not all streaming jobs use streaming engine by default. If your system has never autoscaled before, it is very likely that you haven't enabled it: https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
Steven Fines
@sfines-clgx
anyone ever run into this? java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field magnolia.CallByNeed.eval of type scala.Function0 in instance of magnolia.CallByNeed
Filipe Regadas
@regadas
@sfines-clgx how are you running your pipeline? from Intellij? Also, is your project depending on magnolia? (make sure to have only one version)
Steven Fines
@sfines-clgx
@regadas yes from intellij and I don't use magnolia at all
Filipe Regadas
@regadas
@sfines-clgx try launching the job from SBT for now... I think you are affected by an ongoing issue spotify/scio#2778
Steven Fines
@sfines-clgx
that'll
suck ;)
lots of command line options
Steven Fines
@sfines-clgx
Another question, what is SCIO doing that continually generates methods that are too large for the JDK?
for example: this code snippet generates a methodTooLarge compile error:
 def joinRecords(subject: SCollection[(String, SubjectKeying)], candidateComps: SCollection[(String, List[CandidateCompKeying])]): SCollection[(String, (SubjectKeying, CandidateCompKeying, Double, SubjectProperty, SubjectProperty))] = {
    subject.join(candidateComps).flatMapValues {
      r =>
        val (subject, candidateList) = r
        candidateList.map { comp =>
          (subject, comp, Distance.haversine(Distance.makePoint(subject.latitude, subject.longitude), Distance.makePoint(comp.latitude, comp.longitude)), subject.property, comp.property)
        }
    }
  }
Filipe Regadas
@regadas
@sfines-clgx interesting, I'll take a look
On other news, we just released v0.9.1 https://github.com/spotify/scio/releases/tag/v0.9.1, mostly improvements and bug fixes, let's know what you think!
I know that some of the contributors are in this channel, thank you for all of your contributions! :heart:
Steven Fines
@sfines-clgx
@regadas I've tried to make a test case to put in spotify/scio#2767 which is closed because the reporter gave up
but it can't reliably reproduce it.. It seems to be related to the size of the case classes ... seems to only happen when I use a case class which is very large and very nested (the case class in this case has roughly 50 elements and a nested class with the same or more)
another question is what is the best way to join three tables.. for example I have a mapping table which will limit the expansion of a cross join that I want to join to two other tables .. is it just to .join() and then .join()?
Neville Li
@nevillelyh
Hi all,
As Scio maintainers, lately we feel like the usability & reliability of Gitter has a lot to be desired. We're considering switching to our Spotify FOSS Slack, based on the hypotheses that many here already use it for work, and it's easier to manage than another app. Please fill in a 1 question yes/no survey and respond by Jun 30 Tue 5PM ET. There's also an "other" option if you have a strong preference for a different chatroom service. Thanks.
https://www.surveymonkey.com/r/MT6LQPG
You can get an invite at https://slackin.spotify.com/ and find us in #scio
championj-foxtel
@championj-foxtel

Hi all; after updating to newer SCIO versions we seem be running into some dependency issues causing runtime failures - just wondering if anyone else is hitting these or if anyone has a good strategy for shading or otherwise side-stepping the conflicts

example trace looks a bit like this:

PipelineError(ExceptionEncountered(Exception: java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    <- org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    <- org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
    <- org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
    <- org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
    <- com.spotify.scio.ScioContext.pipeline(ScioContext.scala:515)
    <- com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:35)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.read(PubsubIO.scala:277)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.read(PubsubIO.scala:271)
    <- com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:77)
    <- com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:74)
    <- com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:674)
    <- com.spotify.scio.io.ScioIO.readWithContext(ScioIO.scala:74)
    <- com.spotify.scio.io.ScioIO.readWithContext$(ScioIO.scala:70)
    <- com.spotify.scio.io.FallbackPubsubIOWithoutAttributes.readWithContext(PubsubIO.scala:271)
    <- com.spotify.scio.ScioContext.read(ScioContext.scala:811)
    <- com.spotify.scio.ScioContext.pubsubIn(ScioContext.scala:719)
    <- com.spotify.scio.ScioContext.pubsubSubscription(ScioContext.scala:731)
    <- com.my.package.df.somejob.job.PipelineClass$.$anonfun$pipeline$5(PipelineClass.scala:51)
    <- zio.internal.FiberContext.evaluateNow(FiberContext.scala:458)
    <- zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
    <- java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    <- java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    <- java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    <- sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    <- sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    <- sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    <- java.lang.reflect.Method.invoke(Method.java:498)
    <- org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
    ... 22 more
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
    <- com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
    <- org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    <- org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    <- com.sun.proxy.$Proxy24.getGcsUtil(Unknown Source)
    <- org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:83)
    <- org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.validateOutputFilePrefixSupported(GcsPathValidator.java:53)
    <- org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:256)
    ... 27 more))
3 replies
Steven Fines
@sfines-clgx
is there a more efficient way to handle finding the top by key than using topByKey? I need to process several hundred billion records and topByKey has a nested groupByKey inside it, which is quite slow... should i just write my own that uses aggregate by key and a bounded priority queue or some such?
2 replies
craneharry
@craneharry
Hi, was just wondering if anyone had used GCP emulators for integration testing with Scio? Im attempting to write to a dockerized firestore using .saveAsDatastore but it seems like there is no way of specifying to the DatastoreIO to use a local instance instead of hitting my GCP credentials? It looks like i can use the Scio test framework to mock out the inputs and outputs, but was hoping to do something more like an integration test
1 reply
Steven Fines
@sfines-clgx
@nevillelyh here is a screen shot Screen Shot 2020-06-10 at 9.53.01 AM.png
Neville Li
@nevillelyh
scio version?
Steven Fines
@sfines-clgx
latest
Neville Li
@nevillelyh
these (GroupByKey, Combine.GroupValues) are vanilla transform names and should be replaced by runner with optimized versions, does the estimate size not reflect those?
Neville Li
@nevillelyh
what's the n & avg record size of ur topByKey?
Steven Fines
@sfines-clgx
200
each record is very roughly 1kb
it does not
Neville Li
@nevillelyh
interesting, it shouldn't happen, and has always worked for me
lemme verify
Neville Li
@nevillelyh
s.png
Steven Fines
@sfines-clgx
not surprised - it works but is just slow when you start trying to use it on 131B records
there are ~2m Keys so I'm sort of thinking it doesn't scale like i would like it to
Neville Li
@nevillelyh
seems to work for me, 1B synthetic records of 1M unique keys, topByKey(200), output is ~1M records, and output size ~5% seem to add up
yeah at least Combine.perKey optimization works as expected
it's the 131B size that's causing problem
Steven Fines
@sfines-clgx
you're tellin me ;)
Neville Li
@nevillelyh
what's the exact failure? GC or worker no response?
Steven Fines
@sfines-clgx
doesn't fail is just much slower than using aggregateByKey, sort, and take
Neville Li
@nevillelyh
sth to do with the beam Top.perKey impl then
Steven Fines
@sfines-clgx
probably. My thought was that Top.perKey was what used groupByKey
so I will go with my implementation
Neville Li
@nevillelyh
no the combiner optimization seems to work as expected at least for me, and under the hood it's using TopCombineFn backed by a BoundedHeapwhich should also be the most reasonable way
Steven Fines
@sfines-clgx
hrm.
BoundedHeap, is it public?