Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
  • Jun 17 18:49
    sjkwak synchronize #572
  • Jun 17 18:44

    sjkwak on master

    Update documentation for the cu… (compare)

  • Jun 17 18:44
    sjkwak closed #603
  • Jun 17 18:43

    sjkwak on v2.3.19


  • Jun 17 18:37
    sjkwak review_requested #603
  • Jun 17 18:37
    sjkwak opened #603
  • Jun 16 18:37

    sjkwak on master

    update the version and java sdk… (compare)

  • Jun 16 18:37
    sjkwak closed #602
  • Jun 16 17:32
    nyaghma review_requested #602
  • Jun 16 17:32
    nyaghma opened #602
  • Jun 16 17:29

    nyaghma on updateVersion

    update the version and java sdk (compare)

  • Jun 11 17:06
    mainaksarcar commented #571
  • Jun 11 16:04
    mainaksarcar commented #571
  • Jun 08 13:20
    tahaum commented #594
  • Jun 07 20:28
    spzSource edited #597
  • Jun 03 22:09

    sjkwak on master

    Fix removing partitions in reco… (compare)

  • Jun 03 22:09
    sjkwak closed #584
  • Jun 03 22:04
    sjkwak synchronize #584
  • Jun 03 22:04

    sjkwak on master

    put cache vars in the class ins… (compare)

  • Jun 03 22:04
    sjkwak closed #577


I have a databricks job that executes some spark code that, on a timer, reads from event hub and saves it into a table.
Since the 1st of June, the job started to give similar errors as the one below:

"In partition 12 of hpims-prod-eh-feed-canonic-positions, with consumer group hpims-prod-canonic-positions-to-spark, request seqNo 132674937 is less than the received seqNo 132896403. The earliest seqNo is 132814862 and the last seqNo is 135489734"

I don't believe there are any other readers to the event hub consumer group used by the job and also, if I clean the checkpoint folder, the job works for some time and then it fails again with the same error.
This could mean that the job is not able to keep up with the amount of data, but i also don't believe that is the issue since, in two hours, it was able to process multiple days of data, meaning it is able to more than handle the load.

I don't have much experience with databricks or sparks, so I could not find much by looking at the logs.

Any ideas of what it might be? how can I solve this?

Job Configuration
Cluster: Driver: Standard_F4s, Workers: Standard_F8s, 1 worker, 5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
Library: com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.13

Event hub has 20 partitions

Think i found the solution to my problem. After upgrading com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.15 i haven't had the issue for some time.
An exception occurred while reading the eventhub. Most of the time, it is good. This happens occasionally. Is this normal? Or is there a solution?
: org.apache.spark.sql.streaming.StreamingQueryException: Futures timed out after [5 minutes]
Hey, could you help me? It looks like maxEventsPerTrigger is not being honored in my SSS dbr job. I have the event hub configured for 10 TUs and there is 1 partition and there are no add'l consumers and the producers are light. I'm trying to reprocess data from the start of stream. I am setting maxEventsPerTrigger to 5000 but when I start the streaming job I can see in the output that numInputRows is always 1000. the cluster is using com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.13 . Here's the code:
val genericPassThruEH0 = spark.readStream.format("eventhubs")
.option("eventhubs.connectionString", eh_connstring_generic)
.option("eventhubs.startingPosition", startingEventPosition)
.selectExpr("CAST(body AS STRING) AS body")
just fyi for anyone hitting the above problem via a google search...the solution is
ie, remove "eventhubs." from the option statement.
or use the ehConf connstring builder...I was at the mercy of existing code using the .option syntax that was copy/pasted everywhere.
Thanks to the guys on the EH team for helping me out!!
hello all. im new and want to use this with databricks
how do i attach this event hub to my subscription?
Parag M
Does azure-eventhubs-spark_2.11 work with Spark 3.0. We upgraded to DataBricks Runtime 7.0, which uses Spark 3.0. I am seeing following error after the upgrade though, when I run a streaming job (which works with DataBricks Runtime 6.6 / Spark 2.1) - java.lang.NoSuchMethodError: org.apache.spark.sql.eventhubs.EventHubsSource$$anon$1.parseVersion(Ljava/lang/String;I)
Hi Everyone. I am new to Azure Event hub . I am trying to read data from azure event hub to Kafka using spark structred streaming. I was able to get the data. Question I have is , how to pass starting offsets . In Kafka we pass Json string of offsets with partiton .. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} "" ..
I saw the documentation that we can specifiy offset in azure eventhub spark too , .setStartingPosition("12345") . dont we need to pass list of all offsets from different partitions?
any suggestions please?
Hi Everyone. I am new to Azure Event hub . I am trying to read data from azure event hub to Kafka using spark structred streaming. I was able to get the data. Question I have is , how to pass starting offsets . In Kafka we pass Json string of offsets with partiton .. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} "" ..
I saw the documentation that we can specifiy offset in azure eventhub spark too , .setStartingPosition("12345") . dont we need to pass list of all offsets from different partitions?
Eric Tsang
Hello Team, our team has raised an issue #534. Appreciate if someone could take a look for it
Manish Singh
Correction: The save location is dbfs:/Filestore/xnewevt.
@tomarv2 : what was the solution for "its not printing anything to console, anyone any suggestion?" ?
Manish Singh

