Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Mar 04 13:18
    regiskuckaertz closed #170
  • Mar 04 13:18
    regiskuckaertz opened #170
  • Jun 19 2021 03:22
    AleksandrKovshariov commented #169
  • Jun 19 2021 03:22
    AleksandrKovshariov closed #169
  • Jun 16 2021 10:09
    AleksandrKovshariov opened #169
  • May 25 2021 19:53
    monrealis commented #146
  • May 04 2021 08:27
    FXHibon closed #168
  • May 04 2021 08:27
    FXHibon opened #168
  • Apr 08 2021 11:21
    simonsouter removed as member
  • Mar 04 2021 11:00
    regis-pirum closed #167
  • Mar 04 2021 10:59
    regis-pirum opened #167
  • Oct 24 2020 01:04
    ManuelWeiss converted_to_draft #166
  • Oct 22 2020 08:51
    ManuelWeiss opened #166
  • Oct 08 2020 18:38
    BlueTufa closed #165
  • Oct 08 2020 18:37
    BlueTufa opened #165
  • Sep 30 2020 13:01
    Passarinho4 commented #164
  • Aug 19 2020 01:52
    glorat commented #164
  • Aug 16 2020 08:13
    glorat commented #151
  • Aug 10 2020 14:29
    regiskuckaertz commented #151
  • Aug 07 2020 19:41
    jkomyno opened #164
Jaakko Pallari
@jkpl
What's the GenericAvroSerializer like in your example?
ahh nevermind it's in the issue you posted : )
Ferhat Aydın
@ferhtaydn
It is a wrapper on KafkaAvroSerializer,
Jaakko Pallari
@jkpl
yeah
Ferhat Aydın
@ferhtaydn
I think when I send messages with GenericAvroSerializer, the kafka library use that serializer as class name instead of KafkaAvroSerializer
then, it become a problem when try to deserialize at AvroConverter, magic byte problem.
Jaakko Pallari
@jkpl
Hmm, I'm not sure if that makes any difference to the Kafka library. The library should use whatever serializer instance you provide. The class names in the configs are only used for creating an instance of the serializer if the serializers are provided as string instead of instances.
I would recommend against using key.serializer and value.serializer in your config. Use the manually created instances instead.
Ferhat Aydın
@ferhtaydn
should I remove them from config?
Jaakko Pallari
@jkpl
Yeah, they should be unnecessary.
scala-kafka-client always feeds in the serializers/deserializers as object instances for type-safety. I guess we should add a filter for the config specified serializers/deserializers.
Ferhat Aydın
@ferhtaydn
hmm, so what is your key and value serializer objects for Foo in general?
Jaakko Pallari
@jkpl
The key serializer is whatever serializer you need to use. If you don't need to include keys in the messages, the byte array serializer from Java client is probably a good default.
The GenericAvroSerializer for the GenericRecord should work since it just makes sure that only GenericRecord values pass through it to the KafkaAvroSerializer.
I'm not really sure what's the problem if those don't work.
Ferhat Aydın
@ferhtaydn
Actually it works, I can produce records with GenericAvroSerializer and consume it like in https://github.com/ferhtaydn/sack/tree/master/src/main/scala/com/ferhtaydn/sack/avro
but there is an confluent vs kafka serialization problem, the magic byte 0x0 in first byte
what do you use in general for pojo serializers? something like GenericAvroSerializer or direct KafkaAvroSerializer objects?
Jaakko Pallari
@jkpl
I've used a solution where the KafkaAvroSerializer is wrapped.
Except that we do the pojo to generic record conversion in the serializer
which shouldn't really be that different from your case
Ferhat Aydın
@ferhtaydn
yeap
Jaakko Pallari
@jkpl
I haven't used the Cassandra sink before so I can't comment on how that's supposed to work.
Ferhat Aydın
@ferhtaydn
thank you so much Jaakko, I got tired of that serialization problems :) lets see how could be solved at the end.
Jaakko Pallari
@jkpl
No problem. In general, I think there's still quite a bit of stuff in serialization to figure out. We might put a bit of focus on that area more once we got the core client into a shape we're happy with. :smile:
Ferhat Aydın
@ferhtaydn
:) I think also. there is no difficult things in the tools for kafka itself and scala-client etc. the main problem is (de)serialization :)
Ferhat Aydın
@ferhtaydn

hey @jkpl, the problem is solved by using raw KafkaAvroSerializer by giving Object type to both producer and records. Strictly generic typed consumers and producers are a bit difficult to use for this level of kafka ecosystem stack I think, when you give everything with Properties and Object it just works :) the last comment for issue, you may want to check. datamountaineer/stream-reactor#80

thanks for help.

Jaakko Pallari
@jkpl
Hmm, that's interesting.
I definitely don't like the idea that magic makes it work... but then again it works. :)
It'd be helpful to know what exactly makes it work.
Jaakko Pallari
@jkpl
Anyone tried Kafka Streams? http://docs.confluent.io/3.0.0/streams/
The streams API seems to implement a few patterns that we've been thinking about.
The downside of the API is that it doesn't leave much room for doing asynchronous computations in the middle of the stream.
The API looks pretty neat though. It seems to be fairly easy to do stateful computations with persistent state.
Jaakko Pallari
@jkpl
Oh, and posting this here in case someone hasn't seen it yet: https://www.youtube.com/watch?v=lMlspFnfHM8 :)
prassee
@prassee
how to specify zookeeper ip in the kafka consumer config
Jaakko Pallari
@jkpl
@prassee Hi, sorry for the late response (Gitter stopped sending me email notifications for some reason). As of Kafka 0.9, you no longer specify Zookeeper IP in the consumer config. You only point your client to a Kafka cluster, and it'll figure out the Zookeeper communication for you.
prassee
@prassee
Iam using 0.9.0.0 but that does not worked until I explicitly specified ZK ip
Harmeet Singh(Taara)
@harmeetsingh0013
Hello Team
I am facing ask time out exception using KafakAkkaProducer, Please go to this link for more details http://stackoverflow.com/questions/42188622/apache-kafka-kafkaproduceractor-throws-exception-ask-timeout
Srinath
@srinathji
Hi Team,
i am facing below issue
17/07/14 17:11:45 INFO AppInfoParser: Kafka version : 0.10.0.0
17/07/14 17:11:45 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
17/07/14 17:11:45 INFO AbstractCoordinator: Discovered coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) for group tr-consumer-group.
17/07/14 17:11:45 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group tr-consumer-group
17/07/14 17:11:45 INFO AbstractCoordinator: (Re-)joining group tr-consumer-group
17/07/14 17:11:46 INFO AbstractCoordinator: Marking the coordinator node1.dfz.com:6667 (id: 2147482646 rack: null) dead for group tr-consumer-group
this issue is during spark-kafka streaming
please help on this
Jaakko Pallari
@jkpl
Are you using Spark with scala-kafka-client or Spark's own Kafka module?
First, I don't think any of us have experience using scala-kafka-client with Spark. Second, Spark uses its own, separate client for Kafka.
Michael Xing
@mikexing2010
Hi team, any thoughts on using cakesolutions vs http://doc.akka.io/docs/akka-stream-kafka/current/home.html?
Jaakko Pallari
@jkpl
The major difference is that Akka Streams Kafka is designed for Akka Streams while Scala-Kafka-Client provides integration to Akka.
Akka Streams Kafka is backed by the Akka team, but Scala-Kafka-Client isn't.