Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Rajh
    @Rajh
    Well maybe I could use your RecordFilterStrategy to take care of Deserialisation errors and DLT and keeping only the try/process/catch in the listeners calling an generic error handler bean
    Gary Russell
    @garyrussell

    to duplicate the code

    Temporarily, until we enhance the framework to support this use case.

    Rajh
    @Rajh
    Hi, I'm serving a REST endpoint which is publishing into kafka.
    I want exactly one processing so I'm using a transaction with an idempotent producer. And I'm doing the following : kafkaTemplate.send().get()
    Do you think the ".get()" is needed ? (the producer is quite slow with all that)
    The REST endpoint is synchrone :s
    Gary Russell
    @garyrussell

    You must use get() to confirm delivery, if that is critical, but you should use the variant with a timeout, otherwise you risk hanging indefinitely.

    To clarify, though; "exactly once" really only applies to kafkaRead/process/KafkaWrite and then the only guarantee is that the write and read offset commit are atomic; the process step is at least once.

    http/process/kafkaWrite is simply transactional.

    Rajh
    @Rajh
    You mean I don't need the transaction in this case ?
    That's what I thought but I still need idempotent and .get(), can't test now, but since I introduced transactions the producer is quite slow
    Gary Russell
    @garyrussell
    Yes; you need transactions for this use case; I was just trying to explain the term "exactly once".
    Yes; it will slow things down; Kafka transactions are not really designed for one send/one commit. With many sends/one commit, the overhead is much lower.
    Rajh
    @Rajh
    Oh yes but I was talking about the whole app :D
    But I'm not really sure why I need transactions here. If my producer is idempotent and I'm using .get() and producing to only one topic it should be ok, shouldnt it ?
    (btw I replaced my single record consumer with batch versions and its ok now)
    (still having around ~400ms for the whole process instead of ~50 ms ..)
    Gary Russell
    @garyrussell

    If it's simply write to kafka and there is no other possibility of failure thereafter then, no, you probably don't need transactions. If the POST might fail after the send, then you might want to use transactions (and read_committed on the consumer).

    I am not sure what you mean by "idempotent producer"; idempotency usually applies to consumers (i.e. they can handle
    duplicate messages).

    Rajh
    @Rajh
    idempotent is a config for producers to prevent duplicate on retries, its a kafka config
    I'll do some more load test on monday :)
    Gary Russell
    @garyrussell
    Oh, right; forgot; sorry.
    Rajh
    @Rajh
    Why do we HAVE to use a transaction when transaction are configured ?
    I mean the KafkaTemplate assert that we are inTransaction if the KafkaTemplate is transactional.
    But what if we don't always want transactions ?
    Gary Russell
    @garyrussell
    Configure two producer factories and templates; one with transactions, one without.
    Rajh
    @Rajh
    The thing is I'm calling a business layer which inject a KafkaTemplate
    I dont really want to inject 2 kafkaTemplate and do something like if (kafkaTemplate.isInTransaction()) kafkaTemplateWithTx.send() else kafkaTemplateWithoutTx.send()
    cherepnalkovski
    @cherepnalkovski
    Hi All,
    I am trying to implement Producer and Consumer in different modules with java + spring boot and Avro schema.
    I am producing messages successfully, there is 200 status on schema registry server when the producer sends message with avro schema, but i have errors when this message arrives at the consumer. Looks like consumer doesn't know nothing about the schema registry server and schema format at all. There is no request to the schema registry server to get the schema format.
    Is there any example with producer, consumer in different modules ?
    Filip Halemba
    @filip-halemba
    @cherepnalkovski Did you set up at least this two properties in your application.yml file or java configuration file?
        consumer:
          ...
          value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        properties:
          schema.registry.url: "localhost:8081 - or another url to your schema registry"
    cherepnalkovski
    @cherepnalkovski

    @filip-halemba
    This is my configuration for the consumer

    @Configuration
    public class KafkaConsumerConfiguration
    {
    @Value(value = "${spring.kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.schemaRegistryAddress}")
    private String schemaRegistryAddress;
    
    @Bean
    public Map<String, Object> consumerConfigs()
    {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
    
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory()
    {
        return new DefaultKafkaConsumerFactory(consumerConfigs(), new KafkaAvroDeserializer(), new KafkaAvroDeserializer());
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
    
        return factory;
    }

    And I have another class which is Consumer class :

    @KafkaListener(topics = "new_audit_topic_avro", group_id="helloworld")
    public void consumeMessages(ConsumerRecord<String, GenericRecord> record) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(10);
    logger.info("received payload='{}'", record.toString());
    latch.await();
    }

    Filip Halemba
    @filip-halemba
    @cherepnalkovski Did you registered Consumer class as a Spring bean?
    Adrian Soria
    @adrianSoria
    In my AMQ tests with inmemory AMQ i can get the Queues and do a queue.purge(); That way I avoid using @DirtiesContext. Is there any way to do a topic.purge() after a test is run with Kafka?
    cherepnalkovski
    @cherepnalkovski
    @filip-halemba Yes, this class have @Service annotation so it's spring bean. I will post the error soon
    Gary Russell
    @garyrussell
    @adrianSoria If you have a consumer in your test, you can use consumer(assignments()).seekToEnd(). There is no concept of "purging" a topic in Kafka.
    Adrian Soria
    @adrianSoria
    thanks @garyrussell ! will try this out
    Filip Halemba
    @filip-halemba
    Hi :) Is there any option to add custom HandlerMethodArgumentResolverto @KafkaListener other than copy-pasting the whole class and adding here own HandlerMethodArgumentResolver?
    Gary Russell
    @garyrussell
    @adrianSoria I prefer to use distinct topics for each test.
    @filip-halemba Currently, the only extension point is to add a custom MessageHandlerMethodFactory. It might be tricky to provide an extension point to add argument resolvers, because the order might be important. Feel free to open a new feature issue in GitHub (and contributions are welcome).
    cherepnalkovski
    @cherepnalkovski

    @filip-halemba Here is the error message :

    2020-01-21 15:10:27 - Commit list: {}
    2020-01-21 15:10:27 - Error while processing: null
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition new_audit_topic_avro-0 at offset 0. If needed, please seek past the record to continue consumption.
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 21
    Caused by: java.lang.NullPointerException: null
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) ~[kafka-avro-serializer-3.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) ~[kafka-avro-serializer-3.3.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-3.3.1.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:993) ~[spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949) ~[spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:901) ~[spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_222-1-ojdkbuild]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_222-1-ojdkbuild]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_222-1-ojdkbuild]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222-1-ojdkbuild]

    Filip Halemba
    @filip-halemba
    @garyrussell thank you I will look on it.
    cherepnalkovski
    @cherepnalkovski

    I have Consumer in one module, and I need to receive message with Avro schema serializes.
    I haven't information about the schema (it's defined in some other module).
    How to create my Consumer? What kind of parameter should I receive ?
    How this module will communicate with schema registry service ?

    @KafkaListener(topics = "new_audit_topic_avro", groupId = "helloworld")
    public void consumeMessages(ConsumerRecord<String, GenericRecord> record) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    logger.info("received payload='{}'", record.toString());
    latch.await();
    }

    This is not working.

    Gary Russell
    @garyrussell

    Please learn how to format code (see the Markdown link to the right). Caused by: java.lang.NullPointerException: null this is in the deserializer itself and has nothing to do with Spring - I can't tell what the NPE is caused by because the line numbers don't line up with the source version I have. Use a debugger to see why you are getting an NPE - it's either a problem on the producer side or the schema can't be found in the registry.

    You should ask general questions about Kafka/Avro to the wider Kafka community; we are not experts on Avro here.

    cherepnalkovski
    @cherepnalkovski
    Thanks @garyrussell . I will try to find help there.
    Martin Thurau
    @martinth

    Hi! I have a (transactional) @KafkaListener that might take a long time (multiple minutes) to handle a single message (which is fine in itself, it does a bunch of remote calls to external APIs that is also a bit flaky and slow). When the listener then finishes and tries to commit the offset it fails with org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

    To my understanding the reason for this is the transaction.timeout.ms which defaults to 60 seconds. Is that correct? Can I increase this limit somehow for this specific listener? I already have an own ConcurrentKafkaListenerContainerFactory for it. But there I can only set consumer properties. However transaction.timeout.ms is a producer setting. Where does the producer that commit the offset come from and how can I change the setting for it (without setting it globally for all producers)?

    Gary Russell
    @garyrussell
    There is currently no way to do that; you can't change producer properties after the producer is created; you would need a separate producer factory for these producers. We could look at providing some mechanism but it will require changes to the framework; feel free to open a new feature issue in GitHub and we'll take a look.
    Martin Thurau
    @martinth
    Bummer. Okay, I will see if I can somehow drop the "transactional" needs. If I just don't need the listener to be transactional I could just Ack the message as the first step and in case of errors manually "reschedule" to the same topic?
    Gary Russell
    @garyrussell
    The SeekToCurrentErrorHandler can handle such errors (re-seek so the record is re-delivered). You may need to increase max.poll.interval.ms from its default 5 minutes to avoid a rebalance.
    Martin Thurau
    @martinth
    Maybe I have a misunderstanding then, because I already have the SeekToCurrentErrorHandler on my ListenerContainerFactory. If I understood you correctly (and the code of SeekToCurrentErrorHandler at which I just looked) the messages already get ack-ed immediatly when I use it?
    Gary Russell
    @garyrussell
    No; it is not ack'd until the listener method exits (unless you are using MANUAL_IMMEDIATE ack mode); but we have to re-seek so the record is re-fetched on the next poll.
    Martin Thurau
    @martinth

    unless you are using MANUAL_IMMEDIATE ack mode

    Is this maybe what I want (stupid question, I know)? If I read this it will re-send the record to the same topic-partition when MANUAL_IMMEDIATE is true? So I could configure my AckMode to MANUAL_IMMEDIATE and then .acknowledge() at the start of my listener (which would commit the offset?). And one Exception the SeekToCurrentErrorHandler will re-send the record for me.
    There is then a slight chance that I could loose a message if my service is killed after calling ack but that would be okay for me.

    Gary Russell
    @garyrussell
    That is correct; but none of this will help if your listener takes longer than max.poll.interval.ms; you will get a rebalance. It's not at all clear why you want to commit the offset so early.
    Martin Thurau
    @martinth

    Okay, but max.poll.interval.ms is a consumer setting which I could just change for the ConsumerFactory in my ListenerContainerFactory.

    It's not at all clear why you want to commit the offset so early.

    I assume it's because I don't an in-depth understanding what my problem is :/ Or maybe I have stated my problem not clearly enough:
    I have a listener that takes it's sweet time. I already only poll on record but I still get a ProducerFencedException when my listener finally finishes and commits. So I assume committing the offset early would solve my problem "neatly".

    Gary Russell
    @garyrussell
    No it will not; the transaction won't be committed until the listener exits and the failure will occur then. I thought you decided to drop transactions for this use case. I was discussing the scenario in that context.
    Martin Thurau
    @martinth

    Okay, I think now I understood! My mental model of "ack" vs. "commit" is still a bit flaky :/

    To rephrase: If I drop the transactional from my listener it does not really matter if I ack early or late as long as my listener does not take longer than max.poll.interval.ms.

    Martin Thurau
    @martinth
    Now that I think about it I wonder why I added the @Transactional(transactionManager = "kafkaTransactionManager") in the first place. The only thing I do i take the incoming data, process it and then write to the database. And all the database interaction is already behind a JPA transaction manager to it will be rolled back anyway in case of an uncaught exception. I don't have any other interaction with Kafka other than consuming.
    Thanks @garyrussell I think I have things to try out. Sorry for the stupid questions :o
    Gary Russell
    @garyrussell
    No problem; for future reference, if you do want to synchronize Kafka and JPA transactions you should inject a ChainedKafkaTransactionManager into the listener container; when using @Transactional, the container knows nothing about the Kafka transaction and cannot send the consumed offset(s) to the transaction (which is how Kafka makes consumption and production atomic).