Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Nov 07 2022 11:23
    rtar starred Spinoco/fs2-kafka
  • Jul 29 2021 00:15
    masonedmison starred Spinoco/fs2-kafka
  • Mar 24 2021 04:23
    kevinalh starred Spinoco/fs2-kafka
  • Jan 28 2021 23:41
    mebubo starred Spinoco/fs2-kafka
  • Jan 17 2021 14:53
    PengLoveHONG starred Spinoco/fs2-kafka
  • Nov 07 2020 05:55
    nachocodexx starred Spinoco/fs2-kafka
  • May 08 2020 19:25
    ostronom closed #27
  • May 08 2020 19:18
    ostronom opened #27
  • Mar 14 2020 14:26

    AdamChlupacek on v0.4.0-M4

    (compare)

  • Mar 14 2020 14:26

    AdamChlupacek on 0.4-spinoco-compat

    Setting version to 0.4.0-M4 Setting version to 0.4.0-SNAPSH… (compare)

  • Mar 14 2020 13:34

    mraulim on 0.4-spinoco-compat

    Add timeout for publishing in k… Added trusty as a dist for runn… Fixed timeout on leader query. and 2 more (compare)

  • Mar 14 2020 13:34
    mraulim closed #26
  • Mar 14 2020 09:31
    AdamChlupacek synchronize #26
  • Mar 14 2020 09:31

    AdamChlupacek on publish-connection-timeout

    Add timeout for leaders stream. (compare)

  • Mar 13 2020 18:47
    AdamChlupacek synchronize #26
  • Mar 13 2020 18:47

    AdamChlupacek on publish-connection-timeout

    Fixed timeout on leader query. (compare)

  • Mar 13 2020 17:16
    AdamChlupacek synchronize #26
  • Mar 13 2020 17:16

    AdamChlupacek on publish-connection-timeout

    Added trusty as a dist for runn… (compare)

  • Mar 13 2020 17:13
    AdamChlupacek review_requested #26
  • Mar 13 2020 17:13
    AdamChlupacek opened #26
Colt Frederickson
@coltfred
ok.
Adam Chlupacek
@AdamChlupacek
@coltfred yeah as I suspected, it was probably just some docker fluf, the tests are passing now, so I will merge it and do a release today
Colt Frederickson
@coltfred
@AdamChlupacek Sweet! Thanks!
Adam Chlupacek
@AdamChlupacek
@/all 0.2.0-RC1 released! its build against fs2 0.10.0-RC2
Colt Frederickson
@coltfred
Neat! I'll take a look!
Andi Miller
@andimiller
spinoco.fs2.kafka.failure.ClientTerminated$: Kafka client was terminated so I'm getting that trying to use fs2-kafka
but I may be using it wrong
Adam Chlupacek
@AdamChlupacek
could you show me how you are using it?
Andi Miller
@andimiller
  override def send(t: KafkaClient[IO])(top: String)(bodies: List[String]): IO[ValidatedNel[String, Unit]] = {
      Stream.emits(bodies).covary[IO].evalMap{s =>
        Stream.emit(s).covary[IO].through(text.utf8Encode[IO]).compile.toVector.map(x => (ByteVector.empty, ByteVector(x)))
      }.chunkLimit(1024).evalMap { ms =>
        t.publishN(topic(top), partition(0), requireQuorum = true, serverAckTimeout = 10 seconds, compress = None)(ms)
      }.compile.toVector.map { r =>
        logger.error(s"Finished publishing kafka messages, result was $r")
        ().validNel[String]
      }
    }
currently like that
ideally I just want to send a batch of strings to a kafka topic
Adam Chlupacek
@AdamChlupacek
yeah this look fine, the problem seem that the stream where the kafka client seems to have run its finalisers thus closing the conection
Andi Miller
@andimiller
it could be that I'm talking to a 1.0 server, but I'd expect that to work
Adam Chlupacek
@AdamChlupacek
we have tests on travis running agains 1.0, it works fine. could you show me where you are creating the kafka client?
Andi Miller
@andimiller
      import scala.concurrent.ExecutionContext.Implicits.global
      val pool = Executors.newScheduledThreadPool(8)
      implicit val scheduler = fs2.Scheduler.fromScheduledExecutorService(pool)
      implicit val ag = AsynchronousChannelGroup.withThreadPool(pool)
      kafka.client[IO](
        ensemble = Set(broker("localhost", port = 9092))
        , protocol = ProtocolVersion.Kafka_0_10_2
        , clientName = "testclient"
      )
do scheduler and ag have to be separate pools?
Adam Chlupacek
@AdamChlupacek
i would suggest that in the future, but that certainly does not cause this issue. Are you prehaps compiling and running the stream where the kafka client is created before you pass it to that send function?
Andi Miller
@andimiller
I am, yes
is it bracketed to shut it down?
Adam Chlupacek
@AdamChlupacek
yeah exactly. You cannot run the stream until the "end of the world", since kafka client is treated as a resource, thus it is being released when the stream ends.
Andi Miller
@andimiller
so if I can integrate it into my streamapp it should work?
Adam Chlupacek
@AdamChlupacek
yeah exactly
Andi Miller
@andimiller
cool, thanks, I'll give that a go :)
Adam Chlupacek
@AdamChlupacek
no problem, feel free to ask if you have any other questions :)
Andi Miller
@andimiller
any of you who're using fs2-kafka for consuming, what do you store your offsets in?
Adam Chlupacek
@AdamChlupacek
We use cassandra for storing any and all persistent data.
Andi Miller
@andimiller
so you commit offsets to cassandra when you've processed a batch?
the normal kafka client keeps it in kafka, somehow
and used to keep it in zookeeper
I guess cassandra makes sense since it's very write-heavy
Adam Chlupacek
@AdamChlupacek
basically yeah we keep it there, its needed for recovery etc, no idea how the client can keep anything like that when you crash mid batch or anything like that
Andi Miller
@andimiller
yeah of course
Adam Chlupacek
@AdamChlupacek
I may be misunderstanding here a bit tho, what for do you actually want to store the offset? (since you are comparing to the kafka client, you cannot be using it for recovery etc right? )
Andi Miller
@andimiller
I'm using the offset for crash recovery yeah
I'm currently using the official kafka consumer, and fs2-kafka's producer
so I don't have to implement my own offset management
Adam Chlupacek
@AdamChlupacek
What kind of crash recovery? complete (jvm exits)? or just an error in your stream?
Andi Miller
@andimiller
usually just a normal SIGTERM
the rpm being upgraded
Adam Chlupacek
@AdamChlupacek
And kafka client can somehow get up from this? wow, it must store it in zookeeper or something like that, seems a bit arcane to me :D
if you need something short term, you can store the offsets in zookeeper manually maybe? that would allow you to use fs2-kafka consumer as well.
Pavel Chlupacek
@pchlupacek
Actually you may use kafka to store offset, and also the fs2-kafka. Just configure one of your topics to be compressed and you are good to go. Thats actually how original client of kafka stores offset nowadays, before it was ZK.
Adam Chlupacek
@AdamChlupacek
didnt know that, thats cool.
Andi Miller
@andimiller
yeah I need to read up on how the official client stores it in kafka and do that
Pavel Chlupacek
@pchlupacek
for that you will need to read kafkas client source. Not sure if they document that.
fwiw if you will go this path I would be happy to have PR with that :-)
Andi Miller
@andimiller
I guess you key on consumerid, and compress it
Pavel Chlupacek
@pchlupacek
yes, I woudl say so
and perhaps topic
Filippo De Luca
@filosganga
Hi Guys,
Am I correct to say that to use fs2-kafka I need to assign a specific partition to each consumer?