Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
  • Jan 05 03:21
    xbaha commented #367
  • Jan 05 03:21
    xbaha commented #367
  • Dec 20 2021 08:41
    kevinlacire commented #573
  • Dec 17 2021 15:35
    sushant147 commented #623
  • Dec 17 2021 14:45
    tejaswibm commented #623
  • Dec 17 2021 11:32
    sushant147 commented #524
  • Dec 17 2021 11:31
    sushant147 edited #623
  • Dec 17 2021 11:31
    sushant147 opened #623
  • Dec 17 2021 11:23
    sushant147 commented #524
  • Dec 17 2021 11:21
    sushant147 commented #524
  • Dec 17 2021 11:18
    sushant147 commented #524
  • Dec 10 2021 17:40
    nyaghma assigned #621
  • Dec 09 2021 15:34
    nyaghma synchronize #615
  • Dec 09 2021 15:34

    nyaghma on schemaRegistryPlugin

    updated the Java SR sdk version… (compare)

  • Dec 02 2021 16:03
    FurcyPin opened #622
  • Dec 01 2021 17:52

    nyaghma on updatePOM

    updated pom and tests for spark… (compare)

  • Dec 01 2021 15:58
    FurcyPin opened #621
  • Nov 30 2021 16:34
    nyaghma synchronize #615
  • Nov 30 2021 16:34

    nyaghma on schemaRegistryPlugin

    updated pom (compare)

  • Nov 29 2021 20:05
    nyaghma synchronize #620
Hi Everyone. I am new to Azure Event hub . I am trying to read data from azure event hub to Kafka using spark structred streaming. I was able to get the data. Question I have is , how to pass starting offsets . In Kafka we pass Json string of offsets with partiton .. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} "" ..
I saw the documentation that we can specifiy offset in azure eventhub spark too , .setStartingPosition("12345") . dont we need to pass list of all offsets from different partitions?
any suggestions please?
Hi Everyone. I am new to Azure Event hub . I am trying to read data from azure event hub to Kafka using spark structred streaming. I was able to get the data. Question I have is , how to pass starting offsets . In Kafka we pass Json string of offsets with partiton .. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} "" ..
I saw the documentation that we can specifiy offset in azure eventhub spark too , .setStartingPosition("12345") . dont we need to pass list of all offsets from different partitions?
Eric Tsang
Hello Team, our team has raised an issue #534. Appreciate if someone could take a look for it
Manish Singh
Correction: The save location is dbfs:/Filestore/xnewevt.
@tomarv2 : what was the solution for "its not printing anything to console, anyone any suggestion?" ?
Manish Singh

Have azure_eventhubs_spark_2_11_2_3_15.jar installed in Cluster>Libraries
Cluster Configuration is: 7.1 (includes Apache Spark 3.0.0, Scala 2.12)

Getting Error: encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)

TypeError: 'JavaPackage' object is not callable

TypeError Traceback (most recent call last)

<command-4482231998934855> in <module>
18 connectionString=f'Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY};EntityPath={EVENT_HUB_NAME}'
---> 19 encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)
21 print(connectionString)

TypeError: 'JavaPackage' object is not callable

Any help?

Georgi Slavov
Hello, does the library support OAuth authentication? As far as I see from examples it only uses SAS Key authentication/authorization.
Arvind Ravish
We are still getting below Futures Timed Out exception with latest version of the event hub library, 2.3.13+ with Spark structured streaming, any thoughts?
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecutionrunStream(StreamExecution.scala:312)atorg.apache.spark.sql.execution.streaming.StreamExecutionrunStream(StreamExecution.scala:312) at org.apache.spark.sql.execution.streaming.StreamExecutionanon$1.run(StreamExecution.scala:208)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
Issue is open here: Azure/azure-event-hubs-spark#536

Hi, I have an application where I trigger multiple spark streaming queries each reading from its own eventhub from a single namespace. I am ensuring that each query has its own sink and checkpoint location. I am terminating them through spark.streams.awaitAnyTermination().
This is how the structure looks:

val dataStreams: Array[DataFrame] = eventhubs.foreach( eh =>
val queries: Array[StreamingQuery] = dataStreams.foreach{ 
 df = df.map(....). writeStream(..).path("../eh").checkpointLocation("../eh").start()

However, I am getting ReceieverDisconnected exception for the above scenario. Is the above scenario supported by the eventhub library?

Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
structured streaming eventhubs failing with Caused by: java.lang.ClassNotFoundException: org.apache.qpid.proton.amqp.Symbol
@aravish Hi ,structured streaming eventhub. Are you able to read ir from spark
@tomarv2 could you pls let meknow the approach you have tried to connect to azure event hubs
@brgao pls share the teplate of how you are connecting to azure event hub
@marcusbb any luck with spark eventhub reading
Hi All,
Need Help
I am trying to run a spark application on HDInsight.
I am getting an error as it is not able to find spark-eventhub jar files which I have added in storage account.
While it runs perfectly fie in local mode.
I am using Azure toolkit for IntelliJ to deploy spark application on HDInsight. And passing jar location in run configuration as : wasb://clustername@blobaccountname.blob.core.windows.net/foldername/jarfile.jar
Can anyone tell me what am I missing ?
Scala: 2.11.8
sbt : 1.4.x
Spark Cluster : Spark 2.3 (HDI 3.6)
azure-eventhubs-spark : "2.3.17"
Hello, hope everyone is great! I saw there is a way to write data to a specific partition by providing the "partitionId" or "partitionKey" column, but I am not able to do so. The event hub will still randomly assign partition. I raised the issue here (Azure/azure-event-hubs-spark#547). Could anyone be so kind to help with the issue? Thank you in advance!
Alex Ott
Hi all
When reading from the EventHubs, does it creates one-to-one mapping of EH partition into Spark partition - similar to the Kafka?
Hi all, when consuming events from Event Hubs using Structured Streaming is checkpointing enough to avoid data loss (at-least-once) ? Does the connector raise any warnings for possible data loss? Thank you
Hi all, when we combine using trigger.once() API and set setMaxEventsPerTrigger() to a bigger value, we can process more data, but is there a maximum value of this setMaxEventsPerTrigger() that can be set? Thank you.
Ivan Trusov
Hi everyone. We're using Azure EventHubs as a source for incoming data.
Via the default EH SDK one can easily get the Last Enqueued Sequence Number, but the EH connector for Spark is not providing this information as a column. It's mainly needed to calculate the lag between the last message in the queue and the current message being processed. Is it possible to calculate the lag between the last and current sequence number using the connector?
Nirabhra Tapaswi
Hi everyone. I am trying to integrate eventhub streaming with spark on aks(Azure Kubernetes Service). We are using managed identity there for all purposes, and I could not find a compatibility of azure-event-hubs-spark library with the DefaultCredentials like other azure sdks for java, scala, .Net, python etc have. Am I missing something? Or is there a way to use Managed Identity to access eventhub in this library? Thanks for any responses in advance.
Ganesh Chand
I am curious to know - what aspect of Spark structured query determines the # of event hub incoming requests we see in the azure portal. I have 2 workers each w/ 4 cores so I'll have a max of 8 tasks running in the clusters. But, eventhub portal shows number of incoming requests of several magnitude so I am not able to corelate.
Hi, first timer for spark and com.microsoft.azure.eventhubs. I'm trying to read static csv data files and sent each row as message to EventHub. Basically, using spark.read.format("csv") to create the dataframe and the use dataframe.foreach(sendEvent) to send the rows. All working fine if I use "collect" or "take" to force operation in a single node. But when I remove the collect/take, it will error out with NotSerializableException on the com.microsoft.azure.eventhubs.ConnectionStringBuilder object. Any idea how to fix this serialization issue?
Quick question, the Readme says, Azure event hub spark 2.3.18 is compatible with Spark 3.0. Looking at the master branch of Azure event hub spark, it is using (sparks internal logging) https://github.com/Azure/azure-event-hubs-spark/blob/master/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala#L37 , I checked the java doc for spark 3.0.1. I don't even see this class http://spark.apache.org/docs/3.0.0/api/java/index.html. Is there anything I am missing. I was asking because I am seeing this error in databricks with scala 2.12 and spark 3.0.1 Databricks rntim 7.6 "Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.<init>(EventHubsSourceProvider.scala:55)"
sorry was looking inside java folder
Yunjie Ke
Hi all, I heard spark 3.1.1 and dbr 8.0 just released last week. Any plan to support them?
Hi, I am new to Scala and SBT, I need to use this library, but I am not sure how ? Can anyone help me ? I thought I can add it as an artifact (using Intellij), but it fails for some reason

package com.example.eventhubconsu;

import java.io.IOException;
import java.util.function.Consumer;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;

public class Abc {

private static final String connectionString = "Endpoint=sb://kafkaconsumer.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0cwyAELee1DF0AaoKCDki5u28TXWEadTScSMcrULWgw=";
private static final String eventHubName = "kafkaconsumer";
private static final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=cmsdstorage;AccountKey=gFVxyNgNJ+KbKG0tEBGdoiAwhxZm86XKbIY+VWRpl/h7EV99HXDb/X8vTHX1weJbV9QJlnAmvDcEotuYm7JeiQ==;EndpointSuffix=core.windows.net";
private static final String storageContainerName = "eventcontainer";

public static void main(String... args) throws IOException {

public static void consumeEvents() throws IOException {

    // Create a blob container client that you use later to build an event processor
    // client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()

    // Create a builder object that you will use later to build an event processor
    // client to receive and process events and errors.
    EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .processError(ERROR_HANDLER).checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

    // Use the builder object to create an event processor client
    EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

    System.out.println("Starting event processor");

    System.out.println("Press enter to stop.");


    System.out.println("Stopping event processor");
    System.out.println("Event processor stopped.");

    System.out.println("Exiting process");


public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
    PartitionContext partitionContext = eventContext.getPartitionContext();
    EventData eventData = eventContext.getEventData();

    System.out.printf("Processing event with consumer group %s from partition %s with sequence number %d with body: %s%n",
            partitionContext.getConsumerGroup(),partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());

    // Every 10 events received, it will update the checkpoint stored in Azure Blob
    // Storage.
    if (eventData.getSequenceNumber() % 10 == 0) {

public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
    System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable());
public static final Consumer<InitializationContext> initializePartition = initiaizepartition -> {

    "Can i set partion id to read event from "



is thr any way to set partion id to read events from specific partion id using eventprocessorclients ,please
Hello everyone,
I am new to the group. Is the right place to ask questions about azure event hub spark?
We are seeing the issue mentioned here Azure/azure-event-hubs-spark#572. Are there any workarounds? Or, a timeline for when this might get merged into the official library?
The issue mentioned in Azure/azure-event-hubs-spark#582 - is this closed based on the comments ?
Ganesh Chand
when using Trigger.Once, we are seeing max number of rows processed to be always under 5 million. Is this a limit enforced by the connector?
I have been trying for a while, but seem to get empty display queries, any idea what might be the reason? I do get the schema and everything, is there any particular access needed except the connection string itself?
Sagar Motani
I have been looking for TransportType settings but am unable to find it, want to change it to AMQP_WEB_SOCKETS.
Can someone help on how I can change the TransportType
Hi - We have a spark job (on Databricks) that uses the connector to pull data from the eventhub. We noticed the duration from pulling from the Eventhub is taking longer and longer suddenly, even though this job has been running fine for a very long time. With that being said, where can we find the logs that will tell us where the process is taking the longest when pulling the messages from the eventhub? For example: network calls etc?
Hi, there. I have a task to read the data from 9 event hubs. I use databricks notebook. can we parametrize and run the notebook to fetch the data from the selected eventhub
I'm trying to use the new pyspark documentation for schema registry integration. When I call the ...avro.schemaGuid(guid) I get 'JavaPackage' object is not callable. Is this documentation ready to be used yet? Or am I just missing some dependencies on my cluster?
Jude K
Is there a way to use Managed Identity to access eventhub with azure-event-hubs-spark
or are we forced to use SAS? Thanks for any responses in advance.
Jarod Belshaw
I have what is hopefully a quick question... Can I assign specific partitions to a spark job within the EventHubsConf? meaning, I only want the job to read from the odd partitions. and have another instance of that same job that reads from the even numbered partitions?
Razvan Vacaru
Hi all! I'm trying out your library for a streaming use case, in a notebook I'm reading from eventhub and writing into a delta table. Now from the delta table I'd like to do some transformations in a SQL cell reading from this delta table only the new data and continuosly..do you know if such thing is possible?