Scala helper modules for operating the Apache Kafka client library (0.9.x - 0.10.x)
hey @jkpl, the problem is solved by using raw KafkaAvroSerializer
by giving Object
type to both producer and records. Strictly generic typed consumers and producers are a bit difficult to use for this level of kafka ecosystem stack I think, when you give everything with Properties
and Object
it just works :) the last comment for issue, you may want to check. datamountaineer/stream-reactor#80
thanks for help.
You just need to set the "enable.idempotence" option on the producer. See the kafka docs here: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
The config helper classes in scala-kafka-client now have a parameter (KafkaProducer.Conf -> enableIdempotence: Boolean = false)
I'm trying to set up a very simple log as part of a learning exercise. The exercise is evolving, basically simple transaction processing (think, banking). It starts with using a Scala Stream. Rapidly that stops working when you get to more than one user, so now I'm putting Kafka in. Wrote a super simple (too simple) POC as "step 1." Basically, just a log that transactions are written to. Long term it will introduce Akka, but baby steps. What I have right now, basically:
case class TransactionLog() {
val random = new scala.util.Random // used to create random group IDs to avoid metrics collision
val key = "transaction"
val topic = "transactions"
var host = "localhost:9092"
var group = s"transaction_group-${random.nextInt}"
val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = s"$host"))
val consumer = KafkaConsumer(KafkaConsumer.Conf(new StringDeserializer(),
new StringDeserializer(),
bootstrapServers = s"$host",
groupId = s"$group",
enableAutoCommit = false))
consumer.subscribe(List(topic).asJava)
def poll = consumer.poll(1000)
def send(t: Transaction) = {
val m = producer.send(KafkaProducerRecord(topic, Some(key), t.toString))
producer.flush()
m
}
Now I know this is crude but at this stage we are just introducing the idea of some kind of persistent log. Problem is: I write tests in specs2, and those rapidly exposed a major flaw I hadn't thought of: consumers are not thread safe. Oops. Question is -- what can I do that is absolutely minimal, but still gets the job done? (Realistically, at this point, the only thing we do when reading back from the log is to read the entire log, to tally all transactions). New to scala-kafka-client and trying to figure it out...
sequential
). That prevents the multithreading behavior but allows introduction of a basic Kafka. In the next lesson I can expand on it with reactive Kafka -- which works well, it introduces the idea of a topic, but also gives students the problem of multithreading to chew on.
Hi, I'm still pretty new to Scala and am trying to get the cakesolutions akka client up and running.
I think I have got my (very) rough implementation connected to kafka, but am puzzled as to how to pass messages between consumer <--> receiver. My code is below.
Could anyone hopefully give me any pointers or examples on how to get consumer and receiver actors talking?
object eatKafka extends App {
println("\n\n\n")
// Create an Akka system
println("\n\ncreating system...")
val system = ActorSystem("VySystem")
// create receiver
println("\n\ncreating receiver...")
val receiver = system.actorOf(Props[ReceiverActor], name = "receiver")
// create cake kafka consumer
println("\n\ncreating consumer...")
val conf = ConfigFactory.load()
//Construct the KafkaConsumerActor with Typesafe config in resources
val cakeKConsumer = system.actorOf(
KafkaConsumerActor.props(conf, new ByteArrayDeserializer(), new ByteArrayDeserializer(), receiver)
)
println("pass subscription details to cakeKConsumer")
cakeKConsumer ! Subscribe.AutoPartition(List("impression")) //Subscribe.ManualOffset(Offsets(Map((new TopicPartition("impression", 0), 1))))
}
/////////////////////////////////////////////////////////////////////
// kafka consumer actor
class kConsumeCakeActor extends Actor {
// https://github.com/cakesolutions/scala-kafka-client/blob/master/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala
println("\n\nIn consumer!!")
override def receive = {
case default => println("I am kConsumeCakeActor")
}
}
/////////////////////////////////////////////////////////////////////
// receiver actor
class ReceiverActor extends Actor {
// Extractor for ensuring type safe cast of records
val recordsExt = ConsumerRecords.extractor[Int, ByteString]
println("In receiver")
override def receive: Receive = {
case recordsExt(records) =>
println("do something with records")
//for (record <- records) println(record)
sender() ! Confirm(records.offsets)
case default => println("I am receiver!!")
}
}
Sample output:
[info] Running eatKafka
[info]
[info]
[info]
[info]
[info]
[info]
[info] creating system...
[info]
[info]
[info] creating receiver...
[info]
[info]
[info] creating consumer...
[info] In receiver
[info] pass subscription details to cakeKConsumer
[info] [INFO] [08/24/2017 09:33:20.853] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] Subscribing in auto partition assignment mode to topics [impression].
[info] I am receiver!!
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: records timed out while waiting for a confirmation.
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: redelivering.
[info] I am receiver!!
[info] I am receiver!!