by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • May 25 03:46
    guoch commented #444
  • May 15 15:36
    sanket-t-shah closed #510
  • May 15 15:36
    sanket-t-shah commented #510
  • May 14 07:38
    sanket-t-shah commented #510
  • May 14 07:35
    sjkwak commented #510
  • May 14 07:30
    sjkwak commented #510
  • May 14 07:27
    sanket-t-shah commented #510
  • May 14 07:23
    sjkwak commented #510
  • May 14 06:29
    sanket-t-shah commented #510
  • May 14 06:25
    sjkwak commented #510
  • May 14 06:06
    sanket-t-shah opened #510
  • May 12 16:57
    nyaghma review_requested #509
  • May 12 16:57
    nyaghma labeled #509
  • May 12 16:57
    nyaghma opened #509
  • May 04 21:30
    Fokko opened #508
  • May 03 14:10
    tilumi synchronize #500
  • Apr 24 22:13

    sjkwak on master

    EventHubs write - allow Propert… (compare)

  • Apr 24 22:13
    sjkwak closed #507
  • Apr 24 22:13
    sjkwak closed #506
  • Apr 24 22:05
    sjkwak assigned #506
Brian Gao
@brgao

Hi can someone help me with writing data to an event hub with spark streaming? I've been following the tutorial here: https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#creating-an-eventhubs-sink-for-streaming-queries. This is the code I have for the final step to write to event hub.

df
.select("body")
.writeStream
.format("eventhubs")
.options(readyToCookEventhubParameters.toMap)
.option("checkpointLocation", "abfss://checkpointpathnotincluded")
.start()
.awaitTermination()

I know the dataframe has the correct data inside, but was wondering if anyone had ideas on what i might be missing to prevent the write from completing? I'm checking if the writes are done with the Process data tab in the portal.
I don't get any error messages but I did notice the sink information is very sparse:
"sink" : {
"description" : "EventHubsSink"
}

Thanks!

Brian Gao
@brgao
Solved my problem, thanks
Devendra Modium
@dkm199
Hi all. Has anyone tried using sliding windows on top of eventHub stream.. What I observe is that.. if I read from stream directly, i can read events from eventHub. However if I create one sliding window like .. 5MinDStream = originalDStream.window(5minutes, 1 minute) , then spark jobs not able to read events from eventHub. eventHubStream.event.getBytes returns null. Any suggestions appreciated. Thanks
Devendra Modium
@dkm199
@sabeegrewal thoughts? val ssc = new StreamingContext(spark.sparkContext, Seconds(60)) // Batch every 60 minutes
val eventHubStream = EventHubsUtils.createDirectStream(ssc, ehConf)
val FiveMiniteSlidingWindoStream = eventHubStream.window(Minutes(2), Minutes(1))

