Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Enno
    @ennru
    That's what Coordinated Shutdown is meant to support. Alpakka connectors are not integrated with Coordinated Shutdown for the time being, but for many cases you can solve that in your code.
    Akka HTTP has built-in support since 10.2 https://doc.akka.io/docs/akka-http/current/server-side/graceful-termination.html which is an example of what you're looking for.
    Daniele Sassoli
    @DanieleSassoli
    Thanks @ennru, I'll be looking into it more then.
    itsmesrds
    @itsmesrds

    hello everyone, I'm pretty new to akka framework, I'm trying to consume the data parallely from two different kafka topics as two different streams and i'm using akka.kafka.default-dispatcher. just wanted to know the execution, if i create two different consumers in one application like below.

    can you please clarify me on this

    1. will this two consumers run in exact parallely with out depending on the another (like multi processing) or will it run in asynchronous fashion ?
    2. how will dispatcher configuration(akka.kafka.default-dispatcher) like threadpool and threads helps to achieve this.

    any recomendations also acceptable.

    object EventConsumer extends App {
      // actor syste and meterializer for streams
      implicit val system = ActorSystem.create("octopus")
      implicit val materializer = ActorMaterializer()
    
      // consumer setting
      val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("192.168.1.8:29092")
        .withGroupId("execer-group")
    
      // event  consumer
      val event_consumer = Consumer
        .committableSource(consumerSettings, Subscriptions.topics("event"))
        .map(msg => {
          // got the message, do what evey you need
          println(s"Got message - event -  ${msg.record.value()}")
        })
        .runWith(Sink.ignore)
    
      // event  retry consumer
      val event_retry_consumer = Consumer
        .committableSource(consumerSettings, Subscriptions.topics("event_retry"))
        .map(msg => {
          // got the message, do what evey you need
          println(s"Got message - event_retry - ${msg.record.value()}")
        })
        .runWith(Sink.ignore)
    
      // handle shutdown
      implicit val ec: ExecutionContextExecutor = system.dispatcher
      event_consumer onComplete {
        case Success(_) =>
          println("Successfully terminated consumer")
          system.terminate()
        case Failure(err) =>
          println(err.getMessage)
          system.terminate()
      }
    
      event_retry_consumer onComplete {
        case Success(_) =>
          println("Successfully terminated consumer")
          system.terminate()
        case Failure(err) =>
          println(err.getMessage)
          system.terminate()
      }
    }

    Thank you so much.

    itsmesrds
    @itsmesrds
    can anyone please comment on this ? Thanks.
    Andrii Ilin
    @AndreyIlin
    @itsmesrds
    AFAIK both consumers will run in parallel utilizing threads from default-dispatcher pool.
    Each consumer has underlying consumer actor that polls Kafka so they work independently.
    itsmesrds
    @itsmesrds
    Thanks @AndreyIlin
    itsmesrds
    @itsmesrds

    Hello everyone, Is it possible add a callback function for each future that are emitted in flow stage ?

    I'm trying to read the messages from kafka source and call the rest api asynchronously for every message which i wrapped as a future, on sucess of the future it needs to write to sucess topic, on failure of the future it needs to write to failure topic(this is happening in the flow stage).

    Thanks

    here is the sample code:

    def call_api(myCode: String): Future[String] = {
    
        val responseFuture = Http().singleRequest(
            HttpRequest(
              method = HttpMethods.POST,
              uri = "http://localhost:5050/frame/10",
              entity = HttpEntity(ContentTypes.`application/json`,myCode)))
        responseFuture
          .flatMap(_.entity.toStrict(2 seconds))
          .map(_.data.utf8String)
      }
    
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("192.168.1.8:29092")
        .withGroupId("execer-group")
    
      val event_consumer = Consumer
        .committableSource(consumerSettings, Subscriptions.topics("event_beat"))
        .mapAsync(1)(msg => {
          call_api(msg.record.value())
        }).mapAsync(1)(response => {
        Future.successful(println(s"Got message - event_beat-response -  ${response}", Instant.now()))
      }).runWith(Sink.ignore)
    Sven Ludwig
    @iosven
    @itsmesrds You might want to design a plain vanilla Scalacase class for the elements flowing through your stream so that you can therein have meta-information about each, including the information, that the corresponding http request was a success or a failure, perhaps as an success : Option[Boolean] (Option because it is not known at the start, and is set based on the outcome of the http request). Note that therefore within the mapAsync you would need to wrap a failed responseFuture in a successful Future, so that also your failure cases proceed downstream. Then, downstream, you could use https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/divertTo.html to send the elements that contain the failures to your sink, while the successful elements continue.
    @itsmesrds PS: I would definitely not use future callbacks. Make a stream recipe instead, for example like I just outlined.
    Erik Bakker
    @eamelink
    I have a testing use case where I create a test stream and subscribe to a topic, starting at the ‘latest’ offset. After I am subscribed, I do some action in the system that causes a Kafka event to be created. I want to observe that event with my test stream.
    The problem is, there’s a significant amount of time between running the stream,b and the KafkaConsumer really being subscribed, so if I trigger my event too early, it will not be seen, because by the time the stream subscribes, the ‘latest’ offset is beyond my event.
    I had two solution approaches in mind:
    1) somehow being notified when we are subscribed, but I don’t know how. These are low volume streams, I can’t just wait for any event to be seen
    2) looking up ‘latest’ offsets explicitly, and subscribing with those offsets. But that would require creating a second KafkaConsumer outside Alpakka, since I can’t seem to find how to either get these offsets out of Alpakka, nor how to supply Alpakka with a previously created KafkaConsumer
    Any ideas in how to achieve this?
    Chetan Kumar
    @chetankm-cs

    Hi all I was testing out the performance of Consumer.atMostOnceSource.

    Even with very small messages and no processing I am getting 10msg/s throughput on my local machine.
    akka/alpakka-kafka#147

    Is there anyway to tune this for better performance

    Enno
    @ennru
    Chetan Kumar
    @chetankm-cs
    @ennru I checked the documentation but was wondering why it is so slow compared to the underlying kafka-consumer.
    We are trying to upgrade our infrastructure and there are lots of consumers which are not idempotent so we want to ensure that there are minimum number of msgs are replayed in case of failure.
    Our throughput requirement is around 100msg/s but I am getting around 10msg/s.
    Chetan Kumar
    @chetankm-cs
    @ennru Is there a design doc that you can point me to which explains the working of KafkaConsumerActor.
    Enno
    @ennru
    @chetankm-cs It is slow as it commits every message and waits until the commit is accepted by the broker.
    Your service is not idempotent but you can accept to lose messages?
    You can read up on Alpakka Kafka design and internals in https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations
    Enno
    @ennru
    We've started to work on things for Alpakka Kafka 2.1.0 akka/alpakka-kafka#1211 -- please chime in.
    Elmar Schug
    @kKdH
    Hey guys, can some point me to an example how to use the consumer with plain authentication (username and password)?
    Nikhil Arora
    @nikhilaroratgo_gitlab
    @ennru I have a design question and need some suggestion. I use Akka streams/Graphs to read messages from Kafka(Source is created by Alpakka), send it another service and based on the response I mark the offset manually in the Sink. The Graphs/Stream has of course many flows in between Source and Sink and these flows have some logic inside it. How can I handle the case that if a message fails due to Unhandled Exception or user implementation mistake or erroneous message in any flow and that If I don't have try catch block my messages processing doesn't get stuck due to repeated read of this erroneous message ? I don't want to have try catch block in each flow and also I don't want to attach .divertTo(this is another way to handle this) to each flow. What do you think about this? I am sure this is a common problem and there should be a better way to handle this. Thank you in advance
    Enno
    @ennru
    As Kafka commits offsets you need to make sure message offsets are considered for committing in order. If Kafka's auto-commit is enough for you (at-most-once) you should be fine with wrapping your processing in a RestartSource.
    3 replies
    Arsene
    @Tochemey
    Hello folks. I would like to know whether alpakka offers some retry mechanism for a producer in case an attempt to push messages to kafka fails
    Arsene
    @Tochemey
    Nvm I can make use of the retry.backoff.ms in the kafka producer config.
    Sven Ludwig
    @iosven
    I am trying to infer when Akka 2.6.10 gets released. The backoff logic maxRestarts improvement is something also I need using Akka Streams. Is 2.6.10 coming within say the next 2 weeks or so?
    Enno
    @ennru
    Yes, Akka 2.6.10 will most likely released next week.
    Odd Möller
    @odd
    @ennru Is there a general Alpakka gitter room? Or should I post my MQTT related questions here?
    1 reply
    Dawood
    @muhammeddawood
    Hi All, I am new to akka-streams and also alpakka... I want to group events by key and process each event in substream in order. I have written below code and it is not working... can anyone please help in letting me know what mistake I am making.
        CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
                .withMaxBatch(8)
                .withMaxInterval(Duration.ofSeconds(30));
        Consumer.committableSource(consumerSettings, Subscriptions.topics("events"))
                .groupBy(100, msg -> msg.toString())
                .async()
                .mapAsync(1, msg ->
                        CompletableFuture.supplyAsync(() -> {
                            try {
                                Thread.sleep(200);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread() + " " + msg);
                            return CompletableFuture.completedStage(msg);
                        }, actorSystem.dispatcher())
                                .thenApply(param -> msg.committableOffset()))
                .mergeSubstreams()
                .toMat(Committer.sink(committerSettings), Keep.both())
                .withAttributes(ActorAttributes.supervisionStrategy(
                        ex -> Supervision.restart()))
                .mapMaterializedValue(Consumer::createDrainingControl)
                .run(actorSystem);
    5 replies
    Jack Pines
    @grimlor
    I have a micro-service in K8s running Akka Clustering/Management with liveness and readiness checks. I'd like to add health checks for my service's dependencies, including Kafka. Any suggestion as to how this can be done? I could use my admin client to touch Kafka but I'd rather hook into the same pathways the service is using, specifically the Source or similar Alpakka-Kafka component.
    3 replies
    Aleksandar Skrbic
    @aleksandarskrbic
    Hi, I'm using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
    5 replies
    My pipeline looks something like this:
        source
          .throttle(25, 1.second)
          .filter(predicate)
          .groupedWithin(25, 5.seconds)
          .mapAsync(1) { batch =>
              processAsync(batch)
          }
          .toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
          .run()
    Odd Möller
    @odd
    @ennru What is the current status of the higher-level api for Alpakka MQTT-streaming?
    Enno
    @ennru
    I refreshed the PR recently but haven't spent more time with it. Suggestions welcome...
    Odd Möller
    @odd
    @ennru Thanks, I'll give it a try and report back on the experience.
    Enno
    @ennru
    @/all A first milestone for 2.1: Alpakka Kafka 2.1.0-M1 (Kafka 2.6.0 client, Akka 2.6.10+) is now available https://doc.akka.io/docs/alpakka-kafka/2.1/release-notes/2.1.x.html
    Sven Ludwig
    @iosven
    Sorry wrong channel, posted now in akka/akka
    Aaron Delaplane
    @AaronDelaplane
    If a consumer group only has a single consumer and that one consumer fails is the application now unable to consume new records. i.e. would the app need to be re-started and a new connection to the cluster created?
    Enno
    @ennru
    You can re-start just the consumer with the same consumer group without restarting the app. That will create a new connection to the Kafka broker.
    Aaron Delaplane
    @AaronDelaplane
    Thank you @ennru
    Carlos De Los Angeles
    @cdelosangeles
    Hello! I am using akka streams with Kafka alpakka and I have to connect to two different Kafka clusters from the same application. What is the right way to do this? Thanks!
    Enno
    @ennru
    There is nothing special to it. Just pass in the different bootstrap server addresses.
    Carlos De Los Angeles
    @cdelosangeles
    Thanks @ennru. What if the events being consumed are different? Also, one cluster is using aiven which requires more security settings.
    Carlos De Los Angeles
    @cdelosangeles
    Right now, I have two configs that inherit from akka.kafka.consumer. Each config points to each cluster and I create a consumer for each. The aiven Kafka cluster gets properly consumed but not the regular one. With the same configurations if I only use one (either the aiven or regular) they work without issues. Only when I create both consumers I have issues. I thought maybe some configuration is overwritten? I debugged it and the consumer configs seem to be correct with the aiven one having the extra security information.
    Carlos De Los Angeles
    @cdelosangeles
    Here are the configs:
    final Config consumerConfig = system.settings().config().getConfig("akka.kafka.consumer");
    
    ConsumerSettings<String, byte[]> aivenConsumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())              
                    .withBootstrapServers("a")
                    .withGroupId("test")
                    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                    .withProperty("security.protocol", "SASL_SSL")
                    .withProperty("sasl.mechanism","PLAIN")
                    .withProperty("sasl.jaas.config","...;")
                    .withProperty("ssl.enabled.protocols","TLSv1.2,TLSv1.1,TLSv1")
                    .withProperty("ssl.truststore.location","...");
    
    ActorRef consumerActor1 = system.actorOf(KafkaConsumerActor.props(aivenConsumerSettings));
    
    ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())
            .withBootstrapServers("b")
            .withGroupId("test2")
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    ActorRef consumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings));