by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Alessandro D'Armiento
    @adarmiento
    Ops, nevermind, I just saw that CommittableMessage is not a Committable, but the CommittableOffset inside is. Just missed a .map()
    Roman Schader
    @romanskie

    Hey Everybody, I got a question regarding Alpakka Kafka and at least once processing. Currently I am using something like this:

    committableKafkaSource
        .filter(someFilter)
        .mapAsync(parallelism = 5)(doSomeHttpReq())
        .via(producerFlow)
        .map(_.passThrough)
        .via(committerFlow)
        .toMat(Sink.ignore)(Keep.both)
        .mapMaterializedValue(DrainingControl.apply)
        .run()

    If an abprupt Kafka Broker outage appears, the producer inside the Producer.flexiFlow runs into a timeout after 1 min and we also got a committ warning from the CommittableSink.
    To overcome this problem I wanted to wrap the producer and commiter Flows into RestartFlows like this:

       val producerFlow = {
          Flow[ProducerMessage.Envelope[Vin, CmaEnvelope, CommittableOffset]]
            .via(RestartFlow.onFailuresWithBackoff(200 millis, 5 seconds, 0.2, -1) { () =>
              Producer.flexiFlow(producerSettings)
            })
        }
    
        val committerFlow = {
          val committerSettings: CommitterSettings = CommitterSettings(system)
          Flow[CommittableOffset]
            .via(RestartFlow.onFailuresWithBackoff(200 millis, 5 seconds, 0.2, -1) { () =>
              Committer.flow(committerSettings)
            })

    I am not sure if this does the trick, since I assume that the current processed offset will be gone after a restart, right?
    What is the prefered way to handle an abprupt Kafka Broker outages in such a stream without lossing any offset?

    Thank you in advance :)
    BG Roman

    Enno
    @ennru
    You should wrap the whole stream (source to committer flow) in a restart source. When a broker failure kills the stream it will restart.
    You might find the connection checker relevant: https://github.com/akka/alpakka-kafka/blob/master/core/src/main/resources/reference.conf#L129
    14 replies
    Patrick Skjennum
    @Habitats
    is this the right channel to ask for alpakka+pubsub help as well?
    Patrick Skjennum
    @Habitats

    I've recently changed to alpakka for our pubsub integration, and it's working great(!) but I am getting a ton of

    ow] - Restarting graph due to failure. stack_trace: java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. POST /v1/projects/<project>/<subscription>:pull Strict(45 bytes) -> 200 OK Chunked at

    I've tried setting

      stream {
        materializer {
          subscription-timeout {
            mode = warn
            timeout = 5s
          }
      }
    }

    but it still complains about 1 second 🤔

    Patrick Skjennum
    @Habitats
    ^ this doesn't seem to affect whether or not I can read from pubsub -- my system is otherwise working perfectly, but the logs are super spammy, and some of them are labeled "error" so I'm assuming something's not right
    Enno
    @ennru
    What Alpakka version is that? We just released 2.0.0. I'm not sure if that might have improved, though. https://doc.akka.io/docs/alpakka/current/release-notes/2.0.x/google-cloud-pub-sub.html
    Patrick Skjennum
    @Habitats
    right! I'm using 2.0.0-RC2
    Patrick Skjennum
    @Habitats
    I will upgrade to 2.0.0 and report back :) gz on the relase! looks like I joined the party at just the right time.
    Enno
    @ennru
    It doesn't seem to contain any changes in 2.0.0. So I doubt you'll see a difference. Report an issue with more details, maybe someone has the time to look into it.
    Patrick Skjennum
    @Habitats

    I'm having another issue with my subscription going silent after ~2-3 hours of processing ~20 events/s. It's a pretty busy stream, and should definitely not go silent. I've initiated it using:

    Source
          .tick(initialDelay = 0.seconds, interval = 10 seconds, tick = Done)
          .via(
            RestartFlow.withBackoff(1.second, 30.seconds, randomFactor = 0.2)(
              () => GooglePubSub.subscribeFlow(topic, credentials)
            )
          )

    Could this be related to akka/alpakka#2296 or am I missing something?

    Patrick Skjennum
    @Habitats
    image.png
    BTW; by "silent" i mean it ends up looking like this:
    image.png
    no deadlock or anything either. seems like the flow itself just dies off.
    Enno
    @ennru
    I believe this might show when downstream can't process messages fast enough.
    You may try to add a buffer(1000)
    mwkohout
    @mwkohout
    Hi; I'm looking for a way to create a Transactional sink for messages that come from another source system. I'm publishing multiple messages into multiple topics+partition and because of specific application requirements want them to be committed as a group. Is there a good way to achieve my goal?
    Patrick Skjennum
    @Habitats
    @ennru This is the down(est)stream process, though. But I'll try it!
    Aleksandar Skrbic
    @aleksandarskrbic
            Consumer
              .committableSource(consumerSettings[BookRatingkAvro], Subscriptions.topics("book-rating"))
              .map(msg => msg.record.value)
              .map(book => Rating(book.item.itemId, book.userId, book.rating))
              .via(
                CassandraFlow.create(
                  CassandraWriteSettings.defaults,
                  "INSERT INTO recommender.ratings (item_id, user_id, rating) VALUES (?, ?, ?)",
                  statementBinder
                )
              )
              .runWith(Sink.ignore)
    Can someone help me with this code snippet, I'm reading messages from kafka then transform it and write to cassandra table. Messages are read, but nothing is persisted to cassandra. Keyspace and table are alerady created.
    Here is code for statementBinder
      case class Rating(itemId: Long, userId: String, rating: Int)
      val statementBinder: (Rating, PreparedStatement) => BoundStatement =
        (rating, preparedStatement) => preparedStatement.bind(rating.itemId, rating.userId, rating.rating)
    Enno
    @ennru
    Are you sure your Cassandra session works correctly? You may need to configure the contact-points https://doc.akka.io/docs/alpakka/current/cassandra.html#custom-session-creation
    For the Kafka part, you'll want a committer sink after writing to Cassandra.
    Aleksandar Skrbic
    @aleksandarskrbic
    Cassandra session works correctly since I tried to read keyspaces from Cassandra and it works.
    Aleksandar Skrbic
    @aleksandarskrbic
    This is how I managed to write data to cassandra:
            Consumer
              .plainSource(consumerSettings[BookRatingkAvro], Subscriptions.topics("book-rating"))
              .map(msg => msg.value)
              .map(book => {
                println(book)
                Rating(book.item.itemId, book.userId, book.rating)
              })
              .mapAsync(1)(r =>
                for {
                  stmt <- cassandraSession.prepare("INSERT INTO recommender.ratings (item_id, user_id, rating) VALUES (?, ?, ?)")
                } yield(cassandraSession.executeWrite(stmt.bind(r.itemId, r.userId, r.rating)))
              )
              .runForeach(println)
    Aleksandar Skrbic
    @aleksandarskrbic
    So if I want to use commitableSource, SourceWithContext should be used I guess
    Aleksandar Skrbic
    @aleksandarskrbic

    I managed to solve problem that I had. Here is the solution if someone has similar problem.

    Consumer
      .sourceWithOffsetContext(consumerSettings[BookRatingkAvro], Subscriptions.topics("book-rating"))
      .map(_.value)
      .map(book => Rating(book.item.itemId, book.userId.toLong, book.rating))
      .via(
        CassandraFlow.withContext(
          CassandraWriteSettings.defaults,
          "INSERT INTO recommender.ratings (item_id, user_id, rating) VALUES (?, ?, ?)",
          statementBinder
        )
      )
      .toMat(Committer.sinkWithOffsetContext(committerSettings))(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()

    Just to be sure can someone confirm that this flow guarantees at-least-once delivery?

    Rikard Andersson
    @rikardev

    Hi, using Java on version 2.0.2 and can't seem to get the following code to work:

    Consumer.plainPartitionedManualOffsetSource(
                consumerSettings,
                Subscriptions.topics(topic),
                topicPartitions -> CompletableFuture.completedStage(
                    topicPartitions.stream()
                        .collect(
                            Collectors.toMap(
                                topicPartition -> topicPartition,
                                topicPartition -> 0
                            )
                        )
                )
            )

    I suspect it's the offset on assign that doesn't work since changing to plainPartitionedSource works but committablePartitionedManualOffsetSource doesn't. Have anyone else seen similar issues?

    Enno
    @ennru
    I haven't heard anything in general, it could be something with the Java API. But there is some more involved problem with plainPartitionedManualOffsetSource being discussed in akka/alpakka-kafka#1086
    Rikard Andersson
    @rikardev
    Thanks, just found the issue. Above topicPartition -> 0 should be topicPartition -> 0L. There was a conversion error int -> long causing the Seek to fail and error wasn't logged.
    Enno
    @ennru
    Tricky. Yes, failing for all errors there sounds like a good idea. Please suggest it in a Pull Request.
    Arjun KN
    @arjunknemc
    Hello, i am using the below code to enable backpressure for my downstream. But when there is too many messages to be consumed from kafka around 25L , i am observing that few messages are skipped. Throttling is around 1000 message per minute . Please help me with this
    SourceShape<Done> source = b.add(Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
                    .throttle(throttleLimit, Duration.ofMinutes(throttleTimeInMinutes))
                    .mapAsync(1, msg -> processDataToActorClusterProducer(msg.record(), topic)
                            .thenApply(done -> msg.committableOffset()))
                    .groupedWithin(1000, Duration.ofSeconds(10))
                    .map(group -> foldLeft(group))
                    .mapAsync(3, c ->{
                        c.commitJavadsl();
                        return CompletableFuture.completedFuture(Done.getInstance());
                    }));
            return source;
    Yair Halberstadt
    @YairHalberstadt

    Hi
    I noticed that by default an AutoSubscription has no RebalanceListener, and an empty PartitionAssignmentHandler.
    https://github.com/akka/alpakka-kafka/blob/37005de2ad4354765f941034ddf21ca5b3f6a93a/core/src/main/scala/akka/kafka/Subscriptions.scala#L150

    Does this mean that alpakka by default wont handle changes to assignment?
    So if a consumer is killed, nothing will be reading from its partitions?

    But this is not connected to when a consumer gets killed. It only lets you hook into Kafka's rebalancing for this particular consumer.
    Yair Halberstadt
    @YairHalberstadt
    Will things automatically work if I dont add those?

    I.E. My scenario:

    I have 5 instances of a server all consuming from kafka.
    One of the servers go down, and kafka rebalances the remaining 4.
    Will those 4 automatically rebalance? Or do I need to add a RebalanceListener?

    Enno
    @ennru
    Kafka will rebalance the partitions to the remaining consumers within the same consumer group.
    The listeners are available in case you need to take other actions when Kafka rebalances.
    Yair Halberstadt
    @YairHalberstadt
    Ok, great
    Patrick Skjennum
    @Habitats
    any particular reason the Google Cloud Storage integration is not available for 2.11? is it compatible? https://doc.akka.io/docs/alpakka/current/google-cloud-storage.html
    Enno
    @ennru
    We don't enable Scala 2.11 for new connectors. It could be possible that it compiles just fine, I haven't tried.
    Dhaval Kolapkar
    @kolapkardhaval
    I am running alpakka kafka stream in my project and getting the below error:Error registering AppInfo mbean
    javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
    at akka.kafka.ProducerSettings$.createKafkaProducer(ProducerSettings.scala:160)
    at akka.kafka.ProducerSettings.$anonfun$createKafkaProducerAsync$1(ProducerSettings.scala:383)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:92)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:92)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:47)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:47)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    2020-05-12 18:38:44.737 3970482 [application-akka.kafka.default-dispatcher-12] INFO o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c, transactionalId=fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c] ProducerId set to -1 with epoch -1
    2020-05-12 18:38:44.886 3970631 [kafka-producer-network-thread | producer-fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c, transactionalId=fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c] Cluster ID: DJ2VuIxMRlWsF9dlWKbvNQ
    2020-05-12 18:38:44.886 3970631 [kafka-producer-network-thread | producer-fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c] INFO o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c, transactionalId=fts-events-18f7d56cd-d511-47ba-a3fc-ca9c9de9fd9c] ProducerId set to 6005 with epoch 7
    Enno
    @ennru
    Alpakka Kafka 2.0.3 is now available https://discuss.lightbend.com/t/alpakka-kafka-2-0-3/6501
    Dhaval Kolapkar
    @kolapkardhaval
    I read this article on map vs mapAsync which says there is no difference if the code inside the map is already async https://stackoverflow.com/questions/35146418/difference-between-map-and-mapasync Is this true?
    Dhaval Kolapkar
    @kolapkardhaval
    @ennru @channel Does anyone know how to skip a message in transactional alpakka stream and move on to the next one?
    Tech Pro
    @jtechpro
    How do we achieve per-message parallelism in Alpakka Kafka? i.e, each message key should be processed with its own rate limit?
    brabo-hi
    @brabo-hi
    is CassandraSink removed from alpakka-cassandra ? It has been removed from the documentation page