Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Anirudh Vyas
i need this to present cluster/queue status to users basically
Eloualid Belhaj

The flow was Server log adapter -> kafka -> flume -> hdfs for batch and long term. Kafka -> Spark Streaming -> Elasticsearch -> Grafana (for a nice dashboard)

Do u have a tutorial for building such pipeline please ? I wanna make the same workflow for my company

Eric K Richardson
Most of this is configuration and setting up tools. We used Gatling or Apache JMeter as a simulation to hit the servers so they would emit logs. None of it is hard, you just have to do it piece by piece. This was done for a commercial company. I no longer work there either so there is not much that can be shared. We used Horton but I am not sure there is even an Open Source distro today. Kafka and Elastic we had to setup ourselves.
Hi, why there are "Breakpoint does not belong to any class" when I try to debug?
Larissa Ramos

Hello! How to define a specific spark node to a partition key?
I need to run a similarity join algorithm of datasets in a heterogeneous cluster.
In The first step of the program, the records of the dataset are assigned signatures, and records with the same signature are sent to the same processing node.
Are set pairs sharing no signature cannot be similar. Thus, an intuitive approach is to use signatures as partition keys determining the workers to which a record is sent. Thus, only pairs with a signature in common are sent to the same worker avoiding unnecessary data transmission and similarity evaluations.

But, I need to ensure that signatures that have many records are sent to a worker with better processing power, for example, a GPU. So, how to define a specific spark node to a partition key?

hey guys, am I able to saveAsTable(give an s3 path) ? or would I have to use .save(path) instead?
Hi, im a beginner, can anyone suggest a example project that involve kafka, spark, hadoop and preferably elk too?
Has anyone here worked with Delta tables? I was curious about what does parameters in offset location means DeltaTableSource. I have a job that reads from delta tables. I wanted to somehow monitor how much backlog is yet to be processed by my job.
According to this link delta-io/delta#526, reservoidId is what I should look at. But I'm not able to figure out how to get the latest reservoirId from delta tables source location.
D Cameron Mauch

Upgrading our code from Spark 2.4.7 to 3.0.1, and getting this error: object typed in package scalalang is deprecated (since 3.0.0): please use untyped builtin aggregate functions in this code:

    val meanSores: Dataset[(Int, Double)] = classified
      .filter(e => e._2 >= 0.0 && e._2 <= 1.0)
      .groupByKey { case (typeId, _) => typeId }

But I can’t figure out what it’s supposed to look like. All the examples I could find online use that typed.someAggregateFunction pattern...
The type of classified in this example is Dataset[(Int, Double)]

Eric K Richardson
Anyway, in that section there are quite a few ideas. I found many functions were added so we could remove custom ones.
hi all
i create a smaller cluster with docker with one node master e 2 worker wint installed yarn hadoop 3.2 and spark 3.0. inside the docker all work fine. From spark shell i can work and make query to HIVE.
i try to run my application from intellij using the property -spark.master="docker:7070" the application run but no found properly HDFS and HIVE
how can configure correctly the run application ?
i don't understand why if i run from intellij spark application no read the server configuration
spark is configured with YARN in docker
Robby Kiskanyan
Anyone know how I can add json schema validation and autocomplete for spark operator crd in either coc-yaml or vscode-yaml? I'm having a hard time finding any sort of schema.json anywhere to use against?
Hey good evening! for the purpose of a project I need to find where (in the code) the master 1) splits work to other nodes and 2) where inthe code daata partioning is performed!
Cory Maklin
Hey everyone, unlike the Avro library, Spark doesn't use single object encoding when serializing/de-serializing records. Does anyone know how I can deserialize a record with Spark, and then convert it to one of the classes that we generated using the Avro plugin?
Alexandre M. Cechin
Hello, I was porting an application to spark a couple months ago and kinda let it go for the moment, but since i am on paternity leave decided to take a look at it again while the baby is asleep. I would like to know if is there a way for workers to query a database do retrieve needed information to use inside an UDF. The point I left the code, I had to load up the entire table contents and do joins inside spark trying to replicate the old logic that was done using for-loops with lookups, because trying to access the lookups in the workers was giving me null pointers as the spark context wasn't available to workers.
Is there a way to query data while the computations are being made in the workers, or really I have to load all the data I need in memory and do joins with it? Its a lot of big tables that need to be looked up to query data thats used inside the udf's, and to me it seems kinda of a waste to load it all in memory.
Surendiran Balasubramanian

