Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
indeediansbrett
@indeediansbrett
You can still do the join, just if it's a large table itself and not a tiny dim table should use a sortmerge join or possibly shufflehash (if too big then no) join instead of broadcast join...if the data is skewed on the join keys can salt join. It doesn't seem like it would be as am guessing this is for a model? Like the original are inputs and the output is the predicted value so you can join on some record input key? Have joined large datasets and it's not terrible
RJ
@neontty
it is for feature engineering; I am modifying a dataset for a downstream process.
indeediansbrett
@indeediansbrett
what kind of join keys are you using?
it seems like you can't denormalize it
RJ
@neontty
I would be using an index as the join key?
indeediansbrett
@indeediansbrett
Like a record index? Dunno what you're joining on, but if it's not well distributed then would want to do a salt join probably...sounds like a lot of data so would want to up the shuffle partitions for the join
RJ
@neontty
yeah it's a monotonically increasing record index
indeediansbrett
@indeediansbrett
I thought the input data had independent variables as the columns or something so each record had an id...but yeah that join key you're actually using should be well distributed and shouldn't have skew issues then. Should just disable broadcast joining, it will shuffle merge join by default then
Faiz Halde
@fhalde

We have tasks that have slowed down tremendously and we are unable to figure out where's the issue. The thread dumps always show the following

java.lang.Thread.State: RUNNABLE
    at org.apache.spark.unsafe.Platform.copyMemory(Platform.java:255)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.writeUnalignedBytes(UnsafeWriter.java:129)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:206)
    at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:122)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.window.WindowExec$$anon$1.fetchNextRow(WindowExec.scala:133)
    at org.apache.spark.sql.execution.window.WindowExec$$anon$1.fetchNextPartition(WindowExec.scala:163)
    at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:188)
    at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:122)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)

AQE is enabled. The tasks are not processing a lot of data though. How should I debug this?

Usman Yasin
@UsmanButtPTI_twitter
I have a question regarding Spark structured streaming integration with Kafka for batch processing. I don't seem to find an automatic way for my spark job to process data in batches of N messages. I don't want to run this as a streaming job for obvious cost reasons and only want the job to run at defined intervals, process available data in chunks of N messages and terminate when all data is processed till that point. Effectively, I require the maxOffsetsPerTrigger functionality for batch mode but apparently, it is available for streaming queries only. I'd appreciate if somebody has a similar experience or can share the rationale behind such a design choice.
1 reply
dylanusdt
@dylanusdt:matrix.org
[m]
I'll help 10 individuals how to earn $20,000 in just 72 hours from the crypto/forex market. But you will pay me 10% commission when you receive your profit. if interested send me a direct message via Whatapp by asking me HOW for more details on how to get started
+1 (570) 801-0862
Wei Wei Ryu
@srhth:matrix.org
[m]
hello, im a complete newbie to spark and im going over some code
extractFields(overrodeDF).distinct().write.mode(SaveMode.Overwrite).parquet(params.outputPath)
the above line usually takes 3-4 mins but some times goesover 20 mins to perform for the same job when tried scheduling it multiple times. what would be the cause? this is just a simple write operation for parquet so what would be the reason for the code to take so long?
2 replies
md_abdul_hassan
@md_abdul_hassan:matrix.org
[m]

Hi,
Is there any open source projects built on Apache Spark.

Or Please send the Project Structure/Framework of Spark, Driver classes, Service classes, etc.

2). Different Ways to test the Dataframe equality through spark-shell

Eugene Wolfson
@yegeniy

Hi. I realize this is a scala specific channel, but I see a lot of pyspark questions so I hope it's alright to post this here. I'm just curious whether anyone has had success with running something like this example in an Amazon EMR cluster.

https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#using-virtualenv

Were there any gotchas?

Basically, can you run the following without using a BootstrapAction when spinning up the cluster? I'd like to install arbitrary python dependencies for each spark-submit task, without having to create a new cluster. But following that example doesn't work on an EMR cluster.

    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))

    @pandas_udf("double")
    def mean_udf(v: pd.Series) -> float:
        return v.mean()

    print(df.groupby("id").agg(mean_udf(df['v'])).collect())
