fs2-kafka
1.0.0 uses Kafka 2.5.0, FS2 2.3.0 and Vulcan 1.1.0master
, which looks still unreleased. Is that correct?Either[AvroError, Schema]
as Codec
doesn't guarantee a Schema
can be generated.@ Codec[String].schema
res0: Either[AvroError, org.apache.avro.Schema] = Right("string")
vulcan-generic
intentionally not support maps with non-String
keys? I understand that map keys must be strings in Avro, but my team was recently surprised by vulcan seemingly not being able to use an existing Codec to encode/decode a string key for a map (e.g. to use a value class for type safety)package vulcan.examples
import vulcan.Codec
import vulcan.examples.CaseClassTypedMapField.ValClass
import vulcan.generic._
final case class CaseClassTypedMapField(aMap: Map[ValClass, Int])
object CaseClassTypedMapField {
final case class ValClass(value: String) extends AnyVal
implicit val valClassCodec: Codec[ValClass] =
Codec.derive
implicit val codec: Codec[CaseClassTypedMapField] =
Codec.derive
}
implicit val valClassCodec: vulcan.Codec[ValClass] =
vulcan.Codec.string.imap(ValClass.apply)(_.value)
Hello, is it possible to create a Codec-Union on root level - like this
[
{
"type": "record",
"namespace": "io.confluent.examples.avro",
"name": "Customer",
"fields": [
{"name": "customer_id", "type": "int"},
{"name": "customer_name", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "customer_address", "type": "string"}
]
},
{
"type": "record",
"namespace": "io.confluent.examples.avro",
"name": "Product",
"fields": [
{"name": "product_id", "type": "int"},
{"name": "product_name", "type": "string"},
{"name": "product_price", "type": "double"}
]
}
]
if I create a codec on the following way
sealed trait Root
final case class Product(..) extends Root
final case class Customer(..) extends Root
val rootCodec = Codec.union[Root] {
implicit val productCodec: Codec[Product] = ???
implicit val customerCodec: Codec[Customer] = ???
union[Product] |+| union[Customer]
}
than the rootCodec generate a schema only for a specific message (also Customer or Product "record" schema)- which is not compatible with a union schema.
s. also https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/
Hello, guys.
I'm trying to adopt fs2-kafka in my company. Currently trying to write my first consumer using it and I faced with a NoSuchMethod error:
[error] (run-main-6) java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;)V
[error] java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;)V
[error] at vulcan.generic.package$MagnoliaCodec$.$anonfun$combine$4(package.scala:138)
[error] at scala.util.Either.map(Either.scala:353)
[error] at vulcan.generic.package$MagnoliaCodec$.$anonfun$combine$3(package.scala:132)
[error] at cats.data.Chain$.$anonfun$traverseViaChain$3(Chain.scala:708)
[error] at cats.Eval$.loop$1(Eval.scala:317)
[error] at cats.Eval$.cats$Eval$$evaluate(Eval.scala:363)
[error] at cats.Eval$FlatMap.value(Eval.scala:284)
[error] at cats.data.Chain$.traverseViaChain(Chain.scala:730)
[error] at cats.instances.ListInstances$$anon$1.traverse(list.scala:96)
[error] at cats.instances.ListInstances$$anon$1.traverse(list.scala:17)
[error] at cats.Traverse$Ops.traverse(Traverse.scala:162)
[error] at cats.Traverse$Ops.traverse$(Traverse.scala:161)
[error] at cats.Traverse$ToTraverseOps$$anon$3.traverse(Traverse.scala:185)
[error] at vulcan.generic.package$MagnoliaCodec$.$anonfun$combine$2(package.scala:131)
[error] at vulcan.AvroError$.catchNonFatal(AvroError.scala:54)
[error] at vulcan.generic.package$MagnoliaCodec$.combine$extension(package.scala:129)
.. confidential info here :) ..
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] stack trace is suppressed; run last service / Compile / bgRun for the full output
[error] Nonzero exit code: 1
[error] (service / Compile / run) Nonzero exit code: 1
[error] Total time: 1 s, completed Sep 30, 2020 1:16:56 PM
My current libraries versions are:
fs2-kafka - 1.0.0
vulcan - 1.0.1
I chose such version to avoid eviction of apache avro, which has version 1.9.1 for this setup.show runtime:fullClasspath
gives me the correct version of avro - Attributed(/home/lamdness/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/avro/avro/1.9.1/avro-1.9.1.jar)
In Intellij Idea I can go to the constructor of Schema$Field and it is there.
Dunno what I'm doing wrong. Will appreciate any help.
Thanks!
Union
type with two string types. As per the Avro specification, unions containing two array types or two map types are not permitted, but two types with different names are permitted
. But I am not able to achieve this in vulcan. Can somebody help with this?
Hi there,
I have a maybe silly question, I have looked into the Codec implementation and it looks like it is doing a sort of adaptation from a source type to a target type.
Does it meant that if I have a source GenericRecord and codec with a Avro-compatible schema, it will be able to deserialise it?
@bplommer @vlovgr I am testing the vulcan evolution logic, and I have a question. It does support deserialising the ByteBuffer as string. However, it does not support Array[Byte] to string.
I believe the rationale is that the AvroSdk, when deserialising, uses ByteBuffer rather tha Array[Byte]
Is this correct? There will be any befit adding the support also for Array[Byte] (Or IndexedSeq[Byte])
it("should decode if schema is part of union") {
assertDecodeIs[SealedTraitCaseClass](
unsafeEncode[SealedTraitCaseClass](FirstInSealedTraitCaseClass(0)),
Right(FirstInSealedTraitCaseClass(0)),
Some(unsafeSchema[FirstInSealedTraitCaseClass])
)
}
unsafeEncode[SealedTraitCaseClass ]
here, I believe is using the schema for SealedTraitCaseClass
rather than FirstInSealedTraitCaseClass
unsafeEncode[SealedTraitCaseClass]
to unsafeEncode[FirstInSealedTraitCaseClass]
it fails?
I know. So me and Fabio have developed a library that needs Vulcan to apply the evolution logic, this is why I have tested all the cases to find if there was some not supported.
And So far I have found this one, and another one on the enum but I am not sure about the enaum one.
So if you change thatunsafeEncode[SealedTraitCaseClass]
tounsafeEncode[FirstInSealedTraitCaseClass]
it fails?