@KafkaListener(topics = "in", containerFactory = "testFactory")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String result = process(record);
kafkaTemplate.send("out", result).addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
LOGGER.error("Failed to send message: {}", result, throwable);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
LOGGER.debug("Successfully send message: {}", result);
ack.acknowledge();
}
});
}
.get()
on to ensure its performed synchronously but i want to unit test all the possible scenarios
sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.common.errors.DisconnectException
Hi Everyone, I am new in Kafka and trying to test my application with embedded kafka my class config like below:
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka
And I am using below versions:
junit:4.13.2
kafka:2.6.0
kafka confluent avro serializer: 5.2.1
But I got below error:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1786)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:413)
at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:116)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:277)
at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:635)
at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:390)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
... 25 more
Caused by: java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1285)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1272)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:315)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
... 34 more
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.client.ZKClientConfig
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 39 more
Could somebody please help me if you faced with such an issue before. Thanks in advance
hi - see that the next release has an upgrade to kafka 2.8.0. Should I expect a kafka mismatch with the latest 2.7 release?
Having some weird errors like Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/kafka/common/record/BufferSupplier
Hey all, is there a way for me to know if the Kafka broker is disconnected in my consumer. It is currently set up and working great via the @KafkaListener
annotation on a method.
My use case is that if the Kafka broker has become disconnected, for example: it was once connected and working but now no longer is. Is there a way i can hook into that so a method is called so that i can log any appropriate details for the operations team?
Hi devs
I have setup a single node kafka using bitnami-kafka (https://github.com/bitnami/bitnami-docker-kafka/blob/master/docker-compose.yml) with some modifications as below
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
volumes:
- "/var/data/bitnami:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:2.7.0
ports:
- "9092:9092"
volumes:
- "/var/data/bitnami:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
Below is my application.yml for spring boot producer client
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: "my-app-group"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
But the producer is not able to connect to the broker with the folowing exception
2021-07-29 10:25:30.139 ERROR 14455 --- [ad | producer-1] m.l.springkafka.producer.EventProducer : Error in publishing message. Root cause: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:620) ~[spring-kafka-2.7.4.jar:2.7.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:872) ~[spring-kafka-2.7.4.jar:2.7.4]
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1366) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:690) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:381) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.7.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
Whereas I am able to produce using kafka-console-producer --topic test --bootstrap-server localhost:9092
and consume successfully using kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Can any one please provide some inputs here? TIA
How to handle this specific use case in Spring Batch - Different Names?
There is a CSV file with many records where the CSV header names and the DB domain object names are different. How to fetch the CSV data and create a model to persist in the database in Spring Batch? Below throws errors since the header and db or jpa names are different
Example, employee.csv
Employee Id, Employee Name, Employee Address, Employee Address 2, Date Of Birth
Domain Object Employee.java
public string empId;
public string empName;
public string empAddress;
public string empAddress2;
public string empDOB;
Error thrown on the flat file reader
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.core.io.FileSystemResource;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
@Component
@JobScope
public class EmployeeFlatFileItemReader extends FlatFileItemReader<Employee> implements StepExecutionListener {
private String fileName;
private String filePath;
@Override
public void beforeStep(StepExecution stepExecution) {
fileName = (String) stepExecution.getJobExecution().getJobParameters().getString("fileName");
filePath = (String) stepExecution.getJobExecution().getJobParameters().getString("filePath");
setResource(new FileSystemResource(filePath));
}
public EmployeeFlatFileItemReader() {
try {
//init();
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames("Employee Id" + "," + "Employee Name" + "," + "Employee Address" + "," + "Employee Address 2" + "," + "Date Of Birth");
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
setTargetType(Employee.class); // Throws error
}
});
}
};
setLineMapper(lineMapper);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
}
Registering error with row details + line number + error message
I'm processing a csv file using spring batch. I want to registering error with row details + line number + error message on all the steps wherever the error has been occured. The error may occur in reading stage or file input stage or processing stage or writing stage.
How to do this? Best practice to do this?
Thanks
ConcurrentMessageListenerContainer
and starting the container. I'm using this container as I need to define the concurrency as well. Now, I also want my consumers to consume messages in a batch. Is that possible with this implementation?
https://stackoverflow.com/questions/65058534/is-there-a-way-to-update-the-number-of-concurrency-in-concurrentmessagelistenerc
, I can change the concurrency by stopping and starting the container. Here, if I use multiple KafkaMessageListenerContainer
s instead of ConcurrentMessageListenerContainer
, and just increase/decrease these containers, will that be a better approach, considering I won't have to stop/start in this case.
Hi everyone.
I have an issue when my microservices get lost the connection with kafka servers, all microservices consume the 100% of CPU.
All works fine when reestablish the connection with kafka and i will try to secure the kafka server for never dies.
But i think this is a bug in the libraries and i find this link reported in kafka clients jira.
https://issues.apache.org/jira/browse/KAFKA-5766
But i don't know if only upgrade the kafka client fix the bug or this is a bug in spring-kafka too and i need to upgrade both libraries?.
this are my versions:
spring-kafka: 1.1.7.RELEASE
kafka-clients: kafka-clients-0.10.0.1.jar
My first approach is to upgrade only kafka clients to the latest stable zero versions 0.11.0.3.
If this not works, i will upgrade spring-kafka to latest stable of 1.X.X versions, 1.3.11.RELEASE.
Obviously the best practice is upgrade all to the latest version and this is the roadmap, but at this moment i need to patch this bug because i have a lot of microservices and i can't upgrade all libraries at the same time.
Anyone remembers something about this problem?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
return new KafkaStreamsConfiguration(config);
}
@Bean(name = B_STREAMS_BUILDER)
public FactoryBean<StreamsBuilder> myKStreamBuilder(
@Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration streamsConfig
) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
@Bean("streamA")
public KStream<?, ?> kStream(@Qualifier(DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilder kStreamBuilder) {
...
}
@Bean("streamB")
public KStream<?, ?> kStream(@Qualifier(B_STREAMS_BUILDER) StreamsBuilder kStreamBuilder) {
...
}
Hi, I'm trying to connect to an AWS MKS kafka cluster using IAM authentication, but my client isn't on EC2. I think I have to integrate this library https://github.com/aws/aws-msk-iam-auth, but the instructions the provide are for the Kafka client not Spring boot. It requires installing the following properties:
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM
# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
So I'm trying to adapt this to Spring Boot integration now, and I'm not sure how I can load the 'software.amazon.msk.auth.iam.IAMLoginModule' into the spring integration client. What properties would I use? Is this even possible?
Hi all. I've got an AVRO schema and generated my POJOs. I can consume the messages fine but when I want to turn the parent-POJO into a String with either ObjectMapper or ObjectWriter from Jackson, I get weird exceptions like Caused by: org.apache.avro.AvroRuntimeException: Not an array:
.
Any chance anybody has a fix for this?
Have a great weekend. Cheers
Hi again guys, I have a problem with the consumer objects usage, that it is leading to a ClassCastException, maybe you can throw some light into the issue.
First, I have this object generated by a schema (avro in this case), that is generated in the directory build/generated-main-avro-java/com/company/GeneratedObject.java
Then, I have the following KafkaListenerContainerFactory
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GeneratedObject>>
generatedAvroContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, GeneratedObject> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerAvroFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
With the consumer:
@KafkaListener(
topics = "topic",
containerFactory = "generatedAvroContainerFactory",
groupId = "${spring.kafka.consumer.group-id}"
)
public void consumer(ConsumerRecord<String, GeneratedObject> record, Acknowledgment acknowledgment) {
Messages arrive to the consumer, but right after I try to access any field of the object,record.value().getField()
I get
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.company.GeneratedObject (org.apache.avro.generic.GenericData$Record and com.company.GeneratedObject are in unnamed module of loader 'app')
Could it be I'm missing any kind of configuration? Or does not have it any relation with spring-kafka?
hello All, I have an issue with my consumer, I got this error :
o.a.k.c.c.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
I read about the fact that the time took by the message processing between the call to the poll() and the commit of the offset may be longer than auto.commit.interval.ms
, is it the only cause for this problème ?
How can I know if my consumer is using batch mode or not ?
here is my configuration (I'm using spring cloud stream, but I think this have nothing to do with spring cloud stream but the configuration of the consumer ) :
cloud:
stream:
function:
routing:
enabled: true
source: testProducer
bindings:
functionRouter-in-0:
destination: test-consumer-topic
group: test-consumer
testProducer-out-0:
destination: test-producer-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: 127.0.0.1:9092
auto-create-topics: false
headerMapperBeanName: kafkaHeaderMapper
configuration:
auto.offset.reset: earliest
producer-properties:
'[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
'[value.serializer]': org.springframework.kafka.support.serializer.JsonSerializer
bindings:
testProducer-out-0:
producer:
messageKeyExpression: headers['kafka_messageKey']