Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Chetan Kumar
    @chetankm-cs

    Hi all I was testing out the performance of Consumer.atMostOnceSource.

    Even with very small messages and no processing I am getting 10msg/s throughput on my local machine.
    akka/alpakka-kafka#147

    Is there anyway to tune this for better performance

    Enno
    @ennru
    Chetan Kumar
    @chetankm-cs
    @ennru I checked the documentation but was wondering why it is so slow compared to the underlying kafka-consumer.
    We are trying to upgrade our infrastructure and there are lots of consumers which are not idempotent so we want to ensure that there are minimum number of msgs are replayed in case of failure.
    Our throughput requirement is around 100msg/s but I am getting around 10msg/s.
    Chetan Kumar
    @chetankm-cs
    @ennru Is there a design doc that you can point me to which explains the working of KafkaConsumerActor.
    Enno
    @ennru
    @chetankm-cs It is slow as it commits every message and waits until the commit is accepted by the broker.
    Your service is not idempotent but you can accept to lose messages?
    You can read up on Alpakka Kafka design and internals in https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations
    Enno
    @ennru
    We've started to work on things for Alpakka Kafka 2.1.0 akka/alpakka-kafka#1211 -- please chime in.
    Elmar Schug
    @kKdH
    Hey guys, can some point me to an example how to use the consumer with plain authentication (username and password)?
    Nikhil Arora
    @nikhilaroratgo_gitlab
    @ennru I have a design question and need some suggestion. I use Akka streams/Graphs to read messages from Kafka(Source is created by Alpakka), send it another service and based on the response I mark the offset manually in the Sink. The Graphs/Stream has of course many flows in between Source and Sink and these flows have some logic inside it. How can I handle the case that if a message fails due to Unhandled Exception or user implementation mistake or erroneous message in any flow and that If I don't have try catch block my messages processing doesn't get stuck due to repeated read of this erroneous message ? I don't want to have try catch block in each flow and also I don't want to attach .divertTo(this is another way to handle this) to each flow. What do you think about this? I am sure this is a common problem and there should be a better way to handle this. Thank you in advance
    Enno
    @ennru
    As Kafka commits offsets you need to make sure message offsets are considered for committing in order. If Kafka's auto-commit is enough for you (at-most-once) you should be fine with wrapping your processing in a RestartSource.
    3 replies
    Arsene
    @Tochemey
    Hello folks. I would like to know whether alpakka offers some retry mechanism for a producer in case an attempt to push messages to kafka fails
    Arsene
    @Tochemey
    Nvm I can make use of the retry.backoff.ms in the kafka producer config.
    Sven Ludwig
    @iosven
    I am trying to infer when Akka 2.6.10 gets released. The backoff logic maxRestarts improvement is something also I need using Akka Streams. Is 2.6.10 coming within say the next 2 weeks or so?
    Enno
    @ennru
    Yes, Akka 2.6.10 will most likely released next week.
    Odd Möller
    @odd
    @ennru Is there a general Alpakka gitter room? Or should I post my MQTT related questions here?
    1 reply
    Dawood
    @muhammeddawood
    Hi All, I am new to akka-streams and also alpakka... I want to group events by key and process each event in substream in order. I have written below code and it is not working... can anyone please help in letting me know what mistake I am making.
        CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
                .withMaxBatch(8)
                .withMaxInterval(Duration.ofSeconds(30));
        Consumer.committableSource(consumerSettings, Subscriptions.topics("events"))
                .groupBy(100, msg -> msg.toString())
                .async()
                .mapAsync(1, msg ->
                        CompletableFuture.supplyAsync(() -> {
                            try {
                                Thread.sleep(200);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread() + " " + msg);
                            return CompletableFuture.completedStage(msg);
                        }, actorSystem.dispatcher())
                                .thenApply(param -> msg.committableOffset()))
                .mergeSubstreams()
                .toMat(Committer.sink(committerSettings), Keep.both())
                .withAttributes(ActorAttributes.supervisionStrategy(
                        ex -> Supervision.restart()))
                .mapMaterializedValue(Consumer::createDrainingControl)
                .run(actorSystem);
    5 replies
    Jack Pines
    @grimlor
    I have a micro-service in K8s running Akka Clustering/Management with liveness and readiness checks. I'd like to add health checks for my service's dependencies, including Kafka. Any suggestion as to how this can be done? I could use my admin client to touch Kafka but I'd rather hook into the same pathways the service is using, specifically the Source or similar Alpakka-Kafka component.
    3 replies
    Aleksandar Skrbic
    @aleksandarskrbic
    Hi, I'm using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
    5 replies
    My pipeline looks something like this:
        source
          .throttle(25, 1.second)
          .filter(predicate)
          .groupedWithin(25, 5.seconds)
          .mapAsync(1) { batch =>
              processAsync(batch)
          }
          .toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
          .run()
    Odd Möller
    @odd
    @ennru What is the current status of the higher-level api for Alpakka MQTT-streaming?
    Enno
    @ennru
    I refreshed the PR recently but haven't spent more time with it. Suggestions welcome...
    Odd Möller
    @odd
    @ennru Thanks, I'll give it a try and report back on the experience.
    Enno
    @ennru
    @/all A first milestone for 2.1: Alpakka Kafka 2.1.0-M1 (Kafka 2.6.0 client, Akka 2.6.10+) is now available https://doc.akka.io/docs/alpakka-kafka/2.1/release-notes/2.1.x.html
    Sven Ludwig
    @iosven
    Sorry wrong channel, posted now in akka/akka
    Aaron Delaplane
    @AaronDelaplane
    If a consumer group only has a single consumer and that one consumer fails is the application now unable to consume new records. i.e. would the app need to be re-started and a new connection to the cluster created?
    Enno
    @ennru
    You can re-start just the consumer with the same consumer group without restarting the app. That will create a new connection to the Kafka broker.
    Aaron Delaplane
    @AaronDelaplane
    Thank you @ennru
    Carlos De Los Angeles
    @cdelosangeles
    Hello! I am using akka streams with Kafka alpakka and I have to connect to two different Kafka clusters from the same application. What is the right way to do this? Thanks!
    Enno
    @ennru
    There is nothing special to it. Just pass in the different bootstrap server addresses.
    Carlos De Los Angeles
    @cdelosangeles
    Thanks @ennru. What if the events being consumed are different? Also, one cluster is using aiven which requires more security settings.
    Carlos De Los Angeles
    @cdelosangeles
    Right now, I have two configs that inherit from akka.kafka.consumer. Each config points to each cluster and I create a consumer for each. The aiven Kafka cluster gets properly consumed but not the regular one. With the same configurations if I only use one (either the aiven or regular) they work without issues. Only when I create both consumers I have issues. I thought maybe some configuration is overwritten? I debugged it and the consumer configs seem to be correct with the aiven one having the extra security information.
    Carlos De Los Angeles
    @cdelosangeles
    Here are the configs:
    final Config consumerConfig = system.settings().config().getConfig("akka.kafka.consumer");
    
    ConsumerSettings<String, byte[]> aivenConsumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())              
                    .withBootstrapServers("a")
                    .withGroupId("test")
                    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                    .withProperty("security.protocol", "SASL_SSL")
                    .withProperty("sasl.mechanism","PLAIN")
                    .withProperty("sasl.jaas.config","...;")
                    .withProperty("ssl.enabled.protocols","TLSv1.2,TLSv1.1,TLSv1")
                    .withProperty("ssl.truststore.location","...");
    
    ActorRef consumerActor1 = system.actorOf(KafkaConsumerActor.props(aivenConsumerSettings));
    
    ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())
            .withBootstrapServers("b")
            .withGroupId("test2")
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    ActorRef consumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings));
    Enno
    @ennru
    In most cases you don't need to create the KafkaConsumerActor. Pass the different ConsumerSettings directly to the source factory methods you want to use.
    You may even want pass the settings in application.conf by inheriting akka.kafka.consumer directly in the file. (eg. as used in https://doc.akka.io/docs/alpakka-kafka/current/discovery.html#configure-consumer-settings )
    Carlos De Los Angeles
    @cdelosangeles
    thanks @ennru. I was playing around with the akka.kafka.javadsl.Consumer.plainExternalSource and akka.kafka.javadsl.Consumer.plainSource but I am thinking about setting up for the second. As you mentioned, the second one does not require an ActorRef. Yes, I had it through configuration at first. I will revert back to it. By the way, I just got it to work. The way the test was executing it was not kicking off the regular consumer. So, not related to config at all. Thank you again!
    CorruptOffset
    @CorruptOffset
    Hi is there a Channel to ask Alpakka Jms related questions?
    Sathish
    @Sathis
    Is there a way to test a flow which uses CommitableSource and Commiter Sink. I don't want to rewrite my Graph in Spec class and replace the Commiter.Sink with TestProbe. i.e I already have a method which takes CommitableSource and Commiter.sink as argument. I just want to call this method and pass a Mock Commiter.sink or Is there any better way to test. I'm using EmbeddedKafka for testing as my local environment does not support TestContainer.
    Thomas
    @thomasschoeftner

    Hi everyone!

    What happens if I have 2 alpakka-kafka Committer.sinkWithOffsetContexts which try to commit the same message offset?
    In our setup - due to the complexity of the stream-graph - it could happen that a single offset could be committed multiple times (e.g. when messages get multiplied with mapConcat and routed to different sinks).

    Will there be any hickups or errors if the same offset is committed multiple times?

    Many thanks!

    Enno
    @ennru
    No, that shouldn't be a problem.
    1 reply
    Aleksandar Skrbic
    @aleksandarskrbic
    I'm getting
    Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1003: {}. org.apache.kafka.common.errors.DisconnectException: null
    for huge topics that have ~30 millions of records, but for smaller topics consumer are OK. I've checked consumer group is registered but inactive
    Does anyone know what can be a problem?
    Aleksandar Skrbic
    @aleksandarskrbic
    After 2 minutes of throwing Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1003: {}. org.apache.kafka.common.errors.DisconnectException: null, consumers started to work, so I guess this problem is related to Kafka Cluster and not consumers
    Sathish
    @Sathis
    I have Alpakka kafka Consumer using Consumer.CommitableSource and Committer.sink. I have simple using these flow to do some processing one of the flow uses filter i.e source.filter(x => condition).map(.....). to(sink).run(). using Filter flow in one of the step means if the condition is false then the message will not passthrough and hence it will not reach Committer sink for committing. What is the best way to handle these filtering, so that it still falls through and message is commited.
    3 replies
    Oleksandr Bondarenko
    @bondarenko
    Hi Guys. I'm using committablePartitionedSource consumer. AFAIK when one substream stops then the whole consumer is stopped. Can I somehow have the rest of substreams continue their processing after this happened?
    Prithvi
    @iprithv

    while executing kafka consumer group describe command getting following error
    Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
    Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.

    Any pointers?

    Naggappan
    @naggappan
    HI team i am using kafka 2.6 and mirror maker 2.0 i.e the connect mirror maker binary into the kafka 2.6 bundle. Now the issue is for source i have a plaintex and for destination i have a MTLS enabled. And when i use some config for ssl then it fails for source connection as well. Can some one point me to right config please ? to run mirror maker with different source and dest ssl
    ColeG24
    @ColeG24
    I am seeing that my committableSource consumer is silently failing. I can't find any logs on it. I ended up adding an idleTimeout stage to restart the stream if it stops receiving events from the source, which helps somewhat. Certain nodes seem to have this issue a lot (we are running 3 pods), whereas others it is rare or doesnt happen at all. Any idea why this can be happening?
    Here is the code:
    `val rebalanceListener = actorSystem.actorOf(Props(new RebalanceListener))
    val committableSource = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic).withRebalanceListener(rebalanceListener)).idleTimeout(5 minutes)
    
    
    RestartSource.withBackoff(
      minBackoff = minBackoff,
      maxBackoff = maxBackoff,
      randomFactor = randomBackoffFactor
    ) { () =>
      committableSource.groupedWithin(maxBatchSize, batchDelay)
        .watchTermination() {
          case (consumerControl, streamComplete) =>
            logger.info(s" Started Watching Kafka consumer termination $metricPrefix")
            consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer $metricPrefix"))
            streamComplete
              .flatMap { _ =>
                consumerControl.shutdown().map(_ -> logger.info(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
              }
              .recoverWith {
                case e =>
                  consumerControl.shutdown().map(_ -> logger.error(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} ERROR:CLOSED FROM UPSTREAM", e))
              }
        }
        .mapAsync(parallelism) { messages =>
          metricService.increment(s"$metricPrefix.received", messages.size.toLong)
          metricService.timeAndReportFailure(s"$metricPrefix.timing", s"error.$metricPrefix.process")(process(messages))
        }
        .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
        .log(s"$metricPrefix-precommit")
        .via(Committer.flow(committerSettings))
    }.runWith(Sink.ignore)`
    Enno
    @ennru
    Could the connection from those pods to Kafka be unstable? You may want to try the connection checker https://github.com/akka/alpakka-kafka/blob/491c324f3d678f607bb7474ba303c7ba00a134b8/core/src/main/resources/reference.conf#L127