Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Apr 08 11:21
    simonsouter removed as member
  • Mar 04 11:00
    regis-pirum closed #167
  • Mar 04 10:59
    regis-pirum opened #167
  • Oct 24 2020 01:04
    ManuelWeiss converted_to_draft #166
  • Oct 22 2020 08:51
    ManuelWeiss opened #166
  • Oct 08 2020 18:38
    BlueTufa closed #165
  • Oct 08 2020 18:37
    BlueTufa opened #165
  • Sep 30 2020 13:01
    Passarinho4 commented #164
  • Aug 19 2020 01:52
    glorat commented #164
  • Aug 16 2020 08:13
    glorat commented #151
  • Aug 10 2020 14:29
    regiskuckaertz commented #151
  • Aug 07 2020 19:41
    jkomyno opened #164
  • Jun 05 2020 18:22
    ctoomey edited #163
  • Jun 05 2020 01:56
    ctoomey opened #163
  • Jun 02 2020 20:47
    ctoomey closed #157
  • Apr 13 2020 23:11
    nightscape opened #162
  • Apr 13 2020 23:02
    nightscape commented #151
  • Mar 26 2020 17:36
    ploddi commented #161
  • Mar 23 2020 09:24
    codecov-io commented #161
  • Mar 23 2020 09:24
    codecov-io commented #161
Iam using but that does not worked until I explicitly specified ZK ip
Harmeet Singh(Taara)
Hello Team
I am facing ask time out exception using KafakAkkaProducer, Please go to this link for more details http://stackoverflow.com/questions/42188622/apache-kafka-kafkaproduceractor-throws-exception-ask-timeout
Hi Team,
i am facing below issue
17/07/14 17:11:45 INFO AppInfoParser: Kafka version :
17/07/14 17:11:45 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
17/07/14 17:11:45 INFO AbstractCoordinator: Discovered coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) for group tr-consumer-group.
17/07/14 17:11:45 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group tr-consumer-group
17/07/14 17:11:45 INFO AbstractCoordinator: (Re-)joining group tr-consumer-group
17/07/14 17:11:46 INFO AbstractCoordinator: Marking the coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) dead for group tr-consumer-group
this issue is during spark-kafka streaming
please help on this
Jaakko Pallari
Are you using Spark with scala-kafka-client or Spark's own Kafka module?
First, I don't think any of us have experience using scala-kafka-client with Spark. Second, Spark uses its own, separate client for Kafka.
Michael Xing
Hi team, any thoughts on using cakesolutions vs http://doc.akka.io/docs/akka-stream-kafka/current/home.html?
Jaakko Pallari
The major difference is that Akka Streams Kafka is designed for Akka Streams while Scala-Kafka-Client provides integration to Akka.
Akka Streams Kafka is backed by the Akka team, but Scala-Kafka-Client isn't.
I'd say Scala-Kafka-Client is also less opinionated on how you interact with the client compared to Akka Streams Kafka.
Michael Xing
Thanks! So any recommendations on in what scenarios should I use one over the other?
Jaakko Pallari
If you use Akka Streams, try Akka Streams Kafka before Scala Kafka Client. Other than that, I don't have anything specific to recommend either one for. I'd be interested to hear which one you end up using and why. :)
Mark de Jong
Since kafka 0.11 there is support for idempotent producing right? I've been reading docs and protocols and also checked out the java api.. but I couldn't find how to supply a sequenceNr to make producing idempotent.. am I missing anything?
Simon Souter

You just need to set the "enable.idempotence" option on the producer. See the kafka docs here: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

The config helper classes in scala-kafka-client now have a parameter (KafkaProducer.Conf -> enableIdempotence: Boolean = false)

Simon Souter
Development on this project has been a little quite recently, but iv just kicked off a big project that will be using this client heavily and will likely be making some improvements over the coming months. If anyone has any suggestions, issues or pain points, id be interested in hearing it...
Mark de Jong
I see, cool. But you don't have control over which identifier it might use (like a partition) to get these idempotent writes right?
For example, if you want to stream all the records of a database (SQL) with a ID (auto incrementing integer column). The ID column would be a great candidate to use. Such that you can have exactly once (idempotent) writes
Yeshwanth Kumar
great library and it works really well. I am just trying to understand the ConsumerRecord.extractor pattern that you are using. Is it a type safe casting ?
just trying to understand what exactly is happening there :)
Zacharias J. Beckman

