Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Nov 27 18:16
    codecov[bot] commented #3500
  • Nov 27 17:58
    scala-steward opened #3500
  • Nov 27 09:41

    regadas on master

    Update sbt-mdoc to 2.2.13 (#349… (compare)

  • Nov 27 09:41
    regadas closed #3499
  • Nov 25 19:01
    codecov[bot] commented #3499
  • Nov 25 18:40
    scala-steward opened #3499
  • Nov 25 10:15
    regadas labeled #3497
  • Nov 25 10:14
    regadas labeled #3498
  • Nov 25 10:13

    regadas on master

    Add parameters for passing retr… (compare)

  • Nov 25 10:13
    regadas closed #3497
  • Nov 25 10:12
    codecov[bot] commented #3497
  • Nov 25 10:12
    regadas synchronize #3497
  • Nov 25 09:29

    regadas on master

    Update scala-collection-compat … (compare)

  • Nov 25 09:29
    regadas closed #3498
  • Nov 25 06:57
    codecov[bot] commented #3498
  • Nov 25 06:37
    scala-steward opened #3498
  • Nov 25 05:42
    codecov[bot] commented #3497
  • Nov 25 05:21
    codecov[bot] commented #3497
  • Nov 25 05:21
    DeaconDesperado synchronize #3497
  • Nov 24 23:35
    codecov[bot] commented #3497
Neville Li
@nevillelyh
@sfines-clgx file issue if can repro with a small snippet & a public dataset?
Artur Fejklowicz
@arturroo
@nevillelyh That's right :)
What exactly is scio-sql ?
Is there any documentation beside the Source Code for scio-sql?
Is it something like DataFlow-Sql?
is it possible to be used with Streaming?
Neville Li
@nevillelyh
it's super experimental not recommended for anything serious
Artur Fejklowicz
@arturroo
ok, thx
Steven Fines
@sfines-clgx
@nevillelyh I've actually worked through that part- it seems to throw that exception if it cannot find the table
Steven Fines
@sfines-clgx
Ok, so what is the best approach to handling this use case: I have a list A that is generated from a query, I need to take the output of the list and run Query B which is dynamically built off A.
Currently what I've done is to create a Tap and then open it and iterate over it but I don't know if that's the best way to run bigQuerySelect inside the loop
I suspect that it will fail because it will try to serialize the context
Steven Fines
@sfines-clgx
N/M I had a case of the dumb
Peng Yu
@yupbank
can i load my fitted keras model in scio ?
and predict ?
Artur Fejklowicz
@arturroo
Hi!
What is idAttribute in sc.pubsubSubscription?
How can I in pubsub subscription filter messages with an attribute aaa=XXX in message metadaten with Scio?
Neville Li
@nevillelyh
@arturroo most IO options translate directly to beam and can be found in their API site https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withIdAttribute-java.lang.String-
Steven Fines
@sfines-clgx
question
well scenario
I have an event that arrives from pubsub - it needs to go read big query dynamically based upon that input and then process the results
If it were a static set I would use a SideInput
what is the best practice here
I can't really find an example
Artur Fejklowicz
@arturroo

@nevillelyh thx!
I have another question, I get the schema from table:

@BigQueryType.fromTable("my-project:mydataset.my_nested_table")
class NestedFromTable

and I traverse over NestedFromTable.schema like this:

NestedFromTable.schema.getFields.forEach(tfs => {
    printTableFieldSchema(tfs)
})

def printTableFieldSchema(tfs: TableFieldSchema, parent: String=""): Unit = {
        val fullFieldName = parent match {
            case x if x == ""   => tfs.getName
            case x              => s"$parent.${tfs.getName}"
        }
        logger.info(s"TableFieldSchema Name: $fullFieldName Type: ${tfs.getType}, Mode: ${tfs.getMode}")
        if(tfs.getType == "RECORD") {
            tfs.getFields.forEach( field => printTableFieldSchema(field, fullFieldName))
        }
    }

It works well, but when I

  • stop the streaming directrun job
  • and I change schema (delete table, add INT64 field in middle of schema.json, add table again with new schema)
  • and I run the programm again

than I see the old schema. How can I make new schema visible in Scio Typed BigQuery?

Neville Li
@nevillelyh
@sfines-clgx https://spotify.github.io/scio/examples/index.html see the RefreshingSideInputExample not quite the same but close, u can do the update query in side input .map(fn).asSingletonSideInput?
@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?
Steven Fines
@sfines-clgx
@nevillelyh I couldn't see how to make that take arguments
Alexander
@XLPI
Hi guys, help me please with pretty simple question how actually see output in REPL console from this example https://github.com/spotify/scio/blob/master/site/src/paradox/Scio-REPL.md#local-pipeline ? I saw at the end Iterator[String] - but can't print it, when I try do something I always got error : java.lang.IllegalArgumentException: requirement failed: ScioContext has already been executed, use :newScio <[context-name] | sc> to create new context
Alexander
@XLPI
I got it!
val result = yourSCollection.materialize
result.debug()
sc.run()
Artur Fejklowicz
@arturroo

@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?

Yes, because I get nested JSON from PubSub and I need to store it in the same structure in BigQuery. But in future I would like to use Schema, to be able to parse fields that do not fully comply with BigQuery Types, for example it can be, that from different source I will get DATE in strange format like "m/d/Y" and for this I need to parse it manually, so I need schema from destination table (BigQueryType.fromTable.schema) to travers the schema, find path of DATE fields and change TableRowField according to my known rules into String parsable by BigQuery, like "Y-m-d"

Artur Fejklowicz
@arturroo
I have another question: to save data in BigQuery I use saveAsBigQueryTable. Is it possible for this function to get the failed entries and their errors, and save them in different table so I could later check them?
Additionally how to make pipeline not to stop on error, but simply skip errors, so I would not need to manually start pipeline after every single not parsed message?
Artur Fejklowicz
@arturroo

@arturroo updating compiled typed schema = recompiling code = redeploying, so u r better off with JSON-ish unsafe TableRow?

Yes, because I get nested JSON from PubSub and I need to store it in the same structure in BigQuery. But in future I would like to use Schema, to be able to parse fields that do not fully comply with BigQuery Types, for example it can be, that from different source I will get DATE in strange format like "m/d/Y" and for this I need to parse it manually, so I need schema from destination table (BigQueryType.fromTable.schema) to travers the schema, find path of DATE fields and change TableRowField according to my known rules into String parsable by BigQuery, like "Y-m-d"

OK, so I've manged to read nested Json String as TableRow with Google Jackson implementation (Code from Google Json2BigQuery template) and then this Table row with @BigQueryType.fromTable.fromTableRow change to CaseClass and then save it with saveAsTypedBigQueryTable.
Now I see that Scio has wrapper for Beam's "writeResult.getFailedInsertsWithErr()"
https://github.com/spotify/scio/blob/b278c289800783469275f17ee751cd3b3f101194/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala#L61
How can I use it to save those Failed Inserts into another Table?
I'm not so fluent yet in Scio/Beam/Scala/Java.

Filipe Regadas
@regadas
@arturroo to use it you need to use one of the underlying BigQueryIO* and use WriteParam to set the function that takes an SCollection[Error] https://github.com/spotify/scio/blob/4ca965e0c1d82f8aafb2628eaa92edb5b5f64337/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala#L230
i.e write(BigQueryTable(table))(WriteParam(...))
Deepak Telkar
@d66pak

Hello everyone,

I'm seeing an exception while trying to run scio WordCount example by creating a project using:

sbt new spotify/scio-template.g8

When I try to run the WordCount example that was created by the project, I see the following error:

[error] (run-main-8) java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
[error] java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
[error]     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
[error]     at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
[error]     at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
[error]     at com.sun.proxy.$Proxy482.getGcsUtil(Unknown Source)
[error]     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:241)
[error]     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:98)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:120)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:141)
[error]     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:153)
[error]     at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:223)
[error]     at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
[error]     at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:89)
[error]     at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:76)
[error]     at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:155)
[error]     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
[error]     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[error]     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
[error]     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
[error]     at com.spotify.scio.ScioContext.execute(ScioContext.scala:592)
[error]     at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:580)
[error]     at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:568)
[error]     at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:657)
[error]     at com.spotify.scio.ScioContext.run(ScioContext.scala:568)
[error]     at com.myproj.scio.explore.WordCountWithMetrics$.main(WordCountWithMetrics.scala:27)
[error]     at com.myproj.scio.explore.WordCountWithMetrics.main(WordCountWithMetrics.scala)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]     at java.lang.reflect.Method.invoke(Method.java:498)

