Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
James Moore
@PhillHenry I ended up just using AWS EMR instead of having a local system. It's pretty inexpensive, so doing the docker thing just dropped down my priority list.
William Swaney
@EnricoMi sorry for the late response, but thank-you for your reply.
can dynamic allocation work with spark structured streaming?
Alejandro Drago
Hello someone who knows hdfs and awk ???

Hi Spark Users,

I want to evaluate expression from dataframe column values on other columns in the same dataframe for each row. Please suggest best approach to deal with this given that not impacting the performance of the job.


Sample code:

val sampleDF = Seq(
  (8, 1, "bat", "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (64, 0, "mouse", "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (null, 0, "miki", "NUM IS NOT NULL AND FLAG IS NOT 1 AND WORD IS 'MIKI'")
).toDF("num", "flag", "word", "expression")

val derivedDF = sampleDF.withColumn("status", sampleDF.col("expression"))
Use a udf
toxicafunk this book seems like a good resource, its form the author of the spark-fast-tests lib
I can get sql from an expression object like thisnew Column("colors").substr(0, 3).expr.sql, but how can do otherway around like getting Column or Expression from SQL String ?
Godsgift ThankGod AKARI
scala> val f =(x: Int) => x + 10
f: Int => Int = <function1>

scala> val b=1 to 10
b: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> f(b)
<console>:14: error: type mismatch;
 found   : scala.collection.immutable.Range.Inclusive
 required: Int
Godsgift ThankGod AKARI

Hi Everyone,

Can someone please confirm why am getting this error or alternative solution ?

@giftakari Type of b is Range, but the parameter of function f is Int. You can write like this b.map(f), then you get IndexedSeq[Int].
Godsgift ThankGod AKARI
@fxinn thanks for your help
Meaning we are passing function as parameter ?
siddhesh salvi
Hi everyone how to use s3a connector to access amazon s3 file in spark ?
@PhillHenry I tried a UDF doesn't work. Would appreciate your input.
I am losing the timezone value from below format, I tried couple of formats but not able to make it. Can someone throw lights?
scala> val sampleDF = Seq("2020-04-11T20:40:00-0500").toDF("value")
sampleDF: org.apache.spark.sql.DataFrame = [value: string]
scala> sampleDF.select('value, to_timestamp('value, "yyyy-MM-dd\'T\'HH:mm:ss")).show(false)
|value                   |to_timestamp(`value`, 'yyyy-MM-dd\'T\'HH:mm:ss')|
|2020-04-11T20:40:00-0500|2020-04-11 20:40:00                             |
Jon Junk
I am new to Apache Spark and struggling to do a simple use-case. I am using Azure Data Bricks as my Spark deployment. All I would like to do is read my data as "eventhubs" format and then write to a file. I am able to read my data as a stream using val inStream = spark.readStream.format("eventshub").load() and I am able to write the stream to the console doing this inStream.writeStream.outputMode("append").format("console") .option("truncate", false) .start().awaitTermination(). I am just missing the step on how to write it to a file.
My research has led me to believe that I need to use https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/ForeachWriter.html somehow and specify this in the writeStream.
Sajith Vijsekara
Hi I am new to spark & scala. I have issue with read json file
Sajith Vijsekara
Is there anyone to get help for this issue
  1. What do you mean exactly by "a UDF doesn't work"?
  2. I'm assuming you're losing information as to_timestamp is giving you a java.sql.Timestamp which in the RDBMS world doesn't have timezone information.

I am constantly using "insert overwrite table table_name partition(partition_column) query"" to write data into my table but the problem here is the number of files generated.
so i started using spark.sql.shuffle.partitions property to fix the number of files.

Now the problem statement here is that there is less data in some partition and very huge data in some partitions. when this happens, when i choose my shuffle partitions as per my large partition data there are unnecessary small files created and if i choose shuffle partitions as per partitions with low data, job starts failing with memory issues.

Is there a good way to solve this?

Hi @har5havardhan har5havard
William Swaney

A quick question on the Spark UI, namely the SQL tab.

A bit of background: I have a single query running repeatedly. At the end of the query I will do a simple .count on the DataFrame that results from the query for scenario 'A'. In scenario 'B', I simply assign myDF.rdd to another val, and do a count on that.

When looking at the SQL tab, in each case I have just the one query, and in each case, there's a single job associated with the query. All good. However, scenario 'A' shows an additional row, with no job ids listed, but what appears to be a sum of the time taken (in this case the same as the single query. When I run scenario 'B', this extra row does not appear.

What is the purpose of the additional row appearing for each query in scenario 'A'?

Thanks in advance.