I'm trying to set up a very simple log as part of a learning exercise. The exercise is evolving, basically simple transaction processing (think, banking). It starts with using a Scala Stream. Rapidly that stops working when you get to more than one user, so now I'm putting Kafka in. Wrote a super simple (too simple) POC as "step 1." Basically, just a log that transactions are written to. Long term it will introduce Akka, but baby steps. What I have right now, basically:

case class TransactionLog() {
    val random = new scala.util.Random // used to create random group IDs to avoid metrics collision
    val key = "transaction"
    val topic = "transactions"
    var host = "localhost:9092"
    var group = s"transaction_group-${random.nextInt}"

    val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = s"$host"))

    val consumer = KafkaConsumer(KafkaConsumer.Conf(new StringDeserializer(),
        new StringDeserializer(),
        bootstrapServers = s"$host",
        groupId = s"$group",
        enableAutoCommit = false))


    def poll = consumer.poll(1000)

    def send(t: Transaction) = {
        val m = producer.send(KafkaProducerRecord(topic, Some(key), t.toString))

Now I know this is crude but at this stage we are just introducing the idea of some kind of persistent log. Problem is: I write tests in specs2, and those rapidly exposed a major flaw I hadn't thought of: consumers are not thread safe. Oops. Question is -- what can I do that is absolutely minimal, but still gets the job done? (Realistically, at this point, the only thing we do when reading back from the log is to read the entire log, to tally all transactions). New to scala-kafka-client and trying to figure it out...

Yeshwanth Kumar
@simonsouter Yes, i understood extractors, (case match and unapply) but I am trying to understand why you did it that way? User can still provide ConsumerRecord[String, String] in the actor receive right? Also you cast it. :D
just the rationale behind the design. Not sure if I am missing something basic
Simon Souter
It simply provides compile-time type safety. You can indeed match a ConsumerRecord[String, String] yourself, but you could also match against ConsumerRecord[Int, Int] by accident and get a ClassCastException.
Yeshwanth Kumar
ah! yes, got it! @simonsouter
Simon Souter
@Fristi I don't think you need to worry about an ID to support idempotent producing with Kafka.
Simon Souter
Hi @zbeckman, im not clear on what your use case is for multiple consumers. Do you want multiple threads/users to each consume the same messages?
Zacharias J. Beckman
@simonsouter I think I've solved it. For this simple example I can linearize the specs2 tests (using sequential). That prevents the multithreading behavior but allows introduction of a basic Kafka. In the next lesson I can expand on it with reactive Kafka -- which works well, it introduces the idea of a topic, but also gives students the problem of multithreading to chew on.

Hi, I'm still pretty new to Scala and am trying to get the cakesolutions akka client up and running.
I think I have got my (very) rough implementation connected to kafka, but am puzzled as to how to pass messages between consumer <--> receiver. My code is below.

Could anyone hopefully give me any pointers or examples on how to get consumer and receiver actors talking?

object eatKafka extends App {

  // Create an Akka system
  println("\n\ncreating system...")
  val system = ActorSystem("VySystem")

  // create receiver
  println("\n\ncreating receiver...")
  val receiver = system.actorOf(Props[ReceiverActor], name = "receiver")

  // create cake kafka consumer
  println("\n\ncreating consumer...")
  val conf = ConfigFactory.load()

  //Construct the KafkaConsumerActor with Typesafe config in resources
  val cakeKConsumer = system.actorOf(
    KafkaConsumerActor.props(conf, new ByteArrayDeserializer(), new ByteArrayDeserializer(), receiver)

  println("pass subscription details to cakeKConsumer")
  cakeKConsumer ! Subscribe.AutoPartition(List("impression")) //Subscribe.ManualOffset(Offsets(Map((new TopicPartition("impression", 0), 1))))


// kafka consumer actor
class kConsumeCakeActor extends Actor {
  // https://github.com/cakesolutions/scala-kafka-client/blob/master/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala
  println("\n\nIn consumer!!")
  override def receive = {
    case default => println("I am kConsumeCakeActor")

// receiver actor
class ReceiverActor extends Actor {
  // Extractor for ensuring type safe cast of records
  val recordsExt = ConsumerRecords.extractor[Int, ByteString]
  println("In receiver")
  override def receive: Receive = {
    case recordsExt(records) =>
      println("do something with records")
      //for (record <- records) println(record)
      sender() ! Confirm(records.offsets)

    case default => println("I am receiver!!")

Sample output:

[info] Running eatKafka 
[info] creating system...
[info] creating receiver...
[info] creating consumer...
[info] In receiver
[info] pass subscription details to cakeKConsumer

[info] [INFO] [08/24/2017 09:33:20.853] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] Subscribing in auto partition assignment mode to topics [impression].
[info] I am receiver!!
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: records timed out while waiting for a confirmation.
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: redelivering.
[info] I am receiver!!
[info] I am receiver!!
Yeshwanth Kumar
@simonsouter when I am using AutoPartitionWithManualOffsetwith a assingedListener, does it subscribe and distribute all partitions to the consumers or only the partitions provided?
Meaning, if i have 100 partitions and only 10 I need to seek back, (ie: provide the partition:offset map through assignedListener callback), will the AutoPartitionWithManualOffset mode subscribe to all partition and only seek back on the 10 provided?
Simon Souter
Hi Yeshwanth, Kafka may assign all or a subset of partitions, depending on if there are other nodes subscribing as part of the group. i.e it may assign all 100 partitions as in your example. Once those partitions are assigned, the assignedListener callback will be invoked. If you only provide 10 partition offsets, these 10 partitions will seek to the supplied position. I imagine the other 90 will default to either the beginning or the end of the partition (depending on the KafkaConsumer OffsetResetStrategy), although i don't recall testing that case specifically...
Yeshwanth Kumar
@simonsouter makes sense.
Jimin Hsieh
I found Kafka has MockProducer, MockConsumer...etc. I am curious is it possible to wrap a small Scala wrapper on those Mocks? I’m willing to send the PR too. Thanks! :smile:
Jimin Hsieh

@simonsouter Do you have any thought about this idea? Thanks! :smile:

is it possible to wrap a small Scala wrapper on those Mocks?

Hi All,
I have a very vague question. I have a project on Spark Scala using maven framework. But, now I want to implement kafka subscriber to it so it can be invoked when needed. What would be the best approach to make these changes and how can I use cakesolutions to my project.

So the blog is very confusing:

So, you'll need to detect this [Failure] via some form of supervision and resend a subscription command >to the newly started instance. For brevity I've omitted such code from this example, but it's >something you should include in any long-running production system using >KafkaConsumerActor.

But in the github examples the parent Actor never "resends a subscription command "

Tal Ben Shabtay
Hey all , i was wondering if anybody tried to override the consumer's supervisorStretagy in v0.9.0.0 ? looks like its using the default Decider even when:
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2) {
            case _: KafkaConsumerActor.ConsumerException =>
                log.error("Consumer exception caught. Restarting consumer.")
            case ex =>
                log.warning(s"Failed processing ${ex.getMessage}")
  object kafkaproducer extends App {

  val  props = new Properties()
 props.put("bootstrap.servers", "brokerid")


  val producer = new KafkaProducer[String, String](props)
  val TOPIC="simulatordata"

  for(i<- 1 to 5000) // i dont know the limit, how to get all 
 data from simulator
   val record = new ProducerRecord(TOPIC, "key", what 
  should i write here for getting value of each record ")

   val record = new ProducerRecord(TOPIC, "key", "should i 
   write here for getting value of each record 
   "+new java.util.Date)
i have simulator data and i really dont know the attributes .. some data is there. So i want all the data inside that to be send to kafka topic ... need modification in this code.
note - the simulator data is in other machine , lets say localhost1:9092 , and i want to create the topic in my machine and store all the simulator data on my machine localhost2:9092 .
New bie here does the Scala Kafka client support Kafka streams
Simon Souter
No, but kafka streams already has a scala api, i think
Hi there, people. Anyone interested to speak about the API, it's usage at Ukrainian conf?
Prudhvi Tej
Hi....does the kafka client implement any backpressure techniques ?
Salim Fadhley
Hi, is the Cake Solutions Kafka client under active development? Is it recommended for new projects?