Scala helper modules for operating the Apache Kafka client library (0.9.x - 0.10.x)
hey @jkpl, the problem is solved by using raw KafkaAvroSerializer
by giving Object
type to both producer and records. Strictly generic typed consumers and producers are a bit difficult to use for this level of kafka ecosystem stack I think, when you give everything with Properties
and Object
it just works :) the last comment for issue, you may want to check. datamountaineer/stream-reactor#80
thanks for help.
You just need to set the "enable.idempotence" option on the producer. See the kafka docs here: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
The config helper classes in scala-kafka-client now have a parameter (KafkaProducer.Conf -> enableIdempotence: Boolean = false)
I'm trying to set up a very simple log as part of a learning exercise. The exercise is evolving, basically simple transaction processing (think, banking). It starts with using a Scala Stream. Rapidly that stops working when you get to more than one user, so now I'm putting Kafka in. Wrote a super simple (too simple) POC as "step 1." Basically, just a log that transactions are written to. Long term it will introduce Akka, but baby steps. What I have right now, basically:
case class TransactionLog() {
val random = new scala.util.Random // used to create random group IDs to avoid metrics collision
val key = "transaction"
val topic = "transactions"
var host = "localhost:9092"
var group = s"transaction_group-${random.nextInt}"
val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = s"$host"))
val consumer = KafkaConsumer(KafkaConsumer.Conf(new StringDeserializer(),
new StringDeserializer(),
bootstrapServers = s"$host",
groupId = s"$group",
enableAutoCommit = false))
consumer.subscribe(List(topic).asJava)
def poll = consumer.poll(1000)
def send(t: Transaction) = {
val m = producer.send(KafkaProducerRecord(topic, Some(key), t.toString))
producer.flush()
m
}
Now I know this is crude but at this stage we are just introducing the idea of some kind of persistent log. Problem is: I write tests in specs2, and those rapidly exposed a major flaw I hadn't thought of: consumers are not thread safe. Oops. Question is -- what can I do that is absolutely minimal, but still gets the job done? (Realistically, at this point, the only thing we do when reading back from the log is to read the entire log, to tally all transactions). New to scala-kafka-client and trying to figure it out...
sequential
). That prevents the multithreading behavior but allows introduction of a basic Kafka. In the next lesson I can expand on it with reactive Kafka -- which works well, it introduces the idea of a topic, but also gives students the problem of multithreading to chew on.