Scala helper modules for operating the Apache Kafka client library (0.9.x - 0.10.x)
GenericAvroSerializer
for the GenericRecord
should work since it just makes sure that only GenericRecord
values pass through it to the KafkaAvroSerializer
.
GenericAvroSerializer
and consume it like in https://github.com/ferhtaydn/sack/tree/master/src/main/scala/com/ferhtaydn/sack/avro
the magic byte
0x0 in first byte
GenericAvroSerializer
or direct KafkaAvroSerializer
objects?
hey @jkpl, the problem is solved by using raw KafkaAvroSerializer
by giving Object
type to both producer and records. Strictly generic typed consumers and producers are a bit difficult to use for this level of kafka ecosystem stack I think, when you give everything with Properties
and Object
it just works :) the last comment for issue, you may want to check. datamountaineer/stream-reactor#80
thanks for help.
You just need to set the "enable.idempotence" option on the producer. See the kafka docs here: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
The config helper classes in scala-kafka-client now have a parameter (KafkaProducer.Conf -> enableIdempotence: Boolean = false)
I'm trying to set up a very simple log as part of a learning exercise. The exercise is evolving, basically simple transaction processing (think, banking). It starts with using a Scala Stream. Rapidly that stops working when you get to more than one user, so now I'm putting Kafka in. Wrote a super simple (too simple) POC as "step 1." Basically, just a log that transactions are written to. Long term it will introduce Akka, but baby steps. What I have right now, basically:
case class TransactionLog() {
val random = new scala.util.Random // used to create random group IDs to avoid metrics collision
val key = "transaction"
val topic = "transactions"
var host = "localhost:9092"
var group = s"transaction_group-${random.nextInt}"
val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = s"$host"))
val consumer = KafkaConsumer(KafkaConsumer.Conf(new StringDeserializer(),
new StringDeserializer(),
bootstrapServers = s"$host",
groupId = s"$group",
enableAutoCommit = false))
consumer.subscribe(List(topic).asJava)
def poll = consumer.poll(1000)
def send(t: Transaction) = {
val m = producer.send(KafkaProducerRecord(topic, Some(key), t.toString))
producer.flush()
m
}
Now I know this is crude but at this stage we are just introducing the idea of some kind of persistent log. Problem is: I write tests in specs2, and those rapidly exposed a major flaw I hadn't thought of: consumers are not thread safe. Oops. Question is -- what can I do that is absolutely minimal, but still gets the job done? (Realistically, at this point, the only thing we do when reading back from the log is to read the entire log, to tally all transactions). New to scala-kafka-client and trying to figure it out...