Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Keir Lawson
    @keirlawson
    @bplommer in terms of such a contribution, which branch would it make sense to target?
    or does everything just go into master and then get ported?
    Ben Plommer
    @bplommer
    Yeah, you can just target series/1.x assuming you're not making any breaking changes
    Keir Lawson
    @keirlawson
    coolio
    Keir Lawson
    @keirlawson
    is there a reason why metals files/folders aren't gitignored in the fs2-kafka repo?
    Ben Plommer
    @bplommer
    Not really, they should probably be in your global .gitignore but feel free to open a PR to gitignore them
    Keir Lawson
    @keirlawson
    ah I didn't know global gitignore was a thing! I'll set one up
    Timothy Bess
    @tdbgamer
    So I'm trying to stream something from a database onto Kafka with a transactional producer, but I don't want to pull all the rows into memory. I also don't want to commit my offsets until I'm done streaming. Problem is, CommittableProducerRecord seems to only take chunks. Is there a good way to do this?
    6 replies
    Ben Plommer
    @bplommer
    fs2-kafka v1.5.0 and v2.0.0-RC1 are on their way to Maven Central.
    Mauro González
    @maurogonzalez

    Hi! I've having some troubles with consumers due to connection issues. The Kafka cluster lives inside Kubernetes but for some reason some nodes of the Kafka cluster are being moved causing this issue (the infra team is taking on this).
    In the meanwhile, is there a strategy to reconnect or in the worst case to end gracefully the consumer so the application can exit?

    Current behavior show some logs like these:

     2021-04-22 08:56:18.029 INFO [shutdownHook1]  Consumer from SomeConsumer unsubscribing from [Topic] // Consumer resource release initiates
    ...
    2021-04-22 09:01:18.132 INFO [kafka-coordinator-heartbeat-thread | SomeConsumer] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=SomeConsumer-cb44c10a-9b45-4a51-922f-0ed015c88efc, groupId=AeneidCore-QryConsumer] Member SomeConsumer-cb44c10a-9b45-4a51-922f-0ed015c88efc-abcf89b6-adad-4fc6-8951-9dfc4bf29e8f sending LeaveGroup request to coordinator kafka-2.kafka-headless.namespace.svc.cluster.local:9092 (id: 2147483645 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages.

    And then hangs

    Mauro González
    @maurogonzalez

    Looking at the docs the max.poll.interval.ms=5minutes(just the time in the logs). I think that that LeaveGroup message was sent to the host that resolved to a old IP or some network issue while updating host's resolver in the cluster. But it seems that no other try is done and then the Consumer's resource close doesn't release the resource.

    Any idea to fix this from fs2-kafka or better look at consumer settings?

    Damien O'Reilly
    @DamienOReilly
    Hi, If I stream from a topic that has many partitions using KafkaConsume.stream, will commitBatchWithin handle committing offsets correctly per partition? Or should I use KafkaConsume.partitionedStream and handle streams per partition separately, committing offsets via commitBatchWithin ?
    Fabio Labella
    @SystemFw
    the former works
    Damien O'Reilly
    @DamienOReilly
    How does it keep track of it, I guess fs2-kafka is explicitly handling the last offset per partitions for a given Chunk ?
    Ok I think I see it in fs2.kafka.CommittableOffsetBatch#fromFoldableMap
    Sebastian Voss
    @sebastianvoss

    Hi All, I'm using Vulcan to consume Avro messages from Kafka:

    implicit val deserializer: RecordDeserializer[IO, A] = avroDeserializer[A].using(avroSettings)

    Is there a way to make Vulcans avroDeserializer skip deserialization exceptions? Or should I implement a custom deserializer from scratch?

    Sebastian Voss
    @sebastianvoss
    I tried this but the obvious problem is, it is instantiating the deserializer for every message:
    implicit val deserializer: Deserializer[IO, Option[A]] = Deserializer.instance { (topic, headers, bytes) =>
        avroDeserializer[A]
          .using(avroSettings)
          .forValue
          .flatMap(
            _.deserialize(topic, headers, bytes)
              .map(Some(_))
              .handleErrorWith(e => logger.error(e.getMessage).as(none[A]))
          )
      }
    Sebastian Voss
    @sebastianvoss
    Or should I handle this at the stream level instead of the deserializer level?
    Greg Fisher
    @gnfisher
    How do you handle message versioning?
    Fabio Labella
    @SystemFw
    avro + topic up versioning when needed
    we have a library called vulcan for avro, which is integrated with fs2-kafka
    TheLastAngryMan
    @TheLastAngryMan
    Is there any reason compression wouldn’t be working? I’ve set the compression.type property when creating the ProducerSettings but I don’t think compression is being enabled
    TheLastAngryMan
    @TheLastAngryMan
    Nvm, I do actually think compression is working correctly
    corentin
    @corenti13711539_twitter
    Are KafkaProducer instances, as well as the underlying resources, thread-safe and allowed to be shared across threads?
    Any best practices on sharing producer instances?
    Ayush
    @ayushworks
    Hi, any examples of how to use the KafkaConsumer in a non-IOApp world like say in a springboot api ? how to use the IO returned by drain so that the consumer runs forever
    Andrey
    @green-creeper

    Hello!
    I need to handle the error when kafka is not available, but the producer tries to produce the message
    I looked through documentation and tried to adjust REQUEST_TIMEOUT_MS_CONFIG or DELIVERY_TIMEOUT_MS_CONFIG, but it seems have no effect, since the thread is blocked by retries of connection

    The code is like this

          def produce[T](key: String, record: T, topic: String): F[RecordMetadata] =
            producerResource[T]
              .use { producer =>
                val toProduce = ProducerRecord(topic, key, record)
                producer.produce(ProducerRecords.one(toProduce))
              }
              .flatten
              .map { result =>
                result.records
                  .collectFirst { case (_, metadata) =>
                    metadata
                  }
                  .toRight {
                    val msg = s"Can't produce record ${record.getClass} with key $key due to unknown reason"
                    log.error(msg)
                    KafkaProducerException(msg, new NullPointerException(msg))
                  }
              }
              .adaptError { throwable =>
                log.error(s"Failed to produce record ${record.getClass} to topic $topic. Action: $key.", throwable)
                throwable
              }
              .rethrow

    In this case it's simply can't get to adaptError or toRight sections.
    Looking into logs I see that there're multiple retries to connect from kafka-producer-network-thread

    Aaron Delaplane
    @AaronDelaplane

    During initial consumption of topics on application initialization the application fails with the following error:

    Exception in thread "main" fs2.kafka.CommitTimeoutException: offset commit timeout after 15 seconds for offsets: metadata-manager-competition-collections-v1-0 -> 197, metadata-manager-navigation-items-v1-0 -> 445

    Right before failure the Java client logs the following:

    [Consumer clientId=consumer-e581040c-490f-42cc-bf02-6a006ad02cae-1, groupId=e581040c-490f-42cc-bf02-6a006ad02cae] Revoke previously assigned partitions metadata-manager-sports-v1-0, ...
    
    [Consumer clientId=consumer-e581040c-490f-42cc-bf02-6a006ad02cae-1, groupId=e581040c-490f-42cc-bf02-6a006ad02cae] Member consumer-e581040c-490f-42cc-bf02-6a006ad02cae-1-3c5f5551-5d88-4c82-9bba-2b55520ab881 sending LeaveGroup request to coordinator b-2.usd-dev-usd-aws-msk-u.m3atv0.c10.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed

    This is the code I have for the consumer stream:

        fs2.kafka.KafkaConsumer
          .stream[IO, K, V](
            fs2.kafka
              .ConsumerSettings[IO, K, V](
                keyDeserializer = fs2.kafka.vulcan.avroDeserializer[K].using[IO](avroSettings),
                valueDeserializer = fs2.kafka.vulcan.avroDeserializer[V].using[IO](avroSettings)
              )
              .withBootstrapServers(config.bootstrapServers.value)
              .withAutoOffsetReset(fs2.kafka.AutoOffsetReset.Earliest)
              .withGroupId(config.groupId.show)
              .withAllowAutoCreateTopics(false)
          )
          .evalTap(_.subscribe(config.topics.map(_.entryName)))
          .flatMap(_.stream)
          .evalMap(committableRecord =>
            config.topics
              .find(_.entryName == committableRecord.record.topic)
              .fold[IO[fs2.kafka.CommittableOffset[IO]]] {
                val appError = _error.kafka.consumer.InvalidConsumerRecordTopic(committableRecord.record, config.topics)
                logger.error(appError.asLog) >> IO.raiseError(appError)
              }(topic => processConsumerRecord(ConsumerRecord(topic, committableRecord.record)).as(committableRecord.offset))
          )
          .through[IO, Unit](fs2.kafka.commitBatchWithin[IO](config.commitBatchMinCount.value, config.commitBatchMinDuration.value))

    Is anyone able to provide insight in what I might do to fix this? Thanks

    Aaron Delaplane
    @AaronDelaplane
    Am using version 2.1.0
    Aaron Delaplane
    @AaronDelaplane
    It seems that another consumer is coming online and revoking the partition assignments of the former
    kamisama-rney
    @kamisama-rney

    Just did a migration from #cats-effect 2 to 3 and I'm getting these errors on the compile

    [error] /Users/richard.ney/Projects/iq-platform/research-platform-aqs/src/main/scala/com/lookout/aqs/ingestion/streams/StreamProcessor.scala:71:65: type mismatch;
    [error]  found   : fs2.kafka.ProducerSettings[F,K,V]
    [error]  required: fs2.kafka.ProducerSettings[[_]F[_],Any,Any]
    [error] Note: K <: Any, but class ProducerSettings is invariant in type K.
    [error] You may wish to define K as +K instead. (SLS 4.5)
    [error] Note: V <: Any, but class ProducerSettings is invariant in type V.
    [error] You may wish to define V as +V instead. (SLS 4.5)
    [error]               .through(KafkaProducer.pipe(kafkaProducerSettings.producerSettings, producer))
    [error]                                                                 ^
    [error] /Users/richard.ney/Projects/iq-platform/research-platform-aqs/src/main/scala/com/lookout/aqs/ingestion/streams/StreamProcessor.scala:71:83: type mismatch;
    [error]  found   : fs2.kafka.KafkaProducer.Metrics[F,K,V]
    [error]  required: fs2.kafka.KafkaProducer[[_]F[_],Any,Any]
    [error] Note: K <: Any, but class KafkaProducer is invariant in type K.
    [error] You may wish to define K as +K instead. (SLS 4.5)
    [error] Note: V <: Any, but class KafkaProducer is invariant in type V.
    [error] You may wish to define V as +V instead. (SLS 4.5)
    [error]               .through(KafkaProducer.pipe(kafkaProducerSettings.producerSettings, producer))
    [error]                                                                                   ^
    [error] /Users/richard.ney/Projects/iq-platform/research-platform-aqs/src/main/scala/com/lookout/aqs/ingestion/streams/StreamProcessor.scala:129:13: type mismatch;
    [error]  found   : List[fs2.kafka.ProducerRecords[Unit,K,V]]
    [error]  required: List[fs2.kafka.ProducerRecords[K,V,Unit]]
    [error]     } yield records
    [error]             ^

    Given that most of the changed needed for the cats-effect migration are simple implicit changes or method names changes hoping someone knows the simple change. I've also cross posted this in the fs2 channels in Discord.

    1 reply
    Keir Lawson
    @keirlawson
    Any plans to set up a discord channel for fs2-kafka?
    catostrophe
    @catostrophe
    With the new kafka-clients 3.0 release, what will be the version matrix for fs2-kafka?
    Dawid Godek
    @matematyk60

    I'm wondering is it possible to restart processing of only one assigned partition?
    From my experience this works:

    val failingAction: IO[Unit] = ???
    val consume: IO[Unit] =
      consumerStream
        .map { partition =>
          partition.evalMap(record =>
            failingAction *> record.offset.commit
          )
        }
        .parJoinUnbounded
        .handleErrorWith(ex =>
          Stream.eval(IO.println(s"Exception in partition [$ex]")) >>
            Stream.sleep[IO](10 seconds)
        )
        .repeat
        .compile
        .drain

    But this doesn't:

    val failingAction: IO[Unit] = ???
    val consume: IO[Unit] =
      consumerStream
        .map { partition =>
          val consumed = partition.evalMap(record => failingAction *> record.offset.commit)
    
          consumed
            .handleErrorWith(ex =>
              Stream.eval(IO.println(s"Exception in partition [$ex]")) >>
                Stream.sleep[IO](10 seconds) >> consumed
            )
        }
        .parJoinUnbounded
        .compile
        .drain

    Right now if processing of one partition fails and needs to be restarted, processing of all assigned partitions is stopped and restarted.

    Is this normal behavior of kafka consumers or is this a limitation of fs2-kafka? Thank you for your help :)

    corentin
    @corenti13711539_twitter
    We just had our Kafka broker upgraded and after the upgrade our consumers stopped receiving data from topics they were subscribed to.
    What's the correct way to handle these types of events with fs2-kafka v2.2.0, so that the client would just be able to continue receiving data from topics it has subscribed to?
    Currently the consumer code looks something like this:
    KafkaConsumer
      .stream(consumerSettings)
      .evalTap(_.subscribeTo(topicName))
      .flatMap(_.stream)
      .mapAsync(maxConcurrent) { committable => ??? }
      .compile.drain
    Ben Plommer
    @bplommer
    @keirlawson there’s a scalafix for that :D
    @catostrophe I’m thinking we’ll update for Kafka 3.0 for fs2-Kafka 3.0. I think you should be able to use Kafka 3.0 with current releases though?
    I’ve been thinking for a while we should migrate to discord. @vlovgr @LMnet what do you think?
    2 replies
    Ben Plommer
    @bplommer
    @catostrophe there’s one test I need to fix before we can release a milestone based on 3.0 - there’s a draft PR so feel free to take a look if you feel like doing some debugging :smile:
    Maatary
    @Maatary

    Hi, not having luck on stackoverflow, i would like to move my question here, maybe some of you could put me on the right path

    I have a particular scenario, in which I need to batch load data coming from kafka into a sink with the following particularity:

    Every Message coming from kafka yield a varying number of messages from the perspective of the sink. Said differently, every message in kafka correspond to an entity from the perspective of the data source, however, when translated to the sink data model, it corresponds to varying number of entities.
    We want first (a) to batch the incoming kafka messages for dedup purpose and commit control, then (2) and most importantly, we want to control the number of message sent to the sink according to the sink data model.
    Simulating the situation with simple fs2 stream without kafka i came up with the following sub-streaming approach. It simulate starting from the moment you get your kafka message grouped already (as list).

    val e1 = Stream(List(1,2,3,4), List(3,33,56,7))
      .evalMap {
        list => Stream.emits(list).evalMap(IO.pure).compile.last
      }.compile.toVector.unsafeRunSync()
    and indeed I get
    
    e1: Vector[Option[Int]] = Vector(Some(4), Some(7))

    The idea being that within that inner stream, I would flatMap (as opposed to the evalMap(IO.pure) i'm doing right now ) over the emitted message, given that they can produce multiple message, group and send.

    My issue is it does not feel right to me that I have to compile drain in the middle of a stream. In all the example I have seen online, i never seen a code doing that, hence, i figured i must be missing something.

    Maatary
    @Maatary
    Something closer to what i have in mind would look like so
    val e3 = Stream(List(1,2,3,4), List(3,33,56,7))
      .flatMap { list =>
        Stream.eval(
          Stream.emits(list).covary[IO].groupWithin(2, 1.seconds)
          .evalMap(ch => IO.println(ch.toList)).compile.drain)
          .as(list.last)
      }.compile.toVector.unsafeRunSync()
    Maatary
    @Maatary

    Hi @bplommer I'm trying to understand the following statement in the documentation of fs2:

    withMaxPrefetchBatches adjusts the maximum number of record batches per topic-partition to prefetch before backpressure is applied. The default is 2, meaning there can be up to 2 record batches per topic-partition waiting to be processed.

    My understanding is that it relates to this KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records (Prefetching section)

    However, the KIP does not say anything about controlling the prefetching, that's totally an internal to the consumer if i get it right. So my question is, Are we talking about the same thing as the KIP, but you guys have implemented your own logic to do it, disabling/bypassing some part of the consumer logic, or the two compound together. The reason i am asking, is to control somehow memory, and fully understand what is happening under the hood, and how to best set this variable for our use case.

    Another question that I have is what does control the size of the chunk coming out of the records methods as in

    al stream =
          KafkaConsumer.stream(consumerSettings)
            .subscribeTo("topic")
            .records
            .mapAsync(25) { committable =>
              processRecord(committable.record)
            }

    is the size of the internal chunking of records determined by some kafka settings such as max.poll.records ?

    Ben Plommer
    @bplommer
    @/all we now have a channel on Typelevel discord - https://discord.gg/rsCcwxtysH
    Ghost
    @ghost~58e2f7f3d73408ce4f55e372

    Does Transactions offer exactly once semantics
    https://fd4s.github.io/fs2-kafka/docs/transactions

    eg if i read from a source topic and process the event and write to a sinc topic; then the app dies before commiting the offset.
    Will the write to the sink topic be rolled back so as not to have duplicate events being written to the sink topic once the app comes back online

    Kevin Meredith
    @kevinmeredith

    :question: Looking over your quickstart, I wrote the following:

      private val stream: Stream[IO, Unit] =
        KafkaConsumer.stream(consumerSettings)
          .subscribeTo("topic1")
          .records
          .evalMap {
            cr: CommittableConsumerRecord[IO, String, String] =>
              processRecord(cr.record) *> cr.offset.commit
          }

    Essentially the code I wrote is only processing and committing a single record at a time? Thanks

    legopiraat
    @legopiraat

    Hi guys, i've been reading the producer documentation and am missing how you can create just a single producer without the consumer.
    I have a rest api on which i want to produce one or more kafka messages, does anyone have an example on how to do this?

    Thanks in advance!

    Ihor Zadyra
    @TomoHavvk
    Hello, is there possible to handle such decoding error and avoid consumer stream stuck? Trying to find any solution, but nothing ( Thx!
    vulcan.AvroException: Error decoding Event: Error decoding EventMeta: Error decoding UUID: java.lang.IllegalArgumentException: Invalid UUID string: {{idd}}
    Waldemar Wosiński
    @kastoestoramadus

    Hi,
    I can't locate anywhere in the code:
    https://github.com/fd4s/fs2-kafka/search?q=commitBatchOption
    but this method is mentioned in docs.

    What is going on?

    Maxi Biandratti
    @biandratti
    Hey guys! I'm starting to use fs2-kafka, but I am wondering what is the correct mechanism to restart the consumer in case of any error. I mean, I expect some mechanism to handler the error cases, like the supervisor with the actor to restart the consumers in case of any error. What is the recommendation for this case, any suggestions? Thanks for your help!
    Nakarin Hansawattana
    @nakarinh14
    Hi, is it possible to still backport some of the features from 2.x to 1.x series? I have opened a PR to backport a rebalancing strategy (CooperativeStickyAssignor on consumer) to 1.x as the change looks relatively straightforward. fd4s/fs2-kafka#1063