Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Lior Shapsa
    @liorshapsa_twitter
    Hi, Is there a way to read each partition from offset 0 to X and make sure the stream ends once all partitions reach the last offset?
    1 reply
    Sean Glover
    @seglo
    hi. i'm planning to release 2.1.0-RC1 by next Monday. hopefully a final release a week or so after that. thanks for everyone's patience.
    Maksym Besida
    @mbesida
    is this an expected behavior for Consumer to reconnect forever if, for example, brokers were misconfigured?
    2 replies
    Nikhil Arora
    @nikhilaroratgo_gitlab
    I have a use case where I want to commit the offset when I get the success reply from server. The reply from server is async and I don't know the order. How can I handle this ? This blog https://quarkus.io/blog/kafka-commit-strategies/#the-throttled-strategy talks about my use case. I am wondering if there is anything in alpakka kafka for this.
    3 replies
    Sean Glover
    @seglo
    Alpakka Kafka 2.1.0-RC1 has been released to sonatype https://discuss.lightbend.com/t/alpakka-kafka-2-1-0-rc1-released/8144
    snwlnx
    @snwlnx
    Hello.
    Why api of partitioned sources accept only AutoSubscription?
    I need manual partition assignment and I wanted to use partitioned source
    to consume and commit partitions independently.
    Could anyone suggest good solution for this?
    3 replies
    snwlnx
    @snwlnx
    I realized that could write separate consumer for each partition
    Tejas Somani
    @tejassomani

    Hello,
    I am trying to use Consumer.committableSource with DrainingControl & RestartSource with backoff for my use case based on this https://github.com/akka/alpakka-kafka/blob/master/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala#L501

    val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
        val result = RestartSource
          .onFailuresWithBackoff(RestartSettings(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2)) {
            () =>
                .committableSource(consumerSettings, Subscriptions.topics(topicName))
                .mapMaterializedValue(c => control.set(c))
                .via(processMessage)
                .map(_.committableOffset)
                .via(Committer.flow(committerSettings))
          }
          .runWith(Sink.ignore)
    
        val drainingControl = DrainingControl.apply(control.get(), result)
        drainingControl.drainAndShutdown()

    drainingControl.drainAndShutdown() keeps on running into RTE The correct Consumer.Control has not been assigned, yet.
    Need some help figuring this out. am i missing something?

    2 replies
    Matthew Smedberg
    @msmedberg:matrix.org
    [m]
    How would I go about asking my consumer which partitions it is currently assigned (for purposes of an application healthcheck)? I've looked at KafkaConsumerActor and MetadataClient, but it looks like the response to a Metadata.GetPartitionsFor(topic) message/method call is all partitions of the topic, not just the ones that my node is assigned. (Also, it looks like the only consumer strategies that support KafkaConsumerActor require a ManualSubscription, where my use-case requires something like Consumer.committablePartitionedSource.)
    2 replies
    snwlnx
    @snwlnx
    Hello
    I know about feature with async boundaries akka/alpakka-kafka#1038 and as I understand this is source why in my app messages duplicated and handled in parallel when rebalance started and partition moved to another consumer (also I use manual offset committing which also may be a problem but in eager rebalance we have stop processing phase in which no consumers can process events). I know there are no good solution to guarantee that consumers don't start processing in parallel but what about that solution? see thread
    My goal is at least once semantics but exclude simultaneous parallel handling on different consumers
    7 replies
    Sean Glover
    @seglo
    Alpakka Kafka 2.1.0 final will be released by the end of the week
    Enno Runne
    @ennru
    The 2.1 release is out. Akka 2.6 is now required. https://doc.akka.io/docs/alpakka-kafka/2.1/
    Yuliban
    @yupegom_gitlab

    Hello everyone. A little bit of context: I'm consuming an event from an SQS queue. After some further processing, the output of that stream is going to be a file uploaded to an S3 bucket. I'm facing the issue of removing the message from the queue once the file is uploaded.
    This is the sink uploading the file:

    val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
        S3.multipartUpload("test", "test.json", ContentTypes.`application/json`)

    This is the flow deleting the message:

    val deleteFlow: Flow[Message, Unit, NotUsed] = {
        Flow[Message]
          .map(MessageAction.Delete(_))
          .via(SqsAckFlow(queueUrl))
          .map(_ => ())
      }

    Currently I'm able to save the file in the bucket like this:

    _.asSourceWithContext({ m => m })
          .via(sqsMessageDecoder.flowWithContext[Message])
          .collect({ case Right(account) => account })
          // some more processing
          .asSource
          .map(_._1)
          .to(s3Sink) // this is the sink previously show 
          .run()

    And I could also delete the message:

     _.asSourceWithContext({ m => m })
          .via(decodeMessage)
          .collect({ case Right(account) => account })
          // some more processing
          .asSource
          .map(_._2)
          .via(deleteMessageFlow.flow) // this is the flow previously shown

    I cannot figure out how to be able to connect both, ideally, the removal of the message should be done after the upload.

    Yuliban
    @yupegom_gitlab

    Hello all!
    So, I'm trying to read a message from an sqs queue:

    val source: Source[Message, NotUsed] = {
        SqsSource("url",
          SqsSourceSettings()
            .withCloseOnEmptyReceive(true)
        )
      }

    And then save it into an S3 bucket:

    source
    .
    ./// some processing
    .to(s3Sink)
    .run()

    This is the s3 sink:

     val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
        S3.multipartUpload("bucket", "file.txt", ContentTypes.`text/xml(UTF-8)`)

    If I keep .withCloseOnEmptyReceive(true) in the source config it works. However, I want the source keep listening to the queue always. If I remove it, the future never complete and the file is never uploaded. Any tip on what approach could I use? Is it possible to restart the source?

    BTW, I tried using an actor, once the stream finishes I send a message to the actor to start the stream again. That comes with unexpected behavior.
    Any feedback is appreciated. Thanks!

    Yuliban
    @yupegom_gitlab
    Do you know what might be the issues one could face when running a graph inside another graph? Something like:
    Source
              .single(value)
              .map{ 
                v => 
                Source
                 .single(byteStringRepr)
                 .to(s3Sink)
                 .run()
              }
              .to(s3Sink)
              .run()
    2 replies
    jmw847
    @jmw847

    Hello all,

    we use alpakka kafka in connection with redpanda.
    When we run an integration test with an external dockerized redpanda to test a consumer, we get the following error:

    Producer

    o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
            at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      | => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
        at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:215)
        at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:434)
        ...

    Consumer

    16:14:35.012 [JobsQueueSpecSystem-akka.kafka.default-dispatcher-16] WARN  o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-datapool-1
            at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      | => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
        at akka.kafka.ConsumerSettings$.createKafkaConsumer(ConsumerSettings.scala:237)
        at akka.kafka.ConsumerSettings$.$anonfun$apply$3(ConsumerSettings.scala:111)
        at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$applySettings(KafkaConsumerActor.scala:461)
        at akka.kafka.internal.KafkaConsumerActor.preStart(KafkaConsumerActor.scala:438)
        at akka.actor.Actor.aroundPreStart(Actor.scala:543)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:543)
        at akka.kafka.internal.KafkaConsumerActor.aroundPreStart(KafkaConsumerActor.scala:212)
        at akka.actor.ActorCell.create(ActorCell.scala:637)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:509)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:531)

    This is the consumer configuration:

    ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(s"$brokerHost:$brokerPort")
    .withGroupId(brokerGroup)
    .withProperties(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "earliest"
    )

    Dependencies:
    Redpanda v21.6.1
    Akka 2.6.0
    Alpakka Kafka 2.0.6
    Specs2 4.6.0

    Do you have any idea why the consumer and producer are not closed/shutdown after running the integration test ? Thanks!

    Yuliban
    @yupegom_gitlab

    Hello everyone.
    So, something I still don't get when using streams is whether this makes sense:

    someSource
    .mapAsync(1)(aFunctionThatCallsAnExternalService())
    .map(_.sequence) // doesn't compile just an example
    .
    .
    .

    So the type of the aFunctionThatCallsAnExternalService function is:
    aFunctionThatCallsAnExternalService: Future[Either[Failure, List[ImportantThing]]]
    And say this function returns a huge amount of ImportantThing
    What should I do there, should I put this list back into a stream again? Does it makes sense to use an
    stream and then having this huge list in which I run this (_.sequence) call?

    9 replies
    olgakabirova
    @olgakabirova

    Hello everyone.
    I am quite new with alpakka-kafka and I have a question
    I need to skip one message from the kafka topic but before in I need to know the current offset
    For skipping I do :

        Consumer
          .committablePartitionedManualOffsetSource(
            defConsumerSettings,
            Subscriptions.topics(config.topic),
            _ => Future.successful(Map(new TopicPartition(config.topic, config.partition) -> config.offset)))
          .flatMapMerge(1, _._2)
          .take(1)
          .map { message =>
            message.offset
          }
          .toMat(Committer.sink(CommitterSettings(actorSystem)))(Keep.right)
          .run()

    How can I know the current offset before skipping?

    Sven Ludwig
    @iosven
    @olgakabirova Did you figure it out? Just switch the order of the take and the map? Also, you seem to only keep the offset in your map, consider passing downstream more than that.
    @olgakabirova You could also consider using a map followed by a filter, where due to some predicate of your choice the filter would happen to only prevent the first message from passing downstream.
    @olgakabirova Yet another alternative could be https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/drop.html depending on what you want to do.
    brightinnovator
    @brightinnovator
    what are the open source tool is available to monitor/go through kafka messages ?
    Ignasi Marimon-Clos
    @ignasi35
    🚀 We're pleased to announce the PATCH release of Alpakka Kafka: https://github.com/akka/alpakka-kafka/releases/tag/v2.1.1
    brightinnovator
    @brightinnovator
    what are the open source tool is available to monitor/go through kafka messages ?
    Milo
    @MiloVentimiglia
    Lenses.io I think
    But indeed is not free. You can also montior kafka with Grafana, and prometheus.
    And these two have a community edition.
    brightinnovator
    @brightinnovator
    what are the open source tools available to monitor/go through kafka messages ?
    mwkohout
    @mwkohout

    Hi-I've got an already existing application that is using Alpakka Kafka and is reading from a kafka cluster using a committable source(with offsets committed back to kafka). Now, I'm being asked to read from a second cluster.

    I don't see how it's handled in the Alpakka Kafka 2.1.0 source code, but I don't see anything in the docs against mixing sources from different brokers.

    Is it possible to merge committable sources from different clusters?

    mwkohout
    @mwkohout

    I think I answered my own question: The source cluster seems to be retained as part of the Committer and a KafkaAsyncConsumerCommitterRef.

    sorry for the chatter

    Lieven
    @vortex314
    Hi, I am new to the channel. We are using Alpakka Kafka in production and monitor this with Cinnamon en Elastic APM integration. It gives us E2E traceability which is very useful for financial messages ( payments ) handling. However sometimes the the transaction duration has a spike > 60 sec. This is not what really happens as we see the transaction handled in +/- 1 sec. Apparently somewhere the reporting is incorrect. Anybody aware of a kind of 60 sec timeout on reporting metrics ? As the abnormal spikes are each time above 60 sec.
    Alessandro D'Armiento
    @adarmiento

    Hello, I am using Alpakka-HDFS to save a stream of data in an Orc file on HDFS.
    To do so I first wait for there to be enough messages upstream, then I batch all of them, serialize them in a ORC byte array and then flush the single blob as a file on HDFS.
    This works.

    Now, we decided to drop HDFS in favor of Alluxio, which long story short exposes an Hadoop FileSystem interface but is backed by an object-store.
    After this update, I don't want to use anymore the built-in rename mechanism which (as it makes sense with HDFS) write in a temp directory and then renames the file to have it in the output directory.
    Is it possible?

    Levi Ramsey
    @leviramsey
    Perhaps it makes sense to use Alpakka-S3 to deal with Alluxio? That should be less likely to assume filesystem semantics for an object store.
    Felipe Oliveira
    @feliperazeek
    hi everyone, I need to consume messages that are older than a few seconds. my current idea is to do offsetsForTimes() + seek(). with alpakka, would i have manage my own offsets? would you suggest any other idea? thank you very much!
    Chetan Kumar
    @chetankm-cs
    Hi, I want to understand what happens when a subStream fails in CommittablePartitionedSource, will a new substream automatically start with the failed topic partition? Or will it trigger a rebalance?
    Oleg
    @OlegGipsy_twitter

    Hi guys, I need some advice on how akka/kafka play with websockets.
    There are akka-http server, kafka topic and websocket clients.
    WS clients from UI connect to the server in order to get the messages from the same topic. They all should get the same messages.
    I implemented this using broadcast hub:

    def kafkaBroadcastHubSource(topic: String): Source[String, NotUsed] = {
        val consumerSettings =
          ConsumerSettings(actorSystem, new IntegerDeserializer, new StringDeserializer)
            .withBootstrapServers(kafkaBootstrapServers)
            .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
            .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
            .withGroupId("group1")
    
        val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
          .map(_.value())
          .toMat(BroadcastHub.sink)(Keep.right)
          .run()
      }

    Then used this source in a route:

     lazy val source = kafkaBroadcastHubSource(topic)
    
      val routes: Route = get {
        path("ws") {
    
          extractWebSocketUpgrade { upgrade =>
            val msgSource = source
              .map(msg => TextMessage(msg))
            complete {
              upgrade.handleMessagesWithSinkSource(Sink.ignore, msgSource)
            }
          }
        }
      }

    As far as I know broabcast hub is aligned by the slowest consumer. Do I understand correctly that if the hub buffer is full because the messages keep arriving and the consumption is slow, then backpressure will be triggered and no data can be processed anymore?
    Will the direct kafka consumer (which is a producer to the boradcast hub) be affected as well and no data will be fetched from the topic?
    What else should I add to make this architecture production ready?
    Thanks in advance!

    6 replies
    Chetan Kumar
    @chetankm-cs
    Is there a better way to handle errors inConsumer. committableSource sources, other than restarting the whole consumer.
    varunvats9
    @varunvats9:matrix.org
    [m]
    How to pass custom context to supervision strategy ? Need the context/payload for logging in supervision strategy.
    Jacob Bogers
    @Jacob_Bogers_twitter
    Hi
    Oleg
    @OlegGipsy_twitter

    Hi, I am new to akka streams and trying to understand how akka stream .buffer() operator works

    There is a source with buffer:

    val src = Source(1 to 259)
            .buffer(size = 256, OverflowStrategy.dropHead)

    If I connect it to a regular sink:

    val sink = Sink.foreach[Int] { x =>
            println("Element: " + x)
          }

    then seems like the elements don't get into the buffer, so all elements are printed:

    Element: 1
    Element: 2
    Element: 3
    Element: 4
    ...

    If I put this source into websocket directive and use it in the test:

    val route = extractWebSocketUpgrade { _ =>
            val msgSource = source
              .buffer(256, OverflowStrategy.dropHead)
              .map { x =>
                println("Element: " + x)
                x
              }
            handleWebSocketMessagesForProtocol(Flow.fromSinkAndSourceCoupled(Sink.ignore, msgSource), jwt)
          }
    
     val wsClient = WSProbe()
      WS("ws", wsClient.flow) ~> route ~> check {
        isWebSocketUpgrade shouldBe true
        ...
    }

    then each element from the source firstly gets into the buffer and then when buffer is full (contains all 256 elements) ,last 3 elements go to the buffer and the 3 oldest are removed (by dropHead strategy):

    Element: 3
    Element: 4
    Element: 5
    Element: 6
    ....

    So, the question is - what is the expected behaviour in the first and the second cases?
    Should the elements be enqueued into the buffer in the first case?
    I initially thought that buffer comes into play only when a backpressure is triggered.
    Then, why is it triggered in the second case?
    Thanks!

    Levi Ramsey
    @leviramsey

    Remember that backpressure is basically lack of demand downstream (demand flows from the sink up to the source, generally). A buffer is always signalling demand (with the sole exception of when it's full and the OverflowStrategy is backpressure) to the upstream. Sink.foreach signals demand after processing every element, so it can only backpressure if the processing takes a long time. For the websocket case, demand is ultimately driven by the computer on the other end of the websocket, so it will backpressure more.

    Operator fusion in Akka Streams may also interact here (I forget whether buffer introduces an implied async boundary). But you should definitely be able (assuming you have a dispatcher with multiple threads) to observe the backpressure and dropping behavior with:

    Source(1 to 259).buffer(size = 256, OverflowStrategy.dropHead).async.runWith(Sink.foreach[Int] { x => Thread.sleep(1000); println("Element: " + x) })

    You might also observe the dropping behavior without having the .async there (I haven't experimented).

    Siyavash Habashi
    @janjaali
    Hi there, is it planned to release a new akka-stream-kafka version soonish with support for akka 2.6.16? (https://github.com/akka/akka/releases/tag/v2.6.16)
    5 replies
    Oleg
    @OlegGipsy_twitter
    @leviramsey Thank you, that makes sense. I tried to run your example with .buffer().async and dropping was being observed indeed. It didn't work without async though. I guess it is because all stream pipeline is executed in a single thread
    Dominik Kunz
    @domkun
    Hi, I have a question about the DiscoverySupport using akka-dns method. I did not find a way to specify a port name for the lookup. So, is there a way to set a port name for the lookup. Because using only the service name, only an IP lookup is done which of course deos not include the port of the service.
    Maatary
    @Maatary
    Hi I have a quick question concerning alpakka support for the underlying Idempotent Producer of kafka.
    I can see that Transaction is handled at alpakka level, but i do not see if i can use the idempotent producer with alpakka i.e. idempotence.enable=true ? Does it require either to be transactional or not and no in between with the idempotent producer enabled but at the kafka level ?
    5 replies
    Sven Ludwig
    @iosven
    Sitting in the sun with my Laptop, looking at https://doc.akka.io/docs/alpakka-kafka/current/home.html#matching-kafka-versions and https://doc.akka.io/docs/alpakka-kafka/current/home.html#dependencies I have the following question. So far I kept alpakka Kafka 2.0.5 as I was unsure if I could just upgrade, or if I should upgrade Kafka Brokers first, because the headline reads "Matching Kafka Versions". Therefore now I want to ask: Can I safely use alpakka Kafka 2.1.1 with Kafka 2.4.1 ? Are there any known problems or disadvantages with this combination?
    1 reply
    mwkohout
    @mwkohout

    I've got a system that's bumping into akka/alpakka-kafka#1386, and I've been thinking if there was a way I could contribute to a solution.

    Looking at the source code, it seems like this could be addressed if the internal KafkaConsumerActor could be signaled to reset the offset for a particular TopicPartition to the last committed offset as part of the prestart of SubSourceStageLogic.

    Does this seem like a good approach? Is there a better approach?

    brightinnovator
    @brightinnovator
    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
    snwlnx
    @snwlnx

    Hello I have situation where I lost some parts of messages after node restarts
    last offset handling message in log 117246
    after consumer restarted 117293
    and there are no messages in range beetwen in logs

    Here is my code
    Please give me the hint where I am wrong
    I need at least once guarantee

    def start(...) = 
     for {
          streamCompletionRef                          = new AtomicReference[Option[Future[Done]]](None)
          assignedPartitionsRef                          = new AtomicReference(Set[TopicPartition]())
          consumer = makePartitionedConsumer(config, settings, controlRef, assignedPartitionsRef, handleMessage)
          _ <- Resource.make(startConsumerF(config, consumer, streamCompletionRef))(
                _ => shutdownConsumerF(name, controlRef, streamCompletionRef)
              )
        } yield ()
    
    
      private def startConsumerF(
          config: CommittableConsumerConfig,
          consumer: Source[Object, Unit],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit mat: Materializer): F[Unit] =
        F.delay {
          import config._
          RestartSource
            .onFailuresWithBackoff(minBackoff = backOffSettings.map(_.minBackoff).getOrElse(3).seconds,
                                   maxBackoff = backOffSettings.map(_.maxBackoff).getOrElse(30).seconds,
                                   randomFactor = backOffSettings.map(_.randomFactor).getOrElse(0.2))(() => consumer)
            .toMat(Sink.ignore) { case (_, streamCompletion) => streamCompletionRef.set(Some(streamCompletion)) }
            .run()
        }
    
      private def makePartitionedConsumer[K, V](
          config: CommittableConsumerConfig,
          consumerSettings: ConsumerSettings[K, V],
          controlRef: AtomicReference[Option[Consumer.Control]],
          assignedPartitionsRef: AtomicReference[Set[TopicPartition]],
          handleMessage: CommittableMessage[K, V] => F[Unit]
      )(implicit mat: Materializer, scheduler: Scheduler, logging: Logging[F]): Source[Done, Unit] = {
        import config._
    
        def handlePartitionSource(source: Source[CommittableMessage[K, V], NotUsed]): Future[Done] =
          source
            .mapAsync(maxPartitionParallelism.getOrElse(1)) { msg =>
              (logCommittableMessage(msg) >> handleMessage(msg))
                .as(msg.committableOffset)
                .runWithEmptyContext
                .runToFuture
            }
            .runWith(Committer.sink(CommitterSettings(config.committerConfig)))
    
        val subscriptions = Subscriptions
          .topics(topics.toSortedSet.toList: _*)
          .withPartitionAssignmentHandler(makePartitionAssignmentHandler(assignedPartitionsRef))
    
        Consumer
          .committablePartitionedSource(consumerSettings, subscriptions)
          .mapMaterializedValue(control => controlRef.set(Some(control)))
          .mapAsyncUnordered[Done](maxParallelism) { case (_, source) => handlePartitionSource(source) }
      }
    
      private def shutdownConsumerF(
          name: String,
          controlRef: AtomicReference[Option[Consumer.Control]],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit logger: Logging[F], exec: ExecutionContext): F[Unit] =
        for {
          _                     <- logger.info(s"Stopping Kafka consumer $name")
          maybeControl          <- F.delay(controlRef.get())
          maybeStreamCompletion <- F.delay(streamCompletionRef.get())
          _ <- maybeControl
                .map2(maybeStreamCompletion) { (control, streamCompletion) =>
                  F.deferFuture(DrainingControl(control, streamCompletion).drainAndShutdown()).void
                }
                .getOrElse(F.unit)
                .flatMap(_ => logger.error(s"Successfully stopped Kafka consumer $name"))
                .onError[Unit, Throwable](e => logger.errorCause(s"Failed to stop Kafka consumer $name", e))
        } yield ()
    3 replies
    dr3s
    @dr3s
    assertAllStagesStopped on the kafka base test is continuing to error but it gets the materializer from the actor system. assertAllStagesStopped is called @AfterEach but the docs show the actor system being static and Lifecycle being per Class. Something doesn't make sense
    3 replies