Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Enno
    @ennru
    Yes, that works. The committer batching is independent of how you group your messages. It controls how many messages the committer waits until it actually passes the offsets to the other internals for committing.
    Irisha Mur
    @irishaMur_gitlab
    @ennru thank you very much!
    I asked before did investigation on my own
    Now I see: I can play with these two parameters. More over, CommitterSettingshas default settings: maxBatch = 1000 and maxDuration = 10 seconds. So, if I have less then 1000 messages in my topic, I need to wait at least 10 seconds for commit. I can manage it by properties. But for grouped method - it will wait for messages from kafka and I'm not sure how long it will wait. groupedWithin could be helpful but sourceWithOffsetContext don't have it in API
    Irisha Mur
    @irishaMur_gitlab
    My simple work-around on it:
    Consumer.committableSource(consumerSettings(actorSystem, kafkaConfig), Subscriptions.topics(kafkaConfig.kafkaTopic()))
                            .groupedWithin(maxGroup, Duration.ofSeconds(10))
                            .asSourceWithContext(messages -> messages.stream().map(ConsumerMessage.CommittableMessage::committableOffset)
                                    .collect(Collectors.toList()))
                            .map(messages -> messages.stream().map(ConsumerMessage.CommittableMessage::record).collect(Collectors.toList()))
                             ...
                            .mapContext(ConsumerMessage::createCommittableOffsetBatch)
                            .toMat(Committer.sinkWithOffsetContext(committerSettings), Keep.both())
                            .mapMaterializedValue(Consumer::createDrainingControl)
                            .run(actorSystem);
    nathanccleung
    @nathanccleung
    Hi All, I use the transaction.source for my application and it works fine with lenses.io sandbox, however it seems not to be abled to work on the AWS MSK. More specifically, I sent the data to the topic from another app, and my app got the message from the topic. (I assumed the connection was fine.) When I sink the producerMessage to the target topic. it showed nothing error, but the target topic was not receiving anything and later got
    kafka timeout expired after 60000 milliseconds while awaiting initproducerid
    from the log . Below is my code
    val producerSettings =
        ProducerSettings(akkaKafkaProducerConfig, new StringSerializer, new ByteArraySerializer)
          .withBootstrapServers(akkaKafkaBootstrapUrl)
    
      val innerControl = new AtomicReference[Control](Consumer.NoopControl)
    val stream = RestartSource.onFailuresWithBackoff(
        minBackoff = 1.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
      ) { () =>
        Transactional
          .source(consumerSettings, Subscriptions.topics(appSvcKafkaInboundTopic))
          .mapAsync(1000) {
            msg =>
              try {
                inboundMessageProcess(msg.record.value).collect {
                  case Some(sinkMsgs) =>
                    makeProducerRecords(sinkMsgs) match {
                      case Some(pms) =>
                         ProducerMessage.single(pms.head, msg.partitionOffset)
    
                      case e@_ => ProducerMessage.passThrough[String, Array[Byte], PartitionOffset](msg.partitionOffset)
                    }
                  case e @ _ => ProducerMessage.passThrough[String, Array[Byte], PartitionOffset](msg.partitionOffset)
                }
              } catch {
                case _ => Future(ProducerMessage.passThrough[String, Array[Byte], PartitionOffset](msg.partitionOffset))
              }
          }
          .mapMaterializedValue(c => innerControl.set(c))
          .via(Transactional.flow(producerSettings, s"tId-${appSvcName}-${appNodeId}"))
      }
      stream.runWith(Sink.ignore)
    nathanccleung
    @nathanccleung

    Hi All, I use the transaction.source for my application and it works fine with lenses.io sandbox, however it seems not to be abled to work on the AWS MSK. More specifically, I sent the data to the topic from another app, and my app got the message from the topic. (I assumed the connection was fine.) When I sink the producerMessage to the target topic. it showed nothing error, but the target topic was not receiving anything and later got
    kafka timeout expired after 60000 milliseconds while awaiting initproducerid
    from the log . Below is my code

    val producerSettings =
        ProducerSettings(akkaKafkaProducerConfig, new StringSerializer, new ByteArraySerializer)
          .withBootstrapServers(akkaKafkaBootstrapUrl)
    
      val innerControl = new AtomicReference[Control](Consumer.NoopControl)
    val stream = RestartSource.onFailuresWithBackoff(
        minBackoff = 1.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
      ) { () =>
        Transactional
          .source(consumerSettings, Subscriptions.topics(appSvcKafkaInboundTopic))
          .mapAsync(1000) {
            msg =>
              try {
                inboundMessageProcess(msg.record.value).collect {
                  case Some(sinkMsgs) =>
                    makeProducerRecords(sinkMsgs) match {
                      case Some(pms) =>
                         ProducerMessage.single(pms.head, msg.partitionOffset)
    
                      case e@_ => ProducerMessage.passThrough[String, Array[Byte], PartitionOffset](msg.partitionOffset)
                    }
                  case e @ _ => ProducerMessage.passThrough[String, Array[Byte], PartitionOffset](msg.partitionOffset)
                }
              } catch {
                case _ => Future(ProducerMessage.passThrough[String, Array[Byte], PartitionOffset](msg.partitionOffset))
              }
          }
          .mapMaterializedValue(c => innerControl.set(c))
          .via(Transactional.flow(producerSettings, s"tId-${appSvcName}-${appNodeId}"))
      }
      stream.runWith(Sink.ignore)

    Solved by myself.. Leave the solution here, just in case anyone might encounter the same issue one day
    The problem is a careless mistake, I set one broker per az in the AWS MSK, solving it by setting below

    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    Enno
    @ennru
    @/all Alpakka Kafka 2.0.5 is now available https://discuss.lightbend.com/t/alpakka-kafka-2-0-5-released/7147
    Srepfler Srdan
    @schrepfler
    :clap:
    Marzell Camenzind
    @mcamenzind

    I have data in Kafka and need to stream it to Elasticsearch. Since the data is usually high volume, I'm using batching on the ES Sink side. Consider the following snippet, which reads from Kafka, sets the ES documentId based on the partition and the offset, the ES index and batch inserts to ES with batch size 10 (default according to doc), before eventually committing to Kafka.

      val committerSettings = CommitterSettings.apply(actorSystem)
        .withMaxBatch(10000)
        .withMaxInterval(60 seconds)
    
      Consumer.committablePartitionedSource(
        kafkaConsumerSettings,
        Subscriptions.topics("mytopic")
      )
        .map { case (tp, source) =>
          source.map { message =>
            val partitionId = tp.partition()
            val offsetId = message.committableOffset.partitionOffset.offset
            val msgId = "%07d-%32d".format(partitionId, offsetId)
            WriteMessage.createUpsertMessage(msgId, source)
              .withIndexName(s"index-$partitionId")
          }
          .via(
            ElasticsearchFlow.createWithPassThrough[GenericRecord, ConsumerMessage.CommittableMessage[Integer, String]](
              indexName = "index-",
              typeName = "_doc",
              settings = baseWriteSettings
            )
          )
            .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        }

    There are a few questions:

    • While records are streaming in, are the these records buffered and held back in the ElasticsearchFlow.createWithPassThrough in the via, such that the do not reach the commit sink until the ES request has been executed, thus ensure at least once delivery?
    • Assuming that the data is high volume, I also have quite a large buffer, but from time to time the speed significantly decreases (eg. during night). This leads to the buffer not filling up quickly enough. How do I ensure that the data still arrives in a timely manner, I could not find a way to the tell the Elasticsearch Flow/Sink to flush its buffer at least once 1 minute, independently of the size of it.
    • According to the documentation, Committer.sink batches the commits to Kafka, while providing configs for maxInterval, the max number of seconds between two commits and maxBatch the maximum number of records before starting the commit. Is this somehow linked to the buffer flush of the ES flow?

    Thank you!

    Sven Ludwig
    @iosven

    @ennru Do you have experience or hints regarding running alpakka-kafka 2.0.3 and higher against Kafka 2.1.1 ?

    I read https://doc.akka.io/docs/alpakka-kafka/current/home.html#matching-kafka-versions and I am wondering if I can just upgrade our codebase to newest Akka and alpakka, without having to upgrade the Kafka Brokers immediately.

    I think I might just give it a try in a local integration test very soonish, but I would be interested in any heads-up.
    By the way I have a larger code base upgrade before me, I am about to migrate us from Scala 2.12.10 to 2.13.x and from Akka 2.5.30 to Akka 2.6.x as well. We are mostly using Akka Streams and alpakka.
    Enno
    @ennru
    The Kafka client is very good at adapting to any Kafka broker version. I wouldn't expect any problems there.
    Daniele Sassoli
    @DanieleSassoli
    Hi, I believe this is a bit of a more generic alpakka question rather than alpaka-kafka but couldn't find a more generic room.
    We've got an application that integrates with several different systems(each integration is done via an alpakka connector, be it gcs, s3, kafka, sftp and so on).
    When we release a new version of our app, we'd like the application(which runs in a docker container) to be stopped gracefully, so we'd like to stop accepting new load, finish processing the inflight message(maybe using a custom metric or else to decide if something is still in flight or not) and then proceed with shutdown.
    Does anybody in here have experience with this kind of scenario?
    Enno
    @ennru
    That's what Coordinated Shutdown is meant to support. Alpakka connectors are not integrated with Coordinated Shutdown for the time being, but for many cases you can solve that in your code.
    Akka HTTP has built-in support since 10.2 https://doc.akka.io/docs/akka-http/current/server-side/graceful-termination.html which is an example of what you're looking for.
    Daniele Sassoli
    @DanieleSassoli
    Thanks @ennru, I'll be looking into it more then.
    itsmesrds
    @itsmesrds

    hello everyone, I'm pretty new to akka framework, I'm trying to consume the data parallely from two different kafka topics as two different streams and i'm using akka.kafka.default-dispatcher. just wanted to know the execution, if i create two different consumers in one application like below.

    can you please clarify me on this

    1. will this two consumers run in exact parallely with out depending on the another (like multi processing) or will it run in asynchronous fashion ?
    2. how will dispatcher configuration(akka.kafka.default-dispatcher) like threadpool and threads helps to achieve this.

    any recomendations also acceptable.

    object EventConsumer extends App {
      // actor syste and meterializer for streams
      implicit val system = ActorSystem.create("octopus")
      implicit val materializer = ActorMaterializer()
    
      // consumer setting
      val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("192.168.1.8:29092")
        .withGroupId("execer-group")
    
      // event  consumer
      val event_consumer = Consumer
        .committableSource(consumerSettings, Subscriptions.topics("event"))
        .map(msg => {
          // got the message, do what evey you need
          println(s"Got message - event -  ${msg.record.value()}")
        })
        .runWith(Sink.ignore)
    
      // event  retry consumer
      val event_retry_consumer = Consumer
        .committableSource(consumerSettings, Subscriptions.topics("event_retry"))
        .map(msg => {
          // got the message, do what evey you need
          println(s"Got message - event_retry - ${msg.record.value()}")
        })
        .runWith(Sink.ignore)
    
      // handle shutdown
      implicit val ec: ExecutionContextExecutor = system.dispatcher
      event_consumer onComplete {
        case Success(_) =>
          println("Successfully terminated consumer")
          system.terminate()
        case Failure(err) =>
          println(err.getMessage)
          system.terminate()
      }
    
      event_retry_consumer onComplete {
        case Success(_) =>
          println("Successfully terminated consumer")
          system.terminate()
        case Failure(err) =>
          println(err.getMessage)
          system.terminate()
      }
    }

    Thank you so much.

    itsmesrds
    @itsmesrds
    can anyone please comment on this ? Thanks.
    Andrii Ilin
    @AndreyIlin
    @itsmesrds
    AFAIK both consumers will run in parallel utilizing threads from default-dispatcher pool.
    Each consumer has underlying consumer actor that polls Kafka so they work independently.
    itsmesrds
    @itsmesrds
    Thanks @AndreyIlin
    itsmesrds
    @itsmesrds

    Hello everyone, Is it possible add a callback function for each future that are emitted in flow stage ?

    I'm trying to read the messages from kafka source and call the rest api asynchronously for every message which i wrapped as a future, on sucess of the future it needs to write to sucess topic, on failure of the future it needs to write to failure topic(this is happening in the flow stage).

    Thanks

    here is the sample code:

    def call_api(myCode: String): Future[String] = {
    
        val responseFuture = Http().singleRequest(
            HttpRequest(
              method = HttpMethods.POST,
              uri = "http://localhost:5050/frame/10",
              entity = HttpEntity(ContentTypes.`application/json`,myCode)))
        responseFuture
          .flatMap(_.entity.toStrict(2 seconds))
          .map(_.data.utf8String)
      }
    
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("192.168.1.8:29092")
        .withGroupId("execer-group")
    
      val event_consumer = Consumer
        .committableSource(consumerSettings, Subscriptions.topics("event_beat"))
        .mapAsync(1)(msg => {
          call_api(msg.record.value())
        }).mapAsync(1)(response => {
        Future.successful(println(s"Got message - event_beat-response -  ${response}", Instant.now()))
      }).runWith(Sink.ignore)
    Sven Ludwig
    @iosven
    @itsmesrds You might want to design a plain vanilla Scalacase class for the elements flowing through your stream so that you can therein have meta-information about each, including the information, that the corresponding http request was a success or a failure, perhaps as an success : Option[Boolean] (Option because it is not known at the start, and is set based on the outcome of the http request). Note that therefore within the mapAsync you would need to wrap a failed responseFuture in a successful Future, so that also your failure cases proceed downstream. Then, downstream, you could use https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/divertTo.html to send the elements that contain the failures to your sink, while the successful elements continue.
    @itsmesrds PS: I would definitely not use future callbacks. Make a stream recipe instead, for example like I just outlined.
    Erik Bakker
    @eamelink
    I have a testing use case where I create a test stream and subscribe to a topic, starting at the ‘latest’ offset. After I am subscribed, I do some action in the system that causes a Kafka event to be created. I want to observe that event with my test stream.
    The problem is, there’s a significant amount of time between running the stream,b and the KafkaConsumer really being subscribed, so if I trigger my event too early, it will not be seen, because by the time the stream subscribes, the ‘latest’ offset is beyond my event.
    I had two solution approaches in mind:
    1) somehow being notified when we are subscribed, but I don’t know how. These are low volume streams, I can’t just wait for any event to be seen
    2) looking up ‘latest’ offsets explicitly, and subscribing with those offsets. But that would require creating a second KafkaConsumer outside Alpakka, since I can’t seem to find how to either get these offsets out of Alpakka, nor how to supply Alpakka with a previously created KafkaConsumer
    Any ideas in how to achieve this?
    Chetan Kumar
    @chetankm-cs

    Hi all I was testing out the performance of Consumer.atMostOnceSource.

    Even with very small messages and no processing I am getting 10msg/s throughput on my local machine.
    akka/alpakka-kafka#147

    Is there anyway to tune this for better performance

    Enno
    @ennru
    Chetan Kumar
    @chetankm-cs
    @ennru I checked the documentation but was wondering why it is so slow compared to the underlying kafka-consumer.
    We are trying to upgrade our infrastructure and there are lots of consumers which are not idempotent so we want to ensure that there are minimum number of msgs are replayed in case of failure.
    Our throughput requirement is around 100msg/s but I am getting around 10msg/s.
    Chetan Kumar
    @chetankm-cs
    @ennru Is there a design doc that you can point me to which explains the working of KafkaConsumerActor.
    Enno
    @ennru
    @chetankm-cs It is slow as it commits every message and waits until the commit is accepted by the broker.
    Your service is not idempotent but you can accept to lose messages?
    You can read up on Alpakka Kafka design and internals in https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations
    Enno
    @ennru
    We've started to work on things for Alpakka Kafka 2.1.0 akka/alpakka-kafka#1211 -- please chime in.
    Elmar Schug
    @kKdH
    Hey guys, can some point me to an example how to use the consumer with plain authentication (username and password)?
    Nikhil Arora
    @nikhilaroratgo_gitlab
    @ennru I have a design question and need some suggestion. I use Akka streams/Graphs to read messages from Kafka(Source is created by Alpakka), send it another service and based on the response I mark the offset manually in the Sink. The Graphs/Stream has of course many flows in between Source and Sink and these flows have some logic inside it. How can I handle the case that if a message fails due to Unhandled Exception or user implementation mistake or erroneous message in any flow and that If I don't have try catch block my messages processing doesn't get stuck due to repeated read of this erroneous message ? I don't want to have try catch block in each flow and also I don't want to attach .divertTo(this is another way to handle this) to each flow. What do you think about this? I am sure this is a common problem and there should be a better way to handle this. Thank you in advance
    Enno
    @ennru
    As Kafka commits offsets you need to make sure message offsets are considered for committing in order. If Kafka's auto-commit is enough for you (at-most-once) you should be fine with wrapping your processing in a RestartSource.
    3 replies
    Arsene
    @Tochemey
    Hello folks. I would like to know whether alpakka offers some retry mechanism for a producer in case an attempt to push messages to kafka fails
    Arsene
    @Tochemey
    Nvm I can make use of the retry.backoff.ms in the kafka producer config.
    Sven Ludwig
    @iosven
    I am trying to infer when Akka 2.6.10 gets released. The backoff logic maxRestarts improvement is something also I need using Akka Streams. Is 2.6.10 coming within say the next 2 weeks or so?
    Enno
    @ennru
    Yes, Akka 2.6.10 will most likely released next week.
    Odd Möller
    @odd
    @ennru Is there a general Alpakka gitter room? Or should I post my MQTT related questions here?
    1 reply
    Dawood
    @muhammeddawood
    Hi All, I am new to akka-streams and also alpakka... I want to group events by key and process each event in substream in order. I have written below code and it is not working... can anyone please help in letting me know what mistake I am making.
        CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
                .withMaxBatch(8)
                .withMaxInterval(Duration.ofSeconds(30));
        Consumer.committableSource(consumerSettings, Subscriptions.topics("events"))
                .groupBy(100, msg -> msg.toString())
                .async()
                .mapAsync(1, msg ->
                        CompletableFuture.supplyAsync(() -> {
                            try {
                                Thread.sleep(200);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread() + " " + msg);
                            return CompletableFuture.completedStage(msg);
                        }, actorSystem.dispatcher())
                                .thenApply(param -> msg.committableOffset()))
                .mergeSubstreams()
                .toMat(Committer.sink(committerSettings), Keep.both())
                .withAttributes(ActorAttributes.supervisionStrategy(
                        ex -> Supervision.restart()))
                .mapMaterializedValue(Consumer::createDrainingControl)
                .run(actorSystem);
    5 replies
    Jack Pines
    @grimlor
    I have a micro-service in K8s running Akka Clustering/Management with liveness and readiness checks. I'd like to add health checks for my service's dependencies, including Kafka. Any suggestion as to how this can be done? I could use my admin client to touch Kafka but I'd rather hook into the same pathways the service is using, specifically the Source or similar Alpakka-Kafka component.
    3 replies
    Aleksandar Skrbic
    @aleksandarskrbic
    Hi, I'm using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
    5 replies