Need clarification on role of Catalyst optimizer and Project Tungsten

Catalyst optimizer will produce optimized Physical plan from logical plan.

The optimized physical plan will then taken by Code generator to emit Rdd's.

Here the Code generator is part of Project Tungsten or Catalyst Optimizer?

Here the Code generator is called as Whole Stage Code generator?

When I look at lot of article Code generator will emit Rdd.

Who will then generate bytecode from the RDD.

I am totally confused with Code generator and Project Tungsten. Please help me

Hello, can anyone please explain which artifact is responsible for sending partitions of an rdd to the workers for processing? For example when I use sc.parallelize(Array(1,2,3,4,5,6) , 2) the array splits in 2 partitions and the what? How are they sent to individual workers?
Gustavo Sena Mafra
Does anyone have any hints on how I could count the number of values in column B that are lower than the row's A value? Maybe some trick with window functions? Specifics about this case: B is always >= A, and the values are A/B values are discrete
A    B    expected
1    1    0
1    2    0
2    2    1
2    3    1
3    3    3
4    4    5
Hi! seems like I'm getting deadlocks on spark 2.4.4. No clue why. Main thread is blocked on org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:200) which is waiting on bloomfilter-0 thread
okay, seems like this was related to spark.sql.adaptive.enabled being enabled
This may be a bit offtopic, but I'm trying to run a python library on top of scala/spark in jupyter and eventually I started getting "Java heap is out of memory" errors when rerunning operations I used to be able to do. I could fix these by allocating more memory with a new spark app, but is there any way to free the memory being taken up by parallel operations in a jupyter notebook without having to make a new notebook?
Jaroslaw Nowosad
hi, Not sure if is right channel - but I tried to find anything: how to speed up execution of spark SQL for tables with 10K+ wide columns - have time-out issue on production, created stackoverflow question : https://stackoverflow.com/questions/67299179/spark-sql-tables-with-10k-wide-columns-how-to-speed-up-execution , if anyone have some suggestion - please share...
Mark Hamilton
Does anyone know how Spark documentation pipeline works ?
in particular im looking to replicate this doc build in my own repo
Franco Cano Erazo
I am using spark-testing-base, and my dataframes have at least 25 to 30 columns and it is very hard to do this, would you handle these dataframes as json or csv or what alternatives would you recommend to be easy to do?
Eric K Richardson
You can use datasets with a case class.

TL;DR: How to write ds.withWatermark("timestamp", "10 minutes").groupBy(window($"timestamp", "10 minutes", "5 minutes"), $"word").count() in a type-safe way ?

Hi all. I'm looking for Spark Structured Streaming, and I wonder if there is a way to use the window operations while sticking to the type-safe API.

Sven Ludwig

Hi, since I did not yet get a reply anywhere else, please allow me the following question in this room.

Is it possible to run a Spark program (with Spark Context and Spark Session) from within an Akka Streams recipe for each element that passes a certain stage, let's say a mapAsync stage with parallelism set to 1?

Simplified description of my intended use case: The Spark Program must be executed casually-regularly for each "Workspace" in my system. So I would use Akka Streams to repeatedly and in throttled fashion stream all Workspace IDs that exist in my database, and for each Workspace ID flowing down that stream, I would in that mapAsync stage start the Spark Program, process its outcome wrapped by a Future when it terminates (successfully or failed), and then start it for the next Workspace ID that comes flowing through the stage.
Sven Ludwig
I'd use Akka Streams here, because it is used throughout our solution in many services, and we have very good experience with it, so I would even use it for this super-simple "running a Spark Program" use case.
Dan Sokolsky

if anyone can try the following in a Scala worksheet and tell me what they get, that would be incredibly helpful --

import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.internal.SQLConf

val sqlParser = new CatalystSqlParser(SQLConf.get)

val query = "select col1 from table1;"

//import sqlParser.astBuilder
val parsed = sqlParser.parseExpression(query)

println(s"parsed: ${parsed.prettyJson}")

the error I get looks absurd --

