Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Mar 04 13:18
    regiskuckaertz closed #170
  • Mar 04 13:18
    regiskuckaertz opened #170
  • Jun 19 2021 03:22
    AleksandrKovshariov commented #169
  • Jun 19 2021 03:22
    AleksandrKovshariov closed #169
  • Jun 16 2021 10:09
    AleksandrKovshariov opened #169
  • May 25 2021 19:53
    monrealis commented #146
  • May 04 2021 08:27
    FXHibon closed #168
  • May 04 2021 08:27
    FXHibon opened #168
  • Apr 08 2021 11:21
    simonsouter removed as member
  • Mar 04 2021 11:00
    regis-pirum closed #167
  • Mar 04 2021 10:59
    regis-pirum opened #167
  • Oct 24 2020 01:04
    ManuelWeiss converted_to_draft #166
  • Oct 22 2020 08:51
    ManuelWeiss opened #166
  • Oct 08 2020 18:38
    BlueTufa closed #165
  • Oct 08 2020 18:37
    BlueTufa opened #165
  • Sep 30 2020 13:01
    Passarinho4 commented #164
  • Aug 19 2020 01:52
    glorat commented #164
  • Aug 16 2020 08:13
    glorat commented #151
  • Aug 10 2020 14:29
    regiskuckaertz commented #151
  • Aug 07 2020 19:41
    jkomyno opened #164
Jaakko Pallari
The key serializer is whatever serializer you need to use. If you don't need to include keys in the messages, the byte array serializer from Java client is probably a good default.
The GenericAvroSerializer for the GenericRecord should work since it just makes sure that only GenericRecord values pass through it to the KafkaAvroSerializer.
I'm not really sure what's the problem if those don't work.
Ferhat Aydın
Actually it works, I can produce records with GenericAvroSerializer and consume it like in https://github.com/ferhtaydn/sack/tree/master/src/main/scala/com/ferhtaydn/sack/avro
but there is an confluent vs kafka serialization problem, the magic byte 0x0 in first byte
what do you use in general for pojo serializers? something like GenericAvroSerializer or direct KafkaAvroSerializer objects?
Jaakko Pallari
I've used a solution where the KafkaAvroSerializer is wrapped.
Except that we do the pojo to generic record conversion in the serializer
which shouldn't really be that different from your case
Ferhat Aydın
Jaakko Pallari
I haven't used the Cassandra sink before so I can't comment on how that's supposed to work.
Ferhat Aydın
thank you so much Jaakko, I got tired of that serialization problems :) lets see how could be solved at the end.
Jaakko Pallari
No problem. In general, I think there's still quite a bit of stuff in serialization to figure out. We might put a bit of focus on that area more once we got the core client into a shape we're happy with. :smile:
Ferhat Aydın
:) I think also. there is no difficult things in the tools for kafka itself and scala-client etc. the main problem is (de)serialization :)
Ferhat Aydın

hey @jkpl, the problem is solved by using raw KafkaAvroSerializer by giving Object type to both producer and records. Strictly generic typed consumers and producers are a bit difficult to use for this level of kafka ecosystem stack I think, when you give everything with Properties and Object it just works :) the last comment for issue, you may want to check. datamountaineer/stream-reactor#80

thanks for help.

