Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    brightinnovator
    @brightinnovator
    what are the open source tool is available to monitor/go through kafka messages ?
    Milo
    @MiloVentimiglia
    Lenses.io I think
    But indeed is not free. You can also montior kafka with Grafana, and prometheus.
    And these two have a community edition.
    brightinnovator
    @brightinnovator
    what are the open source tools available to monitor/go through kafka messages ?
    mwkohout
    @mwkohout

    Hi-I've got an already existing application that is using Alpakka Kafka and is reading from a kafka cluster using a committable source(with offsets committed back to kafka). Now, I'm being asked to read from a second cluster.

    I don't see how it's handled in the Alpakka Kafka 2.1.0 source code, but I don't see anything in the docs against mixing sources from different brokers.

    Is it possible to merge committable sources from different clusters?

    mwkohout
    @mwkohout

    I think I answered my own question: The source cluster seems to be retained as part of the Committer and a KafkaAsyncConsumerCommitterRef.

    sorry for the chatter

    Lieven
    @vortex314
    Hi, I am new to the channel. We are using Alpakka Kafka in production and monitor this with Cinnamon en Elastic APM integration. It gives us E2E traceability which is very useful for financial messages ( payments ) handling. However sometimes the the transaction duration has a spike > 60 sec. This is not what really happens as we see the transaction handled in +/- 1 sec. Apparently somewhere the reporting is incorrect. Anybody aware of a kind of 60 sec timeout on reporting metrics ? As the abnormal spikes are each time above 60 sec.
    Alessandro D'Armiento
    @adarmiento

    Hello, I am using Alpakka-HDFS to save a stream of data in an Orc file on HDFS.
    To do so I first wait for there to be enough messages upstream, then I batch all of them, serialize them in a ORC byte array and then flush the single blob as a file on HDFS.
    This works.

    Now, we decided to drop HDFS in favor of Alluxio, which long story short exposes an Hadoop FileSystem interface but is backed by an object-store.
    After this update, I don't want to use anymore the built-in rename mechanism which (as it makes sense with HDFS) write in a temp directory and then renames the file to have it in the output directory.
    Is it possible?

    Levi Ramsey
    @leviramsey
    Perhaps it makes sense to use Alpakka-S3 to deal with Alluxio? That should be less likely to assume filesystem semantics for an object store.
    Felipe Oliveira
    @feliperazeek
    hi everyone, I need to consume messages that are older than a few seconds. my current idea is to do offsetsForTimes() + seek(). with alpakka, would i have manage my own offsets? would you suggest any other idea? thank you very much!
    Chetan Kumar
    @chetankm-cs
    Hi, I want to understand what happens when a subStream fails in CommittablePartitionedSource, will a new substream automatically start with the failed topic partition? Or will it trigger a rebalance?
    Oleg
    @OlegGipsy_twitter

    Hi guys, I need some advice on how akka/kafka play with websockets.
    There are akka-http server, kafka topic and websocket clients.
    WS clients from UI connect to the server in order to get the messages from the same topic. They all should get the same messages.
    I implemented this using broadcast hub:

    def kafkaBroadcastHubSource(topic: String): Source[String, NotUsed] = {
        val consumerSettings =
          ConsumerSettings(actorSystem, new IntegerDeserializer, new StringDeserializer)
            .withBootstrapServers(kafkaBootstrapServers)
            .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
            .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
            .withGroupId("group1")
    
        val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
          .map(_.value())
          .toMat(BroadcastHub.sink)(Keep.right)
          .run()
      }

    Then used this source in a route:

     lazy val source = kafkaBroadcastHubSource(topic)
    
      val routes: Route = get {
        path("ws") {
    
          extractWebSocketUpgrade { upgrade =>
            val msgSource = source
              .map(msg => TextMessage(msg))
            complete {
              upgrade.handleMessagesWithSinkSource(Sink.ignore, msgSource)
            }
          }
        }
      }

    As far as I know broabcast hub is aligned by the slowest consumer. Do I understand correctly that if the hub buffer is full because the messages keep arriving and the consumption is slow, then backpressure will be triggered and no data can be processed anymore?
    Will the direct kafka consumer (which is a producer to the boradcast hub) be affected as well and no data will be fetched from the topic?
    What else should I add to make this architecture production ready?
    Thanks in advance!

    6 replies
    Chetan Kumar
    @chetankm-cs
    Is there a better way to handle errors inConsumer. committableSource sources, other than restarting the whole consumer.
    varunvats9
    @varunvats9:matrix.org
    [m]
    How to pass custom context to supervision strategy ? Need the context/payload for logging in supervision strategy.
    Jacob Bogers
    @Jacob_Bogers_twitter
    Hi
    Oleg
    @OlegGipsy_twitter

    Hi, I am new to akka streams and trying to understand how akka stream .buffer() operator works

    There is a source with buffer:

    val src = Source(1 to 259)
            .buffer(size = 256, OverflowStrategy.dropHead)

    If I connect it to a regular sink:

    val sink = Sink.foreach[Int] { x =>
            println("Element: " + x)
          }

    then seems like the elements don't get into the buffer, so all elements are printed:

    Element: 1
    Element: 2
    Element: 3
    Element: 4
    ...

    If I put this source into websocket directive and use it in the test:

    val route = extractWebSocketUpgrade { _ =>
            val msgSource = source
              .buffer(256, OverflowStrategy.dropHead)
              .map { x =>
                println("Element: " + x)
                x
              }
            handleWebSocketMessagesForProtocol(Flow.fromSinkAndSourceCoupled(Sink.ignore, msgSource), jwt)
          }
    
     val wsClient = WSProbe()
      WS("ws", wsClient.flow) ~> route ~> check {
        isWebSocketUpgrade shouldBe true
        ...
    }

    then each element from the source firstly gets into the buffer and then when buffer is full (contains all 256 elements) ,last 3 elements go to the buffer and the 3 oldest are removed (by dropHead strategy):

    Element: 3
    Element: 4
    Element: 5
    Element: 6
    ....

    So, the question is - what is the expected behaviour in the first and the second cases?
    Should the elements be enqueued into the buffer in the first case?
    I initially thought that buffer comes into play only when a backpressure is triggered.
    Then, why is it triggered in the second case?
    Thanks!

    Levi Ramsey
    @leviramsey

    Remember that backpressure is basically lack of demand downstream (demand flows from the sink up to the source, generally). A buffer is always signalling demand (with the sole exception of when it's full and the OverflowStrategy is backpressure) to the upstream. Sink.foreach signals demand after processing every element, so it can only backpressure if the processing takes a long time. For the websocket case, demand is ultimately driven by the computer on the other end of the websocket, so it will backpressure more.

    Operator fusion in Akka Streams may also interact here (I forget whether buffer introduces an implied async boundary). But you should definitely be able (assuming you have a dispatcher with multiple threads) to observe the backpressure and dropping behavior with:

    Source(1 to 259).buffer(size = 256, OverflowStrategy.dropHead).async.runWith(Sink.foreach[Int] { x => Thread.sleep(1000); println("Element: " + x) })

    You might also observe the dropping behavior without having the .async there (I haven't experimented).

    Siyavash Habashi
    @janjaali
    Hi there, is it planned to release a new akka-stream-kafka version soonish with support for akka 2.6.16? (https://github.com/akka/akka/releases/tag/v2.6.16)
    5 replies
    Oleg
    @OlegGipsy_twitter
    @leviramsey Thank you, that makes sense. I tried to run your example with .buffer().async and dropping was being observed indeed. It didn't work without async though. I guess it is because all stream pipeline is executed in a single thread
    Dominik Kunz
    @domkun
    Hi, I have a question about the DiscoverySupport using akka-dns method. I did not find a way to specify a port name for the lookup. So, is there a way to set a port name for the lookup. Because using only the service name, only an IP lookup is done which of course deos not include the port of the service.
    Maatary
    @Maatary
    Hi I have a quick question concerning alpakka support for the underlying Idempotent Producer of kafka.
    I can see that Transaction is handled at alpakka level, but i do not see if i can use the idempotent producer with alpakka i.e. idempotence.enable=true ? Does it require either to be transactional or not and no in between with the idempotent producer enabled but at the kafka level ?
    5 replies
    Sven
    @iosven
    Sitting in the sun with my Laptop, looking at https://doc.akka.io/docs/alpakka-kafka/current/home.html#matching-kafka-versions and https://doc.akka.io/docs/alpakka-kafka/current/home.html#dependencies I have the following question. So far I kept alpakka Kafka 2.0.5 as I was unsure if I could just upgrade, or if I should upgrade Kafka Brokers first, because the headline reads "Matching Kafka Versions". Therefore now I want to ask: Can I safely use alpakka Kafka 2.1.1 with Kafka 2.4.1 ? Are there any known problems or disadvantages with this combination?
    1 reply
    mwkohout
    @mwkohout

    I've got a system that's bumping into akka/alpakka-kafka#1386, and I've been thinking if there was a way I could contribute to a solution.

    Looking at the source code, it seems like this could be addressed if the internal KafkaConsumerActor could be signaled to reset the offset for a particular TopicPartition to the last committed offset as part of the prestart of SubSourceStageLogic.

    Does this seem like a good approach? Is there a better approach?

    brightinnovator
    @brightinnovator
    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
    snwlnx
    @snwlnx

    Hello I have situation where I lost some parts of messages after node restarts
    last offset handling message in log 117246
    after consumer restarted 117293
    and there are no messages in range beetwen in logs

    Here is my code
    Please give me the hint where I am wrong
    I need at least once guarantee

    def start(...) = 
     for {
          streamCompletionRef                          = new AtomicReference[Option[Future[Done]]](None)
          assignedPartitionsRef                          = new AtomicReference(Set[TopicPartition]())
          consumer = makePartitionedConsumer(config, settings, controlRef, assignedPartitionsRef, handleMessage)
          _ <- Resource.make(startConsumerF(config, consumer, streamCompletionRef))(
                _ => shutdownConsumerF(name, controlRef, streamCompletionRef)
              )
        } yield ()
    
    
      private def startConsumerF(
          config: CommittableConsumerConfig,
          consumer: Source[Object, Unit],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit mat: Materializer): F[Unit] =
        F.delay {
          import config._
          RestartSource
            .onFailuresWithBackoff(minBackoff = backOffSettings.map(_.minBackoff).getOrElse(3).seconds,
                                   maxBackoff = backOffSettings.map(_.maxBackoff).getOrElse(30).seconds,
                                   randomFactor = backOffSettings.map(_.randomFactor).getOrElse(0.2))(() => consumer)
            .toMat(Sink.ignore) { case (_, streamCompletion) => streamCompletionRef.set(Some(streamCompletion)) }
            .run()
        }
    
      private def makePartitionedConsumer[K, V](
          config: CommittableConsumerConfig,
          consumerSettings: ConsumerSettings[K, V],
          controlRef: AtomicReference[Option[Consumer.Control]],
          assignedPartitionsRef: AtomicReference[Set[TopicPartition]],
          handleMessage: CommittableMessage[K, V] => F[Unit]
      )(implicit mat: Materializer, scheduler: Scheduler, logging: Logging[F]): Source[Done, Unit] = {
        import config._
    
        def handlePartitionSource(source: Source[CommittableMessage[K, V], NotUsed]): Future[Done] =
          source
            .mapAsync(maxPartitionParallelism.getOrElse(1)) { msg =>
              (logCommittableMessage(msg) >> handleMessage(msg))
                .as(msg.committableOffset)
                .runWithEmptyContext
                .runToFuture
            }
            .runWith(Committer.sink(CommitterSettings(config.committerConfig)))
    
        val subscriptions = Subscriptions
          .topics(topics.toSortedSet.toList: _*)
          .withPartitionAssignmentHandler(makePartitionAssignmentHandler(assignedPartitionsRef))
    
        Consumer
          .committablePartitionedSource(consumerSettings, subscriptions)
          .mapMaterializedValue(control => controlRef.set(Some(control)))
          .mapAsyncUnordered[Done](maxParallelism) { case (_, source) => handlePartitionSource(source) }
      }
    
      private def shutdownConsumerF(
          name: String,
          controlRef: AtomicReference[Option[Consumer.Control]],
          streamCompletionRef: AtomicReference[Option[Future[Done]]]
      )(implicit logger: Logging[F], exec: ExecutionContext): F[Unit] =
        for {
          _                     <- logger.info(s"Stopping Kafka consumer $name")
          maybeControl          <- F.delay(controlRef.get())
          maybeStreamCompletion <- F.delay(streamCompletionRef.get())
          _ <- maybeControl
                .map2(maybeStreamCompletion) { (control, streamCompletion) =>
                  F.deferFuture(DrainingControl(control, streamCompletion).drainAndShutdown()).void
                }
                .getOrElse(F.unit)
                .flatMap(_ => logger.error(s"Successfully stopped Kafka consumer $name"))
                .onError[Unit, Throwable](e => logger.errorCause(s"Failed to stop Kafka consumer $name", e))
        } yield ()
    3 replies
    dr3s
    @dr3s
    assertAllStagesStopped on the kafka base test is continuing to error but it gets the materializer from the actor system. assertAllStagesStopped is called @AfterEach but the docs show the actor system being static and Lifecycle being per Class. Something doesn't make sense
    3 replies
    Cassiel-girl
    @Cassiel-girl
    The kafka producer sends data to the consumer, and the data must store a copy of the data on the broker's disk as it passes through all brokers? Can it be forwarded directly to the consumer over the network without storing to disk?
    6 replies
    Niklas Uhrberg
    @niklasuhrberg

    This is about Kafka messages being processed twice when rebalancing happens because an application instance is shut down.
    I have a simple application, two instances A0 and A1 consuming messages from one Kafka topic with P partitions. The application uses val control = Consumer .committablePartitionedSource to obtain the sources when partition assignment happens. The shutdown sequence appears to work as intended where all streams report to be completed without exceptions. The following code is executed when I shut an instance down: CoordinatedShutdown .get(system) .addTask( CoordinatedShutdown.PhaseActorSystemTerminate, "Drain streams and close databases" )(() => { //PhaseBeforeActorSystemTerminate logger.info("Enter coordinated shutdown") control .drainAndShutdown() .andThen(_ => { logger.info("Executing custom shutdown task to close databases.") db.close() dbRead.close() () }) })

    When the application instances process messages under the load of e.g. 20-100 messages per second with a partitions count of 2 or 20 (the the examples I have tried) I can consistently reproduce the following scenario:

    1. Both application instances run
    2. One instance , A1, is shut down using kill pid
    3. In A0, one or a few messages are processed that have already been processed. I have the impression that it is never more than one message per partition. When running 20 partitions the highest number of reprocessed messages observed is 3 (with 100 messages per second published to the topic)
    4. One part of the message processing is to create a persistent actor and when the messages are reprocessed (in 3) the state of this persistent actor that was obtained in the first processing of the command is loaded from persistent storage. This is quite important because what I really wanted to assure is that there is no "split brains" situation where the same events get persisted twice.

    What puzzles me is that I think that the comitting of the Kafka message should really prevent the message from being processed twice.
    This is my Committer settings ```committer {
    max-batch = 1
    max-interval = 2.seconds
    parallelism = 1
    delivery = "WaitForAck"
    when = "OffsetFirstObserved"
    }

    The flow ends with a Committer.sink.

    I am not an expert in the Committer settings, this is where I think I should look next.

    I do realize there is more to specify to describe in detail everything, but this may perhaps suffice to sketch the background to my question:

    Can I hope to eliminate the race condition making one command sometimes be processed twice when a rebalancing happens?

    If not, I would like to understand why, since I think a graceful shutdown should give this guarantee.

    Levi Ramsey
    @leviramsey
    It's worth noting that Kafka's commit protocol is fundamentally at-least-once and cannot guarantee to never process a message twice
    Levi Ramsey
    @leviramsey
    Have you considered using cluster sharding (and the Akka Cluster split-brain resolver) to ensure that only one instance of the persistent actor exists?
    Niklas Uhrberg
    @niklasuhrberg
    Yes, I'm totally aware of the cluster sharding but I wanted to explore the possibility to keep the application as simple as possible (in total) in the beginning. But note that at least I don't get the problem that there are multiple instances of the same persistent actor coexisting simultaneously in the current solution. Also, since the same Kafka message gets processed twice, I will have exactly the same problem in cluster sharding having to dedupe the Kafka messages.
    Niklas Uhrberg
    @niklasuhrberg
    @leviramsey Since you wrote "It's worth noting that Kafka's commit protocol is fundamentally at-least-once and cannot guarantee to never process a message twice" I'd like to know more in detail the context where you state this is true. Do you really mean that if the consumer commits the message offset (I use Kafka managed offset) back to Kafka there is still the possibility that it can be consumed again? I perfectly realize that the commit operation can fail, but this is most probably not what I'm seeing. In that case the commit should fail each time I run the experiment.
    Levi Ramsey
    @leviramsey
    I've seen situations where the rebalance happens and the rebalanced-to consumer picks up old offsets. I don't particularly care about that because I default to at-least-once, so I don't investigate. I suspect it's because offset commit might not qualify as a poll for max.poll.interval.ms purposes, but idk. There are also broker failure modes that can lead to Kafka losing committed offsets (just like Kafka in general makes fairly weak guarantees about message durability).
    Michael Goytia
    @HurricaneGoytia_twitter
    Question: When moving to MSK from an on prem version of kafka, we are getting a the following errors failed authentication due to: [a3c21a3d-fcdb-47c1-8754-e46c0e14bd12]: Too many connects. We are using a plainSink and calling this method
    @Singleton
    class KafkaProducerHelper @Inject()(appConfig: AppConfig, implicit val actorSystem: ActorSystem) {
    
      val pLogger: Logger = LoggerFactory.getLogger("application")
      val producerSettings: ProducerSettings[Array[Byte], Array[Byte]] = ProducerSettings(appConfig.KAFKA_PRODUCER_CONF, new ByteArraySerializer, new ByteArraySerializer)
      val plainSinkConnection: Sink[ProducerRecord[Array[Byte], Array[Byte]], Future[Done]] = Producer.plainSink(producerSettings)
    
      def sendAlertsMessages(messages: List[ProducerRecord[Array[Byte], Array[Byte]]]): Future[Option[Done]] = {
        pLogger.info("Sending alert messages")
        Source(messages).runWith(plainSinkConnection) map {
          case Done => Some(Done)
          case _ => None
        }
      }
    
    
    }
    We call sendAlertMessages in a recursive fashion as to split up the List of ProducerRecords. When we pass a full list into the method, it significantly increases the time it takes to produce messages however the error messages go away. I was wondering if anyone else encountered this error with MSK and how they approached it
    @HurricaneGoytia_twitter Tried lowering the paralellism down as well and got a little better but still seeing errors
    Michael Goytia
    @HurricaneGoytia_twitter
    Adi Gerber
    @adigerber
    hi,
    we're working on creating a data processing application and we use Alpakka Kafka to do our stream processing.
    I've written a test case that uses Kafka's MockConsumer and MockProducer and injects those to the ConsumerSettings and ProducerSettings (via withConsumerFactory & withProducer) with the goal of testing the code that creates the Kafka flow.
    the test succeeds most of the time but we're seeing occasional failures in our build system (Jenkins on GCE machines with few resources).
    I can reliably reproduce the failure on my own laptop by lowering the CPU frequency to the lowest possible (400Mhz) and running the test, however the test case succeeds when running on normal CPU frequency.
    what could be the issue?
    I've written a minimal test case here: https://github.com/adigerber/akka-kafka-mock-bug
    Adi Gerber
    @adigerber
    I found out why it happens.
    basically on a slow/overloaded machine and with the default configuration the poll task timer gets 2 hits, and the second poll which emits data does so before the sub source for the newly-assigned partition gets created. this can be detected by checking the contents of mockConsumer.paused, and the solution is to reschedule the same poll task until the consumer is not paused on the topic-partition.
    Joe Zulli
    @GitsMcGee

    Hi all! I have a newbie question about working with Sinks in conjunction with ProducerMessage.multi. Essentially, I have a source that is giving me multi-line strings, and I want to send each line as a separate ProducerRecord into Kafka. Something like this:

        val done: Future[Done] =
          Source(1 to 100)
            .map(num => (0 to num).toList )
            .map(_.map(_.toString))
            .map(list => ProducerMessage.multi[Nothing, String](list.map(new ProducerRecord("topic", _)).toSeq))
            .runWith(Producer.plainSink(producerSettings))

    The error I get is:

    type mismatch;
     found   : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
     required: akka.stream.Graph[akka.stream.SinkShape[akka.kafka.ProducerMessage.Envelope[Nothing,String,akka.NotUsed]],scala.concurrent.Future[akka.Done]]

    Not sure how to make all of the types happy. If anyone can point me in the right direction, it would be much appreciated!

    3 replies
    cheapsolutionarchitect
    @cheapsolutionarchitect
    Hi, when using Producer-Multi-Messages, is it normal that the commit offsets are incremented by the number of elements within the multi message? I'm sending two things to two separate topics with the help of a multi message, and the commit offset increases by 2 on every topic.
    1 reply
    cheapsolutionarchitect
    @cheapsolutionarchitect
    Using alpakka, is it possible to time out and release a consumer if no message was received for e.g. the last minute?
    4 replies
    Adamos8888
    @Adamos8888
    Hi everyone!
    Recently I've been testing kafka consumer reconnection after stopping and starting again kafka container. I'm using commitableExternalSource as a consumer.
    Use case:
    1. Start kafka in docker container.
    2. Start kafka consumer and producer (everything is working fine, messages are consumed).
    3. Stop kafka container (consumer is sending "Kafka commit is to be retried" logs (caused by DisconnectException)).
      And then:
      4.1. Start kafka again in ~10 sec after stopping (everything goes back to normal, consumer is working properly)
      4.2. Wait a bit longer with starting kafka containter:
      4.2.1. kafka is sending Compliting logs from BaseSingleSourceLogic class,
      4.2.2. "Kafka commit is to be retried" logs are still being sent,
      4.2.3. after starting kafka container again there are logs from deadLetter and no more logs are shown.
    Completing.png
    deadLetter.png
    Roy Prager
    @roy-prager
    i am consuming from a topic with multiple partitions, and using groupedWithin in my graph, means that i should commit the offset to multiple partitions. what is the best way for doing it?
    Adamos8888
    @Adamos8888
    Hi guys!
    image.png
    Is there any solution or configuration for suppressing "Kafka commit is to be retried" logs probably config or something else?