Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info

    Hello all,

    we use alpakka kafka in connection with redpanda.
    When we run an integration test with an external dockerized redpanda to test a consumer, we get the following error:


    o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
            at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      | => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
        at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:215)
        at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:434)


    16:14:35.012 [JobsQueueSpecSystem-akka.kafka.default-dispatcher-16] WARN  o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-datapool-1
            at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      | => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
        at akka.kafka.ConsumerSettings$.createKafkaConsumer(ConsumerSettings.scala:237)
        at akka.kafka.ConsumerSettings$.$anonfun$apply$3(ConsumerSettings.scala:111)
        at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$applySettings(KafkaConsumerActor.scala:461)
        at akka.kafka.internal.KafkaConsumerActor.preStart(KafkaConsumerActor.scala:438)
        at akka.actor.Actor.aroundPreStart(Actor.scala:543)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:543)
        at akka.kafka.internal.KafkaConsumerActor.aroundPreStart(KafkaConsumerActor.scala:212)
        at akka.actor.ActorCell.create(ActorCell.scala:637)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:509)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:531)

    This is the consumer configuration:

    ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "earliest"

    Redpanda v21.6.1
    Akka 2.6.0
    Alpakka Kafka 2.0.6
    Specs2 4.6.0

    Do you have any idea why the consumer and producer are not closed/shutdown after running the integration test ? Thanks!


    Hello everyone.
    So, something I still don't get when using streams is whether this makes sense:

    .map(_.sequence) // doesn't compile just an example

    So the type of the aFunctionThatCallsAnExternalService function is:
    aFunctionThatCallsAnExternalService: Future[Either[Failure, List[ImportantThing]]]
    And say this function returns a huge amount of ImportantThing
    What should I do there, should I put this list back into a stream again? Does it makes sense to use an
    stream and then having this huge list in which I run this (_.sequence) call?

    9 replies

    Hello everyone.
    I am quite new with alpakka-kafka and I have a question
    I need to skip one message from the kafka topic but before in I need to know the current offset
    For skipping I do :

            _ => Future.successful(Map(new TopicPartition(config.topic, config.partition) -> config.offset)))
          .flatMapMerge(1, _._2)
          .map { message =>

    How can I know the current offset before skipping?

    @olgakabirova Did you figure it out? Just switch the order of the take and the map? Also, you seem to only keep the offset in your map, consider passing downstream more than that.
    @olgakabirova You could also consider using a map followed by a filter, where due to some predicate of your choice the filter would happen to only prevent the first message from passing downstream.
    @olgakabirova Yet another alternative could be https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/drop.html depending on what you want to do.
    what are the open source tool is available to monitor/go through kafka messages ?
    Ignasi Marimon-Clos
    🚀 We're pleased to announce the PATCH release of Alpakka Kafka: https://github.com/akka/alpakka-kafka/releases/tag/v2.1.1
    what are the open source tool is available to monitor/go through kafka messages ?
    Lenses.io I think
    But indeed is not free. You can also montior kafka with Grafana, and prometheus.
    And these two have a community edition.
    what are the open source tools available to monitor/go through kafka messages ?

    Hi-I've got an already existing application that is using Alpakka Kafka and is reading from a kafka cluster using a committable source(with offsets committed back to kafka). Now, I'm being asked to read from a second cluster.

    I don't see how it's handled in the Alpakka Kafka 2.1.0 source code, but I don't see anything in the docs against mixing sources from different brokers.

    Is it possible to merge committable sources from different clusters?


    I think I answered my own question: The source cluster seems to be retained as part of the Committer and a KafkaAsyncConsumerCommitterRef.

    sorry for the chatter

    Hi, I am new to the channel. We are using Alpakka Kafka in production and monitor this with Cinnamon en Elastic APM integration. It gives us E2E traceability which is very useful for financial messages ( payments ) handling. However sometimes the the transaction duration has a spike > 60 sec. This is not what really happens as we see the transaction handled in +/- 1 sec. Apparently somewhere the reporting is incorrect. Anybody aware of a kind of 60 sec timeout on reporting metrics ? As the abnormal spikes are each time above 60 sec.
    Alessandro D'Armiento

    Hello, I am using Alpakka-HDFS to save a stream of data in an Orc file on HDFS.
    To do so I first wait for there to be enough messages upstream, then I batch all of them, serialize them in a ORC byte array and then flush the single blob as a file on HDFS.
    This works.

    Now, we decided to drop HDFS in favor of Alluxio, which long story short exposes an Hadoop FileSystem interface but is backed by an object-store.
    After this update, I don't want to use anymore the built-in rename mechanism which (as it makes sense with HDFS) write in a temp directory and then renames the file to have it in the output directory.
    Is it possible?

    Levi Ramsey
    Perhaps it makes sense to use Alpakka-S3 to deal with Alluxio? That should be less likely to assume filesystem semantics for an object store.
    Felipe Oliveira
    hi everyone, I need to consume messages that are older than a few seconds. my current idea is to do offsetsForTimes() + seek(). with alpakka, would i have manage my own offsets? would you suggest any other idea? thank you very much!
    Chetan Kumar
    Hi, I want to understand what happens when a subStream fails in CommittablePartitionedSource, will a new substream automatically start with the failed topic partition? Or will it trigger a rebalance?

    Hi guys, I need some advice on how akka/kafka play with websockets.
    There are akka-http server, kafka topic and websocket clients.
    WS clients from UI connect to the server in order to get the messages from the same topic. They all should get the same messages.
    I implemented this using broadcast hub:

    def kafkaBroadcastHubSource(topic: String): Source[String, NotUsed] = {
        val consumerSettings =
          ConsumerSettings(actorSystem, new IntegerDeserializer, new StringDeserializer)
            .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
            .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
        val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))

    Then used this source in a route:

     lazy val source = kafkaBroadcastHubSource(topic)
      val routes: Route = get {
        path("ws") {
          extractWebSocketUpgrade { upgrade =>
            val msgSource = source
              .map(msg => TextMessage(msg))
            complete {
              upgrade.handleMessagesWithSinkSource(Sink.ignore, msgSource)

    As far as I know broabcast hub is aligned by the slowest consumer. Do I understand correctly that if the hub buffer is full because the messages keep arriving and the consumption is slow, then backpressure will be triggered and no data can be processed anymore?
    Will the direct kafka consumer (which is a producer to the boradcast hub) be affected as well and no data will be fetched from the topic?
    What else should I add to make this architecture production ready?
    Thanks in advance!

    6 replies
    Chetan Kumar
    Is there a better way to handle errors inConsumer. committableSource sources, other than restarting the whole consumer.
    How to pass custom context to supervision strategy ? Need the context/payload for logging in supervision strategy.
    Jacob Bogers

    Hi, I am new to akka streams and trying to understand how akka stream .buffer() operator works

    There is a source with buffer:

    val src = Source(1 to 259)
            .buffer(size = 256, OverflowStrategy.dropHead)

    If I connect it to a regular sink:

    val sink = Sink.foreach[Int] { x =>
            println("Element: " + x)

    then seems like the elements don't get into the buffer, so all elements are printed:

    Element: 1
    Element: 2
    Element: 3
    Element: 4

    If I put this source into websocket directive and use it in the test:

    val route = extractWebSocketUpgrade { _ =>
            val msgSource = source
              .buffer(256, OverflowStrategy.dropHead)
              .map { x =>
                println("Element: " + x)
            handleWebSocketMessagesForProtocol(Flow.fromSinkAndSourceCoupled(Sink.ignore, msgSource), jwt)
     val wsClient = WSProbe()
      WS("ws", wsClient.flow) ~> route ~> check {
        isWebSocketUpgrade shouldBe true

    then each element from the source firstly gets into the buffer and then when buffer is full (contains all 256 elements) ,last 3 elements go to the buffer and the 3 oldest are removed (by dropHead strategy):

    Element: 3
    Element: 4
    Element: 5
    Element: 6

    So, the question is - what is the expected behaviour in the first and the second cases?
    Should the elements be enqueued into the buffer in the first case?
    I initially thought that buffer comes into play only when a backpressure is triggered.
    Then, why is it triggered in the second case?

    Levi Ramsey

    Remember that backpressure is basically lack of demand downstream (demand flows from the sink up to the source, generally). A buffer is always signalling demand (with the sole exception of when it's full and the OverflowStrategy is backpressure) to the upstream. Sink.foreach signals demand after processing every element, so it can only backpressure if the processing takes a long time. For the websocket case, demand is ultimately driven by the computer on the other end of the websocket, so it will backpressure more.

    Operator fusion in Akka Streams may also interact here (I forget whether buffer introduces an implied async boundary). But you should definitely be able (assuming you have a dispatcher with multiple threads) to observe the backpressure and dropping behavior with:

    Source(1 to 259).buffer(size = 256, OverflowStrategy.dropHead).async.runWith(Sink.foreach[Int] { x => Thread.sleep(1000); println("Element: " + x) })

    You might also observe the dropping behavior without having the .async there (I haven't experimented).

    Siyavash Habashi
    Hi there, is it planned to release a new akka-stream-kafka version soonish with support for akka 2.6.16? (https://github.com/akka/akka/releases/tag/v2.6.16)
    5 replies
    @leviramsey Thank you, that makes sense. I tried to run your example with .buffer().async and dropping was being observed indeed. It didn't work without async though. I guess it is because all stream pipeline is executed in a single thread
    Dominik Kunz
    Hi, I have a question about the DiscoverySupport using akka-dns method. I did not find a way to specify a port name for the lookup. So, is there a way to set a port name for the lookup. Because using only the service name, only an IP lookup is done which of course deos not include the port of the service.
    Hi I have a quick question concerning alpakka support for the underlying Idempotent Producer of kafka.
    I can see that Transaction is handled at alpakka level, but i do not see if i can use the idempotent producer with alpakka i.e. idempotence.enable=true ? Does it require either to be transactional or not and no in between with the idempotent producer enabled but at the kafka level ?
    5 replies
    Sitting in the sun with my Laptop, looking at https://doc.akka.io/docs/alpakka-kafka/current/home.html#matching-kafka-versions and https://doc.akka.io/docs/alpakka-kafka/current/home.html#dependencies I have the following question. So far I kept alpakka Kafka 2.0.5 as I was unsure if I could just upgrade, or if I should upgrade Kafka Brokers first, because the headline reads "Matching Kafka Versions". Therefore now I want to ask: Can I safely use alpakka Kafka 2.1.1 with Kafka 2.4.1 ? Are there any known problems or disadvantages with this combination?
    1 reply

    I've got a system that's bumping into akka/alpakka-kafka#1386, and I've been thinking if there was a way I could contribute to a solution.

    Looking at the source code, it seems like this could be addressed if the internal KafkaConsumerActor could be signaled to reset the offset for a particular TopicPartition to the last committed offset as part of the prestart of SubSourceStageLogic.

    Does this seem like a good approach? Is there a better approach?

    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..

    Hello I have situation where I lost some parts of messages after node restarts
    last offset handling message in log 117246
    after consumer restarted 117293
    and there are no messages in range beetwen in logs

    Here is my code
    Please give me the hint where I am wrong
    I need at least once guarantee

    def start(...) = 
     for {
          streamCompletionRef                          = new AtomicReference[Option[Future[Done]]](None)
          assignedPartitionsRef                          = new AtomicReference(Set[TopicPartition]())
          consumer = makePartitionedConsumer(config, settings, controlRef, assignedPartitionsRef, handleMessage)
          _ <- Resource.make(startConsumerF(config, consumer, streamCompletionRef))(
                _ => shutdownConsumerF(name, controlRef, streamCompletionRef)
        } yield ()
      private def startConsumerF(
          config: CommittableConsumerConfig,
          consumer: Source[Object, Unit],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit mat: Materializer): F[Unit] =
        F.delay {
          import config._
            .onFailuresWithBackoff(minBackoff = backOffSettings.map(_.minBackoff).getOrElse(3).seconds,
                                   maxBackoff = backOffSettings.map(_.maxBackoff).getOrElse(30).seconds,
                                   randomFactor = backOffSettings.map(_.randomFactor).getOrElse(0.2))(() => consumer)
            .toMat(Sink.ignore) { case (_, streamCompletion) => streamCompletionRef.set(Some(streamCompletion)) }
      private def makePartitionedConsumer[K, V](
          config: CommittableConsumerConfig,
          consumerSettings: ConsumerSettings[K, V],
          controlRef: AtomicReference[Option[Consumer.Control]],
          assignedPartitionsRef: AtomicReference[Set[TopicPartition]],
          handleMessage: CommittableMessage[K, V] => F[Unit]
      )(implicit mat: Materializer, scheduler: Scheduler, logging: Logging[F]): Source[Done, Unit] = {
        import config._
        def handlePartitionSource(source: Source[CommittableMessage[K, V], NotUsed]): Future[Done] =
            .mapAsync(maxPartitionParallelism.getOrElse(1)) { msg =>
              (logCommittableMessage(msg) >> handleMessage(msg))
        val subscriptions = Subscriptions
          .topics(topics.toSortedSet.toList: _*)
          .committablePartitionedSource(consumerSettings, subscriptions)
          .mapMaterializedValue(control => controlRef.set(Some(control)))
          .mapAsyncUnordered[Done](maxParallelism) { case (_, source) => handlePartitionSource(source) }
      private def shutdownConsumerF(
          name: String,
          controlRef: AtomicReference[Option[Consumer.Control]],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit logger: Logging[F], exec: ExecutionContext): F[Unit] =
        for {
          _                     <- logger.info(s"Stopping Kafka consumer $name")
          maybeControl          <- F.delay(controlRef.get())
          maybeStreamCompletion <- F.delay(streamCompletionRef.get())
          _ <- maybeControl
                .map2(maybeStreamCompletion) { (control, streamCompletion) =>
                  F.deferFuture(DrainingControl(control, streamCompletion).drainAndShutdown()).void
                .flatMap(_ => logger.error(s"Successfully stopped Kafka consumer $name"))
                .onError[Unit, Throwable](e => logger.errorCause(s"Failed to stop Kafka consumer $name", e))
        } yield ()
    3 replies
    assertAllStagesStopped on the kafka base test is continuing to error but it gets the materializer from the actor system. assertAllStagesStopped is called @AfterEach but the docs show the actor system being static and Lifecycle being per Class. Something doesn't make sense
    3 replies
    The kafka producer sends data to the consumer, and the data must store a copy of the data on the broker's disk as it passes through all brokers? Can it be forwarded directly to the consumer over the network without storing to disk?
    6 replies
    Niklas Uhrberg

    This is about Kafka messages being processed twice when rebalancing happens because an application instance is shut down.
    I have a simple application, two instances A0 and A1 consuming messages from one Kafka topic with P partitions. The application uses val control = Consumer .committablePartitionedSource to obtain the sources when partition assignment happens. The shutdown sequence appears to work as intended where all streams report to be completed without exceptions. The following code is executed when I shut an instance down: CoordinatedShutdown .get(system) .addTask( CoordinatedShutdown.PhaseActorSystemTerminate, "Drain streams and close databases" )(() => { //PhaseBeforeActorSystemTerminate logger.info("Enter coordinated shutdown") control .drainAndShutdown() .andThen(_ => { logger.info("Executing custom shutdown task to close databases.") db.close() dbRead.close() () }) })

    When the application instances process messages under the load of e.g. 20-100 messages per second with a partitions count of 2 or 20 (the the examples I have tried) I can consistently reproduce the following scenario:

    1. Both application instances run
    2. One instance , A1, is shut down using kill pid
    3. In A0, one or a few messages are processed that have already been processed. I have the impression that it is never more than one message per partition. When running 20 partitions the highest number of reprocessed messages observed is 3 (with 100 messages per second published to the topic)
    4. One part of the message processing is to create a persistent actor and when the messages are reprocessed (in 3) the state of this persistent actor that was obtained in the first processing of the command is loaded from persistent storage. This is quite important because what I really wanted to assure is that there is no "split brains" situation where the same events get persisted twice.

    What puzzles me is that I think that the comitting of the Kafka message should really prevent the message from being processed twice.
    This is my Committer settings ```committer {
    max-batch = 1
    max-interval = 2.seconds
    parallelism = 1
    delivery = "WaitForAck"
    when = "OffsetFirstObserved"

    The flow ends with a Committer.sink.

    I am not an expert in the Committer settings, this is where I think I should look next.

    I do realize there is more to specify to describe in detail everything, but this may perhaps suffice to sketch the background to my question:

    Can I hope to eliminate the race condition making one command sometimes be processed twice when a rebalancing happens?

    If not, I would like to understand why, since I think a graceful shutdown should give this guarantee.

    Levi Ramsey
    It's worth noting that Kafka's commit protocol is fundamentally at-least-once and cannot guarantee to never process a message twice
    Levi Ramsey
    Have you considered using cluster sharding (and the Akka Cluster split-brain resolver) to ensure that only one instance of the persistent actor exists?
    Niklas Uhrberg
    Yes, I'm totally aware of the cluster sharding but I wanted to explore the possibility to keep the application as simple as possible (in total) in the beginning. But note that at least I don't get the problem that there are multiple instances of the same persistent actor coexisting simultaneously in the current solution. Also, since the same Kafka message gets processed twice, I will have exactly the same problem in cluster sharding having to dedupe the Kafka messages.
    Niklas Uhrberg
    @leviramsey Since you wrote "It's worth noting that Kafka's commit protocol is fundamentally at-least-once and cannot guarantee to never process a message twice" I'd like to know more in detail the context where you state this is true. Do you really mean that if the consumer commits the message offset (I use Kafka managed offset) back to Kafka there is still the possibility that it can be consumed again? I perfectly realize that the commit operation can fail, but this is most probably not what I'm seeing. In that case the commit should fail each time I run the experiment.
    Levi Ramsey
    I've seen situations where the rebalance happens and the rebalanced-to consumer picks up old offsets. I don't particularly care about that because I default to at-least-once, so I don't investigate. I suspect it's because offset commit might not qualify as a poll for max.poll.interval.ms purposes, but idk. There are also broker failure modes that can lead to Kafka losing committed offsets (just like Kafka in general makes fairly weak guarantees about message durability).
    Michael Goytia
    Question: When moving to MSK from an on prem version of kafka, we are getting a the following errors failed authentication due to: [a3c21a3d-fcdb-47c1-8754-e46c0e14bd12]: Too many connects. We are using a plainSink and calling this method
    class KafkaProducerHelper @Inject()(appConfig: AppConfig, implicit val actorSystem: ActorSystem) {
      val pLogger: Logger = LoggerFactory.getLogger("application")
      val producerSettings: ProducerSettings[Array[Byte], Array[Byte]] = ProducerSettings(appConfig.KAFKA_PRODUCER_CONF, new ByteArraySerializer, new ByteArraySerializer)
      val plainSinkConnection: Sink[ProducerRecord[Array[Byte], Array[Byte]], Future[Done]] = Producer.plainSink(producerSettings)
      def sendAlertsMessages(messages: List[ProducerRecord[Array[Byte], Array[Byte]]]): Future[Option[Done]] = {
        pLogger.info("Sending alert messages")
        Source(messages).runWith(plainSinkConnection) map {
          case Done => Some(Done)
          case _ => None
    We call sendAlertMessages in a recursive fashion as to split up the List of ProducerRecords. When we pass a full list into the method, it significantly increases the time it takes to produce messages however the error messages go away. I was wondering if anyone else encountered this error with MSK and how they approached it
    @HurricaneGoytia_twitter Tried lowering the paralellism down as well and got a little better but still seeing errors
    Michael Goytia
    Adi Gerber
    we're working on creating a data processing application and we use Alpakka Kafka to do our stream processing.
    I've written a test case that uses Kafka's MockConsumer and MockProducer and injects those to the ConsumerSettings and ProducerSettings (via withConsumerFactory & withProducer) with the goal of testing the code that creates the Kafka flow.
    the test succeeds most of the time but we're seeing occasional failures in our build system (Jenkins on GCE machines with few resources).
    I can reliably reproduce the failure on my own laptop by lowering the CPU frequency to the lowest possible (400Mhz) and running the test, however the test case succeeds when running on normal CPU frequency.
    what could be the issue?
    I've written a minimal test case here: https://github.com/adigerber/akka-kafka-mock-bug
    Adi Gerber
    I found out why it happens.
    basically on a slow/overloaded machine and with the default configuration the poll task timer gets 2 hits, and the second poll which emits data does so before the sub source for the newly-assigned partition gets created. this can be detected by checking the contents of mockConsumer.paused, and the solution is to reschedule the same poll task until the consumer is not paused on the topic-partition.
    Joe Zulli

    Hi all! I have a newbie question about working with Sinks in conjunction with ProducerMessage.multi. Essentially, I have a source that is giving me multi-line strings, and I want to send each line as a separate ProducerRecord into Kafka. Something like this:

        val done: Future[Done] =
          Source(1 to 100)
            .map(num => (0 to num).toList )
            .map(list => ProducerMessage.multi[Nothing, String](list.map(new ProducerRecord("topic", _)).toSeq))

    The error I get is:

    type mismatch;
     found   : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
     required: akka.stream.Graph[akka.stream.SinkShape[akka.kafka.ProducerMessage.Envelope[Nothing,String,akka.NotUsed]],scala.concurrent.Future[akka.Done]]

    Not sure how to make all of the types happy. If anyone can point me in the right direction, it would be much appreciated!

    3 replies
    Hi, when using Producer-Multi-Messages, is it normal that the commit offsets are incremented by the number of elements within the multi message? I'm sending two things to two separate topics with the help of a multi message, and the commit offset increases by 2 on every topic.
    1 reply