ferrari6666
@ferrari6666
org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceData RelationV2[id#20, data#21] test_table;
checkAnalysis.scala is giving error how to take hit.
paulpaul1076
@paulpaul1076
Does Spark use kryo serialization when writing a file to disk?
14 replies
nirupreddy
@nirupreddy_twitter
Hi I am facing the following error , could you help me around the same
Job aborted due to stage failure: Task 2 in stage 5.0 failed 4 times, most recent failure: Lost task 2.3 in stage 5.0 (TID 1529, internal, executor 712): java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.<init>(ParquetRowConverter.scala:615)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:355)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:209)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:395)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:209)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.<init>(ParquetRecordMaterializer.scala:54)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:135)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:222)
    at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:185)
    at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:143)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:352)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStag
2 replies
RJ
@neontty
Screenshot from 2022-07-12 14-23-29.png
Screenshot from 2022-07-12 14-21-52.png

Hi, I am wondering why Spark UI shows 0 Bytes spilled in the "SQL" tab, but under the details for a stage it shows a non-zero amount of data spill to memory/disk.

reference pictures:

I just upgraded to spark 3.2.0 and I want to use AQE. Previously I was setting spark.sql.shuffle.partitions, which had a good result and made small enough partitions so my cluster did not throw an OOM error. However, AQE is coalescing into too few partitions, causing the original error that partitions are to large in memory and throws OOM.
it seems like "spark.sql.adaptive.advisoryPartitionSizeInBytes" can help prevent AQE from coalescing into too few partitions ( not sure why "spark.sql.adaptive.coalescePartitions.minPartitionNum" was deprecated because that is a more direct way for me to prevent too-few-partitions). but I wanted to know if anyone else had this type of problem and whether "advisoryPartitionSizeInBytes" is the correct setting to alleviate it.
RJ
@neontty
any insight is appreciated, thank you!
paulpaul1076
@paulpaul1076
Is there a way to use custom serialization for encoders? I have objects that are larger than 2 gigabytes and both kryo and java serialization fail, because they use Array[Byte] for serialization and its max size is 2 gigs.
Which brings my to my second question, can we create a serializer that uses Array[Array[Byte]], instead of Array[Byte], to overcome that 2 gig limit?
paulpaul1076
@paulpaul1076
Hey guys, I used a kryo encoder in my code, and now all my data in orc files has just 1 column called “value” of type binary, instead of many columns. How do I read from such a table in hive?
Is there a serde for that?
Maksym Melnychok
@mks-m
hey folks, not really a scala related question, but still maybe there's anyone familiar with spark internals - is it possible to launch directly only spark driver process and only spark executor process without having to deal with any resource managers and spark master?
3 replies
akshit034
@akshit034
Hey guys, I am using spark structured streaming and want to understand what is the impact of changing (increase or decrease) window size or schedule of these windows (in sliding mode)?
Also, is there any way to read delta files that are stored in checkpoint dir, which external jar does it need to read them?
4 replies
gorgonzola
@gorgonzola-oh
.]
g k
@abc_spark_gitlab

I am getting migrated from on-premise to the AWS stack. I have a doubt and often confused about how Apache spark works in AWS/similar.

I will just share my current understanding about the onpremise spark that run on yarn. When the application is submitted in the spark cluster, an application master will be created in any of the data node (as containers) and this will take care of the application by spawning executor tasks in the data nodes. This means the spark code will be deployed to the node where the data resides. This means less network transfer. More over this is logical and easy to visualize (at least to me.)

But, suppose there is a same spark application that runs on AWS. This fetches the data from S3 and run on top of eks. Here as I understand the spark drvier and the executor tasks will be spawn on a k8s pod.

-Then does this mean, data has to be transferred through network from S3 to EKS cluster to the node where the executor pod gets spawned ?

