Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    peterstorm
    @peterstorm:matrix.org
    [m]
    Hello, how do you specify multiple bootstrapservers in settings?
    Daniel Robert
    @drobert
    Does fs2-kafka support different "subject name strategy" (per https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy) with regards to schema registry or is it largely predicated around the original TopicNameStrategy approach?
    peterstorm
    @peterstorm:matrix.org
    [m]
    Any way to support Deserializing circeEnumeratum enums? For some reason it works great with serializing, but it fails when I get to the field of the enum inf my case class
    peterstorm
    @peterstorm:matrix.org
    [m]
    Nevermind, it's circe that's not working out for me
    Keir Lawson
    @keirlawson
    What's the best way to go from a raw Kafka ConsumerRecord to a fs2-kafka ConsumerRecord?
    We could consider adding a public version of that function.
    rnd4222
    @rnd4222_gitlab
    What is the point of Headers argument in a Serializer?
    How do I add a Header when serializing value?
    federico cocco
    @federico.cocco_gitlab
    Hello! I would like to produce a record as a single IO operation. Thus I'm compiling the stream as an IO[Unit] operation, which will then be run on a different stream. Observable: the publisher stream seems to be closing in a time longer than the given timeout for the IO op. Also, when a huge amount of requests to publish come through, it seems at a certain point it starts timing out. I have been wondering whether these are related, if I am taking the wrong approach and if there would be a way to close the publisher immediately after the record publishing is completed. Thanks
    3 replies
    Christian Palmhøj Nielsen
    @cpnielsen

    Hi. I am trying to stop a stream, and have tried various approaches such as stream.interruptWhen(myDeferred.attempt) and calling consumer.stopConsuming directly, but they all result in a "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access" error. I could not find any issues on Github regarding shutdown, so I worry that I am doing something wrong elsewhere.

    This is how I setup the stream:

    val consumerSettings = ConsumerSettings[IO, Option[String], String]
      .withBlocker(blocker)
      .withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withGroupId(config.consumerGroupId)
      .withBootstrapServers(config.kafkaServers)
    
    KafkaConsumer
      .stream(consumerSettings)
      .evalTap(_.subscribeTo(config.topic.value))
      .flatMap(_.stream)

    I then have my interruptWhen(...) later downstream.

    Christian Palmhøj Nielsen
    @cpnielsen
    Okay, tried removing the .withBlocker(blocker) and then it works fine. It's a pretty standard execution context with 100 threads, lifted into a Blocker. Looking through the logs, it seems this breaks unless it's a single-threaded blocking pool?
    3 replies
    peterstorm
    @peterstorm:matrix.org
    [m]
    Does fs2 Kafka support co partitioning like Kafka Streams?
    1 reply
    peterstorm
    @peterstorm:matrix.org
    [m]
    It’s specifically the ‘co’ partitioning bit, like described here: https://medium.com/xebia-france/kafka-streams-co-partitioning-requirements-illustrated-2033f686b19c
    1 reply
    I guess specifically putting stuff from two different topics, but with the same key, on the same partition
    Ben Plommer
    @bplommer

    And java consumer is safe to use only from the single thread.

    I don't think this is correct - the consumer isn't threadsafe, but multi-threaded use is supposed to be possible if properly synchronized - It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException. (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded)

    Which the consumer actor is supposed to guarantee - this sounds like a bug in fs2-kafka
    ravetdavid
    @ravetdavid
    Hi, how to handle avro deserialisation exception ? We want to log an error, and continue consuming our partitioned stream. We use parEvalMap to process partitions.
    Keir Lawson
    @keirlawson
    Hello, I'm writing a reusable munit fixture to check Vulcan Codec compatibility against a given schema registry, would this make sense to contribute to fs2-kafka as a new submodule?
    λoλcat
    @catostrophe
    @bplommer just updated from 3.0.0-M2 to 3.0.0-M3 and the tests fail :( will try to investigate it soon
    Ben Plommer
    @bplommer
    @ravetdavid you can use attempt to deserialise to Either
    10 replies
    Ben Plommer
    @bplommer
    @keirlawson yes, something like fs2-kafka-vulcan-testkit would be good to start creating
    @catostrophe oh no :( if you can put together a reproducible failing test that would be great
    I'm really surprised that the 3.0.0-M2 -> 3.0.0-M3 bump broke anything though
    λoλcat
    @catostrophe
    @bplommer well, I caught the issue only once and I couldn't reproduce it. It works just fine now. And I really hope it was some random thing with my test env. The test itself is as easy as possible: it sends some messages to embedded-kafka via the impure client and then reads them as a stream via fs2-kafka. After the update to M3, the consumer hanged waiting for messages. This never happened before (this particular test had successfully completed on CI thousands of times before).
    Ben Plommer
    @bplommer
    Hmm, I've had some problems with embedded-kafka before relating to binary incompatibility. Client code needs to use the same patch version of Scala that kafka was compiled with, because kafka inlines the standard library. Could it be to do with that?
    I'd recommend switching to testcontainers-kafka
    Keir Lawson
    @keirlawson
    @bplommer in terms of such a contribution, which branch would it make sense to target?
    or does everything just go into master and then get ported?
    Ben Plommer
    @bplommer
    Yeah, you can just target series/1.x assuming you're not making any breaking changes
    Keir Lawson
    @keirlawson
    coolio
    Keir Lawson
    @keirlawson
    is there a reason why metals files/folders aren't gitignored in the fs2-kafka repo?
    Ben Plommer
    @bplommer
    Not really, they should probably be in your global .gitignore but feel free to open a PR to gitignore them
    Keir Lawson
    @keirlawson
    ah I didn't know global gitignore was a thing! I'll set one up
    Timothy Bess
    @tdbgamer
    So I'm trying to stream something from a database onto Kafka with a transactional producer, but I don't want to pull all the rows into memory. I also don't want to commit my offsets until I'm done streaming. Problem is, CommittableProducerRecord seems to only take chunks. Is there a good way to do this?
    6 replies
    Ben Plommer
    @bplommer
    fs2-kafka v1.5.0 and v2.0.0-RC1 are on their way to Maven Central.
    Mauro González
    @maurogonzalez

    Hi! I've having some troubles with consumers due to connection issues. The Kafka cluster lives inside Kubernetes but for some reason some nodes of the Kafka cluster are being moved causing this issue (the infra team is taking on this).
    In the meanwhile, is there a strategy to reconnect or in the worst case to end gracefully the consumer so the application can exit?

    Current behavior show some logs like these:

     2021-04-22 08:56:18.029 INFO [shutdownHook1]  Consumer from SomeConsumer unsubscribing from [Topic] // Consumer resource release initiates
    ...
    2021-04-22 09:01:18.132 INFO [kafka-coordinator-heartbeat-thread | SomeConsumer] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=SomeConsumer-cb44c10a-9b45-4a51-922f-0ed015c88efc, groupId=AeneidCore-QryConsumer] Member SomeConsumer-cb44c10a-9b45-4a51-922f-0ed015c88efc-abcf89b6-adad-4fc6-8951-9dfc4bf29e8f sending LeaveGroup request to coordinator kafka-2.kafka-headless.namespace.svc.cluster.local:9092 (id: 2147483645 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages.

    And then hangs

    Mauro González
    @maurogonzalez

    Looking at the docs the max.poll.interval.ms=5minutes(just the time in the logs). I think that that LeaveGroup message was sent to the host that resolved to a old IP or some network issue while updating host's resolver in the cluster. But it seems that no other try is done and then the Consumer's resource close doesn't release the resource.

    Any idea to fix this from fs2-kafka or better look at consumer settings?

    Damien O'Reilly
    @DamienOReilly
    Hi, If I stream from a topic that has many partitions using KafkaConsume.stream, will commitBatchWithin handle committing offsets correctly per partition? Or should I use KafkaConsume.partitionedStream and handle streams per partition separately, committing offsets via commitBatchWithin ?
    Fabio Labella
    @SystemFw
    the former works
    Damien O'Reilly
    @DamienOReilly
    How does it keep track of it, I guess fs2-kafka is explicitly handling the last offset per partitions for a given Chunk ?
    Ok I think I see it in fs2.kafka.CommittableOffsetBatch#fromFoldableMap
    Sebastian Voss
    @sebastianvoss

    Hi All, I'm using Vulcan to consume Avro messages from Kafka:

    implicit val deserializer: RecordDeserializer[IO, A] = avroDeserializer[A].using(avroSettings)

    Is there a way to make Vulcans avroDeserializer skip deserialization exceptions? Or should I implement a custom deserializer from scratch?

    Sebastian Voss
    @sebastianvoss
    I tried this but the obvious problem is, it is instantiating the deserializer for every message:
    implicit val deserializer: Deserializer[IO, Option[A]] = Deserializer.instance { (topic, headers, bytes) =>
        avroDeserializer[A]
          .using(avroSettings)
          .forValue
          .flatMap(
            _.deserialize(topic, headers, bytes)
              .map(Some(_))
              .handleErrorWith(e => logger.error(e.getMessage).as(none[A]))
          )
      }
    Sebastian Voss
    @sebastianvoss
    Or should I handle this at the stream level instead of the deserializer level?
    Greg Fisher
    @gnfisher
    How do you handle message versioning?
    Fabio Labella
    @SystemFw
    avro + topic up versioning when needed
    we have a library called vulcan for avro, which is integrated with fs2-kafka
    TheLastAngryMan
    @TheLastAngryMan
    Is there any reason compression wouldn’t be working? I’ve set the compression.type property when creating the ProducerSettings but I don’t think compression is being enabled
    TheLastAngryMan
    @TheLastAngryMan
    Nvm, I do actually think compression is working correctly
    corentin
    @corenti13711539_twitter
    Are KafkaProducer instances, as well as the underlying resources, thread-safe and allowed to be shared across threads?
    Any best practices on sharing producer instances?