Jaakko Pallari
Hmm, that's interesting.
I definitely don't like the idea that magic makes it work... but then again it works. :)
It'd be helpful to know what exactly makes it work.
Jaakko Pallari
Anyone tried Kafka Streams? http://docs.confluent.io/3.0.0/streams/
The streams API seems to implement a few patterns that we've been thinking about.
The downside of the API is that it doesn't leave much room for doing asynchronous computations in the middle of the stream.
The API looks pretty neat though. It seems to be fairly easy to do stateful computations with persistent state.
Jaakko Pallari
Oh, and posting this here in case someone hasn't seen it yet: https://www.youtube.com/watch?v=lMlspFnfHM8 :)
how to specify zookeeper ip in the kafka consumer config
Jaakko Pallari
@prassee Hi, sorry for the late response (Gitter stopped sending me email notifications for some reason). As of Kafka 0.9, you no longer specify Zookeeper IP in the consumer config. You only point your client to a Kafka cluster, and it'll figure out the Zookeeper communication for you.
Iam using but that does not worked until I explicitly specified ZK ip
Harmeet Singh(Taara)
Hello Team
I am facing ask time out exception using KafakAkkaProducer, Please go to this link for more details http://stackoverflow.com/questions/42188622/apache-kafka-kafkaproduceractor-throws-exception-ask-timeout
Hi Team,
i am facing below issue
17/07/14 17:11:45 INFO AppInfoParser: Kafka version :
17/07/14 17:11:45 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
17/07/14 17:11:45 INFO AbstractCoordinator: Discovered coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) for group tr-consumer-group.
17/07/14 17:11:45 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group tr-consumer-group
17/07/14 17:11:45 INFO AbstractCoordinator: (Re-)joining group tr-consumer-group
17/07/14 17:11:46 INFO AbstractCoordinator: Marking the coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) dead for group tr-consumer-group
this issue is during spark-kafka streaming
please help on this
Jaakko Pallari
Are you using Spark with scala-kafka-client or Spark's own Kafka module?
First, I don't think any of us have experience using scala-kafka-client with Spark. Second, Spark uses its own, separate client for Kafka.
Michael Xing
Hi team, any thoughts on using cakesolutions vs http://doc.akka.io/docs/akka-stream-kafka/current/home.html?
Jaakko Pallari
The major difference is that Akka Streams Kafka is designed for Akka Streams while Scala-Kafka-Client provides integration to Akka.
Akka Streams Kafka is backed by the Akka team, but Scala-Kafka-Client isn't.
I'd say Scala-Kafka-Client is also less opinionated on how you interact with the client compared to Akka Streams Kafka.
Michael Xing
Thanks! So any recommendations on in what scenarios should I use one over the other?
Jaakko Pallari
If you use Akka Streams, try Akka Streams Kafka before Scala Kafka Client. Other than that, I don't have anything specific to recommend either one for. I'd be interested to hear which one you end up using and why. :)
Mark de Jong
Since kafka 0.11 there is support for idempotent producing right? I've been reading docs and protocols and also checked out the java api.. but I couldn't find how to supply a sequenceNr to make producing idempotent.. am I missing anything?
Simon Souter

You just need to set the "enable.idempotence" option on the producer. See the kafka docs here: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

The config helper classes in scala-kafka-client now have a parameter (KafkaProducer.Conf -> enableIdempotence: Boolean = false)

Simon Souter
Development on this project has been a little quite recently, but iv just kicked off a big project that will be using this client heavily and will likely be making some improvements over the coming months. If anyone has any suggestions, issues or pain points, id be interested in hearing it...
Mark de Jong
I see, cool. But you don't have control over which identifier it might use (like a partition) to get these idempotent writes right?
For example, if you want to stream all the records of a database (SQL) with a ID (auto incrementing integer column). The ID column would be a great candidate to use. Such that you can have exactly once (idempotent) writes
Yeshwanth Kumar
great library and it works really well. I am just trying to understand the ConsumerRecord.extractor pattern that you are using. Is it a type safe casting ?
just trying to understand what exactly is happening there :)
Zacharias J. Beckman

I'm trying to set up a very simple log as part of a learning exercise. The exercise is evolving, basically simple transaction processing (think, banking). It starts with using a Scala Stream. Rapidly that stops working when you get to more than one user, so now I'm putting Kafka in. Wrote a super simple (too simple) POC as "step 1." Basically, just a log that transactions are written to. Long term it will introduce Akka, but baby steps. What I have right now, basically:

case class TransactionLog() {
    val random = new scala.util.Random // used to create random group IDs to avoid metrics collision
    val key = "transaction"
    val topic = "transactions"
    var host = "localhost:9092"
    var group = s"transaction_group-${random.nextInt}"

    val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = s"$host"))

    val consumer = KafkaConsumer(KafkaConsumer.Conf(new StringDeserializer(),
        new StringDeserializer(),
        bootstrapServers = s"$host",
        groupId = s"$group",
        enableAutoCommit = false))


    def poll = consumer.poll(1000)

    def send(t: Transaction) = {
        val m = producer.send(KafkaProducerRecord(topic, Some(key), t.toString))

Now I know this is crude but at this stage we are just introducing the idea of some kind of persistent log. Problem is: I write tests in specs2, and those rapidly exposed a major flaw I hadn't thought of: consumers are not thread safe. Oops. Question is -- what can I do that is absolutely minimal, but still gets the job done? (Realistically, at this point, the only thing we do when reading back from the log is to read the entire log, to tally all transactions). New to scala-kafka-client and trying to figure it out...