Have azure_eventhubs_spark_2_11_2_3_15.jar installed in Cluster>Libraries
Cluster Configuration is: 7.1 (includes Apache Spark 3.0.0, Scala 2.12)

Getting Error: encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)

TypeError: 'JavaPackage' object is not callable

TypeError Traceback (most recent call last)

<command-4482231998934855> in <module>
18 connectionString=f'Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY};EntityPath={EVENT_HUB_NAME}'
---> 19 encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)
21 print(connectionString)

TypeError: 'JavaPackage' object is not callable

Any help?

Georgi Slavov
Hello, does the library support OAuth authentication? As far as I see from examples it only uses SAS Key authentication/authorization.
Arvind Ravish
We are still getting below Futures Timed Out exception with latest version of the event hub library, 2.3.13+ with Spark structured streaming, any thoughts?
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecutionrunStream(StreamExecution.scala:312)atorg.apache.spark.sql.execution.streaming.StreamExecutionrunStream(StreamExecution.scala:312) at org.apache.spark.sql.execution.streaming.StreamExecutionanon$1.run(StreamExecution.scala:208)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
Issue is open here: Azure/azure-event-hubs-spark#536

Hi, I have an application where I trigger multiple spark streaming queries each reading from its own eventhub from a single namespace. I am ensuring that each query has its own sink and checkpoint location. I am terminating them through spark.streams.awaitAnyTermination().
This is how the structure looks:

