Hello I have situation where I lost some parts of messages after node restarts
last offset handling message in log 117246
after consumer restarted 117293
and there are no messages in range beetwen in logs
Here is my code
Please give me the hint where I am wrong
I need at least once guarantee
def start(...) =
for {
streamCompletionRef = new AtomicReference[Option[Future[Done]]](None)
assignedPartitionsRef = new AtomicReference(Set[TopicPartition]())
consumer = makePartitionedConsumer(config, settings, controlRef, assignedPartitionsRef, handleMessage)
_ <- Resource.make(startConsumerF(config, consumer, streamCompletionRef))(
_ => shutdownConsumerF(name, controlRef, streamCompletionRef)
)
} yield ()
private def startConsumerF(
config: CommittableConsumerConfig,
consumer: Source[Object, Unit],
streamCompletionRef: AtomicReference[Option[Future[Done]]]
)(implicit mat: Materializer): F[Unit] =
F.delay {
import config._
RestartSource
.onFailuresWithBackoff(minBackoff = backOffSettings.map(_.minBackoff).getOrElse(3).seconds,
maxBackoff = backOffSettings.map(_.maxBackoff).getOrElse(30).seconds,
randomFactor = backOffSettings.map(_.randomFactor).getOrElse(0.2))(() => consumer)
.toMat(Sink.ignore) { case (_, streamCompletion) => streamCompletionRef.set(Some(streamCompletion)) }
.run()
}
private def makePartitionedConsumer[K, V](
config: CommittableConsumerConfig,
consumerSettings: ConsumerSettings[K, V],
controlRef: AtomicReference[Option[Consumer.Control]],
assignedPartitionsRef: AtomicReference[Set[TopicPartition]],
handleMessage: CommittableMessage[K, V] => F[Unit]
)(implicit mat: Materializer, scheduler: Scheduler, logging: Logging[F]): Source[Done, Unit] = {
import config._
def handlePartitionSource(source: Source[CommittableMessage[K, V], NotUsed]): Future[Done] =
source
.mapAsync(maxPartitionParallelism.getOrElse(1)) { msg =>
(logCommittableMessage(msg) >> handleMessage(msg))
.as(msg.committableOffset)
.runWithEmptyContext
.runToFuture
}
.runWith(Committer.sink(CommitterSettings(config.committerConfig)))
val subscriptions = Subscriptions
.topics(topics.toSortedSet.toList: _*)
.withPartitionAssignmentHandler(makePartitionAssignmentHandler(assignedPartitionsRef))
Consumer
.committablePartitionedSource(consumerSettings, subscriptions)
.mapMaterializedValue(control => controlRef.set(Some(control)))
.mapAsyncUnordered[Done](maxParallelism) { case (_, source) => handlePartitionSource(source) }
}
private def shutdownConsumerF(
name: String,
controlRef: AtomicReference[Option[Consumer.Control]],
streamCompletionRef: AtomicReference[Option[Future[Done]]]
)(implicit logger: Logging[F], exec: ExecutionContext): F[Unit] =
for {
_ <- logger.info(s"Stopping Kafka consumer $name")
maybeControl <- F.delay(controlRef.get())
maybeStreamCompletion <- F.delay(streamCompletionRef.get())
_ <- maybeControl
.map2(maybeStreamCompletion) { (control, streamCompletion) =>
F.deferFuture(DrainingControl(control, streamCompletion).drainAndShutdown()).void
}
.getOrElse(F.unit)
.flatMap(_ => logger.error(s"Successfully stopped Kafka consumer $name"))
.onError[Unit, Throwable](e => logger.errorCause(s"Failed to stop Kafka consumer $name", e))
} yield ()
This is about Kafka messages being processed twice when rebalancing happens because an application instance is shut down.
I have a simple application, two instances A0 and A1 consuming messages from one Kafka topic with P partitions. The application uses val control = Consumer
.committablePartitionedSource
to obtain the sources when partition assignment happens. The shutdown sequence appears to work as intended where all streams report to be completed without exceptions. The following code is executed when I shut an instance down: CoordinatedShutdown
.get(system)
.addTask(
CoordinatedShutdown.PhaseActorSystemTerminate,
"Drain streams and close databases"
)(() => { //PhaseBeforeActorSystemTerminate
logger.info("Enter coordinated shutdown")
control
.drainAndShutdown()
.andThen(_ => {
logger.info("Executing custom shutdown task to close databases.")
db.close()
dbRead.close()
()
})
})
When the application instances process messages under the load of e.g. 20-100 messages per second with a partitions count of 2 or 20 (the the examples I have tried) I can consistently reproduce the following scenario:
kill pid
What puzzles me is that I think that the comitting of the Kafka message should really prevent the message from being processed twice.
This is my Committer settings ```committer {
max-batch = 1
max-interval = 2.seconds
parallelism = 1
delivery = "WaitForAck"
when = "OffsetFirstObserved"
}
The flow ends with a Committer.sink.
I am not an expert in the Committer settings, this is where I think I should look next.
I do realize there is more to specify to describe in detail everything, but this may perhaps suffice to sketch the background to my question:
Can I hope to eliminate the race condition making one command sometimes be processed twice when a rebalancing happens?
If not, I would like to understand why, since I think a graceful shutdown should give this guarantee.
max.poll.interval.ms
purposes, but idk. There are also broker failure modes that can lead to Kafka losing committed offsets (just like Kafka in general makes fairly weak guarantees about message durability).
failed authentication due to: [a3c21a3d-fcdb-47c1-8754-e46c0e14bd12]: Too many connects
. We are using a plainSink and calling this method @Singleton
class KafkaProducerHelper @Inject()(appConfig: AppConfig, implicit val actorSystem: ActorSystem) {
val pLogger: Logger = LoggerFactory.getLogger("application")
val producerSettings: ProducerSettings[Array[Byte], Array[Byte]] = ProducerSettings(appConfig.KAFKA_PRODUCER_CONF, new ByteArraySerializer, new ByteArraySerializer)
val plainSinkConnection: Sink[ProducerRecord[Array[Byte], Array[Byte]], Future[Done]] = Producer.plainSink(producerSettings)
def sendAlertsMessages(messages: List[ProducerRecord[Array[Byte], Array[Byte]]]): Future[Option[Done]] = {
pLogger.info("Sending alert messages")
Source(messages).runWith(plainSinkConnection) map {
case Done => Some(Done)
case _ => None
}
}
}
sendAlertMessages
in a recursive fashion as to split up the List
of ProducerRecords
. When we pass a full list into the method, it significantly increases the time it takes to produce
messages however the error messages go away. I was wondering if anyone else encountered this error with MSK and how they approached it
paralellism
down as well and got a little better but still seeing errors
MockConsumer
and MockProducer
and injects those to the ConsumerSettings
and ProducerSettings
(via withConsumerFactory
& withProducer
) with the goal of testing the code that creates the Kafka flow.mockConsumer.paused
, and the solution is to reschedule the same poll task until the consumer is not paused on the topic-partition.
Hi all! I have a newbie question about working with Sinks in conjunction with ProducerMessage.multi
. Essentially, I have a source that is giving me multi-line strings, and I want to send each line as a separate ProducerRecord into Kafka
. Something like this:
val done: Future[Done] =
Source(1 to 100)
.map(num => (0 to num).toList )
.map(_.map(_.toString))
.map(list => ProducerMessage.multi[Nothing, String](list.map(new ProducerRecord("topic", _)).toSeq))
.runWith(Producer.plainSink(producerSettings))
The error I get is:
type mismatch;
found : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
required: akka.stream.Graph[akka.stream.SinkShape[akka.kafka.ProducerMessage.Envelope[Nothing,String,akka.NotUsed]],scala.concurrent.Future[akka.Done]]
Not sure how to make all of the types happy. If anyone can point me in the right direction, it would be much appreciated!
PartitionOffset
per event produced. My events are being created from several events consumed over several partitions, so I need to commit a batch of PartitionOffset
per event. Is this possible?
committablePartitionedSource
so I have access to per-partition offset. How do I terminate the flow after a specific offset has been reached?Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
.mapAsyncUnordered(
maxPartitions,
pair -> {
Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
pair.second();
return source
.via(business())
.map(message -> message.committableOffset())
.runWith(Committer.sink(committerSettings), system);
})
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(system);
preStart
Consumer.commitWithMetadataPartitionedSource
meant to be used? What does the param metadataFromRecord: Function[ConsumerRecord[K, V], String])
allow you to do? I can't find any examples. I want to stop processing when a message is greater than a given timestamp. I can see that the timestamp is available in the metadataFromRecord
, but how can I use it in the result of commitWithMetadataPartitionedSource
?
The result of metadataFromRecord
is only passed back to Kafka when committing an offset (see https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/OffsetAndMetadata.html). The message timestamp is available in any of the sources which give you a ConsumerRecord
or a CommittableMessage
without needing a commitWithMetadata
source.
For the other committable sources, you would call msg.record.timestamp
to get the timestamp. So given StopAfterTimestamp
, you could .takeWhile { msg => msg.record.timestamp <= StopAfterTimestamp }
LagomApplication
, but I can only see the "starting" log, but not "executing" log (topic has messages being produced). I do not see the consumer group created when checking from broker side. private val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
logger.info("Starting the subscriber")
Consumer
.plainSource(consumerSettings, Subscriptions.topics(ExternalTopic))
.mapAsync(1)(message => {
val request = Json.parse(message.value).as[ExternalRequest]
logger.info("Executing {}", request)
Future.successful(Done)
})
.run()
So I have an interesting problem where when I am subscribing to a Stream using Alpakka Kafka and right at the start of the stream I am using prefixAndTail(1).flatMapConcat
to get the first element however it returns None
even though topics are being sent to the Kafka topic. Interestingly I am not getting this problem with a local Kafka stream that I run with Docker.
Does anyone know in what cases this occurs and also if prefixAndTail(1)
is eager? i.e. will it wait for perpetuity until it happens to get an element or is there some kind of timeout?
Consumer.sourceWithOffsetContext
when the source is assigned multiple partitions, how consumption is managed between partitions. I was under the impression the partitions were consumed from using round-robin distribution but cannot find documentation to back that up (or contradict/refute).
msg
data in the code shown in the following link?msg.record.value
contains some string, then publish otherwise skip etc.Hi all, I'm trying to use a dependency that adds Kinesis KPL support to akka. It has a KPLFlow class to provide that support. I'm relativly new to akka and flows, but my objective would be to have a kafka source, that is already created and have some type of sink, to replace the "native" kinesis sink and use this flow to deliver the records. Is there a way to extract from the flow class this? Or is it possible with just the flow "consume" from the kafka source and deliver to a target stream ?
I've create a stackoverflow question regarding this https://stackoverflow.com/questions/73873966/akka-kafka-source-to-kinesis-sink-using-kpl
Hi everyone. I have an application that receives an api request and relays its to a Kafka API Producer. Each request calls the producer to send a message to Kafka. The producer exists throughout the application lifetime and is shared for all requests.
producer.send(new ProducerRecord[String, String](topic, requestBody))
This works OK. Now I want to use instead, an alpakka Producer for the job. The code looks like this:
val kafkaProducer = producerSettings.createKafkaProducer()
val settingsWithProducer = producerSettings.withProducer(kafkaProducer)
val done = Source.single(requestBody)
.map(value => new ProducerRecord[String, String](topic, value))
.runWith(Producer.plainSink(settingsWithProducer))
What are the advantages of the alpakka Producer over the plain, vanilla Producer? I don't know whether the new approach can help me handle a large number of API requests in order at the same time.
def atMostOnceSource[K, V]: Source[ConsumerRecord[K, V], NotUsed] = {
Consumer
.committableSource[K, V](consumerSettings, Subscriptions.topics(allTopics))
.groupedWithin(maxBatchSize, maxBatchDuration)
.mapAsync(1) { messages: Seq[CommittableMessage[K, V]] =>
val committableOffsetBatch =
CommittableOffsetBatch(messages.map(_.committableOffset))
Source
.single(committableOffsetBatch)
.toMat(Committer.sink(committerSettings))(Keep.right)
.run()
.map(_ => messages)
}
.mapConcat(identity)
}