hmlam on master
updateDocfor2.3.22 Merge pull request #639 from ya… (compare)
Have azure_eventhubs_spark_2_11_2_3_15.jar installed in Cluster>Libraries
Cluster Configuration is: 7.1 (includes Apache Spark 3.0.0, Scala 2.12)
Getting Error: encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)
TypeError Traceback (most recent call last)
<command-4482231998934855> in <module>
17
18 connectionString=f'Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY};EntityPath={EVENT_HUB_NAME}'
---> 19 encConnString=sc._jvm.org.apche.spark.EventgHubsUtils.encrypt(connectionString)
20
21 print(connectionString)
TypeError: 'JavaPackage' object is not callable
Any help?
Hi, I have an application where I trigger multiple spark streaming queries each reading from its own eventhub from a single namespace. I am ensuring that each query has its own sink and checkpoint location. I am terminating them through spark.streams.awaitAnyTermination()
.
This is how the structure looks:
val dataStreams: Array[DataFrame] = eventhubs.foreach( eh =>
spark.stream.readStream(...eh...)
}
val queries: Array[StreamingQuery] = dataStreams.foreach{
df = df.map(....). writeStream(..).path("../eh").checkpointLocation("../eh").start()
}
However, I am getting ReceieverDisconnected exception for the above scenario. Is the above scenario supported by the eventhub library?
package com.example.eventhubconsu;
import java.io.IOException;
import java.util.function.Consumer;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
public class Abc {
private static final String connectionString = "Endpoint=sb://kafkaconsumer.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0cwyAELee1DF0AaoKCDki5u28TXWEadTScSMcrULWgw=";
private static final String eventHubName = "kafkaconsumer";
private static final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=cmsdstorage;AccountKey=gFVxyNgNJ+KbKG0tEBGdoiAwhxZm86XKbIY+VWRpl/h7EV99HXDb/X8vTHX1weJbV9QJlnAmvDcEotuYm7JeiQ==;EndpointSuffix=core.windows.net";
private static final String storageContainerName = "eventcontainer";
public static void main(String... args) throws IOException {
consumeEvents();
}
public static void consumeEvents() throws IOException {
// Create a blob container client that you use later to build an event processor
// client to receive and process events
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(storageConnectionString).containerName(storageContainerName).buildAsyncClient();
// Create a builder object that you will use later to build an event processor
// client to receive and process events and errors.
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup("group1").processPartitionInitialization(initializePartition).processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER).checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
// Use the builder object to create an event processor client
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
System.out.println("Starting event processor");
eventProcessorClient.start();
System.out.println("Press enter to stop.");
System.in.read();
System.out.println("Stopping event processor");
eventProcessorClient.stop();
System.out.println("Event processor stopped.");
System.out.println("Exiting process");
}
public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
PartitionContext partitionContext = eventContext.getPartitionContext();
EventData eventData = eventContext.getEventData();
System.out.printf("Processing event with consumer group %s from partition %s with sequence number %d with body: %s%n",
partitionContext.getConsumerGroup(),partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
// Every 10 events received, it will update the checkpoint stored in Azure Blob
// Storage.
if (eventData.getSequenceNumber() % 10 == 0) {
eventContext.updateCheckpoint();
}
};
public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable());
};
public static final Consumer<InitializationContext> initializePartition = initiaizepartition -> {
"Can i set partion id to read event from "
};
}
2.3.21
. Very similar to this Azure/azure-event-hubs-spark#546 but it cannot be the same since it should already be fixed by version 2.3.20
. Unfortunately, I am not able to produce a small example to replicate this, si I'm just throwing this out there just in case it's immediately recognizable by someone that has experienced this before. Any help is very welcome! [ERROR, stream execution thread for <stream-name> [id = ***, runId = ***], org.apache.spark.sql.execution.streaming.MicroBatchExecution, stream execution thread for *** [id = ***, runId = ***]hread, Query *** [id = ***, runId = ***] terminated with error]
java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:79)
at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170)
at org.apache.spark.eventhubs.client.EventHubsClient.client(EventHubsClient.scala:62)
at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
at org.apache.spark.sql.eventhubs.EventHubsSource.partitionCount(EventHubsSource.scala:81)
at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:106)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:98)
Hi, I am currently facing the following issue and wants some advice:
The spark job I am running currently generates a DataFrame of million+ records. If I do df.write() to EH, it will cause throttling issues even with EH scale been tune up to be very high.
The throttle will cause retries which will rewrite million+ records to EH again, and in the end the operation will fail with timeout.
What can I do to avoid this issue while still sending all the records from DataFrame to EH? I am fine if it takes time to send all the records in (hour+), as long as all records reaches EH.
Hello Everyone, I am trying to consume data from Azure Event Hubs with Databricks PySpark and write it in an ADLS sink. Somehow, the spark jobis not able to finish and gets aborted after running for 2 hours. The error is Caused by: java.util.concurrent.RejectedExecutionException: ReactorDispatcher instance is closed.
here is a full error https://gist.github.com/kingindanord/a5f585c6ee7053c275c714d1b07c6538#file-spark_error-log and here is my python script https://gist.github.com/kingindanord/a5f585c6ee7053c275c714d1b07c6538#file-script-py
I am using a separate consumer group for this application. The Event hub has 3 partitions, Auto-inflate throughput units are enabled and it is set to 21 units.
Databricks Runtime Version: 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12) Worker type & Driver type are Standard_E16_v3 (128GB Memory, 16 Cores) Min workers: 1, Max workers, 3.
As you can see in the code, startingEventPosition
and endingEventPosition
are only one hour apart, so the size of data should be around 3 GB, I don't know why I am not able to consume them. Can you please help me with this issue.