Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
  • Aug 01 14:02
    udossa edited #647
  • Aug 01 14:02
    udossa opened #647
  • Jul 27 03:37
    nikhilbaby commented #625
  • Jul 25 10:33
    ms-nwiersch opened #646
  • Jul 22 16:58
    yamin-msft synchronize #645
  • Jul 12 16:34
    yamin-msft edited #645
  • Jul 12 16:27
    yamin-msft synchronize #645
  • Jul 11 18:07
    yamin-msft opened #645
  • Jul 01 14:57
    hmlam commented #644
  • Jun 23 10:09
    Kiran-G1 opened #644
  • Jun 23 05:30
    Scarlettliuyc opened #643
  • Jun 21 19:46
    Silverutm commented #642
  • Jun 19 00:10
    wallystart commented #642
  • Jun 16 20:08
    Silverutm opened #642
  • Jun 16 06:53
    fanchuo opened #641
  • Jun 14 03:12
    canberra2002 opened #640
  • May 28 17:09
    scaladevspark commented #615
  • May 28 17:09
    scaladevspark commented #615
  • May 21 03:14
    isinghrana commented #553
  • May 17 01:13

    hmlam on master

    updateDocfor2.3.22 Merge pull request #639 from ya… (compare)

Manish Singh
Correction: The save location is dbfs:/Filestore/xnewevt.
@tomarv2 : what was the solution for "its not printing anything to console, anyone any suggestion?" ?
Manish Singh

Have azure_eventhubs_spark_2_11_2_3_15.jar installed in Cluster>Libraries
Cluster Configuration is: 7.1 (includes Apache Spark 3.0.0, Scala 2.12)

Getting Error: encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)

TypeError: 'JavaPackage' object is not callable

TypeError Traceback (most recent call last)

<command-4482231998934855> in <module>
18 connectionString=f'Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY};EntityPath={EVENT_HUB_NAME}'
---> 19 encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)
21 print(connectionString)

TypeError: 'JavaPackage' object is not callable

Any help?

Georgi Slavov
Hello, does the library support OAuth authentication? As far as I see from examples it only uses SAS Key authentication/authorization.
Arvind Ravish
We are still getting below Futures Timed Out exception with latest version of the event hub library, 2.3.13+ with Spark structured streaming, any thoughts?
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecutionrunStream(StreamExecution.scala:312)atorg.apache.spark.sql.execution.streaming.StreamExecutionrunStream(StreamExecution.scala:312) at org.apache.spark.sql.execution.streaming.StreamExecutionanon$1.run(StreamExecution.scala:208)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
Issue is open here: Azure/azure-event-hubs-spark#536

Hi, I have an application where I trigger multiple spark streaming queries each reading from its own eventhub from a single namespace. I am ensuring that each query has its own sink and checkpoint location. I am terminating them through spark.streams.awaitAnyTermination().
This is how the structure looks:

val dataStreams: Array[DataFrame] = eventhubs.foreach( eh =>
val queries: Array[StreamingQuery] = dataStreams.foreach{ 
 df = df.map(....). writeStream(..).path("../eh").checkpointLocation("../eh").start()

However, I am getting ReceieverDisconnected exception for the above scenario. Is the above scenario supported by the eventhub library?

Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
structured streaming eventhubs failing with Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
@aravish Hi ,structured streaming eventhub. Are you able to read ir from spark
@tomarv2 could you pls let meknow the approach you have tried to connect to azure event hubs
@brgao pls share the teplate of how you are connecting to azure event hub
@marcusbb any luck with spark eventhub reading
Hi All,
Need Help
I am trying to run a spark application on HDInsight.
I am getting an error as it is not able to find spark-eventhub jar files which I have added in storage account.
While it runs perfectly fie in local mode.
I am using Azure toolkit for IntelliJ to deploy spark application on HDInsight. And passing jar location in run configuration as : wasb://clustername@blobaccountname.blob.core.windows.net/foldername/jarfile.jar
Can anyone tell me what am I missing ?
Scala: 2.11.8
sbt : 1.4.x
Spark Cluster : Spark 2.3 (HDI 3.6)
azure-eventhubs-spark : "2.3.17"
Hello, hope everyone is great! I saw there is a way to write data to a specific partition by providing the "partitionId" or "partitionKey" column, but I am not able to do so. The event hub will still randomly assign partition. I raised the issue here (Azure/azure-event-hubs-spark#547). Could anyone be so kind to help with the issue? Thank you in advance!
Alex Ott
Hi all
When reading from the EventHubs, does it creates one-to-one mapping of EH partition into Spark partition - similar to the Kafka?
Hi all, when consuming events from Event Hubs using Structured Streaming is checkpointing enough to avoid data loss (at-least-once) ? Does the connector raise any warnings for possible data loss? Thank you
Hi all, when we combine using trigger.once() API and set setMaxEventsPerTrigger() to a bigger value, we can process more data, but is there a maximum value of this setMaxEventsPerTrigger() that can be set? Thank you.
Ivan Trusov
Hi everyone. We're using Azure EventHubs as a source for incoming data.
Via the default EH SDK one can easily get the Last Enqueued Sequence Number, but the EH connector for Spark is not providing this information as a column. It's mainly needed to calculate the lag between the last message in the queue and the current message being processed. Is it possible to calculate the lag between the last and current sequence number using the connector?
Nirabhra Tapaswi
Hi everyone. I am trying to integrate eventhub streaming with spark on aks(Azure Kubernetes Service). We are using managed identity there for all purposes, and I could not find a compatibility of azure-event-hubs-spark library with the DefaultCredentials like other azure sdks for java, scala, .Net, python etc have. Am I missing something? Or is there a way to use Managed Identity to access eventhub in this library? Thanks for any responses in advance.
Ganesh Chand
I am curious to know - what aspect of Spark structured query determines the # of event hub incoming requests we see in the azure portal. I have 2 workers each w/ 4 cores so I'll have a max of 8 tasks running in the clusters. But, eventhub portal shows number of incoming requests of several magnitude so I am not able to corelate.
Hi, first timer for spark and com.microsoft.azure.eventhubs. I'm trying to read static csv data files and sent each row as message to EventHub. Basically, using spark.read.format("csv") to create the dataframe and the use dataframe.foreach(sendEvent) to send the rows. All working fine if I use "collect" or "take" to force operation in a single node. But when I remove the collect/take, it will error out with NotSerializableException on the com.microsoft.azure.eventhubs.ConnectionStringBuilder object. Any idea how to fix this serialization issue?
Quick question, the Readme says, Azure event hub spark 2.3.18 is compatible with Spark 3.0. Looking at the master branch of Azure event hub spark, it is using (sparks internal logging) https://github.com/Azure/azure-event-hubs-spark/blob/master/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala#L37 , I checked the java doc for spark 3.0.1. I don't even see this class http://spark.apache.org/docs/3.0.0/api/java/index.html. Is there anything I am missing. I was asking because I am seeing this error in databricks with scala 2.12 and spark 3.0.1 Databricks rntim 7.6 "Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.<init>(EventHubsSourceProvider.scala:55)"
sorry was looking inside java folder
Yunjie Ke
Hi all, I heard spark 3.1.1 and dbr 8.0 just released last week. Any plan to support them?
Hi, I am new to Scala and SBT, I need to use this library, but I am not sure how ? Can anyone help me ? I thought I can add it as an artifact (using Intellij), but it fails for some reason

package com.example.eventhubconsu;

import java.io.IOException;
import java.util.function.Consumer;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;

public class Abc {

private static final String connectionString = "Endpoint=sb://kafkaconsumer.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0cwyAELee1DF0AaoKCDki5u28TXWEadTScSMcrULWgw=";
private static final String eventHubName = "kafkaconsumer";
private static final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=cmsdstorage;AccountKey=gFVxyNgNJ+KbKG0tEBGdoiAwhxZm86XKbIY+VWRpl/h7EV99HXDb/X8vTHX1weJbV9QJlnAmvDcEotuYm7JeiQ==;EndpointSuffix=core.windows.net";
private static final String storageContainerName = "eventcontainer";

public static void main(String... args) throws IOException {

public static void consumeEvents() throws IOException {

    // Create a blob container client that you use later to build an event processor
    // client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()

    // Create a builder object that you will use later to build an event processor
    // client to receive and process events and errors.
    EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .processError(ERROR_HANDLER).checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

    // Use the builder object to create an event processor client
    EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

    System.out.println("Starting event processor");

    System.out.println("Press enter to stop.");


    System.out.println("Stopping event processor");
    System.out.println("Event processor stopped.");

    System.out.println("Exiting process");


public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
    PartitionContext partitionContext = eventContext.getPartitionContext();
    EventData eventData = eventContext.getEventData();

    System.out.printf("Processing event with consumer group %s from partition %s with sequence number %d with body: %s%n",
            partitionContext.getConsumerGroup(),partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());

    // Every 10 events received, it will update the checkpoint stored in Azure Blob
    // Storage.
    if (eventData.getSequenceNumber() % 10 == 0) {

public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
    System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable());
public static final Consumer<InitializationContext> initializePartition = initiaizepartition -> {

    "Can i set partion id to read event from "



is thr any way to set partion id to read events from specific partion id using eventprocessorclients ,please
Hello everyone,
I am new to the group. Is the right place to ask questions about azure event hub spark?
We are seeing the issue mentioned here Azure/azure-event-hubs-spark#572. Are there any workarounds? Or, a timeline for when this might get merged into the official library?
The issue mentioned in Azure/azure-event-hubs-spark#582 - is this closed based on the comments ?
Ganesh Chand
when using Trigger.Once, we are seeing max number of rows processed to be always under 5 million. Is this a limit enforced by the connector?
I have been trying for a while, but seem to get empty display queries, any idea what might be the reason? I do get the schema and everything, is there any particular access needed except the connection string itself?
Sagar Motani
I have been looking for TransportType settings but am unable to find it, want to change it to AMQP_WEB_SOCKETS.
Can someone help on how I can change the TransportType
Hi - We have a spark job (on Databricks) that uses the connector to pull data from the eventhub. We noticed the duration from pulling from the Eventhub is taking longer and longer suddenly, even though this job has been running fine for a very long time. With that being said, where can we find the logs that will tell us where the process is taking the longest when pulling the messages from the eventhub? For example: network calls etc?
Hi, there. I have a task to read the data from 9 event hubs. I use databricks notebook. can we parametrize and run the notebook to fetch the data from the selected eventhub
I'm trying to use the new pyspark documentation for schema registry integration. When I call the ...avro.schemaGuid(guid) I get 'JavaPackage' object is not callable. Is this documentation ready to be used yet? Or am I just missing some dependencies on my cluster?
Jude K
Is there a way to use Managed Identity to access eventhub with azure-event-hubs-spark
or are we forced to use SAS? Thanks for any responses in advance.
Jarod Belshaw
I have what is hopefully a quick question... Can I assign specific partitions to a spark job within the EventHubsConf? meaning, I only want the job to read from the odd partitions. and have another instance of that same job that reads from the even numbered partitions?
Razvan Vacaru
Hi all! I'm trying out your library for a streaming use case, in a notebook I'm reading from eventhub and writing into a delta table. Now from the delta table I'd like to do some transformations in a SQL cell reading from this delta table only the new data and continuosly..do you know if such thing is possible?
can some one help me with previous perf runs/ benchmarking done using this connector?
Hello everyone, I'm experiencing this error when using library version 2.3.21. Very similar to this Azure/azure-event-hubs-spark#546 but it cannot be the same since it should already be fixed by version 2.3.20. Unfortunately, I am not able to produce a small example to replicate this, si I'm just throwing this out there just in case it's immediately recognizable by someone that has experienced this before. Any help is very welcome!
  [ERROR, stream execution thread for <stream-name> [id = ***, runId = ***], org.apache.spark.sql.execution.streaming.MicroBatchExecution, stream execution thread for *** [id = ***, runId = ***]hread, Query *** [id = ***, runId = ***] terminated with error]
  java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
  at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
  at scala.concurrent.Await$.result(package.scala:146)
  at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:79)
  at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170)
  at org.apache.spark.eventhubs.client.EventHubsClient.client(EventHubsClient.scala:62)
  at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
  at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
  at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
  at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
  at org.apache.spark.sql.eventhubs.EventHubsSource.partitionCount(EventHubsSource.scala:81)
  at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:106)
  at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:98)
Li Jun Huang

Hi, I am currently facing the following issue and wants some advice:

The spark job I am running currently generates a DataFrame of million+ records. If I do df.write() to EH, it will cause throttling issues even with EH scale been tune up to be very high.
The throttle will cause retries which will rewrite million+ records to EH again, and in the end the operation will fail with timeout.

What can I do to avoid this issue while still sending all the records from DataFrame to EH? I am fine if it takes time to send all the records in (hour+), as long as all records reaches EH.


Hello Everyone, I am trying to consume data from Azure Event Hubs with Databricks PySpark and write it in an ADLS sink. Somehow, the spark jobis not able to finish and gets aborted after running for 2 hours. The error is Caused by: java.util.concurrent.RejectedExecutionException: ReactorDispatcher instance is closed. here is a full error https://gist.github.com/kingindanord/a5f585c6ee7053c275c714d1b07c6538#file-spark_error-log and here is my python script https://gist.github.com/kingindanord/a5f585c6ee7053c275c714d1b07c6538#file-script-py

I am using a separate consumer group for this application. The Event hub has 3 partitions, Auto-inflate throughput units are enabled and it is set to 21 units.
Databricks Runtime Version: 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12) Worker type & Driver type are Standard_E16_v3 (128GB Memory, 16 Cores) Min workers: 1, Max workers, 3.

As you can see in the code, startingEventPosition and endingEventPosition are only one hour apart, so the size of data should be around 3 GB, I don't know why I am not able to consume them. Can you please help me with this issue.

kiran kalyanam
Hi, I am using v2.3.21 & structured streaming from event hub and trying to read 3million events per trigger in 2 min micro batches from an event hub. When the job starts i was able to read the 3Million events within 60-70 seconds and as the time progresses, reads become slower taking about 4-5 minutes long for the same 3Million events. When i restart the job, it again performs better. Can someone help with a solution to this problem? Why does it take longer to read after few runs and perform better when restart happens