I have seen some of the videos that uses EMR on top of EKS. But I am a little confused here.
-Since EMR provides spark runtime, why do we use EKS here? Can't we run EMR alone for spark applications in actual production environment? (I know that EKS, can be a replacement to YARN in spark world)

-Can't we run spark on top of EKS without using EMR? (I am thinking emr as a cluster where in spark drivers and executors can run )

Marriyam
@Marriyam
Hello everyone, I'm currently working on machine learning through spark. Can anyone please guide me on how can I convert the spark streaming data frame to a regular data frame?
2 replies
md_abdul_hassan
@md_abdul_hassan:matrix.org
[m]

How to rewrite/split/organize complex SQL query into scala program.

Select * from
(
(((Select SelectExpr, Agg
(Select distinct SelectExpr
Where filter1)
Group by Cols)
union all
(Select SelectExpr, Agg
(Select distinct SelectExpr
Where filter2)
Group by Cols) --3, 4
) inner join
(Select SelectExprA, Agg
(Select distinct SelectExprA
Where filterA)
Group by ColsA)
)
INNER JOIN
(
Select SelectExprB, Agg
(Select distinct SelectExprB
Where filterB)
Group by ColsB
)
)

How to rewrite/split/organize complex SQL query into scala program.
Any online code example, or, useful resource, or Answer

2 replies
Rishi Kaushal
@rishikaushal:matrix.org
[m]
guys can you tell how to write to s3 from master node of spark cluster ?
anyone here ?
md_abdul_hassan
@md_abdul_hassan:matrix.org
[m]
Rishi Kaushal
@rishikaushal:matrix.org
[m]
I want to write to s3 only from masternode of the cluster using scala in zeppelin notebook which runs on a spark cluster, can you share me the scala code on how to make it possible ?
1 reply
RJ
@neontty

Why is spark.sql.adaptive.coalescePartitions.parallelismFirst default to true, but documentation says recommended to set it to false?

https://spark.apache.org/docs/3.3.0/sql-performance-tuning.html

Mahesh (Maara) Veerabathiran
@maaraoffl
Hi, I have to write a dataframe into message pack format. I am thinking of creating a custom datasource. Could someone share a guide to create one?
1 reply
Rishi Kaushal
@rishikaushal:matrix.org
[m]
guys can you tell about some active channels for airflow cluster issues ?
saritdev1989
@saritdev1989
While using Spark Structured Streaming (Spark 3.2.1), on kafka messages in snappy compressed format cpu utilization in Spark side is increased by approx 2 times. Is there any tuning parameter to optimize cpu utilization in Spark side?
1 reply
RJ
@neontty
image.png

Does anyone know why spark uses the compressed file size on disk when determining partitions for spark.sql.files.maxPartitionBytes insteads of the partitions inside of the files or the uncompressed size?

This is a problem for me because I have a dataset that is 213GB on disk but inflates to 1.4TB when uncompressed

related:
https://stackoverflow.com/questions/61847278/why-does-spark-not-create-partitions-based-on-parquet-block-size-on-read-inste

screenshot for reference also; you can see how much data spill I get just by reading the file with spark.sql.files.maxPartitionBytes set to 128MB
RJ
@neontty
We can see the data has a compression ratio of ~6.7x so if I crank the maxPartitionBytes down to 16MB it is able to read the data in approximately half the time with much much less spillage
image.png
saritdev1989
@saritdev1989
While using Spark Structured Streaming (Spark 3.2.1) in K8s environment, for incoming kafka messages in snappy compressed format, cpu utilization in Spark side is increased by approx 2 times however for uncompressed messages cpu utilization is much lower. Is this a normal behavior for spark where compressed message processing requires high cpu? or any tuning is required to lower cpu utilization for compressed messages?
RJ
@neontty
Why does my dataframe even spill to disk? it's not doing a shuffle. is it just because it's being deserialized and exceeds the executor memory?
Khalid-Alqahtani
@Khalid-Alqahtani

Hi

I have spark stand alone cluster
I want to monitor and alerting when worker go down .
What is the best approach for this issue