Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Enno
    @ennru
    We've started to work on things for Alpakka Kafka 2.1.0 akka/alpakka-kafka#1211 -- please chime in.
    Elmar Schug
    @kKdH
    Hey guys, can some point me to an example how to use the consumer with plain authentication (username and password)?
    Nikhil Arora
    @nikhilaroratgo_gitlab
    @ennru I have a design question and need some suggestion. I use Akka streams/Graphs to read messages from Kafka(Source is created by Alpakka), send it another service and based on the response I mark the offset manually in the Sink. The Graphs/Stream has of course many flows in between Source and Sink and these flows have some logic inside it. How can I handle the case that if a message fails due to Unhandled Exception or user implementation mistake or erroneous message in any flow and that If I don't have try catch block my messages processing doesn't get stuck due to repeated read of this erroneous message ? I don't want to have try catch block in each flow and also I don't want to attach .divertTo(this is another way to handle this) to each flow. What do you think about this? I am sure this is a common problem and there should be a better way to handle this. Thank you in advance
    Enno
    @ennru
    As Kafka commits offsets you need to make sure message offsets are considered for committing in order. If Kafka's auto-commit is enough for you (at-most-once) you should be fine with wrapping your processing in a RestartSource.
    3 replies
    Arsene
    @Tochemey
    Hello folks. I would like to know whether alpakka offers some retry mechanism for a producer in case an attempt to push messages to kafka fails
    Arsene
    @Tochemey
    Nvm I can make use of the retry.backoff.ms in the kafka producer config.
    Sven Ludwig
    @iosven
    I am trying to infer when Akka 2.6.10 gets released. The backoff logic maxRestarts improvement is something also I need using Akka Streams. Is 2.6.10 coming within say the next 2 weeks or so?
    Enno
    @ennru
    Yes, Akka 2.6.10 will most likely released next week.
    Odd Möller
    @odd
    @ennru Is there a general Alpakka gitter room? Or should I post my MQTT related questions here?
    1 reply
    Dawood
    @muhammeddawood
    Hi All, I am new to akka-streams and also alpakka... I want to group events by key and process each event in substream in order. I have written below code and it is not working... can anyone please help in letting me know what mistake I am making.
        CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
                .withMaxBatch(8)
                .withMaxInterval(Duration.ofSeconds(30));
        Consumer.committableSource(consumerSettings, Subscriptions.topics("events"))
                .groupBy(100, msg -> msg.toString())
                .async()
                .mapAsync(1, msg ->
                        CompletableFuture.supplyAsync(() -> {
                            try {
                                Thread.sleep(200);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread() + " " + msg);
                            return CompletableFuture.completedStage(msg);
                        }, actorSystem.dispatcher())
                                .thenApply(param -> msg.committableOffset()))
                .mergeSubstreams()
                .toMat(Committer.sink(committerSettings), Keep.both())
                .withAttributes(ActorAttributes.supervisionStrategy(
                        ex -> Supervision.restart()))
                .mapMaterializedValue(Consumer::createDrainingControl)
                .run(actorSystem);
    5 replies
    Jack Pines
    @grimlor
    I have a micro-service in K8s running Akka Clustering/Management with liveness and readiness checks. I'd like to add health checks for my service's dependencies, including Kafka. Any suggestion as to how this can be done? I could use my admin client to touch Kafka but I'd rather hook into the same pathways the service is using, specifically the Source or similar Alpakka-Kafka component.
    3 replies
    Aleksandar Skrbic
    @aleksandarskrbic
    Hi, I'm using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
    5 replies
    My pipeline looks something like this:
        source
          .throttle(25, 1.second)
          .filter(predicate)
          .groupedWithin(25, 5.seconds)
          .mapAsync(1) { batch =>
              processAsync(batch)
          }
          .toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
          .run()
    Odd Möller
    @odd
    @ennru What is the current status of the higher-level api for Alpakka MQTT-streaming?
    Enno
    @ennru
    I refreshed the PR recently but haven't spent more time with it. Suggestions welcome...
    Odd Möller
    @odd
    @ennru Thanks, I'll give it a try and report back on the experience.
    Enno
    @ennru
    @/all A first milestone for 2.1: Alpakka Kafka 2.1.0-M1 (Kafka 2.6.0 client, Akka 2.6.10+) is now available https://doc.akka.io/docs/alpakka-kafka/2.1/release-notes/2.1.x.html
    Sven Ludwig
    @iosven
    Sorry wrong channel, posted now in akka/akka
    Aaron Delaplane
    @AaronDelaplane
    If a consumer group only has a single consumer and that one consumer fails is the application now unable to consume new records. i.e. would the app need to be re-started and a new connection to the cluster created?
    Enno
    @ennru
    You can re-start just the consumer with the same consumer group without restarting the app. That will create a new connection to the Kafka broker.
    Aaron Delaplane
    @AaronDelaplane
    Thank you @ennru
    Carlos De Los Angeles
    @cdelosangeles
    Hello! I am using akka streams with Kafka alpakka and I have to connect to two different Kafka clusters from the same application. What is the right way to do this? Thanks!
    Enno
    @ennru
    There is nothing special to it. Just pass in the different bootstrap server addresses.
    Carlos De Los Angeles
    @cdelosangeles
    Thanks @ennru. What if the events being consumed are different? Also, one cluster is using aiven which requires more security settings.
    Carlos De Los Angeles
    @cdelosangeles
    Right now, I have two configs that inherit from akka.kafka.consumer. Each config points to each cluster and I create a consumer for each. The aiven Kafka cluster gets properly consumed but not the regular one. With the same configurations if I only use one (either the aiven or regular) they work without issues. Only when I create both consumers I have issues. I thought maybe some configuration is overwritten? I debugged it and the consumer configs seem to be correct with the aiven one having the extra security information.
    Carlos De Los Angeles
    @cdelosangeles
    Here are the configs:
    final Config consumerConfig = system.settings().config().getConfig("akka.kafka.consumer");
    
    ConsumerSettings<String, byte[]> aivenConsumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())              
                    .withBootstrapServers("a")
                    .withGroupId("test")
                    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                    .withProperty("security.protocol", "SASL_SSL")
                    .withProperty("sasl.mechanism","PLAIN")
                    .withProperty("sasl.jaas.config","...;")
                    .withProperty("ssl.enabled.protocols","TLSv1.2,TLSv1.1,TLSv1")
                    .withProperty("ssl.truststore.location","...");
    
    ActorRef consumerActor1 = system.actorOf(KafkaConsumerActor.props(aivenConsumerSettings));
    
    ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())
            .withBootstrapServers("b")
            .withGroupId("test2")
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    ActorRef consumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings));
    Enno
    @ennru
    In most cases you don't need to create the KafkaConsumerActor. Pass the different ConsumerSettings directly to the source factory methods you want to use.
    You may even want pass the settings in application.conf by inheriting akka.kafka.consumer directly in the file. (eg. as used in https://doc.akka.io/docs/alpakka-kafka/current/discovery.html#configure-consumer-settings )
    Carlos De Los Angeles
    @cdelosangeles
    thanks @ennru. I was playing around with the akka.kafka.javadsl.Consumer.plainExternalSource and akka.kafka.javadsl.Consumer.plainSource but I am thinking about setting up for the second. As you mentioned, the second one does not require an ActorRef. Yes, I had it through configuration at first. I will revert back to it. By the way, I just got it to work. The way the test was executing it was not kicking off the regular consumer. So, not related to config at all. Thank you again!
    CorruptOffset
    @CorruptOffset
    Hi is there a Channel to ask Alpakka Jms related questions?
    Sathish
    @Sathis
    Is there a way to test a flow which uses CommitableSource and Commiter Sink. I don't want to rewrite my Graph in Spec class and replace the Commiter.Sink with TestProbe. i.e I already have a method which takes CommitableSource and Commiter.sink as argument. I just want to call this method and pass a Mock Commiter.sink or Is there any better way to test. I'm using EmbeddedKafka for testing as my local environment does not support TestContainer.
    Thomas
    @thomasschoeftner

    Hi everyone!

    What happens if I have 2 alpakka-kafka Committer.sinkWithOffsetContexts which try to commit the same message offset?
    In our setup - due to the complexity of the stream-graph - it could happen that a single offset could be committed multiple times (e.g. when messages get multiplied with mapConcat and routed to different sinks).

    Will there be any hickups or errors if the same offset is committed multiple times?

    Many thanks!

    Enno
    @ennru
    No, that shouldn't be a problem.
    1 reply
    Aleksandar Skrbic
    @aleksandarskrbic
    I'm getting
    Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1003: {}. org.apache.kafka.common.errors.DisconnectException: null
    for huge topics that have ~30 millions of records, but for smaller topics consumer are OK. I've checked consumer group is registered but inactive
    Does anyone know what can be a problem?
    Aleksandar Skrbic
    @aleksandarskrbic
    After 2 minutes of throwing Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1003: {}. org.apache.kafka.common.errors.DisconnectException: null, consumers started to work, so I guess this problem is related to Kafka Cluster and not consumers
    Sathish
    @Sathis
    I have Alpakka kafka Consumer using Consumer.CommitableSource and Committer.sink. I have simple using these flow to do some processing one of the flow uses filter i.e source.filter(x => condition).map(.....). to(sink).run(). using Filter flow in one of the step means if the condition is false then the message will not passthrough and hence it will not reach Committer sink for committing. What is the best way to handle these filtering, so that it still falls through and message is commited.
    3 replies
    Oleksandr Bondarenko
    @bondarenko
    Hi Guys. I'm using committablePartitionedSource consumer. AFAIK when one substream stops then the whole consumer is stopped. Can I somehow have the rest of substreams continue their processing after this happened?
    Prithvi
    @iprithv

    while executing kafka consumer group describe command getting following error
    Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
    Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.

    Any pointers?

    Naggappan
    @naggappan
    HI team i am using kafka 2.6 and mirror maker 2.0 i.e the connect mirror maker binary into the kafka 2.6 bundle. Now the issue is for source i have a plaintex and for destination i have a MTLS enabled. And when i use some config for ssl then it fails for source connection as well. Can some one point me to right config please ? to run mirror maker with different source and dest ssl
    ColeG24
    @ColeG24
    I am seeing that my committableSource consumer is silently failing. I can't find any logs on it. I ended up adding an idleTimeout stage to restart the stream if it stops receiving events from the source, which helps somewhat. Certain nodes seem to have this issue a lot (we are running 3 pods), whereas others it is rare or doesnt happen at all. Any idea why this can be happening?
    Here is the code:
    `val rebalanceListener = actorSystem.actorOf(Props(new RebalanceListener))
    val committableSource = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic).withRebalanceListener(rebalanceListener)).idleTimeout(5 minutes)
    
    
    RestartSource.withBackoff(
      minBackoff = minBackoff,
      maxBackoff = maxBackoff,
      randomFactor = randomBackoffFactor
    ) { () =>
      committableSource.groupedWithin(maxBatchSize, batchDelay)
        .watchTermination() {
          case (consumerControl, streamComplete) =>
            logger.info(s" Started Watching Kafka consumer termination $metricPrefix")
            consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer $metricPrefix"))
            streamComplete
              .flatMap { _ =>
                consumerControl.shutdown().map(_ -> logger.info(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
              }
              .recoverWith {
                case e =>
                  consumerControl.shutdown().map(_ -> logger.error(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} ERROR:CLOSED FROM UPSTREAM", e))
              }
        }
        .mapAsync(parallelism) { messages =>
          metricService.increment(s"$metricPrefix.received", messages.size.toLong)
          metricService.timeAndReportFailure(s"$metricPrefix.timing", s"error.$metricPrefix.process")(process(messages))
        }
        .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
        .log(s"$metricPrefix-precommit")
        .via(Committer.flow(committerSettings))
    }.runWith(Sink.ignore)`
    Enno
    @ennru
    Could the connection from those pods to Kafka be unstable? You may want to try the connection checker https://github.com/akka/alpakka-kafka/blob/491c324f3d678f607bb7474ba303c7ba00a134b8/core/src/main/resources/reference.conf#L127
    ColeG24
    @ColeG24
    I actually tried that, I have the consumer configured to this:
    val consumerSettings: ConsumerSettings[Array[Byte], String] = ConsumerSettings( actorSystem, new ByteArrayDeserializer, new StringDeserializer ) .withBootstrapServers(kafka_bootstrap_servers) .withGroupId(group) .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") .withProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, maxRequestSize.toString) .withProperty(ENABLE_AUTO_COMMIT_CONFIG, "false") .withCommitTimeout(commitTimeOut) .withConnectionChecker(ConnectionCheckerSettings(3, 15.seconds, 2d)) .withProperties(additionalPropertiesAsMap)
    Screen Shot 2020-11-13 at 10.32.59 AM.png
    ColeG24
    @ColeG24
    I am also seeing accordong to the rebalance listener logs that certain partitions sometimes are not even assigned
    Tried to link a screen shot above for it, but if you look at the last log for user_app_state-0 the last partition's event is it being revoked
    ColeG24
    @ColeG24
    So basically I have 1 partition with 150,000 lag, 1 with 60,000 then the rest are at 0. Could there be a problem with that particular partition? Is there a good way to debug this?
    Antonio F. Lancho
    @onioman_twitter
    Hi there! I am running into issues when doing mergePrioritized of two Consumer.committableSource. Periodically one of the streams, the one with higher volume and priority, jumps back a number offset (in the range of hundreds). I commit the offsets to Kafka directly
    5 replies
    Same code behaves correctly when only having a single source.
    Marc Rooding
    @mrooding

    Hi, I'm having occassional CommitTimeoutExceptions occurring. We're using manual offset committing using the Commiter sink with default settings. The exception is:

    Kafka commit took longer than: 15 seconds (Ask timed out on [Actor[akka://holmes/system/kafka-consumer-4#-979292278]] after [15000 ms]. Message of type [akka.kafka.internal.KafkaConsumerActor$Internal$Commit]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.)

    We only see this happening when our stream is processing a lot of messages at once. I'm quite confident it has something to do with our internal processing and not with Kafka not keeping up with the # of commits. When I simply consume and commit offsets for the entire topic then it doesn't fail with the timeout. If I enable our entire flow that processes each message, then it starts occuring now and then. I've looked at memory and CPU consumption but that seems to be all ok. Any clue on how to figure out what internall processing is causing the commit actor to not be able to respond in 15 seconds?

    10 replies