Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    ennru
    @ennru:matrix.org
    [m]
    We're about to release Alpakka Kafka 2.0.6 which contains some bug fixes and the latest TestContainters updates akka/alpakka-kafka#1294
    ennru
    @ennru:matrix.org
    [m]
    Alpakka Kafka 2.0.6 is now available from Maven Central https://doc.akka.io/docs/alpakka-kafka/current/release-notes/2.0.x.html#2-0-6
    Daniel Sebban
    @dsebban_twitter
    Can someone explain to me this sentence "When a topic-partition is revoked, the corresponding source completes."
    is there a way to do the opposite when source completes then kafka will revoke the partition ?
    1 reply
    I am using a KillSwitch to kill a substream but the partition is not being revoked, is there a way to do it ?
    Akshay jha
    @akshayjh
    My consumer is a grpahDB and it has schema defined. My concern is, do i need to create seperate topics for each vertexType and EdgeType?
    1 reply
    Thomas
    @thomasschoeftner

    Hi everyone.
    I have a little SNAFU with akka-stream-kafka-testkit. - Hopefully somebody has an idea.

    We are using akka-stream-kafka and -testkit in version 2.0.5 combined with akka 2.6.10.
    Our Kafka-broker is provided via org.testcontainers : kafka.
    In the integration tests, we use akka.kafka.testkit.scaladsl.TestcontainersKafkaLike to add Kafka-broker support to our specs.

    And here comes the problem:
    When using org.testcontainers : kafka : 1.14.3everything works as expected and the tests run succesfully.
    Unfortunately, that version is incompatible with current Docker Desktop versions (at least on Mac), so we tried to move to 1.15.0 or 1.15.1.
    After the upgrade, the container image version configured via akka.kafka.testkit.testcontainers.confluent-platform-version is ignored and the it:test always tries to download 'cp-kafka:latest'.

    Following this, I'd presume that this is an error of org.testcontainers : kafka, but then I saw that the Cloudflow folks are very much using version 1.15.0 and still manage to make the test download and run container image cp-kafka:5.4.3 - see here.
    They are directly setting a container image name and tag, instead of using the akka.kafka.testkit.testcontainers config keys.

    Does akka-stream-kafka-testkit have a compatibility issue with org.testcontainers : kafka : 1.5.x?

    Many thanks (and sorry for the lengthy explanation)!

    2 replies
    itshan
    @MadeInChina
    Hi, do we have some example for ElasticsearchFlow.createBulk?
    1 reply
    Sean Glover
    @seglo
    Alpakka Kafka 2.0.7 was released (testkit updates only) https://discuss.lightbend.com/t/alpakka-kafka-2-0-7-released/7824
    Harish
    @harishmca_twitter
    Hi Team , I am new to Akka and looking starting point for Kafka COnsumer and Producre
    1 reply
    VarunVats9
    @VarunVats9
    Hi, I'm using CommittablePartitionedSource, in my code, and wanted to use "group.instance.id" to counter rebalancing issue.
    But Since the group.instance.id has to be unique, but the CommittablePartitionedSource use the consumerSetting once only, so everytime same group.instance.id would be used. Is there any other way ?
    1 reply
    VarunVats9
    @VarunVats9
    @seglo : Please can you look at the above query ?
    VarunVats9
    @VarunVats9
    Hi, I'm using, CommittablePartitionedSource, and facing issues of rebalancing, has anyone faced the same, can someone suggest any solution to this.
    Sean Glover
    @seglo
    I replied to your question earlier as a thread ^
    varunvats9
    @varunvats9:matrix.org
    [m]
    seglo (Sean Glover) : Actually looks like the brokers are on older version, hence group.instance.id, didn't work. I'm still figuring why the rebalancing is happening so often, or if someone has faced something similar, when using CommittablePartitionedSource.
    rbr1589
    @rbr1589

    Hi, I'm using alpakka for consuming data, process them and produce it back to kafka. I want a reliable s/w which process every piece of data. I thought committable consumer source of alpakka would provide me that, where I can commit once I process the data. As my flow is dynamic graph created, I need to split the data and merge them back in the same sequence. I am currently using partitions and merge sequence concepts of graph dsl, which solves my problem. But the materialized value, thats provided by commitable source could not be sent or captured as part of my flow creation. Can some one help me with that, please find the below code, I use java as my programming language:
    `
    Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> kafkaMessages = Consumer.committableSource( consumerSettings, Subscriptions.topics( topic ) );
    Flow<Pair, Pair, NotUsed> someflow1 = constructFlow1();
    Flow<Pair, Pair, NotUsed> someflow2 = constructFlow2();
    Flow<Pair, Pair, NotUsed> someflow3 = constructFlow3();
    Flow<Pair, Pair, NotUsed> combinedFlow = someflow1.async()
    .via( someflow2.async() )
    .via( someflow3.async() );
    Sink<ConsumerMessage.Committable, CompletionStage<Done>> sink = Committer.sink(committerSettings);
    final RunnableGraph<Consumer.Control> result =
    RunnableGraph.<Consumer.Control>fromGraph(
    GraphDSL.create(
    builder -> {
    UniformFanOutShape<Pair, Pair> partitions =
    builder.add(
    Partition.create( Pair.class, concurrency, element -> getPartitionNumber() ) );
    UniformFanInShape<Pair, Pair> output =
    builder.add( MergeSequence.create( concurrency, Pair::second ) );

                                    builder
                                            .from( builder.add( kafkaMessages.zipWithIndex() ) )
                                            .viaFanOut( partitions );
                                    for( int i = 0; i < concurrency; i++ ) {
                                        builder
                                                .from( partitions.out( i ) )
                                                .via( builder.add( combinedF.async() ) )
                                                .viaFanIn( output );
                                    }
    
                                    //Filter the data by removing non-null elements, and extract the producer record from the pair
                                    builder
                                            .from(output.out())
                                            .to( builder.add( createProducerRecords ) )
                                            .via( Producer.flexiFlow( producerSettings ) )
                                            .map( m -> m.passThrough() )
                                            .toMat( Committer.sink( committerSettings ), Keep.both() )
                                            .mapMaterializedValue( Consumer::createDrainingControl );
                                  ), sink );
                         return ClosedShape.getInstance();

    } ) );
    result.run( materializer );
    `

    3 replies
    Adrian
    @adrian-salajan

    Hello, i would like to continue the stream after passing through the commiter, something like.
    I would like a passthrough similar to the Producer

      Consumer
        .committableSource[String, String](???, ???)
        .mapAsync(2) {
          msg: CommittableMessage[String, String] =>
            (msg, process(msg.record.value())) //keep both values
        }
        .via(Committer.flow(???)) //ideally this should take the Commitable but passthrough the Person
        .map(resultOfProcess: Person => furtherProcessing(resultOfProcess))
    
    def process(v: String): Future[Person] = ???
    def furtherProcessing(p: Person) = ???

    Can this be done without Broadcast->Merge ?
    is it safe to do with Broadcast(2), one output flow being the CommiterFlow, the other a simple passthrough (_.2 to get the person), and merge the (Done, Person)=>Person ?

    7 replies
    Adrian
    @adrian-salajan

    i have something like below, which does not seem to print and does not finish:

    val done = consumer.via(Committer.flow(committerSettings))
    .take(3)
     .map(_ => println("xxxxxx"))
     .toMat(Sink.ignore)(Keep.right)
    
    Await.ready(done, 5.seconds) //timeouts

    i'm wondering what does it mean to take() in this case, since the kafka consumer receives batches of messages

    the consumer is

        RestartSource.onFailuresWithBackoff(
                config.restartSource.toAkkaRestartSettings
              )(() =>
                Consumer
                  .committableSource(config.toKafkaSettings, Subscriptions.topics(id.value))
    .mapAsync(4)(msg => println(msg); msg) //these are always printed

    i would expect it to finish due to the take(), but maybe not? do i need to force completion via a Consumer.Control ?

    4 replies
    Pedro Silva
    @gygabyte
    hello. As per my understanding 2.0.7 was the last release supporting Scala 2.11, though I have found a bug there with committablePartitionedManualOffsetSource subscriptions. I assume that a fix for a bug raised on 2.0.7 will only be available on Scala 2.12 with the next release 2.1.x ? thank you
    5 replies
    elorchane
    @elorchane
    Hello everyone,
    I have a stream that consumes from multiple sources and merge all sources using Source.combine and MergePrioritized. Each source is committableSource. I have a logic for processing each message and then I use Committer.sink to commit the messages.
    When trying to commit I get the following error:
    java.lang.IllegalArgumentException: requirement failed: CommittableOffset [CommittableOffsetImpl(PartitionOffset(GroupTopicPartition(consumer_1,topic_1,1),29826492),)] committer for groupId [group_id_test] must be same as the other with this groupId.
    have anyone encounter this problem?
    3 replies
    rbr1589
    @rbr1589

    Hi,
    I am facing issue with alpakka consumer with plain source, where my consumer stops consuming the messages without any errors printed on the akka-stream. I see the below message being printed:

    2021-03-11 16:50:57,349 INFO  [akka.actor.default-dispatcher-45] org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    [Consumer clientId=consumer-CONSUMER-GRP-2, groupId=CONSUMER-GRP] Attempt to heartbeat failed since group is rebalancing
    
    2021-03-11 16:52:41,710 INFO  [akka.kafka.default-dispatcher-95] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
    [Consumer clientId=consumer-CONSUMER_GRP-1, groupId=CONSUMER-GRP] Revoke previously assigned partitions topic-5, topic-4
    
    2021-03-11 16:52:41,934 INFO  [akka.kafka.default-dispatcher-93] org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    [Consumer clientId=consumer-CONSUMER-GRP-1, groupId=CONSUMER-GRP] Member consumer-CONSUMER-GRP-1-cdfb1446-3781-4605-9e85-739c2fe83439 sending LeaveGroup request to coordinator <IP>:9092 (id: 2147482644 rack: null) due to the consumer is being closed

    I use akka-streams with Graph dsl for my data flow and obtain parallelism. I have a akka cluster with my actors running with clustersingleton to have a timer to invoke actors remotely. I have multiple topics with the same consumer group name, where all the consumer groups get unassigned with the consumer id, and then the stream never consume any message. But on the same process I have a plain java consumer thread without graphDSL which gets re-assigned with the consumer client id.
    Can someone help me on identifying the issue?

    Jesse Yates
    @jyates
    Hiya! Any word on when the next branch will be cut? Some changes we would love to integrate into our system
    3 replies
    pokuri999
    @pokuri999

    I am trying to use Consumer.committablePartitionedSource() and creating stream per partition as shown below

    public void setup() {
            control = Consumer.committablePartitionedSource(consumerSettings,
                    Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
                    .mapAsyncUnordered(Integer.MAX_VALUE, pair -> setupSource(pair, committerSettings))
                    .toMat(Sink.ignore(), Consumer::createDrainingControl)
                    .run(Materializer.matFromSystem(actorSystem));
        }
    
        private CompletionStage<Done> setupSource(Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>> pair, CommitterSettings committerSettings) {
            LOGGER.info("SETTING UP PARTITION-{} SOURCE", pair.first().partition());
            return pair.second().mapAsync(1, msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                    .thenApply(param -> msg.committableOffset()))
                    .withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
                    .runWith(Committer.sink(committerSettings), Materializer.matFromSystem(actorSystem));
        }

    While setting us the source per partition I am using parallelism which I want to change based on no of partitions assigned to the node. That I can do that in the first assignment of partitions to the node. But as new nodes join the cluster assigned partitions are revoked and assigned. This time stream not emitting already existing partitions to reconfigure parallelism.

    What are the options I have to control parallelism on each partitioned source on every rebalancing operation?

    Lior Shapsa
    @liorshapsa_twitter
    Hi, Is there a way to read each partition from offset 0 to X and make sure the stream ends once all partitions reach the last offset?
    1 reply
    Sean Glover
    @seglo
    hi. i'm planning to release 2.1.0-RC1 by next Monday. hopefully a final release a week or so after that. thanks for everyone's patience.
    Maksym Besida
    @mbesida
    is this an expected behavior for Consumer to reconnect forever if, for example, brokers were misconfigured?
    2 replies
    Nikhil Arora
    @nikhilaroratgo_gitlab
    I have a use case where I want to commit the offset when I get the success reply from server. The reply from server is async and I don't know the order. How can I handle this ? This blog https://quarkus.io/blog/kafka-commit-strategies/#the-throttled-strategy talks about my use case. I am wondering if there is anything in alpakka kafka for this.
    3 replies
    Sean Glover
    @seglo
    Alpakka Kafka 2.1.0-RC1 has been released to sonatype https://discuss.lightbend.com/t/alpakka-kafka-2-1-0-rc1-released/8144
    snwlnx
    @snwlnx
    Hello.
    Why api of partitioned sources accept only AutoSubscription?
    I need manual partition assignment and I wanted to use partitioned source
    to consume and commit partitions independently.
    Could anyone suggest good solution for this?
    3 replies
    snwlnx
    @snwlnx
    I realized that could write separate consumer for each partition
    Tejas Somani
    @tejassomani

    Hello,
    I am trying to use Consumer.committableSource with DrainingControl & RestartSource with backoff for my use case based on this https://github.com/akka/alpakka-kafka/blob/master/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala#L501

    val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
        val result = RestartSource
          .onFailuresWithBackoff(RestartSettings(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2)) {
            () =>
                .committableSource(consumerSettings, Subscriptions.topics(topicName))
                .mapMaterializedValue(c => control.set(c))
                .via(processMessage)
                .map(_.committableOffset)
                .via(Committer.flow(committerSettings))
          }
          .runWith(Sink.ignore)
    
        val drainingControl = DrainingControl.apply(control.get(), result)
        drainingControl.drainAndShutdown()

    drainingControl.drainAndShutdown() keeps on running into RTE The correct Consumer.Control has not been assigned, yet.
    Need some help figuring this out. am i missing something?

    2 replies
    Matthew Smedberg
    @msmedberg:matrix.org
    [m]
    How would I go about asking my consumer which partitions it is currently assigned (for purposes of an application healthcheck)? I've looked at KafkaConsumerActor and MetadataClient, but it looks like the response to a Metadata.GetPartitionsFor(topic) message/method call is all partitions of the topic, not just the ones that my node is assigned. (Also, it looks like the only consumer strategies that support KafkaConsumerActor require a ManualSubscription, where my use-case requires something like Consumer.committablePartitionedSource.)
    2 replies
    snwlnx
    @snwlnx
    Hello
    I know about feature with async boundaries akka/alpakka-kafka#1038 and as I understand this is source why in my app messages duplicated and handled in parallel when rebalance started and partition moved to another consumer (also I use manual offset committing which also may be a problem but in eager rebalance we have stop processing phase in which no consumers can process events). I know there are no good solution to guarantee that consumers don't start processing in parallel but what about that solution? see thread
    My goal is at least once semantics but exclude simultaneous parallel handling on different consumers
    7 replies
    Sean Glover
    @seglo
    Alpakka Kafka 2.1.0 final will be released by the end of the week
    Enno Runne
    @ennru
    The 2.1 release is out. Akka 2.6 is now required. https://doc.akka.io/docs/alpakka-kafka/2.1/
    Yuliban
    @yupegom_gitlab

    Hello everyone. A little bit of context: I'm consuming an event from an SQS queue. After some further processing, the output of that stream is going to be a file uploaded to an S3 bucket. I'm facing the issue of removing the message from the queue once the file is uploaded.
    This is the sink uploading the file:

    val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
        S3.multipartUpload("test", "test.json", ContentTypes.`application/json`)

    This is the flow deleting the message:

    val deleteFlow: Flow[Message, Unit, NotUsed] = {
        Flow[Message]
          .map(MessageAction.Delete(_))
          .via(SqsAckFlow(queueUrl))
          .map(_ => ())
      }

    Currently I'm able to save the file in the bucket like this:

    _.asSourceWithContext({ m => m })
          .via(sqsMessageDecoder.flowWithContext[Message])
          .collect({ case Right(account) => account })
          // some more processing
          .asSource
          .map(_._1)
          .to(s3Sink) // this is the sink previously show 
          .run()

    And I could also delete the message:

     _.asSourceWithContext({ m => m })
          .via(decodeMessage)
          .collect({ case Right(account) => account })
          // some more processing
          .asSource
          .map(_._2)
          .via(deleteMessageFlow.flow) // this is the flow previously shown

    I cannot figure out how to be able to connect both, ideally, the removal of the message should be done after the upload.

    Yuliban
    @yupegom_gitlab

    Hello all!
    So, I'm trying to read a message from an sqs queue:

    val source: Source[Message, NotUsed] = {
        SqsSource("url",
          SqsSourceSettings()
            .withCloseOnEmptyReceive(true)
        )
      }

    And then save it into an S3 bucket:

    source
    .
    ./// some processing
    .to(s3Sink)
    .run()

    This is the s3 sink:

     val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
        S3.multipartUpload("bucket", "file.txt", ContentTypes.`text/xml(UTF-8)`)

    If I keep .withCloseOnEmptyReceive(true) in the source config it works. However, I want the source keep listening to the queue always. If I remove it, the future never complete and the file is never uploaded. Any tip on what approach could I use? Is it possible to restart the source?

    BTW, I tried using an actor, once the stream finishes I send a message to the actor to start the stream again. That comes with unexpected behavior.
    Any feedback is appreciated. Thanks!

    Yuliban
    @yupegom_gitlab
    Do you know what might be the issues one could face when running a graph inside another graph? Something like:
    Source
              .single(value)
              .map{ 
                v => 
                Source
                 .single(byteStringRepr)
                 .to(s3Sink)
                 .run()
              }
              .to(s3Sink)
              .run()
    2 replies
    jmw847
    @jmw847

    Hello all,

    we use alpakka kafka in connection with redpanda.
    When we run an integration test with an external dockerized redpanda to test a consumer, we get the following error:

    Producer

    o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
            at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      | => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
        at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:215)
        at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:434)
        ...

    Consumer

    16:14:35.012 [JobsQueueSpecSystem-akka.kafka.default-dispatcher-16] WARN  o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-datapool-1
            at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
            at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      | => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
        at akka.kafka.ConsumerSettings$.createKafkaConsumer(ConsumerSettings.scala:237)
        at akka.kafka.ConsumerSettings$.$anonfun$apply$3(ConsumerSettings.scala:111)
        at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$applySettings(KafkaConsumerActor.scala:461)
        at akka.kafka.internal.KafkaConsumerActor.preStart(KafkaConsumerActor.scala:438)
        at akka.actor.Actor.aroundPreStart(Actor.scala:543)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:543)
        at akka.kafka.internal.KafkaConsumerActor.aroundPreStart(KafkaConsumerActor.scala:212)
        at akka.actor.ActorCell.create(ActorCell.scala:637)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:509)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:531)

    This is the consumer configuration:

    ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(s"$brokerHost:$brokerPort")
    .withGroupId(brokerGroup)
    .withProperties(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "earliest"
    )

    Dependencies:
    Redpanda v21.6.1
    Akka 2.6.0
    Alpakka Kafka 2.0.6
    Specs2 4.6.0

    Do you have any idea why the consumer and producer are not closed/shutdown after running the integration test ? Thanks!

    Yuliban
    @yupegom_gitlab

    Hello everyone.
    So, something I still don't get when using streams is whether this makes sense:

    someSource
    .mapAsync(1)(aFunctionThatCallsAnExternalService())
    .map(_.sequence) // doesn't compile just an example
    .
    .
    .

    So the type of the aFunctionThatCallsAnExternalService function is:
    aFunctionThatCallsAnExternalService: Future[Either[Failure, List[ImportantThing]]]
    And say this function returns a huge amount of ImportantThing
    What should I do there, should I put this list back into a stream again? Does it makes sense to use an
    stream and then having this huge list in which I run this (_.sequence) call?

    9 replies
    olgakabirova
    @olgakabirova

    Hello everyone.
    I am quite new with alpakka-kafka and I have a question
    I need to skip one message from the kafka topic but before in I need to know the current offset
    For skipping I do :

        Consumer
          .committablePartitionedManualOffsetSource(
            defConsumerSettings,
            Subscriptions.topics(config.topic),
            _ => Future.successful(Map(new TopicPartition(config.topic, config.partition) -> config.offset)))
          .flatMapMerge(1, _._2)
          .take(1)
          .map { message =>
            message.offset
          }
          .toMat(Committer.sink(CommitterSettings(actorSystem)))(Keep.right)
          .run()

    How can I know the current offset before skipping?

    Sven
    @iosven
    @olgakabirova Did you figure it out? Just switch the order of the take and the map? Also, you seem to only keep the offset in your map, consider passing downstream more than that.
    @olgakabirova You could also consider using a map followed by a filter, where due to some predicate of your choice the filter would happen to only prevent the first message from passing downstream.
    @olgakabirova Yet another alternative could be https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/drop.html depending on what you want to do.
    brightinnovator
    @brightinnovator
    what are the open source tool is available to monitor/go through kafka messages ?
    Ignasi Marimon-Clos
    @ignasi35
    🚀 We're pleased to announce the PATCH release of Alpakka Kafka: https://github.com/akka/alpakka-kafka/releases/tag/v2.1.1
    brightinnovator
    @brightinnovator
    what are the open source tool is available to monitor/go through kafka messages ?
    Milo
    @MiloVentimiglia
    Lenses.io I think
    But indeed is not free. You can also montior kafka with Grafana, and prometheus.