Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Dec 14 2018 09:11
    j481 opened #150
  • Dec 06 2018 16:15

    simonsouter on master

    Update README.md (compare)

  • Dec 06 2018 14:44
    francescopellegrini commented #148
  • Dec 06 2018 14:22
    simonsouter closed #148
  • Dec 06 2018 14:22
    simonsouter commented #148
  • Dec 06 2018 14:13

    simonsouter on v2.1.0

    (compare)

  • Dec 06 2018 14:13

    simonsouter on master

    Setting version to 2.1.0 Setting version to 2.1.1-SNAPSH… (compare)

  • Dec 06 2018 14:05

    simonsouter on master

    Update plugin (compare)

  • Dec 06 2018 13:56
    codecov-io commented #149
  • Dec 06 2018 13:56
    codecov-io commented #149
  • Dec 06 2018 13:55
    codecov-io commented #149
  • Dec 06 2018 13:55
    codecov-io commented #149
  • Dec 06 2018 13:55
    simonsouter closed #147
  • Dec 06 2018 13:55
    simonsouter commented #147
  • Dec 06 2018 13:55

    simonsouter on 2.1.0

    (compare)

  • Dec 06 2018 13:55

    simonsouter on master

    Update to 2.1.0 (#149) (compare)

  • Dec 06 2018 13:55
    simonsouter closed #149
  • Dec 06 2018 13:49
    simonsouter opened #149
  • Dec 06 2018 13:49

    simonsouter on 2.1.0

    Update to 2.1.0 (compare)

  • Dec 06 2018 07:16
    mtekp opened #148
prassee
@prassee
how to specify zookeeper ip in the kafka consumer config
Jaakko Pallari
@jkpl
@prassee Hi, sorry for the late response (Gitter stopped sending me email notifications for some reason). As of Kafka 0.9, you no longer specify Zookeeper IP in the consumer config. You only point your client to a Kafka cluster, and it'll figure out the Zookeeper communication for you.
prassee
@prassee
Iam using 0.9.0.0 but that does not worked until I explicitly specified ZK ip
Harmeet Singh(Taara)
@harmeetsingh0013
Hello Team
I am facing ask time out exception using KafakAkkaProducer, Please go to this link for more details http://stackoverflow.com/questions/42188622/apache-kafka-kafkaproduceractor-throws-exception-ask-timeout
SrinathJi
@srinathji
Hi Team,
i am facing below issue
17/07/14 17:11:45 INFO AppInfoParser: Kafka version : 0.10.0.0
17/07/14 17:11:45 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
17/07/14 17:11:45 INFO AbstractCoordinator: Discovered coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) for group tr-consumer-group.
17/07/14 17:11:45 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group tr-consumer-group
17/07/14 17:11:45 INFO AbstractCoordinator: (Re-)joining group tr-consumer-group
17/07/14 17:11:46 INFO AbstractCoordinator: Marking the coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) dead for group tr-consumer-group
this issue is during spark-kafka streaming
please help on this
Jaakko Pallari
@jkpl
Are you using Spark with scala-kafka-client or Spark's own Kafka module?
First, I don't think any of us have experience using scala-kafka-client with Spark. Second, Spark uses its own, separate client for Kafka.
Michael Xing
@mikexing2010
Hi team, any thoughts on using cakesolutions vs http://doc.akka.io/docs/akka-stream-kafka/current/home.html?
Jaakko Pallari
@jkpl
The major difference is that Akka Streams Kafka is designed for Akka Streams while Scala-Kafka-Client provides integration to Akka.
Akka Streams Kafka is backed by the Akka team, but Scala-Kafka-Client isn't.
I'd say Scala-Kafka-Client is also less opinionated on how you interact with the client compared to Akka Streams Kafka.
Michael Xing
@mikexing2010
Thanks! So any recommendations on in what scenarios should I use one over the other?
Jaakko Pallari
@jkpl
If you use Akka Streams, try Akka Streams Kafka before Scala Kafka Client. Other than that, I don't have anything specific to recommend either one for. I'd be interested to hear which one you end up using and why. :)
Mark de Jong
@Fristi
Since kafka 0.11 there is support for idempotent producing right? I've been reading docs and protocols and also checked out the java api.. but I couldn't find how to supply a sequenceNr to make producing idempotent.. am I missing anything?
Simon Souter
@simonsouter

You just need to set the "enable.idempotence" option on the producer. See the kafka docs here: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

The config helper classes in scala-kafka-client now have a parameter (KafkaProducer.Conf -> enableIdempotence: Boolean = false)

