Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
  • Nov 30 03:23
    ganeshchand commented #557
  • Nov 30 03:23
    ganeshchand commented #557
  • Nov 28 02:26
    jgiardin commented #568
  • Nov 24 23:03
    sjkwak assigned #569
  • Nov 24 22:36
    tilumi edited #569
  • Nov 24 22:34
    tilumi opened #569
  • Nov 23 17:51
    jgiardin commented #363
  • Nov 22 12:45
    mdrakiburrahman closed #567
  • Nov 22 12:45
    mdrakiburrahman commented #567
  • Nov 22 01:20
    arerlend commented #567
  • Nov 22 01:20
    arerlend commented #567
  • Nov 21 15:29
    tilumi edited #568
  • Nov 21 15:28
    tilumi opened #568
  • Nov 19 17:26
    sjkwak assigned #556
  • Nov 19 17:26
    sjkwak assigned #566
  • Nov 19 17:25
    sjkwak assigned #567
  • Nov 19 12:40
    mdrakiburrahman opened #567
  • Nov 17 16:19
    tilumi edited #566
  • Nov 17 16:19
    tilumi opened #566
  • Nov 17 06:22

    sjkwak on master

    Update documentation for the cu… (compare)

@sabeegrewal Have you tried to set setStartingPositions in ehConf and is it working as expected ?? i dont have checkpoint ...
I am trying to stream eventhubs using python:
read_df = (


read_df.writeStream.outputMode("append").format("console").option("truncate", False).start().awaitTermination()
its not printing anything to console, anyone any suggestion? or if you can share an example
thanks. I found the solution
is anyone familiar with ReceiverDisconnectedException? i'm using a single connection policy and chaining multiple dataframes from that stream. would switching to multiple streams/connection policies solve these errors?
I am trying to deploy via python to azure data bricks cluster, File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/pyspark/sql/streaming.py", line 400, in load
return self._df(self._jreader.load())
File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(a, *kw)
File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o24.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.eventhubs.EventHubsSourceProvider. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
I am getting above error,
also how to use pip install for eventhub_spark connector?
Giving offset : "-1" is not working as expected. When i give this it is only taking the latest data. Anyone know the reason?
Hi , wonder if Eventhubs supports non-body object only to stream, looks like Topic needs all Json rows to form a body ... any ideas ? we have target system where we don't want to parse the avro stream during ingestion so by principle it may look like eventhubs can't send columns its stream with body only ...but just want to doublecheck group of people here
Marcus Simonsen
Hello, I'm attempting to get higher throughput on the connector, as with default settings it can only process about 10K msg/minute . Would adjusting the maxRatePerPartition help in this regard? What is the current default setting?
Hello all. We are having problems when sending messages to eventhub using 2.3.15 version. Aparently, writeStream is working, as you can see in the chart, but nothing is being received in eventhub.
We recently upgraded library from 2.3.13 to 2.3.15 version.
Is there a way to see connection log and investigate this issue more in detail?


I have a databricks job that executes some spark code that, on a timer, reads from event hub and saves it into a table.
Since the 1st of June, the job started to give similar errors as the one below:

"In partition 12 of hpims-prod-eh-feed-canonic-positions, with consumer group hpims-prod-canonic-positions-to-spark, request seqNo 132674937 is less than the received seqNo 132896403. The earliest seqNo is 132814862 and the last seqNo is 135489734"

I don't believe there are any other readers to the event hub consumer group used by the job and also, if I clean the checkpoint folder, the job works for some time and then it fails again with the same error.
This could mean that the job is not able to keep up with the amount of data, but i also don't believe that is the issue since, in two hours, it was able to process multiple days of data, meaning it is able to more than handle the load.

I don't have much experience with databricks or sparks, so I could not find much by looking at the logs.

Any ideas of what it might be? how can I solve this?

Job Configuration
Cluster: Driver: Standard_F4s, Workers: Standard_F8s, 1 worker, 5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
Library: com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.13

Event hub has 20 partitions

Think i found the solution to my problem. After upgrading com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.15 i haven't had the issue for some time.
An exception occurred while reading the eventhub. Most of the time, it is good. This happens occasionally. Is this normal? Or is there a solution?
: org.apache.spark.sql.streaming.StreamingQueryException: Futures timed out after [5 minutes]
Hey, could you help me? It looks like maxEventsPerTrigger is not being honored in my SSS dbr job. I have the event hub configured for 10 TUs and there is 1 partition and there are no add'l consumers and the producers are light. I'm trying to reprocess data from the start of stream. I am setting maxEventsPerTrigger to 5000 but when I start the streaming job I can see in the output that numInputRows is always 1000. the cluster is using com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.13 . Here's the code:
val genericPassThruEH0 = spark.readStream.format("eventhubs")
.option("eventhubs.connectionString", eh_connstring_generic)
.option("eventhubs.startingPosition", startingEventPosition)
.selectExpr("CAST(body AS STRING) AS body")
just fyi for anyone hitting the above problem via a google search...the solution is
ie, remove "eventhubs." from the option statement.
or use the ehConf connstring builder...I was at the mercy of existing code using the .option syntax that was copy/pasted everywhere.
Thanks to the guys on the EH team for helping me out!!
hello all. im new and want to use this with databricks
how do i attach this event hub to my subscription?
Parag M
Does azure-eventhubs-spark_2.11 work with Spark 3.0. We upgraded to DataBricks Runtime 7.0, which uses Spark 3.0. I am seeing following error after the upgrade though, when I run a streaming job (which works with DataBricks Runtime 6.6 / Spark 2.1) - java.lang.NoSuchMethodError: org.apache.spark.sql.eventhubs.EventHubsSource$$anon$1.parseVersion(Ljava/lang/String;I)
Hi Everyone. I am new to Azure Event hub . I am trying to read data from azure event hub to Kafka using spark structred streaming. I was able to get the data. Question I have is , how to pass starting offsets . In Kafka we pass Json string of offsets with partiton .. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} "" ..
I saw the documentation that we can specifiy offset in azure eventhub spark too , .setStartingPosition("12345") . dont we need to pass list of all offsets from different partitions?
any suggestions please?
Hi Everyone. I am new to Azure Event hub . I am trying to read data from azure event hub to Kafka using spark structred streaming. I was able to get the data. Question I have is , how to pass starting offsets . In Kafka we pass Json string of offsets with partiton .. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} "" ..
I saw the documentation that we can specifiy offset in azure eventhub spark too , .setStartingPosition("12345") . dont we need to pass list of all offsets from different partitions?
Eric Tsang
Hello Team, our team has raised an issue #534. Appreciate if someone could take a look for it
Manish Singh
Correction: The save location is dbfs:/Filestore/xnewevt.
@tomarv2 : what was the solution for "its not printing anything to console, anyone any suggestion?" ?
Manish Singh