eventHubStream.foreachRDD { eventHubEvents =>

  eventHubEvents.foreach { r => {
    println(s"Event bytes  inside for Before fcn are ${r.getBytes}")
  }
  }
  Helper.processEventHubEvents(eventHubEvents, docDb.value, 1)
}
FiveMiniteSlidingWindoStream.foreachRDD { eventHubEvents =>
  Helper.processEventHubEvents(eventHubEvents, docDb.value,5)
} 
if I remove FiveMiniteSlidingWindoStream, then it works as expected.. Event bytes inside for Before fcn are VALID_BYTES. With above code.. i see Event bytes inside for Before fcn are null
belgacea
@belgacea
Hello @sjkwak is this library still maintained ? Last commit is 3 month ago and the build is broken :/
Arvind Ravish
@aravish
Does EventHubStream support windowing with Spark RDDs?
O. J. Ishtiaq
@agilityhawk
Hi folks.
Got a questions regarding Structured Streaming with eventHubs
New receiver 'spark-0-196' with higher epoch of '0' is created hence current receiver 'spark-0-190' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
and have created a separate consumer group for my Databricks notebook as well
but still running into this issue
Can somebody please help me out here?
Molotch
@Molotch
We got the same error/warning (don't remember) but I believe it's just a logging issue, it's really not a problem with the read.
O. J. Ishtiaq
@agilityhawk
In my case, it ends up stopping the execution of my command.
Is there a way to either fix this permanently or at least handle it gracefully such that the command continues to execute?
Lucas Yang
@tilumi
If I enable speculation, then two executors will read the same partition with the same consumer group
What the behavior will be?
Will the new executor take precedence over old executor & read the complete data between the specified start & end offsets??
Lucas Yang
@tilumi
Will only new executor receive data & the slow one deosn't recive data?
Jorge Docampo
@jdocampo
Hi, recently it has become available the RBAC access to EventHubs using the Microsoft Authentication Library (MSAL) for Java (https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory) (https://github.com/Azure/azure-event-hubs/tree/master/samples/Java/Rbac )
Are there any plans on integrating this authentication method in the connector? How is the best way to raise this feature request?
Dilip Diwakar
@dlpkmr98
Hi, recently facing one issue .. when building jar with maven
[ERROR] error: bad symbolic reference to org.apache.http.annotation.Experimental encountered in class file 'EventHubsConf.class'.
[INFO] Cannot access type Experimental in package org.apache.http.annotation. The current classpath may be
[INFO] missing a definition for org.apache.http.annotation.Experimental, or EventHubsConf.class may have been compiled against a version that's
[INFO] incompatible with the one found on the current classpath.
[ERROR] one error found
lib are used --

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<maven.compiler.source>1.8</maven.compiler.source>

<maven.compiler.target>1.8</maven.compiler.target>

<spark.version>2.4.3</spark.version>

<scalatest.version>3.2.0-M1</scalatest.version>

<scala.version>2.11</scala.version>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-spark_${scala.version}</artifactId>
    <version>2.3.10</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.3.0</version>
</dependency>
not able to uderstand why EventHubsConf.class may compiled against a different version
Dilip Diwakar
@dlpkmr98
solved now
Marcus Simonsen
@marcusbb
Hello. I'm new to the Event Hubs library and trying to write output to a parquet file format using structured streaming approach.
I'm using Spark 2.4, with Event Hubs 2.3.14.1
Based on this documentation
https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md I can create a writeStream like so
val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()
However this results in a compile error
Error:(42, 16) type mismatch;
 found   : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
 required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
      .foreach(writer)
Marcus Simonsen
@marcusbb
Sorry I had to downcast the writer. Scala newbie!
Marcus Simonsen
@marcusbb

Hello again.
I'm trying to use Event Hubs connector to write JSON messages into the file system. It should be parquet format, but I'm just playing around right now

 val ehConf = EventHubsConf(connectionString)
      .setConsumerGroup("audit_spark_stream")
      .setStartingPosition(EventPosition.fromStartOfStream)



    val df = spark
      .readStream
      .format("eventhubs")
      .options(ehConf.toMap).load()

    df.selectExpr( "CAST(body AS STRING)")

    val query = df.writeStream

        .format("json")
        .option("path","data/audit/json")
        .option("checkpointLocation","data/audit/jsoncp")
        .trigger(ProcessingTime("25 seconds"))
        .start().awaitTermination()

The stpark job compiles and starts fine, but it's not writing data out. Does anyone see anything wrong with the approach?

Marcus Simonsen
@marcusbb
Checkpoint directory is getting created, I see other information about partitioning and checkpoint activity but no data...
Marcus Simonsen
@marcusbb
I have also tried with a batch reading example with no success. I'm running the batch job on local Spark cluster 2.4.4 cluster
 val ehConf = EventHubsConf(connectionString)
     .setConsumerGroup("audit_spark_stream")
     .setStartingPosition(EventPosition.fromSequenceNumber(0L))
     .setEndingPosition(EventPosition.fromEnqueuedTime(Instant.now))


   // Simple batch query
   val df = spark.read
     .format("eventhubs")
     .options(ehConf.toMap)
     .load()

   import spark.implicits._
   df.select($"body" cast "string").show()
b-navaneethakumar
@b-navaneethakumar
Guys, Need an help ...
While implementing per partition configuration ..

val positions = Map(
new NameAndPartition(name, 0) -> EventPosition.fromStartOfStream,
new NameAndPartition(name, 1) -> EventPosition.fromSequenceNumber(100L)
)

val cs = "YOUR.CONNECTION.STRING"
val ehConf = EventHubsConf(cs)
.setStartingPositions(positions)
.setStartingPosition(EventPosition.fromEndOfStream)

Streaming is start from @Latest for all the partition ...

Even if i remove .setStartingPosition(EventPosition.fromEndOfStream) also , It takes from @Latest position ...

How to pass only setStartingPositions (Multiple partition )
b-navaneethakumar
@b-navaneethakumar
@sabeegrewal Have you tried to set setStartingPositions in ehConf and is it working as expected ?? i dont have checkpoint ...
tomarv2
@tomarv2
I am trying to stream eventhubs using python:
read_df = (
  spark
    .readStream
    .format("eventhubs")
    .options(**conf)
    .load()
)

read_df.printSchema()

read_df.writeStream.outputMode("append").format("console").option("truncate", False).start().awaitTermination()
tomarv2
@tomarv2
its not printing anything to console, anyone any suggestion? or if you can share an example
tomarv2
@tomarv2
thanks. I found the solution
gah112
@gah112
is anyone familiar with ReceiverDisconnectedException? i'm using a single connection policy and chaining multiple dataframes from that stream. would switching to multiple streams/connection policies solve these errors?
rvramanan0
@rvramanan0
I am trying to deploy via python to azure data bricks cluster, File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/pyspark/sql/streaming.py", line 400, in load
return self._df(self._jreader.load())
File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(a, *kw)
File "/opt/anaconda3/envs/bidh-adls/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o24.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.eventhubs.EventHubsSourceProvider. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
I am getting above error,
also how to use pip install for eventhub_spark connector?
Didhin
@didhin
Giving offset : "-1" is not working as expected. When i give this it is only taking the latest data. Anyone know the reason?
pauldx
@pauldx
Hi , wonder if Eventhubs supports non-body object only to stream, looks like Topic needs all Json rows to form a body ... any ideas ? we have target system where we don't want to parse the avro stream during ingestion so by principle it may look like eventhubs can't send columns its stream with body only ...but just want to doublecheck group of people here