val dataStreams: Array[DataFrame] = eventhubs.foreach( eh =>
val queries: Array[StreamingQuery] = dataStreams.foreach{ 
 df = df.map(....). writeStream(..).path("../eh").checkpointLocation("../eh").start()

However, I am getting ReceieverDisconnected exception for the above scenario. Is the above scenario supported by the eventhub library?

Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
structured streaming eventhubs failing with Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
@aravish Hi ,structured streaming eventhub. Are you able to read ir from spark
@tomarv2 could you pls let meknow the approach you have tried to connect to azure event hubs
@brgao pls share the teplate of how you are connecting to azure event hub
@marcusbb any luck with spark eventhub reading
Hi All,
Need Help
I am trying to run a spark application on HDInsight.
I am getting an error as it is not able to find spark-eventhub jar files which I have added in storage account.
While it runs perfectly fie in local mode.
I am using Azure toolkit for IntelliJ to deploy spark application on HDInsight. And passing jar location in run configuration as : wasb://clustername@blobaccountname.blob.core.windows.net/foldername/jarfile.jar
Can anyone tell me what am I missing ?
Scala: 2.11.8
sbt : 1.4.x
Spark Cluster : Spark 2.3 (HDI 3.6)
azure-eventhubs-spark : "2.3.17"
Hello, hope everyone is great! I saw there is a way to write data to a specific partition by providing the "partitionId" or "partitionKey" column, but I am not able to do so. The event hub will still randomly assign partition. I raised the issue here (Azure/azure-event-hubs-spark#547). Could anyone be so kind to help with the issue? Thank you in advance!
Alex Ott
Hi all
When reading from the EventHubs, does it creates one-to-one mapping of EH partition into Spark partition - similar to the Kafka?
Hi all, when consuming events from Event Hubs using Structured Streaming is checkpointing enough to avoid data loss (at-least-once) ? Does the connector raise any warnings for possible data loss? Thank you
Hi all, when we combine using trigger.once() API and set setMaxEventsPerTrigger() to a bigger value, we can process more data, but is there a maximum value of this setMaxEventsPerTrigger() that can be set? Thank you.
Ivan Trusov
Hi everyone. We're using Azure EventHubs as a source for incoming data.
Via the default EH SDK one can easily get the Last Enqueued Sequence Number, but the EH connector for Spark is not providing this information as a column. It's mainly needed to calculate the lag between the last message in the queue and the current message being processed. Is it possible to calculate the lag between the last and current sequence number using the connector?
Nirabhra Tapaswi
Hi everyone. I am trying to integrate eventhub streaming with spark on aks(Azure Kubernetes Service). We are using managed identity there for all purposes, and I could not find a compatibility of azure-event-hubs-spark library with the DefaultCredentials like other azure sdks for java, scala, .Net, python etc have. Am I missing something? Or is there a way to use Managed Identity to access eventhub in this library? Thanks for any responses in advance.
Ganesh Chand
I am curious to know - what aspect of Spark structured query determines the # of event hub incoming requests we see in the azure portal. I have 2 workers each w/ 4 cores so I'll have a max of 8 tasks running in the clusters. But, eventhub portal shows number of incoming requests of several magnitude so I am not able to corelate.
Hi, first timer for spark and com.microsoft.azure.eventhubs. I'm trying to read static csv data files and sent each row as message to EventHub. Basically, using spark.read.format("csv") to create the dataframe and the use dataframe.foreach(sendEvent) to send the rows. All working fine if I use "collect" or "take" to force operation in a single node. But when I remove the collect/take, it will error out with NotSerializableException on the com.microsoft.azure.eventhubs.ConnectionStringBuilder object. Any idea how to fix this serialization issue?
Quick question, the Readme says, Azure event hub spark 2.3.18 is compatible with Spark 3.0. Looking at the master branch of Azure event hub spark, it is using (sparks internal logging) https://github.com/Azure/azure-event-hubs-spark/blob/master/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala#L37 , I checked the java doc for spark 3.0.1. I don't even see this class http://spark.apache.org/docs/3.0.0/api/java/index.html. Is there anything I am missing. I was asking because I am seeing this error in databricks with scala 2.12 and spark 3.0.1 Databricks rntim 7.6 "Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.<init>(EventHubsSourceProvider.scala:55)"
sorry was looking inside java folder
Yunjie Ke
Hi all, I heard spark 3.1.1 and dbr 8.0 just released last week. Any plan to support them?
Hi, I am new to Scala and SBT, I need to use this library, but I am not sure how ? Can anyone help me ? I thought I can add it as an artifact (using Intellij), but it fails for some reason

package com.example.eventhubconsu;

import java.io.IOException;
import java.util.function.Consumer;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;

public class Abc {

private static final String connectionString = "Endpoint=sb://kafkaconsumer.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0cwyAELee1DF0AaoKCDki5u28TXWEadTScSMcrULWgw=";
private static final String eventHubName = "kafkaconsumer";
private static final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=cmsdstorage;AccountKey=gFVxyNgNJ+KbKG0tEBGdoiAwhxZm86XKbIY+VWRpl/h7EV99HXDb/X8vTHX1weJbV9QJlnAmvDcEotuYm7JeiQ==;EndpointSuffix=core.windows.net";
private static final String storageContainerName = "eventcontainer";

public static void main(String... args) throws IOException {

public static void consumeEvents() throws IOException {

    // Create a blob container client that you use later to build an event processor
    // client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()

    // Create a builder object that you will use later to build an event processor
    // client to receive and process events and errors.
    EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .processError(ERROR_HANDLER).checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

    // Use the builder object to create an event processor client
    EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

    System.out.println("Starting event processor");

    System.out.println("Press enter to stop.");


    System.out.println("Stopping event processor");
    System.out.println("Event processor stopped.");

    System.out.println("Exiting process");


public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
    PartitionContext partitionContext = eventContext.getPartitionContext();
    EventData eventData = eventContext.getEventData();

    System.out.printf("Processing event with consumer group %s from partition %s with sequence number %d with body: %s%n",
            partitionContext.getConsumerGroup(),partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());

    // Every 10 events received, it will update the checkpoint stored in Azure Blob
    // Storage.
    if (eventData.getSequenceNumber() % 10 == 0) {

public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
    System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable());
public static final Consumer<InitializationContext> initializePartition = initiaizepartition -> {

    "Can i set partion id to read event from "



is thr any way to set partion id to read events from specific partion id using eventprocessorclients ,please
Hello everyone,
I am new to the group. Is the right place to ask questions about azure event hub spark?
We are seeing the issue mentioned here Azure/azure-event-hubs-spark#572. Are there any workarounds? Or, a timeline for when this might get merged into the official library?
The issue mentioned in Azure/azure-event-hubs-spark#582 - is this closed based on the comments ?
Ganesh Chand
when using Trigger.Once, we are seeing max number of rows processed to be always under 5 million. Is this a limit enforced by the connector?