mismatched input 'from' expecting {<EOF>, '-'}(line 1, pos 12)
== SQL ==
select col1 from table1;
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseExpression(ParseDriver.scala:49)
... 36 elided
Hi All, Collect in spark is taking huge time. I want to get list of values of one column to Scala collection. How can I do this?
 val newDynamicFieldTablesDF = cachedPhoenixAppMetaDataForCreateTableDF
          logger.info(s"####### except with client-schema done " + LocalDateTime.now())
          // newDynamicFieldTablesDF.cache()

            val detailsForCreateTableDF = cachedPhoenixAppMetaDataForCreateTableDF
              .join(broadcast(newDynamicFieldTablesDF), Seq("reporting_table"), "inner")
            logger.info(s"####### join with newDF done " + LocalDateTime.now())

//            detailsForCreateTableDF.cache()

            val newDynamicFieldTablesList = newDynamicFieldTablesDF.map(r => r.getString(0)).collect().toSet
Eric K Richardson
What are you doing with the data that you want it in a Scala collection?
@ekrich thanks much for your response.
newDynamicFieldTablesList.foreach(table => {

// running here Create table DDL/SQL query
@ekrich What would be best practice or idea approach?
Eric K Richardson
Is the Schema dynamic or can you just create an object with the column names?
Also, if you put your data into a case class as a dataset then it will automatically use the case class field names.
@ekrich It is dynamic
newDynamicFieldTablesList.foreach(table => {

              // create string interpolation for create table goes here
              val tableInformationDF = detailsForCreateTableDF.filter(col("reporting_table") === table)

              tableInformationDF.rdd.collect().foreach(row => {

                val reportingTable = row.getAs("reporting_table").asInstanceOf[String]
                val dynamicFieldEntityId = row.getAs("dynamic_field_entity_id").asInstanceOf[String]
                val isMultiSet = row.getAs("is_multi_set").asInstanceOf[Boolean]
                val individualIdForMultiSet = row.getAs("individual_id_for_multiset").asInstanceOf[String]
                val dynamicFieldColumns = row.getAs("dynamic_field_columns").asInstanceOf[String]
                val textImageOn = row.getAs("text_image_on").asInstanceOf[String]

                val dynamicFieldColumnsForCreate = if (isMultiSet) s"$individualIdForMultiSet $multiSetColumns" else ""

                val entity = row.getAs("dynamic_field_entity").asInstanceOf[String]
                val entityTableName = if (entity.equalsIgnoreCase("financial_allocation"))
                  "ENTITY_COST_CENTER_ALLOCATION" else entity
                val entityId = row.getAs("dynamic_field_entity_id").asInstanceOf[String]

                if (!table.toLowerCase.contains("custodian")) {
                  val createTableQuery =
                       |CREATE TABLE [dbo].[$reportingTable](
                       |                        [${reportingTable}_id] [int] NOT NULL,
                       |                      [${dynamicFieldEntityId.toLowerCase}] [int] NOT NULL,
                       |                      $dynamicFieldColumnsForCreate
                       |                      $dynamicFieldColumns
                       |                      CONSTRAINT [${reportingTable}_1] PRIMARY KEY CLUSTERED
                       |                      ([${reportingTable}_id] ASC) WITH (PAD_INDEX  = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY],
                       |                      CONSTRAINT [FK_$reportingTable] FOREIGN KEY ([$entityId])
                       |                      REFERENCES $entityTableName ($entityId)
                       |                      ON DELETE CASCADE
                       |                      ) ON
                       |                      [PRIMARY]
                       |                     $textImageOn

                  val dynamicFieldColForIndex = if (isMultiSet) " , [dynamic_field_data_set_id] ASC " else ""

                  val createIndexQuery =
                       |CREATE NONCLUSTERED INDEX [${reportingTable}_2] ON [dbo].[$reportingTable]
                       |       ( [$dynamicFieldEntityId] ASC $dynamicFieldColForIndex
                       |       )WITH (PAD_INDEX  = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, IGNORE_DUP_KEY = OFF,
                       |       ONLINE = OFF) ON [PRIMARY]
But How case class will help
My Spark-UI says, 25 min is taken by collect. Which is huge and I don't have much data . Only 300 records
Eric K Richardson
You shouldn't need collect, the foreach on the dataframe forces a computation.
Where are you saving the data?
Eric K Richardson
Any way you can restructure your data so you don't need dynamic stuff?
Got to go now.
Ok, thanks
Saswata Dutta
any indepth guides to understand the query planning optimizations in spark catalyst or tungsten ? if i want to try to contribute in some plan generation optimisations