Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info

    Hello I have situation where I lost some parts of messages after node restarts
    last offset handling message in log 117246
    after consumer restarted 117293
    and there are no messages in range beetwen in logs

    Here is my code
    Please give me the hint where I am wrong
    I need at least once guarantee

    def start(...) = 
     for {
          streamCompletionRef                          = new AtomicReference[Option[Future[Done]]](None)
          assignedPartitionsRef                          = new AtomicReference(Set[TopicPartition]())
          consumer = makePartitionedConsumer(config, settings, controlRef, assignedPartitionsRef, handleMessage)
          _ <- Resource.make(startConsumerF(config, consumer, streamCompletionRef))(
                _ => shutdownConsumerF(name, controlRef, streamCompletionRef)
        } yield ()
      private def startConsumerF(
          config: CommittableConsumerConfig,
          consumer: Source[Object, Unit],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit mat: Materializer): F[Unit] =
        F.delay {
          import config._
            .onFailuresWithBackoff(minBackoff = backOffSettings.map(_.minBackoff).getOrElse(3).seconds,
                                   maxBackoff = backOffSettings.map(_.maxBackoff).getOrElse(30).seconds,
                                   randomFactor = backOffSettings.map(_.randomFactor).getOrElse(0.2))(() => consumer)
            .toMat(Sink.ignore) { case (_, streamCompletion) => streamCompletionRef.set(Some(streamCompletion)) }
      private def makePartitionedConsumer[K, V](
          config: CommittableConsumerConfig,
          consumerSettings: ConsumerSettings[K, V],
          controlRef: AtomicReference[Option[Consumer.Control]],
          assignedPartitionsRef: AtomicReference[Set[TopicPartition]],
          handleMessage: CommittableMessage[K, V] => F[Unit]
      )(implicit mat: Materializer, scheduler: Scheduler, logging: Logging[F]): Source[Done, Unit] = {
        import config._
        def handlePartitionSource(source: Source[CommittableMessage[K, V], NotUsed]): Future[Done] =
            .mapAsync(maxPartitionParallelism.getOrElse(1)) { msg =>
              (logCommittableMessage(msg) >> handleMessage(msg))
        val subscriptions = Subscriptions
          .topics(topics.toSortedSet.toList: _*)
          .committablePartitionedSource(consumerSettings, subscriptions)
          .mapMaterializedValue(control => controlRef.set(Some(control)))
          .mapAsyncUnordered[Done](maxParallelism) { case (_, source) => handlePartitionSource(source) }
      private def shutdownConsumerF(
          name: String,
          controlRef: AtomicReference[Option[Consumer.Control]],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit logger: Logging[F], exec: ExecutionContext): F[Unit] =
        for {
          _                     <- logger.info(s"Stopping Kafka consumer $name")
          maybeControl          <- F.delay(controlRef.get())
          maybeStreamCompletion <- F.delay(streamCompletionRef.get())
          _ <- maybeControl
                .map2(maybeStreamCompletion) { (control, streamCompletion) =>
                  F.deferFuture(DrainingControl(control, streamCompletion).drainAndShutdown()).void
                .flatMap(_ => logger.error(s"Successfully stopped Kafka consumer $name"))
                .onError[Unit, Throwable](e => logger.errorCause(s"Failed to stop Kafka consumer $name", e))
        } yield ()
    3 replies
    assertAllStagesStopped on the kafka base test is continuing to error but it gets the materializer from the actor system. assertAllStagesStopped is called @AfterEach but the docs show the actor system being static and Lifecycle being per Class. Something doesn't make sense
    3 replies
    The kafka producer sends data to the consumer, and the data must store a copy of the data on the broker's disk as it passes through all brokers? Can it be forwarded directly to the consumer over the network without storing to disk?
    6 replies
    Niklas Uhrberg

    This is about Kafka messages being processed twice when rebalancing happens because an application instance is shut down.
    I have a simple application, two instances A0 and A1 consuming messages from one Kafka topic with P partitions. The application uses val control = Consumer .committablePartitionedSource to obtain the sources when partition assignment happens. The shutdown sequence appears to work as intended where all streams report to be completed without exceptions. The following code is executed when I shut an instance down: CoordinatedShutdown .get(system) .addTask( CoordinatedShutdown.PhaseActorSystemTerminate, "Drain streams and close databases" )(() => { //PhaseBeforeActorSystemTerminate logger.info("Enter coordinated shutdown") control .drainAndShutdown() .andThen(_ => { logger.info("Executing custom shutdown task to close databases.") db.close() dbRead.close() () }) })

    When the application instances process messages under the load of e.g. 20-100 messages per second with a partitions count of 2 or 20 (the the examples I have tried) I can consistently reproduce the following scenario:

    1. Both application instances run
    2. One instance , A1, is shut down using kill pid
    3. In A0, one or a few messages are processed that have already been processed. I have the impression that it is never more than one message per partition. When running 20 partitions the highest number of reprocessed messages observed is 3 (with 100 messages per second published to the topic)
    4. One part of the message processing is to create a persistent actor and when the messages are reprocessed (in 3) the state of this persistent actor that was obtained in the first processing of the command is loaded from persistent storage. This is quite important because what I really wanted to assure is that there is no "split brains" situation where the same events get persisted twice.

    What puzzles me is that I think that the comitting of the Kafka message should really prevent the message from being processed twice.
    This is my Committer settings ```committer {
    max-batch = 1
    max-interval = 2.seconds
    parallelism = 1
    delivery = "WaitForAck"
    when = "OffsetFirstObserved"

    The flow ends with a Committer.sink.

    I am not an expert in the Committer settings, this is where I think I should look next.

    I do realize there is more to specify to describe in detail everything, but this may perhaps suffice to sketch the background to my question:

    Can I hope to eliminate the race condition making one command sometimes be processed twice when a rebalancing happens?

    If not, I would like to understand why, since I think a graceful shutdown should give this guarantee.

    Levi Ramsey
    It's worth noting that Kafka's commit protocol is fundamentally at-least-once and cannot guarantee to never process a message twice
    Levi Ramsey
    Have you considered using cluster sharding (and the Akka Cluster split-brain resolver) to ensure that only one instance of the persistent actor exists?
    Niklas Uhrberg
    Yes, I'm totally aware of the cluster sharding but I wanted to explore the possibility to keep the application as simple as possible (in total) in the beginning. But note that at least I don't get the problem that there are multiple instances of the same persistent actor coexisting simultaneously in the current solution. Also, since the same Kafka message gets processed twice, I will have exactly the same problem in cluster sharding having to dedupe the Kafka messages.
    Niklas Uhrberg
    @leviramsey Since you wrote "It's worth noting that Kafka's commit protocol is fundamentally at-least-once and cannot guarantee to never process a message twice" I'd like to know more in detail the context where you state this is true. Do you really mean that if the consumer commits the message offset (I use Kafka managed offset) back to Kafka there is still the possibility that it can be consumed again? I perfectly realize that the commit operation can fail, but this is most probably not what I'm seeing. In that case the commit should fail each time I run the experiment.
    Levi Ramsey
    I've seen situations where the rebalance happens and the rebalanced-to consumer picks up old offsets. I don't particularly care about that because I default to at-least-once, so I don't investigate. I suspect it's because offset commit might not qualify as a poll for max.poll.interval.ms purposes, but idk. There are also broker failure modes that can lead to Kafka losing committed offsets (just like Kafka in general makes fairly weak guarantees about message durability).
    Michael Goytia
    Question: When moving to MSK from an on prem version of kafka, we are getting a the following errors failed authentication due to: [a3c21a3d-fcdb-47c1-8754-e46c0e14bd12]: Too many connects. We are using a plainSink and calling this method
    class KafkaProducerHelper @Inject()(appConfig: AppConfig, implicit val actorSystem: ActorSystem) {
      val pLogger: Logger = LoggerFactory.getLogger("application")
      val producerSettings: ProducerSettings[Array[Byte], Array[Byte]] = ProducerSettings(appConfig.KAFKA_PRODUCER_CONF, new ByteArraySerializer, new ByteArraySerializer)
      val plainSinkConnection: Sink[ProducerRecord[Array[Byte], Array[Byte]], Future[Done]] = Producer.plainSink(producerSettings)
      def sendAlertsMessages(messages: List[ProducerRecord[Array[Byte], Array[Byte]]]): Future[Option[Done]] = {
        pLogger.info("Sending alert messages")
        Source(messages).runWith(plainSinkConnection) map {
          case Done => Some(Done)
          case _ => None
    We call sendAlertMessages in a recursive fashion as to split up the List of ProducerRecords. When we pass a full list into the method, it significantly increases the time it takes to produce messages however the error messages go away. I was wondering if anyone else encountered this error with MSK and how they approached it
    @HurricaneGoytia_twitter Tried lowering the paralellism down as well and got a little better but still seeing errors
    Michael Goytia
    Adi Gerber
    we're working on creating a data processing application and we use Alpakka Kafka to do our stream processing.
    I've written a test case that uses Kafka's MockConsumer and MockProducer and injects those to the ConsumerSettings and ProducerSettings (via withConsumerFactory & withProducer) with the goal of testing the code that creates the Kafka flow.
    the test succeeds most of the time but we're seeing occasional failures in our build system (Jenkins on GCE machines with few resources).
    I can reliably reproduce the failure on my own laptop by lowering the CPU frequency to the lowest possible (400Mhz) and running the test, however the test case succeeds when running on normal CPU frequency.
    what could be the issue?
    I've written a minimal test case here: https://github.com/adigerber/akka-kafka-mock-bug
    Adi Gerber
    I found out why it happens.
    basically on a slow/overloaded machine and with the default configuration the poll task timer gets 2 hits, and the second poll which emits data does so before the sub source for the newly-assigned partition gets created. this can be detected by checking the contents of mockConsumer.paused, and the solution is to reschedule the same poll task until the consumer is not paused on the topic-partition.
    Joe Zulli

    Hi all! I have a newbie question about working with Sinks in conjunction with ProducerMessage.multi. Essentially, I have a source that is giving me multi-line strings, and I want to send each line as a separate ProducerRecord into Kafka. Something like this:

        val done: Future[Done] =
          Source(1 to 100)
            .map(num => (0 to num).toList )
            .map(list => ProducerMessage.multi[Nothing, String](list.map(new ProducerRecord("topic", _)).toSeq))

    The error I get is:

    type mismatch;
     found   : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
     required: akka.stream.Graph[akka.stream.SinkShape[akka.kafka.ProducerMessage.Envelope[Nothing,String,akka.NotUsed]],scala.concurrent.Future[akka.Done]]

    Not sure how to make all of the types happy. If anyone can point me in the right direction, it would be much appreciated!

    3 replies
    Hi, when using Producer-Multi-Messages, is it normal that the commit offsets are incremented by the number of elements within the multi message? I'm sending two things to two separate topics with the help of a multi message, and the commit offset increases by 2 on every topic.
    1 reply
    Using alpakka, is it possible to time out and release a consumer if no message was received for e.g. the last minute?
    4 replies
    Hi everyone!
    Recently I've been testing kafka consumer reconnection after stopping and starting again kafka container. I'm using commitableExternalSource as a consumer.
    Use case:
    1. Start kafka in docker container.
    2. Start kafka consumer and producer (everything is working fine, messages are consumed).
    3. Stop kafka container (consumer is sending "Kafka commit is to be retried" logs (caused by DisconnectException)).
      And then:
      4.1. Start kafka again in ~10 sec after stopping (everything goes back to normal, consumer is working properly)
      4.2. Wait a bit longer with starting kafka containter:
      4.2.1. kafka is sending Compliting logs from BaseSingleSourceLogic class,
      4.2.2. "Kafka commit is to be retried" logs are still being sent,
      4.2.3. after starting kafka container again there are logs from deadLetter and no more logs are shown.
    Roy Prager
    i am consuming from a topic with multiple partitions, and using groupedWithin in my graph, means that i should commit the offset to multiple partitions. what is the best way for doing it?
    Hi guys!
    Is there any solution or configuration for suppressing "Kafka commit is to be retried" logs probably config or something else?
    Ryan Tomczik
    Hi everyone, I'm looking to commit offset batches with each event produced transactionally. The problem is it looks like you can only provide one PartitionOffset per event produced. My events are being created from several events consumed over several partitions, so I need to commit a batch of PartitionOffset per event. Is this possible?
    Vishal Bhavsar
    Hi, I'm looking to consume messages from earliest to a specific offset and then stop the consuming (thus stopping source from consuming/emitting more messages). What would be the best way to achieve this? I am using committablePartitionedSource so I have access to per-partition offset. How do I terminate the flow after a specific offset has been reached?
    Consumer.DrainingControl<Done> control =
        Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
                pair -> {
                  Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
                  return source
                      .map(message -> message.committableOffset())
                      .runWith(Committer.sink(committerSettings), system);
            .toMat(Sink.ignore(), Consumer::createDrainingControl)
    1 reply
    Hey, can someone help me, in stream i have combination of take and takeWithin, and i am wondering if takeWithin will start counter after it receives last message or first message that reaches takeWithin operator.
    Levi Ramsey
    I'm pretty sure takeWithin starts its timer at materialization (i.e. before it has seen a stream element)
    Burak Helvac─▒
    Please use kafka, but leave kafka-streams as soon as possible.
    Vishal Bhavsar
    Hi, how is the metadata from Consumer.commitWithMetadataPartitionedSource meant to be used? What does the param metadataFromRecord: Function[ConsumerRecord[K, V], String]) allow you to do? I can't find any examples. I want to stop processing when a message is greater than a given timestamp. I can see that the timestamp is available in the metadataFromRecord, but how can I use it in the result of commitWithMetadataPartitionedSource?
    3 replies
    Levi Ramsey

    The result of metadataFromRecord is only passed back to Kafka when committing an offset (see https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/OffsetAndMetadata.html). The message timestamp is available in any of the sources which give you a ConsumerRecord or a CommittableMessage without needing a commitWithMetadata source.

    For the other committable sources, you would call msg.record.timestamp to get the timestamp. So given StopAfterTimestamp, you could .takeWhile { msg => msg.record.timestamp <= StopAfterTimestamp }

    The only usecase for that metadata that I can see is if you have tooling which consumes the consumer-offset topic from Kafka (e.g. for observability) and you want to pass metadata like which hosts are committing offsets to that tooling
    Vishal Bhavsar
    That makes sense. Thank you for such a comprehensive response @leviramsey!
    Harry Tran
    Hello, does anyone have an example using Alpakka Kafka consumer in a Lagom application before? I have this in a class, and wire it in a LagomApplication, but I can only see the "starting" log, but not "executing" log (topic has messages being produced). I do not see the consumer group created when checking from broker side.
      private val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
      logger.info("Starting the subscriber")
        .plainSource(consumerSettings, Subscriptions.topics(ExternalTopic))
        .mapAsync(1)(message => {
          val request = Json.parse(message.value).as[ExternalRequest]
          logger.info("Executing {}", request)
    1 reply
    Hm, what use have the CassandraWriteSettings? What's their purporse?
    1 reply
    Matthew de Detrich

    So I have an interesting problem where when I am subscribing to a Stream using Alpakka Kafka and right at the start of the stream I am using prefixAndTail(1).flatMapConcat to get the first element however it returns None even though topics are being sent to the Kafka topic. Interestingly I am not getting this problem with a local Kafka stream that I run with Docker.

    Does anyone know in what cases this occurs and also if prefixAndTail(1) is eager? i.e. will it wait for perpetuity until it happens to get an element or is there some kind of timeout?

    Matthew de Detrich
    So figured out the issue, turned out it was Main immediately terminating which was causing a shutdown.
    Is there any way to make sure that two messages aimed for two different topics either both end up in those topics or none of them does after sending them there using either Send Producer or any regular streaming producer?
    1 reply
    Dave Kichler
    Curious whether the consumption patterns for the Consumer sources are documented anywhere? I'm specifically curious about the semantics of Consumer.sourceWithOffsetContext when the source is assigned multiple partitions, how consumption is managed between partitions. I was under the impression the partitions were consumed from using round-robin distribution but cannot find documentation to back that up (or contradict/refute).
    3 replies
    Sean Kwak
    Can I ask how to do a conditional publish with msg data in the code shown in the following link?
    e.g. if msg.record.value contains some string, then publish otherwise skip etc.
    4 replies
    Ashish Sharma
    hey, what is the configuration for setting log.retention duration for a topic within client settings?
    Ashish Sharma
    I guess this has to be done at the time of topic creation from the client?
    Levi Ramsey
    Or done through the usual Kafka CLI tools (e.g. kafka-topics.sh)
    Koen Dejonghe
    Can I use HdfsFlow to write parquet files to hdfs? If so, how? Thank you.
    BTW, I have GenericRecords in my flow. I could use AvroParquetWriter, but that does not have the RotationStrategy and FilePathGenerator

    Hi all, I'm trying to use a dependency that adds Kinesis KPL support to akka. It has a KPLFlow class to provide that support. I'm relativly new to akka and flows, but my objective would be to have a kafka source, that is already created and have some type of sink, to replace the "native" kinesis sink and use this flow to deliver the records. Is there a way to extract from the flow class this? Or is it possible with just the flow "consume" from the kafka source and deliver to a target stream ?

    I've create a stackoverflow question regarding this https://stackoverflow.com/questions/73873966/akka-kafka-source-to-kinesis-sink-using-kpl


    Hi everyone. I have an application that receives an api request and relays its to a Kafka API Producer. Each request calls the producer to send a message to Kafka. The producer exists throughout the application lifetime and is shared for all requests.

    producer.send(new ProducerRecord[String, String](topic, requestBody))

    This works OK. Now I want to use instead, an alpakka Producer for the job. The code looks like this:

     val kafkaProducer = producerSettings.createKafkaProducer()
    val settingsWithProducer = producerSettings.withProducer(kafkaProducer)
    val done = Source.single(requestBody)
      .map(value => new ProducerRecord[String, String](topic, value))

    What are the advantages of the alpakka Producer over the plain, vanilla Producer? I don't know whether the new approach can help me handle a large number of API requests in order at the same time.

    Copeland Nicolas
    Thanks for sharing such great information, the post you published have some great information which is quite beneficial for me. I highly appreciated with your work abilities. https://www.mysainsburys.net/
    Ladinu Chandrasinghe
    Hello all, what is the proper way to implement batched at-most-once processing using alpakka-kafka?
    I have the following sub-optimal solution, but is there a better way that doesn't involve materializing an inner stream?
        def atMostOnceSource[K, V]: Source[ConsumerRecord[K, V], NotUsed] = {
             .committableSource[K, V](consumerSettings, Subscriptions.topics(allTopics))
             .groupedWithin(maxBatchSize, maxBatchDuration)
             .mapAsync(1) { messages: Seq[CommittableMessage[K, V]] =>
               val committableOffsetBatch =
                 .map(_ => messages)