Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Selcuk Cabuk
in the background does spark makes a join operation with my new VertexRDD and existing graph ?
Is this running in a single JVM?
Just avoid all mutation. If you write mutable code, all bets are off.
Rather than mutating your stuff with foreach, create a new copy with the updated values using map.
That's the point of map. "Keep the same structure, but change the values this way."
(This is if RDDs require that the stuff you put in them is immutable. I actually don't know.)
Selcuk Cabuk
@PhillHenry yes I'm running in my local, so do you think that's the reason?

@Ichoran but if I use map, map function will give me a new graph RDD right? What if my graph is so big, with every map operation will I double the size? I mean if I run map method on graph for a 10 different vertex, then I'll get 10 different graph RDD, if my graph is big how will I manage them on ram?

also i found something similar on stackoverflow: https://stackoverflow.com/questions/30913771/modifying-rdd-of-object-in-spark-scala/30914313

As long as you don't store the new one, the old one will just be discarded. If you can't access it, it will be garbage collected.
@xsenko When you call map, it's actually just building an execution plan. Eventually when you call something like reduce or count or something it will need to execute, but it won't actually store all the intermediate RDD's.
Selcuk Cabuk
@esuntag I got it thanks. But still couldn't understand one part, lets say I've graph with 10gig data. When i covert it to graphx RDD, it will cost me 10gig of memory right? (maybe little more) I'm gonna use it this graph in many operations so i need to cache it.
Now, is every map operation will return me a new graph (which is 10gig). So it means that 10 different map operation (Because I wanted to modify 10 different values in my actual graph) gonna cost me 100gig memory or somehow graphx will modify only the changed data.

In here: https://spark.apache.org/docs/latest/graphx-programming-guide.html
It says

Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure.

but i couldn't find more detailed information
Ah, graphx is a bit of a different beast, and it depends on what you're doing with it. But I don't think there's any case where calling map a bunch of times will balloon the size. Calling map doesn't immediately create the new graph, but creates a plan that when executed gives you the new graph. When you call map 10 times, you basically just put 10 things in your plan. Then eventually the plan is executed when you actually need to know the value (something like reduce, collect, or any of the functions that save/cache)
If you're really not sure, you can try running in local mode and profiling it to watch memory consumption
Stanislav Palatnik

Hey guys. Stuck on a transmation step. I'm trying to convert a specific column to JSON and then compress it.

      .select($"id", to_json($"attributes"))
      .withColumn("compressedAttributes", callUDF(GZIPUDFKey, $"attributes"))


org.apache.spark.sql.AnalysisException: cannot resolve '`attributes`' given input columns: [id, structstojson(attributes)];;

I assume it needs to "flatten" the call to jsonify it?
Stanislav Palatnik
Nvm I got it. Aliasing it with a string name worked
Miguel Pérez Pasalodos
Hello! Anybody knows if in structured streaming there's a way to access batchTimestampMs from within a foreachBatch?
Edward Adjei
I have no idea how to pull this off and new to graphframe
Liran Buanos
Hi, would appreciate some help with -https://stackoverflow.com/questions/64265444/error-loading-the-stages-stage-id-page-in-spark-ui
Mainly would like to know how to find logs for spark metric rest API, as I'm getting an error when loading the stages page in spark UI
Hi all, does anyone have any experience and tips on joining a Kafka source and a big static dataset in Structured Streaming? I can never make it work - my job eventually dies after it starts. Using a join between the Kafka source and a small static dataset works though.
D Cameron Mauch
Having trouble with custom encoders. I think this should work, but doesn’t seem to find it...
Getting this error:
java.lang.UnsupportedOperationsException: No Encoder found for DayOfWeek
@DCameronMauch Not sure why you get it, but with case class and import spark.implicits._, you should not get the error
D Cameron Mauch
It’s a case class that has a member which is an abstract data type.
That does not automatically get an encoder
Hi All, looking for some advice to improve a spark3 job. I am reprocessing historical logs about (300-400GB per day) which have been left in S3 as small files. Repacking into Parquet files partitioned by the day / hour. main code doing the lifting is spark.read.text to read the files then dataframe.repartition(...).write.mode("append").insertInto('destTable') to save. I tried small executors didn't seem to work too well. Now I am using large executors (8 Core / 20GB) aside from adding more executors what more can I do? I have set spark confs spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 spark.speculation=false spark.dynamicAllocation.enabled=true spark.shuffle.service.enabled=true
@Data-drone how many days are you processing at once? What are you exactly doing with the repartition? Could that be replaced with a ‘coalesce’?
12 replies
@DCameronMauch In this case, you need to create another encoder for the wrapper class which is your case class named Payload in your code example. See this https://stackoverflow.com/questions/64190037/how-to-store-nested-custom-objects-in-spark-dataset/64248200#64248200
btw, does anyone have some thoughts to contribute to this question? it's in bounty https://stackoverflow.com/questions/64200800/how-do-datatype-and-encoder-interact-with-spark-dataset-and-what-is-their-relati
@mkedwards Hello I have same error with you. Can you help me solution pass the error? thank so much
D Cameron Mauch
I got my encoder working now in a test, but only for Dataset[DayOfWeek]. So there is clearly an Encoder[DayOfWeek] in scope. But when I try to encode a Payload, which has a parameter of type DayOfWeek, I get message about no encoder found for DayOfWeek…. WTF?
Maybe I am confused about the scope. If an implicit is in scope in some method A, which then calls method B, within the context of B, is it still in scope?
Ankush Kankariya

I am noticing a difference in behaviour on upgrading to spark 3 where the NumPartitions are changing on df.select which causing my zip operations to fail on mismatch. With spark 2.4.4 it works fine. This does not happen with filter but only with select cols

spark = SparkSession.builder.appName("local"). \
master("local[2]"). \
config("spark.executor.memory", "2g"). \
config("spark.driver.memory", "2g"). \
config("spark.sql.shuffle.partitions",10). \
config("spark.default.parallelism", 10). \

With Spark 2.4.4:

df = spark.table("tableA")
print(df.rdd.getNumPartitions()) #10
new_df = df.filter("id is not null")
print(new_df.rdd.getNumPartitions()) #10
new_2_df = df.select("id")
print(new_2_df.rdd.getNumPartitions()) #10
With Spark 3.0.0:

df = spark.table("tableA")
print(df.rdd.getNumPartitions()) #10
new_df = df.filter("id is not null")
print(new_df.rdd.getNumPartitions()) #10
new_2_df = df.select("id")
print(new_2_df.rdd.getNumPartitions()) #1
See the last line where it changes to 1 partition from initial 10. Any thoughts?

Hi all! I'm having trouble with streaming data from azure event hub. The code was working 2 weeks ago and with no modifications, now it doesn't work. It throws ReceiverDisconnectedException. I have posted details about the code and the error in this github issue Azure/azure-event-hubs-spark#545
I saw that lots of people have ran into this issue before, but I can't seem to identify the fix.
Hi All, I'm new to spark, and looking for some good jupyter type notebook for Scala with some examples. Has anyone come across such work ?
Hey, is there a way to drop columns containing specific string ? for example, something like df.drop(*somestring*) ?
Selcuk Cabuk

I'm stuck in spark
I've a broadcast value looks like

ListBuffer((16,12,4), (16,15,4))

and have a RDD like this

RDD(Map(12 -> (21,19), 15 -> (21,12)))

and I'm trying to join this two, with second item in broadcast value and key of map in rdd
so result should be

(16, 12, 21, 19)
(16, 15, 21, 12)
Selcuk Cabuk

I think this approach should be ok
first i get rid of map inside the RDD with

val BigRDD: RDD[(VertexId, VertexId, Int)] = BigRDDwithMap.flatMap(elem => {
        elem.map(listItem => (listItem._1, listItem._2, listItem._3))


      val result = BigRDD.flatMap{
        case(key, value1, value2) =>
          broadcastData.value.filter(item => item._2 == key).map{bcValues =>
            (bcValues._1, key, value1, value2)
Karan Pratap Singh
Hey, I know its not a relevant place for this question, but if anyone can help will be really grateful, I am trying to run spark using spark-submit having dependencies installed on the container, but the spark is taking local python to run the python job, spark-submit --master yarn --deploy-mode cluster --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME --num-executors 2 main.py -v this is the command I am using.
Guys, in spark 2.4 is it possible to set the start offset when connecting to kafka? I tried using the value earliest at auto. Offset. Reset but the log prints the value set to none.... And i do not get old msgs
I'm using DataBricks' Spark on Azure. When I .cache my DataFrame using the delta format, it takes a long time to (apparently) pull the data out of Azure Gen 2. But when I replaceWhere, only the local copy changes. No changes are reflected in Azure.
Is this expected? It certainly came as a surprise to me and colleagues.
sorry if it doesn't apply here, I've been trying to migrate several jobs to Spark 3 and I'm seeing huge performance hits, it's a lot worse than Spark 2 in every single one of our jobs and I can't find any reference t this or people experimenting either, other than their release notes claiming performance improvements. Has anybody had similar experiences or the opposite?
Can we combine a RDD and a list into one RDD ?
RDD[Person], List[States] => RDD[(State, Iterable[Person])]
Hello everyone, may I ask, I want to learn spark source code, architecture, and what are the latest good books or documents, projects, etc., thank you