Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Hello. Lets say I declare a val like below:
val a: Any = Array(1,2,3)
when I type :
the output is some nonsense and not the actual Array as expected. So, how could I retrieve and print the value of a variable of type "Any" ??
Hello, I am little confused about spark dynamic allocation. In private class Tracker of org.apache.spark.scheduler.dynalloc.ExecutorMonitor, the method updateTimeout will take the min of _cacheTimeout and _shuffleTimeout.
I think is more reasonable to take the max of _cacheTimeout and _shuffleTimeout, since we need to wait both of cache and shuffle to be timeout if they all be set.
Any one known about that? the code can see at https://github.com/apache/spark/blob/e1a5d9411733437b5a18045bbd18b48f7aa40f46/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L567
Jian Xu
Hi my spark server usage of memory keeps on growing for processing same size of batch datasets
it seems to me the ram keeps all historical operation and therefore it grows along time
does anyone might have similar experience would appreciate any solution
Milad Molaee
Hi everybody
Good time
I have a request
Is it possible for someone to give me the combination codes of k-means and genetic algorithms in Python or scala on the Spark platform?
Especially in the Python language, I did not find

Or the combination codes of algorithms Fuzzy C-means and Ant colony

Fuzzy C-means+Genetic

Milad Molaee
Thank you very much for helping 😃😃😃
hi all, any other good spark related chat servers you all recommend on matrix / discord / slack?
Eric K Richardson
There is a Spark user listserv and Spark dev.
// Is it just me, or does this blow up for you too?
// I'm running this from the spark shell in both
// spark 2.3.3 and 3.0.2, both give silly exceptions

object MyTestHelper extends Serializable {
  def len(s: String): Int = { s.length }

class MyTest() extends Serializable {
  def len(s: String): Int = { MyTestHelper.len(s) }

spark.udf.register("my_test", (new MyTest()).len((_: String)))

spark.sql("select my_test('how long is this string?')").show()
The fact that the MyTest.len method calls a function in a singleton object seems to be what makes it break. Is there a good reason for that? As you can guess, this is not really the UDF I need, but it's the simplest distillation of a problem I'm seeing that I could come up with.
Eric K Richardson
I don't think that's it. For example, this works fine for me:
spark.udf.register("another_test", (_: String).length)
spark.sql("select another_test('how long is this string?')").show()
I should add that the code that breaks (with the MyTest class and MyTestHelper object) runs fine in a spark shell on a single node cluster, but breaks on our real cluster, which is of course multi-node.
Eric K Richardson
Then is is potentially a serialization error maybe. What were the errors?
I get a java.lang.ExceptionInInitializerError, and it's Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration. But the master URL is set in my spark-defaults.conf uniformly across the cluster, and seems to work fine as evidenced by ordinary Spark SQL queries of ORC data running fine there. It makes me think that the exceptions are not really showing what's going on.

The top of the stack trace for the ExceptionInInitializerError looks like this:

        at $line130.$read$$iw.<init>(<console>:9)                                          
        at $line130.$read.<init>(<console>:47)                                                    
        at $line130.$read$.<init>(<console>:51)                                                   
        at $line130.$read$.<clinit>(<console>)                                                    
        at $line131.$read$$iw.<init>(<console>:9)                            
        at $line131.$read.<init>(<console>:21)                                                    
        at $line131.$read$.<init>(<console>:25)                                                   
        at $line131.$read$.<clinit>(<console>)                                                    
        at $line132.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res58$1(<console>:29)
        at $line132.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res58$1$adapted(<console>:29)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)

Not sure how to interpret that.


I use Spark 3.0.2.
After submitting an application with ./spark-submit to my Spark standalone cluster, I successfully got a DRIVER_ID in running mode.
However I then try to get the status with :
./spark-submit --deploy-mode cluster --master spark://$MASTER_HOST:$MASTER_PORT --status $SPARK_DRIVER_ID
But the following error is returned :

WARN RestSubmissionClient: Unable to connect to server spark://a.b.c.d:7077.
Exception in thread "main" org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable to connect to server

Have you an idea ? Thanks

Eric K Richardson
@g1l3sp I always run spark locally first to try things out with master set to local[*]. This is easier than even using spark submit.
I did find a way to refactor and get it to work. I ended up nesting everything inside the singleton object. Not sure why it makes a difference, but it does.
// This works.  Yay!

object MyTestHelper extends Serializable {
  def len_helper(s: String): Int = { s.length }

  class MyTest extends Serializable {
    def len(s: String): Int = { len_helper(s) }

  def len(s:String) : Int = {
    (new MyTest).len(s)

spark.udf.register("my_test", MyTestHelper.len((_: String)))

spark.sql("select my_test('how long is this string?')").show()
The broken example from above did work on an older version of Spark (2.2.x I think), so maybe they got more conservative about what they'll try to serialize up from the shell? Just a wild guess.
Eric K Richardson
Why do you need the intermediate class, can't the object just work? You are making a new object for every row in the RDD.
i think u did't add udf before you code
you func
i have a question , hope someone can figure me out, is dataframe better than spark-sql?
Calvin Pietersen
Hi, I have an issue with Spark structured streaming using Kafka as a source. I'm trying to rate limit the source using maxOffsetsPerTrigger. However, it seems that the first trigger doesn't respect the rate limit. I've seen elsewhere that people claim this is the case, but cannot find any documentation on it.
@ekrich It's standing in for a much more complex bit of code. The example here is admittedly silly. In the real thing, the object creation is unavoidable.
Eric K Richardson
I see, thanks.
Ruslan Zubtsov
Hello everyone.
I have an idea of an open-source library/framework over Spark which would help with structuring & writing a better Spark jobs (less boilerplate, more clarity). I'm looking for any help to design and implement it. Maybe you'll give a fresh view on the potential issues or suggest new ideas.
If anybody here is interested in that topic please reach out to me privately!
Thank you.
11 replies
Charlie Stras
Hello everyone. Are there any recommended textbooks or tutorials for beginner? Thanks.
Eric K Richardson
@/all please note that most Scala gitters and moving to Discord - https://discord.gg/CnPjXp7Q
2 replies
D Cameron Mauch
@CharlieStras I would recommend the RockTheJVM membership. Tons of good stuff in there. A bunch of us signed up through our company.
Charlie Stras
@DCameronMauch Thanks!
Mike Pfaffenberger
Hey all, might be kind of the wrong place to ask, but has anyone ever configured a vanilla Spark cluster to use Azure datalake file system with SAS tokens?
Doeun Kim

Hello everyone. I'd like to ask something about Spark performance.
Under the case of running the code block below, does spark read data externally twice from one S3 parquet file? And if I want to read it once, then should I add "df.persist()"?

Python Test Code(it's just a meaningless sample code):
df = spark.read.parquet('s3a://test/test.parquet')
df1 = df.join(df, df.col == df.col)

Thank you

@mpfaffenberger I had a helluva job doing that. Got it working with OAuth, though. Made some rough notes at https://javaagile.blogspot.com/2020/03/azure-docker-and-k8s.html that may or may not help.
Mike Pfaffenberger
@PhillHenry thanks!! Yeah I was looking at the OAuth possibility. The main thing I want to be able to do is configure read only access. But I don't think I can do that with straight key-based authentication. The key essentially gives you full admin access to a storage account. I don't want to accidentally blow away something in a workload...
(library not post sorry)
Very interested to try that.
Alejandro Drago
Hello everybody. I need to translate this simple query (spark sql) to dataframe api.
spark.sql(""" select field1,field2,field3 group by field2 """)
This query works but when trying to change to dataframe api I can't find the way to use groupBy without agg functions.
1 reply