Have azure_eventhubs_spark_2_11_2_3_15.jar installed in Cluster>Libraries
Cluster Configuration is: 7.1 (includes Apache Spark 3.0.0, Scala 2.12)

Getting Error: encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)

TypeError: 'JavaPackage' object is not callable

TypeError Traceback (most recent call last)

<command-4482231998934855> in <module>
18 connectionString=f'Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY};EntityPath={EVENT_HUB_NAME}'
---> 19 encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)
21 print(connectionString)

TypeError: 'JavaPackage' object is not callable

Any help?

Georgi Slavov
Hello, does the library support OAuth authentication? As far as I see from examples it only uses SAS Key authentication/authorization.
Arvind Ravish
We are still getting below Futures Timed Out exception with latest version of the event hub library, 2.3.13+ with Spark structured streaming, any thoughts?
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecutionrunStream(StreamExecution.scala:312)atorg.apache.spark.sql.execution.streaming.StreamExecutionrunStream(StreamExecution.scala:312) at org.apache.spark.sql.execution.streaming.StreamExecutionanon$1.run(StreamExecution.scala:208)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
Issue is open here: Azure/azure-event-hubs-spark#536

Hi, I have an application where I trigger multiple spark streaming queries each reading from its own eventhub from a single namespace. I am ensuring that each query has its own sink and checkpoint location. I am terminating them through spark.streams.awaitAnyTermination().
This is how the structure looks:

val dataStreams: Array[DataFrame] = eventhubs.foreach( eh =>
val queries: Array[StreamingQuery] = dataStreams.foreach{ 
 df = df.map(....). writeStream(..).path("../eh").checkpointLocation("../eh").start()

However, I am getting ReceieverDisconnected exception for the above scenario. Is the above scenario supported by the eventhub library?

Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
structured streaming eventhubs failing with Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
@aravish Hi ,structured streaming eventhub. Are you able to read ir from spark
@tomarv2 could you pls let meknow the approach you have tried to connect to azure event hubs
@brgao pls share the teplate of how you are connecting to azure event hub
@marcusbb any luck with spark eventhub reading
Hi All,
Need Help
I am trying to run a spark application on HDInsight.
I am getting an error as it is not able to find spark-eventhub jar files which I have added in storage account.
While it runs perfectly fie in local mode.
I am using Azure toolkit for IntelliJ to deploy spark application on HDInsight. And passing jar location in run configuration as : wasb://clustername@blobaccountname.blob.core.windows.net/foldername/jarfile.jar
Can anyone tell me what am I missing ?
Scala: 2.11.8
sbt : 1.4.x
Spark Cluster : Spark 2.3 (HDI 3.6)
azure-eventhubs-spark : "2.3.17"
Hello, hope everyone is great! I saw there is a way to write data to a specific partition by providing the "partitionId" or "partitionKey" column, but I am not able to do so. The event hub will still randomly assign partition. I raised the issue here (Azure/azure-event-hubs-spark#547). Could anyone be so kind to help with the issue? Thank you in advance!
Alex Ott
Hi all
When reading from the EventHubs, does it creates one-to-one mapping of EH partition into Spark partition - similar to the Kafka?
Hi all, when consuming events from Event Hubs using Structured Streaming is checkpointing enough to avoid data loss (at-least-once) ? Does the connector raise any warnings for possible data loss? Thank you
Hi all, when we combine using trigger.once() API and set setMaxEventsPerTrigger() to a bigger value, we can process more data, but is there a maximum value of this setMaxEventsPerTrigger() that can be set? Thank you.