Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Alexander Ray
    @AlexanderRay

    Hello, is it possible to create a Codec-Union on root level - like this

    [
      {
       "type": "record",
       "namespace": "io.confluent.examples.avro",
       "name": "Customer",
    
       "fields": [
           {"name": "customer_id", "type": "int"},
           {"name": "customer_name", "type": "string"},
           {"name": "customer_email", "type": "string"},
           {"name": "customer_address", "type": "string"}
       ]
      },
      {
       "type": "record",
       "namespace": "io.confluent.examples.avro",
       "name": "Product",
    
       "fields": [
           {"name": "product_id", "type": "int"},
           {"name": "product_name", "type": "string"},
           {"name": "product_price", "type": "double"}
       ]
      }
    ]

    if I create a codec on the following way

    sealed trait Root
    final case class Product(..) extends Root
    final case class Customer(..) extends Root
    
    val rootCodec = Codec.union[Root]  {
       implicit val productCodec: Codec[Product] = ???
       implicit val customerCodec: Codec[Customer] = ???
    
      union[Product] |+| union[Customer] 
    }

    than the rootCodec generate a schema only for a specific message (also Customer or Product "record" schema)- which is not compatible with a union schema.

    s. also https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/

    Michael
    @grouzen

    Hello, guys.

    I'm trying to adopt fs2-kafka in my company. Currently trying to write my first consumer using it and I faced with a NoSuchMethod error:

    [error] (run-main-6) java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;)V
    [error] java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;)V
    [error]     at vulcan.generic.package$MagnoliaCodec$.$anonfun$combine$4(package.scala:138)
    [error]     at scala.util.Either.map(Either.scala:353)
    [error]     at vulcan.generic.package$MagnoliaCodec$.$anonfun$combine$3(package.scala:132)
    [error]     at cats.data.Chain$.$anonfun$traverseViaChain$3(Chain.scala:708)
    [error]     at cats.Eval$.loop$1(Eval.scala:317)
    [error]     at cats.Eval$.cats$Eval$$evaluate(Eval.scala:363)
    [error]     at cats.Eval$FlatMap.value(Eval.scala:284)
    [error]     at cats.data.Chain$.traverseViaChain(Chain.scala:730)
    [error]     at cats.instances.ListInstances$$anon$1.traverse(list.scala:96)
    [error]     at cats.instances.ListInstances$$anon$1.traverse(list.scala:17)
    [error]     at cats.Traverse$Ops.traverse(Traverse.scala:162)
    [error]     at cats.Traverse$Ops.traverse$(Traverse.scala:161)
    [error]     at cats.Traverse$ToTraverseOps$$anon$3.traverse(Traverse.scala:185)
    [error]     at vulcan.generic.package$MagnoliaCodec$.$anonfun$combine$2(package.scala:131)
    [error]     at vulcan.AvroError$.catchNonFatal(AvroError.scala:54)
    [error]     at vulcan.generic.package$MagnoliaCodec$.combine$extension(package.scala:129)
    .. confidential info here :) ..
    [error]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    [error]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    [error]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    [error]     at java.lang.reflect.Method.invoke(Method.java:498)
    [error] stack trace is suppressed; run last service / Compile / bgRun for the full output
    [error] Nonzero exit code: 1
    [error] (service / Compile / run) Nonzero exit code: 1
    [error] Total time: 1 s, completed Sep 30, 2020 1:16:56 PM

    My current libraries versions are:
    fs2-kafka - 1.0.0
    vulcan - 1.0.1

    I chose such version to avoid eviction of apache avro, which has version 1.9.1 for this setup.
    show runtime:fullClasspath gives me the correct version of avro - Attributed(/home/lamdness/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/avro/avro/1.9.1/avro-1.9.1.jar)
    In Intellij Idea I can go to the constructor of Schema$Field and it is there.

    Dunno what I'm doing wrong. Will appreciate any help.
    Thanks!

    Michael
    @grouzen
    Seems like the problem in generic module, because I can generate codec using Codec.record
    Michael
    @grouzen
    Problem fixed. The cause of it was in our library which depends on older versions of kafka artifacts.
    Nishant Vishwakarma
    @nishantv12
    I need to create a Union type with two string types. As per the Avro specification, unions containing two array types or two map types are not permitted, but two types with different names are permitted. But I am not able to achieve this in vulcan. Can somebody help with this?
    Louis Forite
    @lforite
    Hello, I am using vulcan to publish events in Kafka. It works great so thanks a lot for this very cool library­čĹîI am currently wondering if is there an elegant way to share a Kafka producer for multiple unrelated events / topics. Is using a shapeless coproduct the way to go ? Thanks
    Viktor Rudebeck
    @vlovgr
    @lforite if you can put all events into a union (e.g. using shapeless coproduct), that's the easiest solution.
    @nishantv12 you'll probably have to wrap the strings in records (with different names) to get that working
    Louis Forite
    @lforite
    @vlovgr I will be trying soon, I will keep you posted. I am afraid it will create a schema with a union type in the specific topics
    Viktor Rudebeck
    @vlovgr
    @lforite if you use fs2-kafka-vulcan it shouldn't at least, as I've used this approach many times.
    Louis Forite
    @lforite
    @vlovgr it seems to be working fine :) Awesome !
    Filippo De Luca
    @filosganga

    Hi there,
    I have a maybe silly question, I have looked into the Codec implementation and it looks like it is doing a sort of adaptation from a source type to a target type.

    Does it meant that if I have a source GenericRecord and codec with a Avro-compatible schema, it will be able to deserialise it?

    Viktor Rudebeck
    @vlovgr
    Yes, it provides a mapping between Scala types and types supported by the Java Avro library.
    Ben Plommer
    @bplommer
    I've created https://gitter.im/fd4s/dev for discussion related to development of fd4s libraries - anyone interested should feel free to join
    Jacqueline Hubbard
    @JackieDev
    Hi I have a problem, my Codec/s are throwing a scala.UninitializedFieldError however I can see that my codecs are covering all nested Models/DataTypes, has anyone else had this issue?
    Ben Plommer
    @bplommer
    (The problem Jacqueline had was the result of a codec defined in a val depending on another val defined further down in the same object - the answer was to replace vals with lazy vals)
    Filippo De Luca
    @filosganga

    @bplommer @vlovgr I am testing the vulcan evolution logic, and I have a question. It does support deserialising the ByteBuffer as string. However, it does not support Array[Byte] to string.

    I believe the rationale is that the AvroSdk, when deserialising, uses ByteBuffer rather tha Array[Byte]

    Is this correct? There will be any befit adding the support also for Array[Byte] (Or IndexedSeq[Byte])

    Ben Plommer
    @bplommer
    yeah, that's correct. There'd only be a benefit in supporting Array[Byte] if that's something the java lib outputs
    Filippo De Luca
    @filosganga
    Thanks makes sense to me
    Filippo De Luca
    @filosganga

    @bplommer As you could imagine I am hacking with the vulcan evolution logic. There is one case that does not seem to be supported:

    If the reader shcema is a union of A,B and the writer schema is just B, the reader should be able to read it.

    However, in vulcan it fails.

    There is a test in vulcan for it:
            it("should decode if schema is part of union") {
              assertDecodeIs[SealedTraitCaseClass](
                unsafeEncode[SealedTraitCaseClass](FirstInSealedTraitCaseClass(0)),
                Right(FirstInSealedTraitCaseClass(0)),
                Some(unsafeSchema[FirstInSealedTraitCaseClass])
              )
            }
    However, unsafeEncode[SealedTraitCaseClass ] here, I believe is using the schema for SealedTraitCaseClass rather than FirstInSealedTraitCaseClass
    Ben Plommer
    @bplommer
    Yeah, it would be. There's quite a confusing division of work between the java lib and the Vulcan code with regard to schema resolution. With the confluent deserializers used in fs2-kafka-vulcan the decoded record should already be adapted to match the reader schema before vulcan sees it
    So if you change that unsafeEncode[SealedTraitCaseClass] to unsafeEncode[FirstInSealedTraitCaseClass]it fails?
    Filippo De Luca
    @filosganga

    I know. So me and Fabio have developed a library that needs Vulcan to apply the evolution logic, this is why I have tested all the cases to find if there was some not supported.

    And So far I have found this one, and another one on the enum but I am not sure about the enaum one.

    So if you change that unsafeEncode[SealedTraitCaseClass] to unsafeEncode[FirstInSealedTraitCaseClass]it fails?
    Will try it now
    In case it fails, are you happy to me opening a PR?
    Ben Plommer
    @bplommer
    by all means
    I guess that's an internal ovo thing?
    Filippo De Luca
    @filosganga
    yes for the time being it is internal
    Ben Plommer
    @bplommer
    hmm, actually it shouldn't matter which schema you're encoding with - the encoding of an enum is just the encoding of the value type
    Filippo De Luca
    @filosganga
    In fact it does work as expected, my bad
    The failing case is another one :)
    if writer's is a union, but reader's is not
    If the reader's schema matches the selected writer's schema, it is recursively resolved against it. If they do not match, an error is signalled.
    Basically the reverse
    Ben Plommer
    @bplommer
    Ah right, yeah. So when we check the writer schema type, we need to always accept SchemaType.UNION
    Feel free to open a PR for that :)
    Filippo De Luca
    @filosganga
    cool thanks!
    Ben Plommer
    @bplommer
    Oh btw @filosganga I remember some time back we were talking about re-implementing functionality from the Java lib so Vulcan wouldn't depend on it - I have a couple of WIP/experimental PRs up for aspects of that (one for representing schemas, one for scodec-based codecs)
    Filippo De Luca
    @filosganga
    It would be great
    Ben Plommer
    @bplommer
    This one is a separate module that replaces the encoding/decoding functionality while keeping the existing vulcan API (so uses the java schema representation) - fd4s/vulcan#289
    I think even if it doesn't become the preferred implementation, it would be good to have it for cross-validating the implementations against each other and clarifying our understanding
    I finally made some proper progress by starting from what we had rather than starting from the ground up with trying to produce a fully statically validated implementation
    Fabio Labella
    @SystemFw
    oh, one quick win
    the schema registry client has a lot of synchronized for what's essentially a concurrent map and a couple of rest calls that need to be cached
    Ben Plommer
    @bplommer
    yeah, we have a ticket in fs2-kafka to reimplement that
    I feel like some of that fs2-kafka code should move to vulcan though - the thing of wiring up the serdes with the codecs to get the right schema resolution seems much more an avro thing than a kafka thing
    Filippo De Luca
    @filosganga
    Uh, I am not sure about that. If the vulcona scope is just Avro, it has nothing to do with Kafka serde, but can be used in any other context. We use it with DynamoDb for example