Here's sbt command I used to run this example:

sbt "runMain com.myproj.scio.explore.WordCount --runner=DirectRunner --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=/tmp/wordCount"

Note: I'm using DirectRunner

To solve this problem, I had to add the following dependency in build.sbt

"com.google.guava" % "guava" % "28.2-jre"
My question is: am I doing the right thing to solve the problem or is there something I'm missing?
Thanks for your help!
Filipe Regadas
@regadas
Hi @d66pak That's a known issue in 0.9.0 and should be fixed in the next release spotify/scio#2945. You can keep that override for now and remove it with the new version
Deepak Telkar
@d66pak
Hi @regadas thanks for your prompt response. Will keep an eye for the next release.
championj-foxtel
@championj-foxtel
Hi sorry if this is wrong place for this type of Q; a few of our pipelines run into OOM errors in Dataflow and don't seem to want to scale out; we have tried introducing a groupBy transform on random key to force a shuffle and still no dice - are there any common pitfalls or a reliable way to force scaling?
jmendesky
@jmendesky
autoscaling is based on resources, throughput and data freshness. What do they look like? Is dataflow building up a backlog of data? Also, depending on what engine you are using, you might have to enable it explicitly with --autoscalingAlgorithm=THROUGHPUT_BASED
championj-foxtel
@championj-foxtel

