CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
.withMaxBatch(8)
.withMaxInterval(Duration.ofSeconds(30));
Consumer.committableSource(consumerSettings, Subscriptions.topics("events"))
.groupBy(100, msg -> msg.toString())
.async()
.mapAsync(1, msg ->
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " " + msg);
return CompletableFuture.completedStage(msg);
}, actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.mergeSubstreams()
.toMat(Committer.sink(committerSettings), Keep.both())
.withAttributes(ActorAttributes.supervisionStrategy(
ex -> Supervision.restart()))
.mapMaterializedValue(Consumer::createDrainingControl)
.run(actorSystem);
source
.throttle(25, 1.second)
.filter(predicate)
.groupedWithin(25, 5.seconds)
.mapAsync(1) { batch =>
processAsync(batch)
}
.toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
.run()
final Config consumerConfig = system.settings().config().getConfig("akka.kafka.consumer");
ConsumerSettings<String, byte[]> aivenConsumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("a")
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty("security.protocol", "SASL_SSL")
.withProperty("sasl.mechanism","PLAIN")
.withProperty("sasl.jaas.config","...;")
.withProperty("ssl.enabled.protocols","TLSv1.2,TLSv1.1,TLSv1")
.withProperty("ssl.truststore.location","...");
ActorRef consumerActor1 = system.actorOf(KafkaConsumerActor.props(aivenConsumerSettings));
ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("b")
.withGroupId("test2")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ActorRef consumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings));
KafkaConsumerActor
. Pass the different ConsumerSettings
directly to the source factory methods you want to use.application.conf
by inheriting akka.kafka.consumer
directly in the file. (eg. as used in https://doc.akka.io/docs/alpakka-kafka/current/discovery.html#configure-consumer-settings )
Hi everyone!
What happens if I have 2 alpakka-kafka Committer.sinkWithOffsetContext
s which try to commit the same message offset?
In our setup - due to the complexity of the stream-graph - it could happen that a single offset could be committed multiple times (e.g. when messages get multiplied with mapConcat
and routed to different sinks).
Will there be any hickups or errors if the same offset is committed multiple times?
Many thanks!
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1003: {}. org.apache.kafka.common.errors.DisconnectException: null
while executing kafka consumer group describe command getting following error
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
Any pointers?
val committableSource = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic).withRebalanceListener(rebalanceListener)).idleTimeout(5 minutes)
RestartSource.withBackoff(
minBackoff = minBackoff,
maxBackoff = maxBackoff,
randomFactor = randomBackoffFactor
) { () =>
committableSource.groupedWithin(maxBatchSize, batchDelay)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer termination $metricPrefix")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer $metricPrefix"))
streamComplete
.flatMap { _ =>
consumerControl.shutdown().map(_ -> logger.info(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
}
.recoverWith {
case e =>
consumerControl.shutdown().map(_ -> logger.error(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} ERROR:CLOSED FROM UPSTREAM", e))
}
}
.mapAsync(parallelism) { messages =>
metricService.increment(s"$metricPrefix.received", messages.size.toLong)
metricService.timeAndReportFailure(s"$metricPrefix.timing", s"error.$metricPrefix.process")(process(messages))
}
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.log(s"$metricPrefix-precommit")
.via(Committer.flow(committerSettings))
}.runWith(Sink.ignore)`
val consumerSettings: ConsumerSettings[Array[Byte], String] =
ConsumerSettings(
actorSystem, new ByteArrayDeserializer, new StringDeserializer
)
.withBootstrapServers(kafka_bootstrap_servers)
.withGroupId(group)
.withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, maxRequestSize.toString)
.withProperty(ENABLE_AUTO_COMMIT_CONFIG, "false")
.withCommitTimeout(commitTimeOut)
.withConnectionChecker(ConnectionCheckerSettings(3, 15.seconds, 2d))
.withProperties(additionalPropertiesAsMap)
mergePrioritized
of two Consumer.committableSource
. Periodically one of the streams, the one with higher volume and priority, jumps back a number offset (in the range of hundreds). I commit the offsets to Kafka directly
Hi, I'm having occassional CommitTimeoutExceptions occurring. We're using manual offset committing using the Commiter sink with default settings. The exception is:
Kafka commit took longer than: 15 seconds (Ask timed out on [Actor[akka://holmes/system/kafka-consumer-4#-979292278]] after [15000 ms]. Message of type [akka.kafka.internal.KafkaConsumerActor$Internal$Commit]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.)
We only see this happening when our stream is processing a lot of messages at once. I'm quite confident it has something to do with our internal processing and not with Kafka not keeping up with the # of commits. When I simply consume and commit offsets for the entire topic then it doesn't fail with the timeout. If I enable our entire flow that processes each message, then it starts occuring now and then. I've looked at memory and CPU consumption but that seems to be all ok. Any clue on how to figure out what internall processing is causing the commit actor to not be able to respond in 15 seconds?