Hi, I'm using alpakka for consuming data, process them and produce it back to kafka. I want a reliable s/w which process every piece of data. I thought committable consumer source of alpakka would provide me that, where I can commit once I process the data. As my flow is dynamic graph created, I need to split the data and merge them back in the same sequence. I am currently using partitions and merge sequence concepts of graph dsl, which solves my problem. But the materialized value, thats provided by commitable source could not be sent or captured as part of my flow creation. Can some one help me with that, please find the below code, I use java as my programming language:
`
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> kafkaMessages = Consumer.committableSource( consumerSettings, Subscriptions.topics( topic ) );
Flow<Pair, Pair, NotUsed> someflow1 = constructFlow1();
Flow<Pair, Pair, NotUsed> someflow2 = constructFlow2();
Flow<Pair, Pair, NotUsed> someflow3 = constructFlow3();
Flow<Pair, Pair, NotUsed> combinedFlow = someflow1.async()
.via( someflow2.async() )
.via( someflow3.async() );
Sink<ConsumerMessage.Committable, CompletionStage<Done>> sink = Committer.sink(committerSettings);
final RunnableGraph<Consumer.Control> result =
RunnableGraph.<Consumer.Control>fromGraph(
GraphDSL.create(
builder -> {
UniformFanOutShape<Pair, Pair> partitions =
builder.add(
Partition.create( Pair.class, concurrency, element -> getPartitionNumber() ) );
UniformFanInShape<Pair, Pair> output =
builder.add( MergeSequence.create( concurrency, Pair::second ) );
builder
.from( builder.add( kafkaMessages.zipWithIndex() ) )
.viaFanOut( partitions );
for( int i = 0; i < concurrency; i++ ) {
builder
.from( partitions.out( i ) )
.via( builder.add( combinedF.async() ) )
.viaFanIn( output );
}
//Filter the data by removing non-null elements, and extract the producer record from the pair
builder
.from(output.out())
.to( builder.add( createProducerRecords ) )
.via( Producer.flexiFlow( producerSettings ) )
.map( m -> m.passThrough() )
.toMat( Committer.sink( committerSettings ), Keep.both() )
.mapMaterializedValue( Consumer::createDrainingControl );
), sink );
return ClosedShape.getInstance();
} ) );
result.run( materializer );
`
Hello, i would like to continue the stream after passing through the commiter, something like.
I would like a passthrough similar to the Producer
Consumer
.committableSource[String, String](???, ???)
.mapAsync(2) {
msg: CommittableMessage[String, String] =>
(msg, process(msg.record.value())) //keep both values
}
.via(Committer.flow(???)) //ideally this should take the Commitable but passthrough the Person
.map(resultOfProcess: Person => furtherProcessing(resultOfProcess))
def process(v: String): Future[Person] = ???
def furtherProcessing(p: Person) = ???
Can this be done without Broadcast->Merge ?
is it safe to do with Broadcast(2), one output flow being the CommiterFlow, the other a simple passthrough (_.2 to get the person), and merge the (Done, Person)=>Person ?
i have something like below, which does not seem to print and does not finish:
val done = consumer.via(Committer.flow(committerSettings))
.take(3)
.map(_ => println("xxxxxx"))
.toMat(Sink.ignore)(Keep.right)
Await.ready(done, 5.seconds) //timeouts
i'm wondering what does it mean to take() in this case, since the kafka consumer receives batches of messages
the consumer is
RestartSource.onFailuresWithBackoff(
config.restartSource.toAkkaRestartSettings
)(() =>
Consumer
.committableSource(config.toKafkaSettings, Subscriptions.topics(id.value))
.mapAsync(4)(msg => println(msg); msg) //these are always printed
i would expect it to finish due to the take(), but maybe not? do i need to force completion via a Consumer.Control ?
Hi,
I am facing issue with alpakka consumer with plain source, where my consumer stops consuming the messages without any errors printed on the akka-stream. I see the below message being printed:
2021-03-11 16:50:57,349 INFO [akka.actor.default-dispatcher-45] org.apache.kafka.clients.consumer.internals.AbstractCoordinator
[Consumer clientId=consumer-CONSUMER-GRP-2, groupId=CONSUMER-GRP] Attempt to heartbeat failed since group is rebalancing
2021-03-11 16:52:41,710 INFO [akka.kafka.default-dispatcher-95] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
[Consumer clientId=consumer-CONSUMER_GRP-1, groupId=CONSUMER-GRP] Revoke previously assigned partitions topic-5, topic-4
2021-03-11 16:52:41,934 INFO [akka.kafka.default-dispatcher-93] org.apache.kafka.clients.consumer.internals.AbstractCoordinator
[Consumer clientId=consumer-CONSUMER-GRP-1, groupId=CONSUMER-GRP] Member consumer-CONSUMER-GRP-1-cdfb1446-3781-4605-9e85-739c2fe83439 sending LeaveGroup request to coordinator <IP>:9092 (id: 2147482644 rack: null) due to the consumer is being closed
I use akka-streams with Graph dsl for my data flow and obtain parallelism. I have a akka cluster with my actors running with clustersingleton to have a timer to invoke actors remotely. I have multiple topics with the same consumer group name, where all the consumer groups get unassigned with the consumer id, and then the stream never consume any message. But on the same process I have a plain java consumer thread without graphDSL which gets re-assigned with the consumer client id.
Can someone help me on identifying the issue?
I am trying to use Consumer.committablePartitionedSource() and creating stream per partition as shown below
public void setup() {
control = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
.mapAsyncUnordered(Integer.MAX_VALUE, pair -> setupSource(pair, committerSettings))
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(Materializer.matFromSystem(actorSystem));
}
private CompletionStage<Done> setupSource(Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>> pair, CommitterSettings committerSettings) {
LOGGER.info("SETTING UP PARTITION-{} SOURCE", pair.first().partition());
return pair.second().mapAsync(1, msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
.runWith(Committer.sink(committerSettings), Materializer.matFromSystem(actorSystem));
}
While setting us the source per partition I am using parallelism which I want to change based on no of partitions assigned to the node. That I can do that in the first assignment of partitions to the node. But as new nodes join the cluster assigned partitions are revoked and assigned. This time stream not emitting already existing partitions to reconfigure parallelism.
What are the options I have to control parallelism on each partitioned source on every rebalancing operation?
Hello,
I am trying to use Consumer.committableSource
with DrainingControl & RestartSource with backoff for my use case based on this https://github.com/akka/alpakka-kafka/blob/master/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala#L501
val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
val result = RestartSource
.onFailuresWithBackoff(RestartSettings(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2)) {
() =>
.committableSource(consumerSettings, Subscriptions.topics(topicName))
.mapMaterializedValue(c => control.set(c))
.via(processMessage)
.map(_.committableOffset)
.via(Committer.flow(committerSettings))
}
.runWith(Sink.ignore)
val drainingControl = DrainingControl.apply(control.get(), result)
drainingControl.drainAndShutdown()
drainingControl.drainAndShutdown() keeps on running into RTE The correct Consumer.Control has not been assigned, yet.
Need some help figuring this out. am i missing something?
KafkaConsumerActor
and MetadataClient
, but it looks like the response to a Metadata.GetPartitionsFor(topic)
message/method call is all partitions of the topic, not just the ones that my node is assigned. (Also, it looks like the only consumer strategies that support KafkaConsumerActor
require a ManualSubscription
, where my use-case requires something like Consumer.committablePartitionedSource
.)
Hello everyone. A little bit of context: I'm consuming an event from an SQS queue. After some further processing, the output of that stream is going to be a file uploaded to an S3 bucket. I'm facing the issue of removing the message from the queue once the file is uploaded.
This is the sink uploading the file:
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload("test", "test.json", ContentTypes.`application/json`)
This is the flow deleting the message:
val deleteFlow: Flow[Message, Unit, NotUsed] = {
Flow[Message]
.map(MessageAction.Delete(_))
.via(SqsAckFlow(queueUrl))
.map(_ => ())
}
Currently I'm able to save the file in the bucket like this:
_.asSourceWithContext({ m => m })
.via(sqsMessageDecoder.flowWithContext[Message])
.collect({ case Right(account) => account })
// some more processing
.asSource
.map(_._1)
.to(s3Sink) // this is the sink previously show
.run()
And I could also delete the message:
_.asSourceWithContext({ m => m })
.via(decodeMessage)
.collect({ case Right(account) => account })
// some more processing
.asSource
.map(_._2)
.via(deleteMessageFlow.flow) // this is the flow previously shown
I cannot figure out how to be able to connect both, ideally, the removal of the message should be done after the upload.
Hello all!
So, I'm trying to read a message from an sqs queue:
val source: Source[Message, NotUsed] = {
SqsSource("url",
SqsSourceSettings()
.withCloseOnEmptyReceive(true)
)
}
And then save it into an S3 bucket:
source
.
./// some processing
.to(s3Sink)
.run()
This is the s3 sink:
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload("bucket", "file.txt", ContentTypes.`text/xml(UTF-8)`)
If I keep .withCloseOnEmptyReceive(true)
in the source config it works. However, I want the source keep listening to the queue always. If I remove it, the future never complete and the file is never uploaded. Any tip on what approach could I use? Is it possible to restart the source?
BTW, I tried using an actor, once the stream finishes I send a message to the actor to start the stream again. That comes with unexpected behavior.
Any feedback is appreciated. Thanks!
Hello all,
we use alpakka kafka in connection with redpanda.
When we run an integration test with an external dockerized redpanda to test a consumer, we get the following error:
Producer
o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
| => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:215)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:434)
...
Consumer
16:14:35.012 [JobsQueueSpecSystem-akka.kafka.default-dispatcher-16] WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-datapool-1
at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
| => dat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at akka.kafka.ConsumerSettings$.createKafkaConsumer(ConsumerSettings.scala:237)
at akka.kafka.ConsumerSettings$.$anonfun$apply$3(ConsumerSettings.scala:111)
at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$applySettings(KafkaConsumerActor.scala:461)
at akka.kafka.internal.KafkaConsumerActor.preStart(KafkaConsumerActor.scala:438)
at akka.actor.Actor.aroundPreStart(Actor.scala:543)
at akka.actor.Actor.aroundPreStart$(Actor.scala:543)
at akka.kafka.internal.KafkaConsumerActor.aroundPreStart(KafkaConsumerActor.scala:212)
at akka.actor.ActorCell.create(ActorCell.scala:637)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:509)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:531)
This is the consumer configuration:
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(s"$brokerHost:$brokerPort")
.withGroupId(brokerGroup)
.withProperties(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
Dependencies:
Redpanda v21.6.1
Akka 2.6.0
Alpakka Kafka 2.0.6
Specs2 4.6.0
Do you have any idea why the consumer and producer are not closed/shutdown after running the integration test ? Thanks!
Hello everyone.
So, something I still don't get when using streams is whether this makes sense:
someSource
.mapAsync(1)(aFunctionThatCallsAnExternalService())
.map(_.sequence) // doesn't compile just an example
.
.
.
So the type of the aFunctionThatCallsAnExternalService
function is:aFunctionThatCallsAnExternalService: Future[Either[Failure, List[ImportantThing]]]
And say this function returns a huge amount of ImportantThing
What should I do there, should I put this list back into a stream again? Does it makes sense to use an
stream and then having this huge list in which I run this (_.sequence)
call?
Hello everyone.
I am quite new with alpakka-kafka and I have a question
I need to skip one message from the kafka topic but before in I need to know the current offset
For skipping I do :
Consumer
.committablePartitionedManualOffsetSource(
defConsumerSettings,
Subscriptions.topics(config.topic),
_ => Future.successful(Map(new TopicPartition(config.topic, config.partition) -> config.offset)))
.flatMapMerge(1, _._2)
.take(1)
.map { message =>
message.offset
}
.toMat(Committer.sink(CommitterSettings(actorSystem)))(Keep.right)
.run()
How can I know the current offset before skipping?
map
followed by a filter
, where due to some predicate of your choice the filter would happen to only prevent the first message from passing downstream.
Hi-I've got an already existing application that is using Alpakka Kafka and is reading from a kafka cluster using a committable source(with offsets committed back to kafka). Now, I'm being asked to read from a second cluster.
I don't see how it's handled in the Alpakka Kafka 2.1.0 source code, but I don't see anything in the docs against mixing sources from different brokers.
Is it possible to merge committable sources from different clusters?
Hello, I am using Alpakka-HDFS to save a stream of data in an Orc file on HDFS.
To do so I first wait for there to be enough messages upstream, then I batch all of them, serialize them in a ORC byte array and then flush the single blob as a file on HDFS.
This works.
Now, we decided to drop HDFS in favor of Alluxio, which long story short exposes an Hadoop FileSystem interface but is backed by an object-store.
After this update, I don't want to use anymore the built-in rename mechanism which (as it makes sense with HDFS) write in a temp directory and then renames the file to have it in the output directory.
Is it possible?