KafkaConsumerActor
. Pass the different ConsumerSettings
directly to the source factory methods you want to use.application.conf
by inheriting akka.kafka.consumer
directly in the file. (eg. as used in https://doc.akka.io/docs/alpakka-kafka/current/discovery.html#configure-consumer-settings )
Hi everyone!
What happens if I have 2 alpakka-kafka Committer.sinkWithOffsetContext
s which try to commit the same message offset?
In our setup - due to the complexity of the stream-graph - it could happen that a single offset could be committed multiple times (e.g. when messages get multiplied with mapConcat
and routed to different sinks).
Will there be any hickups or errors if the same offset is committed multiple times?
Many thanks!
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1003: {}. org.apache.kafka.common.errors.DisconnectException: null
while executing kafka consumer group describe command getting following error
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
Any pointers?
val committableSource = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic).withRebalanceListener(rebalanceListener)).idleTimeout(5 minutes)
RestartSource.withBackoff(
minBackoff = minBackoff,
maxBackoff = maxBackoff,
randomFactor = randomBackoffFactor
) { () =>
committableSource.groupedWithin(maxBatchSize, batchDelay)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer termination $metricPrefix")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer $metricPrefix"))
streamComplete
.flatMap { _ =>
consumerControl.shutdown().map(_ -> logger.info(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
}
.recoverWith {
case e =>
consumerControl.shutdown().map(_ -> logger.error(s"Consumer $metricPrefix SHUTDOWN at ${Instant.now} ERROR:CLOSED FROM UPSTREAM", e))
}
}
.mapAsync(parallelism) { messages =>
metricService.increment(s"$metricPrefix.received", messages.size.toLong)
metricService.timeAndReportFailure(s"$metricPrefix.timing", s"error.$metricPrefix.process")(process(messages))
}
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.log(s"$metricPrefix-precommit")
.via(Committer.flow(committerSettings))
}.runWith(Sink.ignore)`
val consumerSettings: ConsumerSettings[Array[Byte], String] =
ConsumerSettings(
actorSystem, new ByteArrayDeserializer, new StringDeserializer
)
.withBootstrapServers(kafka_bootstrap_servers)
.withGroupId(group)
.withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, maxRequestSize.toString)
.withProperty(ENABLE_AUTO_COMMIT_CONFIG, "false")
.withCommitTimeout(commitTimeOut)
.withConnectionChecker(ConnectionCheckerSettings(3, 15.seconds, 2d))
.withProperties(additionalPropertiesAsMap)
mergePrioritized
of two Consumer.committableSource
. Periodically one of the streams, the one with higher volume and priority, jumps back a number offset (in the range of hundreds). I commit the offsets to Kafka directly
Hi, I'm having occassional CommitTimeoutExceptions occurring. We're using manual offset committing using the Commiter sink with default settings. The exception is:
Kafka commit took longer than: 15 seconds (Ask timed out on [Actor[akka://holmes/system/kafka-consumer-4#-979292278]] after [15000 ms]. Message of type [akka.kafka.internal.KafkaConsumerActor$Internal$Commit]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.)
We only see this happening when our stream is processing a lot of messages at once. I'm quite confident it has something to do with our internal processing and not with Kafka not keeping up with the # of commits. When I simply consume and commit offsets for the entire topic then it doesn't fail with the timeout. If I enable our entire flow that processes each message, then it starts occuring now and then. I've looked at memory and CPU consumption but that seems to be all ok. Any clue on how to figure out what internall processing is causing the commit actor to not be able to respond in 15 seconds?
Hi, not sure if this is the right place to ask but wondering if I could get some feedback on if alpakka-kafka could be a solution to a service I am architecting.
Basically, the goal of the service (Micronaut framework) is to reactively handle incoming requests (via /POST) to connect and deliver messages from a kafka topic based on a set of options and filters contained in the request body. The response would be a kotlin Flow that will emit the messages from the user-defined kafka topic that match whatever filters contained in the request.
I was thinking alpakka-kafka could be a solution for concurrent polling to multiple kafka topics for the multiple users.
Am I anywhere close in thinking this could be a use case? Thank you very much.
Hi everyone.
I have a little SNAFU with akka-stream-kafka-testkit. - Hopefully somebody has an idea.
We are using akka-stream-kafka
and -testkit
in version 2.0.5
combined with akka 2.6.10
.
Our Kafka-broker is provided via org.testcontainers : kafka
.
In the integration tests, we use akka.kafka.testkit.scaladsl.TestcontainersKafkaLike
to add Kafka-broker support to our specs.
And here comes the problem:
When using org.testcontainers : kafka : 1.14.3
everything works as expected and the tests run succesfully.
Unfortunately, that version is incompatible with current Docker Desktop versions (at least on Mac), so we tried to move to 1.15.0 or 1.15.1.
After the upgrade, the container image version configured via akka.kafka.testkit.testcontainers.confluent-platform-version
is ignored and the it:test always tries to download 'cp-kafka:latest'.
Following this, I'd presume that this is an error of org.testcontainers : kafka
, but then I saw that the Cloudflow folks are very much using version 1.15.0 and still manage to make the test download and run container image cp-kafka:5.4.3
- see here.
They are directly setting a container image name and tag, instead of using the akka.kafka.testkit.testcontainers
config keys.
Does akka-stream-kafka-testkit
have a compatibility issue with org.testcontainers : kafka : 1.5.x
?
Many thanks (and sorry for the lengthy explanation)!
Hi, I'm using alpakka for consuming data, process them and produce it back to kafka. I want a reliable s/w which process every piece of data. I thought committable consumer source of alpakka would provide me that, where I can commit once I process the data. As my flow is dynamic graph created, I need to split the data and merge them back in the same sequence. I am currently using partitions and merge sequence concepts of graph dsl, which solves my problem. But the materialized value, thats provided by commitable source could not be sent or captured as part of my flow creation. Can some one help me with that, please find the below code, I use java as my programming language:
`
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> kafkaMessages = Consumer.committableSource( consumerSettings, Subscriptions.topics( topic ) );
Flow<Pair, Pair, NotUsed> someflow1 = constructFlow1();
Flow<Pair, Pair, NotUsed> someflow2 = constructFlow2();
Flow<Pair, Pair, NotUsed> someflow3 = constructFlow3();
Flow<Pair, Pair, NotUsed> combinedFlow = someflow1.async()
.via( someflow2.async() )
.via( someflow3.async() );
Sink<ConsumerMessage.Committable, CompletionStage<Done>> sink = Committer.sink(committerSettings);
final RunnableGraph<Consumer.Control> result =
RunnableGraph.<Consumer.Control>fromGraph(
GraphDSL.create(
builder -> {
UniformFanOutShape<Pair, Pair> partitions =
builder.add(
Partition.create( Pair.class, concurrency, element -> getPartitionNumber() ) );
UniformFanInShape<Pair, Pair> output =
builder.add( MergeSequence.create( concurrency, Pair::second ) );
builder
.from( builder.add( kafkaMessages.zipWithIndex() ) )
.viaFanOut( partitions );
for( int i = 0; i < concurrency; i++ ) {
builder
.from( partitions.out( i ) )
.via( builder.add( combinedF.async() ) )
.viaFanIn( output );
}
//Filter the data by removing non-null elements, and extract the producer record from the pair
builder
.from(output.out())
.to( builder.add( createProducerRecords ) )
.via( Producer.flexiFlow( producerSettings ) )
.map( m -> m.passThrough() )
.toMat( Committer.sink( committerSettings ), Keep.both() )
.mapMaterializedValue( Consumer::createDrainingControl );
), sink );
return ClosedShape.getInstance();
} ) );
result.run( materializer );
`
Hello, i would like to continue the stream after passing through the commiter, something like.
I would like a passthrough similar to the Producer
Consumer
.committableSource[String, String](???, ???)
.mapAsync(2) {
msg: CommittableMessage[String, String] =>
(msg, process(msg.record.value())) //keep both values
}
.via(Committer.flow(???)) //ideally this should take the Commitable but passthrough the Person
.map(resultOfProcess: Person => furtherProcessing(resultOfProcess))
def process(v: String): Future[Person] = ???
def furtherProcessing(p: Person) = ???
Can this be done without Broadcast->Merge ?
is it safe to do with Broadcast(2), one output flow being the CommiterFlow, the other a simple passthrough (_.2 to get the person), and merge the (Done, Person)=>Person ?
i have something like below, which does not seem to print and does not finish:
val done = consumer.via(Committer.flow(committerSettings))
.take(3)
.map(_ => println("xxxxxx"))
.toMat(Sink.ignore)(Keep.right)
Await.ready(done, 5.seconds) //timeouts
i'm wondering what does it mean to take() in this case, since the kafka consumer receives batches of messages
the consumer is
RestartSource.onFailuresWithBackoff(
config.restartSource.toAkkaRestartSettings
)(() =>
Consumer
.committableSource(config.toKafkaSettings, Subscriptions.topics(id.value))
.mapAsync(4)(msg => println(msg); msg) //these are always printed
i would expect it to finish due to the take(), but maybe not? do i need to force completion via a Consumer.Control ?
Hi,
I am facing issue with alpakka consumer with plain source, where my consumer stops consuming the messages without any errors printed on the akka-stream. I see the below message being printed:
2021-03-11 16:50:57,349 INFO [akka.actor.default-dispatcher-45] org.apache.kafka.clients.consumer.internals.AbstractCoordinator
[Consumer clientId=consumer-CONSUMER-GRP-2, groupId=CONSUMER-GRP] Attempt to heartbeat failed since group is rebalancing
2021-03-11 16:52:41,710 INFO [akka.kafka.default-dispatcher-95] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
[Consumer clientId=consumer-CONSUMER_GRP-1, groupId=CONSUMER-GRP] Revoke previously assigned partitions topic-5, topic-4
2021-03-11 16:52:41,934 INFO [akka.kafka.default-dispatcher-93] org.apache.kafka.clients.consumer.internals.AbstractCoordinator
[Consumer clientId=consumer-CONSUMER-GRP-1, groupId=CONSUMER-GRP] Member consumer-CONSUMER-GRP-1-cdfb1446-3781-4605-9e85-739c2fe83439 sending LeaveGroup request to coordinator <IP>:9092 (id: 2147482644 rack: null) due to the consumer is being closed
I use akka-streams with Graph dsl for my data flow and obtain parallelism. I have a akka cluster with my actors running with clustersingleton to have a timer to invoke actors remotely. I have multiple topics with the same consumer group name, where all the consumer groups get unassigned with the consumer id, and then the stream never consume any message. But on the same process I have a plain java consumer thread without graphDSL which gets re-assigned with the consumer client id.
Can someone help me on identifying the issue?
I am trying to use Consumer.committablePartitionedSource() and creating stream per partition as shown below
public void setup() {
control = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
.mapAsyncUnordered(Integer.MAX_VALUE, pair -> setupSource(pair, committerSettings))
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(Materializer.matFromSystem(actorSystem));
}
private CompletionStage<Done> setupSource(Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>> pair, CommitterSettings committerSettings) {
LOGGER.info("SETTING UP PARTITION-{} SOURCE", pair.first().partition());
return pair.second().mapAsync(1, msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
.runWith(Committer.sink(committerSettings), Materializer.matFromSystem(actorSystem));
}
While setting us the source per partition I am using parallelism which I want to change based on no of partitions assigned to the node. That I can do that in the first assignment of partitions to the node. But as new nodes join the cluster assigned partitions are revoked and assigned. This time stream not emitting already existing partitions to reconfigure parallelism.
What are the options I have to control parallelism on each partitioned source on every rebalancing operation?