The input (PubSub) are file triggers that we then load and create SCollection of the contents; so there isn't a large backlog so to speak; however due to the lack of memory in the operation the node OOMs before it can output; and so it doesn't ack the messages and continues to process (leading to rebooted worker OOM etc.)

What I was hoping to do was coerce it to split the intermediate SCollections across more nodes to prevent the memory issues

jmendesky
@jmendesky
worth checking what engine you are using - autoscaling is only enabled by default for streaming engine, but not all streaming jobs use streaming engine by default. If your system has never autoscaled before, it is very likely that you haven't enabled it: https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
Steven Fines
@sfines-clgx
anyone ever run into this? java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field magnolia.CallByNeed.eval of type scala.Function0 in instance of magnolia.CallByNeed
Filipe Regadas
@regadas
@sfines-clgx how are you running your pipeline? from Intellij? Also, is your project depending on magnolia? (make sure to have only one version)
Steven Fines
@sfines-clgx
@regadas yes from intellij and I don't use magnolia at all
Filipe Regadas
@regadas
@sfines-clgx try launching the job from SBT for now... I think you are affected by an ongoing issue spotify/scio#2778
Steven Fines
@sfines-clgx
that'll
suck ;)
lots of command line options
Steven Fines
@sfines-clgx
Another question, what is SCIO doing that continually generates methods that are too large for the JDK?
for example: this code snippet generates a methodTooLarge compile error:
 def joinRecords(subject: SCollection[(String, SubjectKeying)], candidateComps: SCollection[(String, List[CandidateCompKeying])]): SCollection[(String, (SubjectKeying, CandidateCompKeying, Double, SubjectProperty, SubjectProperty))] = {
    subject.join(candidateComps).flatMapValues {
      r =>
        val (subject, candidateList) = r
        candidateList.map { comp =>
          (subject, comp, Distance.haversine(Distance.makePoint(subject.latitude, subject.longitude), Distance.makePoint(comp.latitude, comp.longitude)), subject.property, comp.property)
        }
    }
  }
Filipe Regadas
@regadas
@sfines-clgx interesting, I'll take a look