Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ming Li
    @danwslightinc

    Hi there, I am trying to implement Circuit Breaker with Resillience4J around Spring Cloud Stream Kafka Binder.

         @Bean
        public java.util.function.Consumer<Message<SomePojo>> process() {
            return message -> {
                var consumer = message.getHeaders().get(KafkaHeaders.CONSUMER, Consumer.class);
                var acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                var currentState = circuitBreakerRegistry.circuitBreaker(BACKEND_SERVICE).getState();
                if (currentState == CircuitBreaker.State.OPEN) {
                    var topic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class);
                    var partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
                    consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
                    if (acknowledgment != null) {
                        acknowledgment.nack(10);
                    }
                }
                // some business logic
                if (acknowledgment != null) {
                    log.debug("Acknowledged");
                    acknowledgment.acknowledge();
                }
            };
        }

    The above solution works for one message in the topic. When I have multiple messages, the app fails to poll after around 15 minutes. Did I miss anything?

    Thank you.

    13 replies
    Jack Wilkinson
    @guacjack

    Hey all just got a quick query and hoping one of you might be able to see where i've gone wrong.

    in my application.properties i have

    spring.kafka.bootstrap-servers=http://atesturl.com:9092

    and then i have application-test.properties i have

    spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

    Now when i run my actual tests with @ActiveProfiles("test"), i can see that the app runs, with the embeddedkafka broker and i can see the @KafkaListeners being called correctly after publishing messages....but i can see in the logs that it is firstly trying to connect using the "http://atesturl.com:9092" in my default application.properties

    So i end up with this in the logs

    Failed authentication with http://atesturl.com:9092/{the ip address} (Authentication failed)

    but then afterwards it seems to use the embedded kafka version. Its almost like its starting up in the application.properties version before being eventually overriden with the one in the application-test.properties file

    24 replies
    Jack Wilkinson
    @guacjack
    Anyone know if there is any kind of function or callback i can hook into for when my Kafka Consumer has connected to the broker? I am connecting via @KafkaListener and everything works fine, im just trying to write an integration test at the moment to verify that its connected
    14 replies
    Knut Schleßelmann
    @kschlesselmann
    Hi! Are there currently any known issues with health checks in current releases? I notice the health check response time goes up in one of our services from a couple ms to 10–20 seconds(!). Right now kafka is my prime suspect. It doesn't matter if I choose Spring Boot 2.4.7 or 2.5.1. With 2.4.6 everything seems fine. I'm still debugging to get some better info :-/
    1 reply
    Jack Wilkinson
    @guacjack
    Does Spring-Kafka change the default max.poll.records from 500? Also, is there a way to see the default consumer properties that Spring Kafka is applying?
    52 replies
    Jack Wilkinson
    @guacjack
    Also, how does spring-kafka deal with gracefully shutting down a consumer? If for example a sigterm is sent, does it process all existing records from the last poll before exiting?
    11 replies
    Jack Wilkinson
    @guacjack

    I can see from here https://stackoverflow.com/questions/50244744/spring-kafka-when-is-exactly-consumer-poll-called-behind-the-hood Gary has answered the question of "When is a Consumer's poll called behing the scenes" saying

    In 1.3 and above there is a single thread per consumer; the next poll() is performed after the last message from the previous poll has been processed by the listener.

    however, if there are 0 messages retrieved from Kafka so basically nothing on the queue to process, how often does it poll then?

    2 replies
    Pritam Roy
    @pritamroy:matrix.org
    [m]

    Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.

    What I'm doing now is as follow:

    topics = "topic",
    containerFactory = "configureKafkaListenerContainerFactory",
    errorHandler = "kafkaConsumerErrorHandler")
    public void consume(
    ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
    throws Exception {
    UploadService.upload(record.value());
    acknowledgment.acknowledge();
    }

    Can anyone help me to do it in a better way, please?

    Pritam Roy
    @pritamroy:matrix.org
    [m]

    :point_up: Edit: Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.

    What I'm doing now is as follow:

    
    @KafkaListener(
    topics = "topic",
    containerFactory = "configureKafkaListenerContainerFactory",
    errorHandler = "kafkaConsumerErrorHandler")
    public void consume(
    ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
    throws Exception {
    UploadService.upload(record.value());
    acknowledgment.acknowledge();
    }
    
    Can anyone help me to do it in a better way, please?
    7 replies
    plmakas
    @plmakas
    Hi, I'm deploying an application with @KafkaListener to Kubernetes. When there's an upgrade, the pods that are being terminated start to flood logs with warning "Connection to node xxx could not be established. Broker may not be available." until they are fully terminated. This makes browsing logs difficult in my centralized logging solution. Is there a way to avoid this warning in my case?
    1 reply
    Pritam Roy
    @pritamroy:matrix.org
    [m]
    Thanks @artembilan @garyrussell for your help.
    Rahul
    @rahul-raj-1
    Hello Team,
    What's the best strategy to read data from multiple topics .I have to read data from 5 topics and perform different processing based on message. So should I have a 1 KafkaListenerContainerFactory or 5 KafkaListenerContainerFactory (i.e 1 for each topic ).is there any recommendation from Spring Kafka?
    3 replies
    winkidzz
    @winkidzz
    Hello, I am using spring boot kafka to create streaming application. The spring:kafka:streams:state-dir property in application.yaml does not get picked up by spring kafka. any help>
    29 replies
    Luigi Cerone
    @LuigiCerone
    Hello everyone, what is the best way to gracefully shutdown Kafka Streams created with this library?
    I have a config class annotated with @Configuration @EnableKafkaStreams. This class creates two beans of type KStream<String, String>. I've configured a shutdown hook which call (inside a @PreDestroy callback) the method StreamsBuilderFactoryBean::close() (StreamsBuilderFactoryBean is injected with @Autowire). Is this the right approach? thanks.
    5 replies
    Igor Rudenko
    @fac30ff
    Hi here. I have a question about one thing: I've confluent with avro schema registry, and have issue with 500 error when retreiving schema. And support say that happened because of absent schema cashing in kafka client. So my question is schema cashing present in kafka client, if it present it swtiched on or off by default?
    2 replies
    Aleksey Vasin
    @youagree
    why callback onFailure not work, when i send message through kafkaTemplate, when topic not present(Error while fetching metadata with correlation id 43 : {test-topic=UNKNOWN_TOPIC_OR_PARTITION})
    28 replies
    `kafkaTemplate.send(
                    "test-topic", 1, "body"
            ).addCallback(
                    new ListenableFutureCallback<>() {
                        @Override
                        public void onFailure(Throwable throwable) {
                            System.out.println("failure");
                        }
    
                        @Override
                        public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
                            System.out.println("success");
                        }
                    }
            );`
    Ghenadii Batalski
    @ghenadiibatalski
    Hello, i'm receiving following warning: org.apache.kafka.clients.consumer.ConsumerConfig The configuration 'spring.json.trusted.packages' was supplied but isn't a known config. The consumer seems to work and the config looks like described here https://docs.spring.io/spring-boot/docs/2.4.5/reference/htmlsingle/#boot-features-kafka-extra-props
    Am i doing something wrong or is it a known/ignorable issue? Thx
    6 replies
    Ghenadii Batalski
    @ghenadiibatalski
    Hello, we are testing a disaster recovery now and facing interesting problem. We run some stateless springboot microservices with kafka on kubernetes. After the Kafka cluster was fully shut down an started again, nodes received new IP addresses. We use the DNS name of a broker-service as a bootstrap servers value. But the clients cannot reconnect to a broker after its restart because they cached the from now invalid nodes IP addresses. Is there any possibility to resolve the IP addresses again from a bootstrap servers name if the (re)connection can not be established? If not, is there any way to recognize such misbehaving condition and make the spring boot service unhealthy to be rebooted automatically by kubernetes? Thanks and regards, Gena
    4 replies
    Pietro Galassi
    @pietrogalassi
    How to configure a EOS (exactly once semantic) ?
    36 replies
    Deepak Emani
    @deepakemani-maersk
    hi, we are looking at the options for kafka consumer replaying messages based on timestamp. we use spring kafka in one consumer and reactive spring kafka in another consumer. Please point to any working examples for both type of services
    9 replies
    winkidzz
    @winkidzz
    Morning, Are there any known issues in converting avro to json and back to avro as part of monitoring?
    2 replies
    Виктор
    @Vichukano
    Hello! I need to recevie messages form one topic, process them and send to another. It's a correct way to ack messages after send was success, like this:
    @KafkaListener(topics = "in", containerFactory = "testFactory")
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String result = process(record);
    kafkaTemplate.send("out", result).addCallback(new ListenableFutureCallback<>() { 
                @Override
                public void onFailure(Throwable throwable) {
                LOGGER.error("Failed to send message: {}", result, throwable);
                }
                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                     LOGGER.debug("Successfully send message: {}", result);
                     ack.acknowledge();
                }
            });
        }
    5 replies
    Jack Wilkinson
    @guacjack
    Im currently trying to write a robust kafka producer using KafkaTemplate.send, does anyone have any good resources for how to handle the many exceptions that might be thrown from this call? I am calling .get() on to ensure its performed synchronously but i want to unit test all the possible scenarios
    1 reply
    Aleksey Vasin
    @youagree
    Hello, how is the best practise to fix this? I have one consumer group, which consume from different topics and catch this sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    2 replies
    thanks for answers
    Rahul
    @rahul-raj-1
    Is there anyway we can monitor consumer lag programmatically?
    1 reply
    Luigi Cerone
    @LuigiCerone
    Hello, is there any way I can delete a topic via @NewTopic annotation? Thanks
    4 replies
    venkatasreekanth
    @venkatasreekanth
    when configuring consumption of topics using kafka listener, some of the listeners fail with disconnect exception or timeout exception, is there a way to retry, the containers seems to be simply hanging in there doing nothing
    org.apache.kafka.common.errors.DisconnectException
    4 replies
    brightinnovator
    @brightinnovator

    I want to read 3 crore csv rows which is of 2GB csv file size and need to insert into MySQL via Java.

    Could someone please help me know the fastest and memory efficient way to avoid out of memory exception as well load in lesser time?

    Please kindly advise.

    Igor Rudenko
    @fac30ff
    Hi there, I have a problem Spring Kafka do not send kafka header with name Id and system time, how to override option to include this fields in header?
    1 reply
    springdev2022
    @springdev2022
    Can someone help me with links and suggestion to process csv file and send it to kafka topic via spring batch/integration?
    6 replies
    aaysenur
    @aaysenur

    Hi Everyone, I am new in Kafka and trying to test my application with embedded kafka my class config like below:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @DirtiesContext
    @EmbeddedKafka

    And I am using below versions:
    junit:4.13.2
    kafka:2.6.0
    kafka confluent avro serializer: 5.2.1

    But I got below error:
    Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1786)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:413)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:116)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:277)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:635)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:390)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:325)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
    ... 25 more
    Caused by: java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1285)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1272)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:315)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
    ... 34 more
    Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.client.ZKClientConfig
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 39 more

    Could somebody please help me if you faced with such an issue before. Thanks in advance

    1 reply
    Joe McCaffrey
    @JoeMcCaffrey

    hi - see that the next release has an upgrade to kafka 2.8.0. Should I expect a kafka mismatch with the latest 2.7 release?

    Having some weird errors like Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/kafka/common/record/BufferSupplier

    3 replies
    springdev2022
    @springdev2022
    Where i can find a good spring integration + kafka sample where to process a csv file and write it to DB or Kafka based on the configuration?
    7 replies
    brightinnovator
    @brightinnovator
    what are the open source tool is available to monitor/go through kafka messages ?
    Jack Wilkinson
    @guacjack

    Hey all, is there a way for me to know if the Kafka broker is disconnected in my consumer. It is currently set up and working great via the @KafkaListener annotation on a method.

    My use case is that if the Kafka broker has become disconnected, for example: it was once connected and working but now no longer is. Is there a way i can hook into that so a method is called so that i can log any appropriate details for the operations team?

    14 replies
    brightinnovator
    @brightinnovator
    what are the open source tools available to monitor/go through kafka messages ?
    Mukut Bhattacharjee
    @mukutbhattacharjee

    Hi devs

    I have setup a single node kafka using bitnami-kafka (https://github.com/bitnami/bitnami-docker-kafka/blob/master/docker-compose.yml) with some modifications as below

    version: "2"
    
    services:
      zookeeper:
        image: docker.io/bitnami/zookeeper:3.7
        ports:
          - "2181:2181"
        volumes:
          - "/var/data/bitnami:/bitnami"
        environment:
          - ALLOW_ANONYMOUS_LOGIN=yes
      kafka:
        image: docker.io/bitnami/kafka:2.7.0
        ports:
          - "9092:9092"
        volumes:
          - "/var/data/bitnami:/bitnami"
        environment:
          - KAFKA_BROKER_ID=1
          - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
          - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
          - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
          - ALLOW_PLAINTEXT_LISTENER=yes
        depends_on:
          - zookeeper

    Below is my application.yml for spring boot producer client

    spring:
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: "my-app-group"
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    But the producer is not able to connect to the broker with the folowing exception

    2021-07-29 10:25:30.139 ERROR 14455 --- [ad | producer-1] m.l.springkafka.producer.EventProducer   : Error in  publishing message. Root cause: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
    
    org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
        at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:620) ~[spring-kafka-2.7.4.jar:2.7.4]
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:872) ~[spring-kafka-2.7.4.jar:2.7.4]
        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1366) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:690) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:381) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.7.1.jar:na]
        at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
    Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation

    Whereas I am able to produce using kafka-console-producer --topic test --bootstrap-server localhost:9092 and consume successfully using kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

    Can any one please provide some inputs here? TIA

    1 reply
    brightinnovator
    @brightinnovator

    How to handle this specific use case in Spring Batch - Different Names?

    There is a CSV file with many records where the CSV header names and the DB domain object names are different. How to fetch the CSV data and create a model to persist in the database in Spring Batch? Below throws errors since the header and db or jpa names are different

    Example, employee.csv

    Employee Id, Employee Name, Employee Address, Employee Address 2, Date Of Birth
    Domain Object Employee.java

    public string empId;
    public string empName;
    public string empAddress;
    public string empAddress2;
    public string empDOB;
    Error thrown on the flat file reader

    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.StepExecutionListener;
    import org.springframework.batch.core.configuration.annotation.JobScope;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.mapping.FieldSetMapper;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.stereotype.Component;
    import org.springframework.validation.BindException;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.batch.item.file.transform.FieldSet;

    @Component
    @JobScope
    public class EmployeeFlatFileItemReader extends FlatFileItemReader<Employee> implements StepExecutionListener {

    private String fileName;
    private String filePath;

    @Override
    public void beforeStep(StepExecution stepExecution) {
    fileName = (String) stepExecution.getJobExecution().getJobParameters().getString("fileName");
    filePath = (String) stepExecution.getJobExecution().getJobParameters().getString("filePath");
    setResource(new FileSystemResource(filePath));
    }

    public EmployeeFlatFileItemReader() {
    try {
    //init();
    DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>() {
    {
    setLineTokenizer(new DelimitedLineTokenizer() {
    {
    setNames("Employee Id" + "," + "Employee Name" + "," + "Employee Address" + "," + "Employee Address 2" + "," + "Date Of Birth");
    }
    });
    setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
    {
    setTargetType(Employee.class); // Throws error
    }
    });
    }
    };
    setLineMapper(lineMapper);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
    return stepExecution.getExitStatus();
    }

    }

    SauriBabu
    @SauriBabu
    I am using Spring-kafka in production code which is making rest call to external service. Now while writing integration test using EmbeddedKafka server I am using WireMock server to mock external API call. My test case is running in main thread and kafka consumer is in its own thread so dependency set using wiremock is not getting picked up. Can anyone point me in some direction to handle this threading issue ?
    Zane XiaoYe
    @Zane-XY
    Hi, is there any configuration needs to be done in order to get OpenTelemetry instrumentation working?
    1 reply
    Rajh
    @Rajh
    Hi, I see there is now @RetryableTopic feature. But can it be triggered manually to retry just a sub list of messages from a batch ?
    1 reply
    learningdeveloper2021
    @learningdeveloper2021

    Registering error with row details + line number + error message

    I'm processing a csv file using spring batch. I want to registering error with row details + line number + error message on all the steps wherever the error has been occured. The error may occur in reading stage or file input stage or processing stage or writing stage.

    How to do this? Best practice to do this?

    Thanks

    1 reply
    Akshansh Jain
    @akshanshjain95
    Hi, I have a use case where I need to define topics of my consumers at runtime. Also, other consumer configs are also provided at runtime. So I am creating a ConcurrentMessageListenerContainer and starting the container. I'm using this container as I need to define the concurrency as well. Now, I also want my consumers to consume messages in a batch. Is that possible with this implementation?
    3 replies
    Zane XiaoYe
    @Zane-XY
    I want to test with @EmbededKafaka,
    I’m assuming by puting this annotition on the top of my test case,
    all my producer, consumer configuration in the main/java package will be pointing to this in-memory Kafka right?
    I don’t need to re-define all the consumer/producer in test/java for this embeded kafka, right?
    5 replies
    Akshansh Jain
    @akshanshjain95
    Hi, I have a use case where I want to increase/decrease number of consumers at runtime for a specific topic. As mentioned in the answer here - https://stackoverflow.com/questions/65058534/is-there-a-way-to-update-the-number-of-concurrency-in-concurrentmessagelistenerc, I can change the concurrency by stopping and starting the container. Here, if I use multiple KafkaMessageListenerContainers instead of ConcurrentMessageListenerContainer, and just increase/decrease these containers, will that be a better approach, considering I won't have to stop/start in this case.
    2 replies
    Zane XiaoYe
    @Zane-XY
    What does bootstrapServersProperty = "spring.kafka.bootstrap-servers” mean? Is spring.kafka.bootstrap-servers a special reserved address?
    1 reply
    Jack Wilkinson
    @guacjack

    Does anyone have any links to any useful guides for using Kafka Streams within Spring Boot?

    Also, do you have to use Spring Cloud Streams when using Kafka Streams within Spring?

    1 reply
    Zane XiaoYe
    @Zane-XY
    Get the "Topic ... not present in metadata after 60000 ms.” error while producing to the EmbededKafka. The broker server is set to “localhost:9092”.
    1 reply