Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Adamos8888
    @Adamos8888
    deadLetter.png
    Roy Prager
    @roy-prager
    i am consuming from a topic with multiple partitions, and using groupedWithin in my graph, means that i should commit the offset to multiple partitions. what is the best way for doing it?
    Adamos8888
    @Adamos8888
    Hi guys!
    image.png
    Is there any solution or configuration for suppressing "Kafka commit is to be retried" logs probably config or something else?
    Ryan Tomczik
    @Tomczik76
    Hi everyone, I'm looking to commit offset batches with each event produced transactionally. The problem is it looks like you can only provide one PartitionOffset per event produced. My events are being created from several events consumed over several partitions, so I need to commit a batch of PartitionOffset per event. Is this possible?
    Vishal Bhavsar
    @vbhavsar
    Hi, I'm looking to consume messages from earliest to a specific offset and then stop the consuming (thus stopping source from consuming/emitting more messages). What would be the best way to achieve this? I am using committablePartitionedSource so I have access to per-partition offset. How do I terminate the flow after a specific offset has been reached?
    Consumer.DrainingControl<Done> control =
        Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
            .mapAsyncUnordered(
                maxPartitions,
                pair -> {
                  Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
                      pair.second();
                  return source
                      .via(business())
                      .map(message -> message.committableOffset())
                      .runWith(Committer.sink(committerSettings), system);
                })
            .toMat(Sink.ignore(), Consumer::createDrainingControl)
            .run(system);
    1 reply
    zyon1
    @zyon1
    Hey, can someone help me, in stream i have combination of take and takeWithin, and i am wondering if takeWithin will start counter after it receives last message or first message that reaches takeWithin operator.
    Levi Ramsey
    @leviramsey
    I'm pretty sure takeWithin starts its timer at materialization (i.e. before it has seen a stream element)
    Burak Helvac─▒
    @burakhelvaci
    Please use kafka, but leave kafka-streams as soon as possible.
    Vishal Bhavsar
    @vbhavsar
    Hi, how is the metadata from Consumer.commitWithMetadataPartitionedSource meant to be used? What does the param metadataFromRecord: Function[ConsumerRecord[K, V], String]) allow you to do? I can't find any examples. I want to stop processing when a message is greater than a given timestamp. I can see that the timestamp is available in the metadataFromRecord, but how can I use it in the result of commitWithMetadataPartitionedSource?
    3 replies
    Levi Ramsey
    @leviramsey

    The result of metadataFromRecord is only passed back to Kafka when committing an offset (see https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/OffsetAndMetadata.html). The message timestamp is available in any of the sources which give you a ConsumerRecord or a CommittableMessage without needing a commitWithMetadata source.

    For the other committable sources, you would call msg.record.timestamp to get the timestamp. So given StopAfterTimestamp, you could .takeWhile { msg => msg.record.timestamp <= StopAfterTimestamp }

    The only usecase for that metadata that I can see is if you have tooling which consumes the consumer-offset topic from Kafka (e.g. for observability) and you want to pass metadata like which hosts are committing offsets to that tooling
    Vishal Bhavsar
    @vbhavsar
    That makes sense. Thank you for such a comprehensive response @leviramsey!
    Harry Tran
    @pertsodian
    Hello, does anyone have an example using Alpakka Kafka consumer in a Lagom application before? I have this in a class, and wire it in a LagomApplication, but I can only see the "starting" log, but not "executing" log (topic has messages being produced). I do not see the consumer group created when checking from broker side.
      private val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
    
      logger.info("Starting the subscriber")
      Consumer
        .plainSource(consumerSettings, Subscriptions.topics(ExternalTopic))
        .mapAsync(1)(message => {
          val request = Json.parse(message.value).as[ExternalRequest]
          logger.info("Executing {}", request)
          Future.successful(Done)
        })
        .run()
    1 reply
    cheapsolutionarchitect
    @cheapsolutionarchitect
    Hm, what use have the CassandraWriteSettings? What's their purporse?
    1 reply
    Matthew de Detrich
    @mdedetrich

    So I have an interesting problem where when I am subscribing to a Stream using Alpakka Kafka and right at the start of the stream I am using prefixAndTail(1).flatMapConcat to get the first element however it returns None even though topics are being sent to the Kafka topic. Interestingly I am not getting this problem with a local Kafka stream that I run with Docker.

    Does anyone know in what cases this occurs and also if prefixAndTail(1) is eager? i.e. will it wait for perpetuity until it happens to get an element or is there some kind of timeout?

    Matthew de Detrich
    @mdedetrich
    So figured out the issue, turned out it was Main immediately terminating which was causing a shutdown.
    archieby
    @archieby
    Is there any way to make sure that two messages aimed for two different topics either both end up in those topics or none of them does after sending them there using either Send Producer or any regular streaming producer?
    1 reply
    Dave Kichler
    @dkichler
    Curious whether the consumption patterns for the Consumer sources are documented anywhere? I'm specifically curious about the semantics of Consumer.sourceWithOffsetContext when the source is assigned multiple partitions, how consumption is managed between partitions. I was under the impression the partitions were consumed from using round-robin distribution but cannot find documentation to back that up (or contradict/refute).
    3 replies
    Sean Kwak
    @cosmir17
    Can I ask how to do a conditional publish with msg data in the code shown in the following link?
    e.g. if msg.record.value contains some string, then publish otherwise skip etc.
    https://github.com/akka/alpakka-kafka/blob/v3.0.0/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala#L241-L251
    4 replies
    Ashish Sharma
    @ashish-sharma09
    hey, what is the configuration for setting log.retention duration for a topic within client settings?
    Ashish Sharma
    @ashish-sharma09
    I guess this has to be done at the time of topic creation from the client?
    Levi Ramsey
    @leviramsey
    Or done through the usual Kafka CLI tools (e.g. kafka-topics.sh)
    Koen Dejonghe
    @koen-dejonghe
    Can I use HdfsFlow to write parquet files to hdfs? If so, how? Thank you.
    BTW, I have GenericRecords in my flow. I could use AvroParquetWriter, but that does not have the RotationStrategy and FilePathGenerator