Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Enno
    @ennru
    You seem to be mixing versions of the Kafka broker and client that do not fit together.
    mvillafuertem
    @mvillafuertem

    I don't why this test fail. Maybe I'm doing something wrong.

    This test used to works with EmbeddedKafka

    Environment

    scala 2.13.1
    sbt 1.3.2
    java 11.0.4
    "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.0.5"
    "org.testcontainers" % "kafka" % "1.12.2"

    https://github.com/mvillafuertem/scala/blob/master/alpakka/src/it/scala/io/github/mvillafuertem/alpakka/kafka/NumbersKafkaIT.scala

    assertion failed: timeout (10 seconds) during expectMsg while waiting for OnNext(1)
    java.lang.AssertionError: assertion failed: timeout (10 seconds) during expectMsg while waiting for OnNext(1)
        at scala.Predef$.assert(Predef.scala:282)
        at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:418)
        at akka.testkit.TestKitBase.expectMsg(TestKit.scala:395)
        at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:395)
        at akka.testkit.TestKit.expectMsg(TestKit.scala:923)
        at akka.stream.testkit.TestSubscriber$ManualProbe.$anonfun$expectNextN$1(StreamTestKit.scala:411)
        at scala.collection.immutable.List.foreach(List.scala:305)
        at akka.stream.testkit.TestSubscriber$ManualProbe.expectNextN(StreamTestKit.scala:411)
        at io.github.mvillafuertem.alpakka.kafka.NumbersKafkaIT.$anonfun$new$2(NumbersKafkaIT.scala:142)
    Enno
    @ennru
    @/all Alpakka Kafka 1.1.0 hit Maven Central after a tough fight getting it there https://doc.akka.io/docs/alpakka-kafka/current/release-notes/1.1.x.html
    Chetan Mehrotra
    @chetanmeh
    @ennru Is the Kafka Client version also updated. We can possibly update https://doc.akka.io/docs/alpakka-kafka/current/home.html#matching-kafka-versions
    Enno
    @ennru
    No, the Kafka client version is still 2.1.1. We plan to release Alpakka Kafka 2.0 just after Apache Kafka 2.4.0 is out (planned for end of October) and will depend on that client version.
    You may pull in later clients yourself.
    Srepfler Srdan
    @schrepfler
    👏👏👏
    Nik Gushchin
    @Dr-Nikson
    Guys, is it safe to use KafkaAvroDeserializer in parallel?
    Dave Holtzhouser
    @dholtz
    Looking for some guidance to help with my obvious confusion.
      val elasticSettings = ElasticsearchWriteSettings()
        .withBufferSize(10)
        .withVersionType("internal")
        .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))
    
      val elasticSink = ElasticsearchSink.create[WebhookEvent]("webhookEvents", "_doc", elasticSettings)
    
      implicit val jsonStreamSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
    
      val routes: Route =
        path("streams" / "webhook-events") {
          post {
            entity(asSourceOf[List[WebhookEvent]]) { source =>
              WriteMessage.createIndexMessage(java.util.UUID.randomUUID(), source)
            }
    
          }
          complete(StatusCodes.OK)
        }
    
      Http().bindAndHandle(routes, "localhost", 9999)
      println(s"Server online..")
    }
    What’s the right way to accept a webhook events from a 3rd party api and sink that into elastic?
    I’m kind of confused if the Route is the actual source, or if that should just be a definition of the possible endpoints … among other obvious confusions
    mvillafuertem
    @mvillafuertem

    This test does not work with TestcontainersKafkaLike. I get same result. Need I any configuration?

    https://github.com/akka/alpakka-kafka/blob/master/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala

    assertion failed: timeout (10 seconds) during expectMsg while waiting for OnNext(1)
    java.lang.AssertionError: assertion failed: timeout (10 seconds) during expectMsg while waiting for OnNext(1)
        at scala.Predef$.assert(Predef.scala:282)
        at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:418)
        at akka.testkit.TestKitBase.expectMsg(TestKit.scala:395)
        at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:395)
        at akka.testkit.TestKit.expectMsg(TestKit.scala:923)
        at akka.stream.testkit.TestSubscriber$ManualProbe.$anonfun$expectNextN$1(StreamTestKit.scala:411)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:921)
        at akka.stream.testkit.TestSubscriber$ManualProbe.expectNextN(StreamTestKit.scala:411)
        at io.github.mvillafuertem.alpakka.kafka.TransactionsSpec.$anonfun$new$3(TransactionsSpec.scala:61)

    I would appreciate, if someone could help me

    Aλexander Zafirov
    @alexanderzafirov
    Hello everyone, I would like to ask for some input on the following question - I'm using a Consumer.committableSource in my application. During tests I have discovered that instead of going round-robin among partitions of the the Kafka topic, the application will drain a given partition until it consumes the latest entry before switching to the next partition. This is not ideal for my application as it cares about the temporal order at which the events are put on Kafka. This exhaustive way of reading partitions is like going back and forth in time.
    Any ideas on how I can tune the consumer to favor round-robin on partition consumption instead?
    Thank you!
    Enno
    @ennru
    If you care about the order, why do you put them on different partitions?
    Aλexander Zafirov
    @alexanderzafirov
    Hi @ennru. Thank you for your reply. Your question you post makes sense. I will elaborate. I care about the order of events within what I call a conversation. I can have many of those which are not necessarily related to each other. Thus the events that are on Kafka are tracking state changes in the the conversation. What I care for are the events in within a conversation. For maximizing throughput I partition the conversations across multiple topics.
    Enno
    @ennru
    Kafka itself does not provide any guarantees about order across partitions. The consumers fetch always fetch a batch of messages which most often belong to the same partition. It might be possible to tweak that with settings on the Kafka consumer. http://kafka.apache.org/documentation/#consumerconfigs
    Aλexander Zafirov
    @alexanderzafirov
    I don't want order across partitions. My partitioning scheme gives me what I need. I'm just asking if it is possible to round-robin the consumer to read messages across assigned partitions as opposed of going for watermark offset for each partition before moving to the next. I realize that this is something related to the actual kafka consumer.
    I will post my findings regardless if I succeed.
    Thank you for your help!
    Enno
    @ennru
    You might be able to get the behaviour you need with Alpakka Kafka's partitionedSource by controlling demand per partition.
    Rakesh
    @rakeshkr00
    Hi All, I have Consumer.committableSource and during msg processing I am encountering org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id error. What is surprising to me is that the error is not caught by Supervision.Decider but it terminates the Stream and then exception is caught by onComplete. I am using Akka 2.5.23, akka-stream-kafka 1.0.5, Kafka client 2.1.1, Scala 2.11.12. Any help would be appreciated. The Supervision.Decider looks like below:
      val getDecider1: Supervision.Decider = {
          case _: org.apache.kafka.clients.consumer.RetriableCommitFailedException =>
            Supervision.resume
          case e: org.apache.kafka.common.errors.SerializationException =>
            Supervision.resume 
    }
    Rakesh
    @rakeshkr00
    For the below exception, the above Supervision.Decider definition is successfuly interception the exception:
    val test = Source.fromFuture(Future.successful(1))
              .mapAsync(1){_ => Future(throw new org.apache.kafka.common.errors.SerializationException("This is testing")) }
              .runWith(Sink.ignore)
    Chetan Mehrotra
    @chetanmeh

    I am seeing long time in stopping a Consumer source when using DrainingControl.drainAndShutdown like here

    This causes our test to take quite a bit of time in close phase (> 30 sec). Checking the stop flow it appears that while shutting down there is a delay of 30 secs (https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala#L97) which uses stopTimeout. Per reference

      # The stage will delay stopping the internal actor to allow processing of
      # messages already in the stream (required for successful committing).
      # This can be set to 0 for streams using `DrainingControl`.
      stop-timeout = 30s

    So wanted to confirm in our approach we can safely set this timeout to 0 as we explicitly use DrainingControl and that would take care to ensure that stop order is in proper sequence

    Enno
    @ennru
    Yes, when using DrainingControl you may set the stop-timeout to 0. The docs note it here https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control
    Chetan Mehrotra
    @chetanmeh
    Thanks @ennru for confirming. Missed the docs
    Enno
    @ennru
    You're welcome.
    Rakesh
    @rakeshkr00

    Hi All, I have Consumer.committableSource and during msg processing I am encountering org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id error. What is surprising to me is that the error is not caught by Supervision.Decider but it terminates the Stream and then exception is caught by onComplete. I am using Akka 2.5.23, akka-stream-kafka 1.0.5, Kafka client 2.1.1, Scala 2.11.12. Any help would be appreciated. The Supervision.Decider looks like below:

      val getDecider1: Supervision.Decider = {
          case _: org.apache.kafka.clients.consumer.RetriableCommitFailedException =>
            Supervision.resume
          case e: org.apache.kafka.common.errors.SerializationException =>
            Supervision.resume 
    }

    Any help pls.

    Enno
    @ennru
    @rakeshkr00 Errors from deserializing directly in the consumer can't be handled by supervision. It only works if you do explicit deserialisation in a map operator as is shown for JSON here https://doc.akka.io/docs/alpakka-kafka/current/serialization.html#spray-json
    Rakesh
    @rakeshkr00
    Thanks @ennru for pointing that out. I was missing that part of information.
    Brendan McAdams
    @bwmcadams
    I’ve been suddenly running afoul of this error section: https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala#L207-L217 but for the life of me can’t figure out why. There’s only a single stream instance running through at a time, can’t figure out what’s clashing. Anything stupid/obvious around this that I’m likely to have missed?
    from the Expected / Got they are clearly different instances of KafkaAsyncConsumerCommitterRef, just not sure how / why .
    Enno
    @ennru
    Do use a RestartSource around the consumer?
    Brendan McAdams
    @bwmcadams
    ya
      val luceneIndexingSource = RestartSource.withBackoff(
        minBackoff = 3.seconds,  // todo - config file me
        maxBackoff = 30.seconds,
        randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
      ) { () =>
    …but the toMat(Committer.sink) happens outside that withBackoff block. Which is causing the issue, isnt it.
    Brendan McAdams
    @bwmcadams
    wait, can’t be that since RestartSource has to return a source, must run the toMat outside it.
    Enno
    @ennru
    Haven't thought of this before, but I guess we have a problem here. Every new incarnation of the source will have a new actor internally, so when offsets from those different incarnations get batched together it fails.
    Brendan McAdams
    @bwmcadams
    doh.
    any best-way-to-stopgap?
    Enno
    @ennru
    RestartSource is not as important for Alpakka Kafka as it used to be, though.
    Brendan McAdams
    @bwmcadams
    oh? How so?
    Enno
    @ennru
    Since 1.0, we do not have the WakeupException stuff as the Kafka library improved on that.
    Brendan McAdams
    @bwmcadams
    I just updated the code from 1.0.5 to 1.1.0 … should I remove the RestartSource for now? Any additional error handling I should add instead?
    Enno
    @ennru
    What do you warp in the RestartSource? If it is just the Consumer source you can do without.
    Pre Alpakka Kafka 1.0 the Kafka client could get stuck when the Kafka broker went unreachable.
    Brendan McAdams
    @bwmcadams
    it’s the consumer source and dsome flow I tack on
    Enno
    @ennru
    But something has triggered a restart in your code, so there must be something that fails....
    Brendan McAdams
    @bwmcadams
    yup
    but I think we can probably handle that better outside of the restartsource
    Enno
    @ennru
    @bwmcadams please comment akka/alpakka-kafka#942