Simon Souter
@simonsouter
Development on this project has been a little quite recently, but iv just kicked off a big project that will be using this client heavily and will likely be making some improvements over the coming months. If anyone has any suggestions, issues or pain points, id be interested in hearing it...
Mark de Jong
@Fristi
I see, cool. But you don't have control over which identifier it might use (like a partition) to get these idempotent writes right?
For example, if you want to stream all the records of a database (SQL) with a ID (auto incrementing integer column). The ID column would be a great candidate to use. Such that you can have exactly once (idempotent) writes
Yeshwanth Kumar
@morpheyesh
great library and it works really well. I am just trying to understand the ConsumerRecord.extractor pattern that you are using. Is it a type safe casting ?
just trying to understand what exactly is happening there :)
Zacharias J. Beckman
@zbeckman

I'm trying to set up a very simple log as part of a learning exercise. The exercise is evolving, basically simple transaction processing (think, banking). It starts with using a Scala Stream. Rapidly that stops working when you get to more than one user, so now I'm putting Kafka in. Wrote a super simple (too simple) POC as "step 1." Basically, just a log that transactions are written to. Long term it will introduce Akka, but baby steps. What I have right now, basically:

case class TransactionLog() {
    val random = new scala.util.Random // used to create random group IDs to avoid metrics collision
    val key = "transaction"
    val topic = "transactions"
    var host = "localhost:9092"
    var group = s"transaction_group-${random.nextInt}"

    val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = s"$host"))

    val consumer = KafkaConsumer(KafkaConsumer.Conf(new StringDeserializer(),
        new StringDeserializer(),
        bootstrapServers = s"$host",
        groupId = s"$group",
        enableAutoCommit = false))

    consumer.subscribe(List(topic).asJava)

    def poll = consumer.poll(1000)

    def send(t: Transaction) = {
        val m = producer.send(KafkaProducerRecord(topic, Some(key), t.toString))
        producer.flush()
        m
    }

Now I know this is crude but at this stage we are just introducing the idea of some kind of persistent log. Problem is: I write tests in specs2, and those rapidly exposed a major flaw I hadn't thought of: consumers are not thread safe. Oops. Question is -- what can I do that is absolutely minimal, but still gets the job done? (Realistically, at this point, the only thing we do when reading back from the log is to read the entire log, to tally all transactions). New to scala-kafka-client and trying to figure it out...

Yeshwanth Kumar
@morpheyesh
@simonsouter Yes, i understood extractors, (case match and unapply) but I am trying to understand why you did it that way? User can still provide ConsumerRecord[String, String] in the actor receive right? Also you cast it. :D
just the rationale behind the design. Not sure if I am missing something basic
Simon Souter
@simonsouter
It simply provides compile-time type safety. You can indeed match a ConsumerRecord[String, String] yourself, but you could also match against ConsumerRecord[Int, Int] by accident and get a ClassCastException.
Yeshwanth Kumar
@morpheyesh
ah! yes, got it! @simonsouter
Simon Souter
@simonsouter
@Fristi I don't think you need to worry about an ID to support idempotent producing with Kafka.
Simon Souter
@simonsouter
Hi @zbeckman, im not clear on what your use case is for multiple consumers. Do you want multiple threads/users to each consume the same messages?
Zacharias J. Beckman
@zbeckman
@simonsouter I think I've solved it. For this simple example I can linearize the specs2 tests (using sequential). That prevents the multithreading behavior but allows introduction of a basic Kafka. In the next lesson I can expand on it with reactive Kafka -- which works well, it introduces the idea of a topic, but also gives students the problem of multithreading to chew on.
meatware
@meatware

Hi, I'm still pretty new to Scala and am trying to get the cakesolutions akka client up and running.
I think I have got my (very) rough implementation connected to kafka, but am puzzled as to how to pass messages between consumer <--> receiver. My code is below.

Could anyone hopefully give me any pointers or examples on how to get consumer and receiver actors talking?

object eatKafka extends App {

  println("\n\n\n")
  // Create an Akka system
  println("\n\ncreating system...")
  val system = ActorSystem("VySystem")

  // create receiver
  println("\n\ncreating receiver...")
  val receiver = system.actorOf(Props[ReceiverActor], name = "receiver")

  // create cake kafka consumer
  println("\n\ncreating consumer...")
  val conf = ConfigFactory.load()

  //Construct the KafkaConsumerActor with Typesafe config in resources
  val cakeKConsumer = system.actorOf(
    KafkaConsumerActor.props(conf, new ByteArrayDeserializer(), new ByteArrayDeserializer(), receiver)
  )

  println("pass subscription details to cakeKConsumer")
  cakeKConsumer ! Subscribe.AutoPartition(List("impression")) //Subscribe.ManualOffset(Offsets(Map((new TopicPartition("impression", 0), 1))))

}


/////////////////////////////////////////////////////////////////////
// kafka consumer actor
class kConsumeCakeActor extends Actor {
  // https://github.com/cakesolutions/scala-kafka-client/blob/master/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala
  println("\n\nIn consumer!!")
  override def receive = {
    case default => println("I am kConsumeCakeActor")
  }
}