Hello, I'm trying to understand the times in a the execution of an application through the Spark UI. If I understood well, before a shuffle operationg (like groupBy(col1) ) there should be a shuffle write time (in the initial stage) and a shuffle read time in the second stage, right?
If I'm right I was wondering why in the Summary Metrics section (in the Spark UI) of the intial stage (the one before 'exchange') there is not a row for 'Shuffle write time' while in the stage after 'exchange' there is a 'Shuffle read blocked time'
sanjay sharma
Hi All,
Running spark scala/java jobs in databricks with some jobs sometime we are getting the below issue, when we re-run the job succeeded? any reason?
failed due to Caused by: com.amazon.support.exceptions.GeneralException: Amazon Error retrieving IAM credentials. "Rate exceeded (Service: AmazonRedshift; Status Code: 400; Error Code: Throttling;

I have this code :

val test1: RDD[(String, (Action, String))] = mappedAction.filter(_._2._1.transientErrors.isEmpty)
val test2: RDD[(String, Iterable[(Action, String)])] = test1.groupByKey()

I know that reduceByKey is more performant that groupByKey because it avoids shuffle.

But i'm beginner and i cant manage to replace groupByKey in the second line by a reduceByKey without compile errors
Can any one help ?

sayantan ganguly
Hi I want file header's column name in same order as in the file . but when i use -

val data = spark.read
.withColumn("id", monotonically_increasing_id())

val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0). Then the column headers are coming in different random order.But i want in the same order as mentioned in file.Please help me on this.

@PhillHenry I meant, I wrote a UDF with .toString but still I am facing same issue. Sorry if it is a dumb question.
Alejandro Drago
Hello I am connecting big query from spark and set the env variable to the json credentials, but it fails when executors try to show the data
Because the file doesn't exists (It is true I have in local only )
I tried with hdfs path but the same error appears
Ankush Kankariya
HI All, i need help with an error I am getting when running Explain on a sql statement with subquery : scala.MatchError: scalar-subquery#9201 [] (of class org.apache.spark.sql.catalyst.expressions.ScalarSubquery). Any one have any suggestions on how to reproduce this in my local environment, only getting this in emr
Enrico Minack

Once parsed into a Timestamp the timestamp is store internally as UTC and printed as your local timezone (e.g. as defined by spark.sql.session.timeZone). Spark is good at hiding timezone information from you.

You can get the timezone information via date_format(column, format):

import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.functions._

val sampleDF = Seq("2020-04-11T20:40:00-05:00").toDF("value")
val timestampDF = sampleDF.select($"value".cast(TimestampType))
timestampDF.select(date_format($"value", "yyyy-MM-dd'T'HH:mm:ssZZZZ")).show(false)
|date_format(value, yyyy-MM-dd'T'HH:mm:ssZZZZ)|
|2020-04-12T03:40:00+0200                     |

If you want the timezone only, use timestampDF.select(date_format($"value", "ZZZZ")).show.

|date_format(value, ZZZZ)|
|                   +0200|

It all depends how you get the data "downstream". If you go through parquet or csv files, they will retain the timezone information. If you go through strings, you should format them as above. If you use Dataset.map you can access the timestamps as java.sql.Timestamp objects (but that might not be necessary):

import java.sql.Timestamp
case class Times(value: Timestamp)
timestampDF.as[Times].map(t => t.value.getTimezoneOffset).show
| -120|
Need to insert partiton hive table
hive3 orc table

Hi all,
This is regarding spark structured file streaming.

Upstream applications will be writing to the input file streaming directory continuously. Whenever there is a new file Spark will pick up immediately and do the processing.

So, if my upstream app is writing a huge file, assume it takes 10 mins. But when the upstream app started writing, spark micro-batch starts and picks the incomplete file and process. But when the complete data is written by upstream in input directory, spark will not read the same file once again as the input file name is already processed and maintained in check point directory.

How to handle this situation ?

Enrico Minack
You should never write in that directory directly. Better write into a different directory that is on the same filesystem and on success move the file into the streamed directory. Moving a file is quick and atomic and either the complete file is visible to Spark or none.

@EnricoMi Thanks for the reply. But moving the file will also take some time to completely write into the directory ? Isn't it ?

so, in source system, once they completed they need to write a touch file(success flag), then we need to move it into streaming input directory.

Hi, I am new to Spark Scala.
What is the most optimal way to get Json data from an Http API in spark ?
Justin Pitts
@karthikeyan9475 atomic means it happens fast enough that no intermediate state is observable.
Sajith Vijsekara
when try to install azure-eventhubs-spark_2.11 in databricks it gives me this error. have anyone face this type of error
Enrico Minack
@karthikeyan9475 Can't the source system move the file after successful writing?