/////////////////////////////////////////////////////////////////////
// receiver actor
class ReceiverActor extends Actor {
  // Extractor for ensuring type safe cast of records
  val recordsExt = ConsumerRecords.extractor[Int, ByteString]
  println("In receiver")
  override def receive: Receive = {
    case recordsExt(records) =>
      println("do something with records")
      //for (record <- records) println(record)
      sender() ! Confirm(records.offsets)

    case default => println("I am receiver!!")
  }
}

Sample output:

[info] Running eatKafka 
[info] 
[info] 
[info] 
[info] 
[info] 
[info] 
[info] creating system...
[info] 
[info] 
[info] creating receiver...
[info] 
[info] 
[info] creating consumer...
[info] In receiver
[info] pass subscription details to cakeKConsumer

[info] [INFO] [08/24/2017 09:33:20.853] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] Subscribing in auto partition assignment mode to topics [impression].
[info] I am receiver!!
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: records timed out while waiting for a confirmation.
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: redelivering.
[info] I am receiver!!
[info] I am receiver!!
Yeshwanth Kumar
@morpheyesh
@simonsouter when I am using AutoPartitionWithManualOffsetwith a assingedListener, does it subscribe and distribute all partitions to the consumers or only the partitions provided?
Meaning, if i have 100 partitions and only 10 I need to seek back, (ie: provide the partition:offset map through assignedListener callback), will the AutoPartitionWithManualOffset mode subscribe to all partition and only seek back on the 10 provided?
Simon Souter
@simonsouter
Hi Yeshwanth, Kafka may assign all or a subset of partitions, depending on if there are other nodes subscribing as part of the group. i.e it may assign all 100 partitions as in your example. Once those partitions are assigned, the assignedListener callback will be invoked. If you only provide 10 partition offsets, these 10 partitions will seek to the supplied position. I imagine the other 90 will default to either the beginning or the end of the partition (depending on the KafkaConsumer OffsetResetStrategy), although i don't recall testing that case specifically...
Yeshwanth Kumar
@morpheyesh
@simonsouter makes sense.
Jimin Hsieh
@jiminhsieh
I found Kafka has MockProducer, MockConsumer...etc. I am curious is it possible to wrap a small Scala wrapper on those Mocks? I’m willing to send the PR too. Thanks! :smile:
Jimin Hsieh
@jiminhsieh

@simonsouter Do you have any thought about this idea? Thanks! :smile:

is it possible to wrap a small Scala wrapper on those Mocks?

ashlesh0001
@ashlesh0001
Hi All,
I have a very vague question. I have a project on Spark Scala using maven framework. But, now I want to implement kafka subscriber to it so it can be invoked when needed. What would be the best approach to make these changes and how can I use cakesolutions to my project.
Thanks
rabzu
@rabzu

So the blog is very confusing:

So, you'll need to detect this [Failure] via some form of supervision and resend a subscription command >to the newly started instance. For brevity I've omitted such code from this example, but it's >something you should include in any long-running production system using >KafkaConsumerActor.

But in the github examples the parent Actor never "resends a subscription command "

Tal Ben Shabtay
@talbenshabtay
Hey all , i was wondering if anybody tried to override the consumer's supervisorStretagy in v0.9.0.0 ? looks like its using the default Decider even when:
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2) {
            case _: KafkaConsumerActor.ConsumerException =>
                log.error("Consumer exception caught. Restarting consumer.")
                SupervisorStrategy.Restart
            case ex =>
                log.warning(s"Failed processing ${ex.getMessage}")
                SupervisorStrategy.Escalate
        }
sheryl11
@sheryl11
  object kafkaproducer extends App {

  val  props = new Properties()
 props.put("bootstrap.servers", "brokerid")

  props.put("key.serializer", 
 "org.apache.kafka.common.serialization.StringSerializer")
   props.put("value.serializer", 
 "org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](props)
  val TOPIC="simulatordata"

  for(i<- 1 to 5000) // i dont know the limit, how to get all 
 data from simulator
  {
   val record = new ProducerRecord(TOPIC, "key", what 
  should i write here for getting value of each record ")
   producer.send(record)
   }

   val record = new ProducerRecord(TOPIC, "key", "should i 
   write here for getting value of each record 
   "+new java.util.Date)
   producer.send(record)
   producer.close()
i have simulator data and i really dont know the attributes .. some data is there. So i want all the data inside that to be send to kafka topic ... need modification in this code.
sheryl11
@sheryl11
note - the simulator data is in other machine , lets say localhost1:9092 , and i want to create the topic in my machine and store all the simulator data on my machine localhost2:9092 .
prassee
@prassee
New bie here does the Scala Kafka client support Kafka streams
Simon Souter
@simonsouter
No, but kafka streams already has a scala api, i think
Ivan
@advancedwebdeveloper
Hi there, people. Anyone interested to speak about the API, it's usage at Ukrainian conf?