Hi there, I am trying to implement Circuit Breaker with Resillience4J around Spring Cloud Stream Kafka Binder.
@Bean
public java.util.function.Consumer<Message<SomePojo>> process() {
return message -> {
var consumer = message.getHeaders().get(KafkaHeaders.CONSUMER, Consumer.class);
var acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
var currentState = circuitBreakerRegistry.circuitBreaker(BACKEND_SERVICE).getState();
if (currentState == CircuitBreaker.State.OPEN) {
var topic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class);
var partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
if (acknowledgment != null) {
acknowledgment.nack(10);
}
}
// some business logic
if (acknowledgment != null) {
log.debug("Acknowledged");
acknowledgment.acknowledge();
}
};
}
The above solution works for one message in the topic. When I have multiple messages, the app fails to poll after around 15 minutes. Did I miss anything?
Thank you.
Hey all just got a quick query and hoping one of you might be able to see where i've gone wrong.
in my application.properties i have
spring.kafka.bootstrap-servers=http://atesturl.com:9092
and then i have application-test.properties i have
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Now when i run my actual tests with @ActiveProfiles("test")
, i can see that the app runs, with the embeddedkafka broker and i can see the @KafkaListeners
being called correctly after publishing messages....but i can see in the logs that it is firstly trying to connect using the "http://atesturl.com:9092" in my default application.properties
So i end up with this in the logs
Failed authentication with http://atesturl.com:9092/{the ip address} (Authentication failed)
but then afterwards it seems to use the embedded kafka version. Its almost like its starting up in the application.properties
version before being eventually overriden with the one in the application-test.properties
file
I can see from here https://stackoverflow.com/questions/50244744/spring-kafka-when-is-exactly-consumer-poll-called-behind-the-hood Gary has answered the question of "When is a Consumer's poll called behing the scenes" saying
In 1.3 and above there is a single thread per consumer; the next poll() is performed after the last message from the previous poll has been processed by the listener.
however, if there are 0 messages retrieved from Kafka so basically nothing on the queue to process, how often does it poll then?
Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.
What I'm doing now is as follow:
topics = "topic",
containerFactory = "configureKafkaListenerContainerFactory",
errorHandler = "kafkaConsumerErrorHandler")
public void consume(
ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
throws Exception {
UploadService.upload(record.value());
acknowledgment.acknowledge();
}
Can anyone help me to do it in a better way, please?
:point_up: Edit: Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.
What I'm doing now is as follow:
@KafkaListener(
topics = "topic",
containerFactory = "configureKafkaListenerContainerFactory",
errorHandler = "kafkaConsumerErrorHandler")
public void consume(
ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
throws Exception {
UploadService.upload(record.value());
acknowledgment.acknowledge();
}
Can anyone help me to do it in a better way, please?
@Configuration @EnableKafkaStreams
. This class creates two beans of type KStream<String, String>. I've configured a shutdown hook which call (inside a @PreDestroy
callback) the method StreamsBuilderFactoryBean::close()
(StreamsBuilderFactoryBean is injected with @Autowire
). Is this the right approach? thanks.
`kafkaTemplate.send(
"test-topic", 1, "body"
).addCallback(
new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("failure");
}
@Override
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
System.out.println("success");
}
}
);`
org.apache.kafka.clients.consumer.ConsumerConfig The configuration 'spring.json.trusted.packages' was supplied but isn't a known config.
The consumer seems to work and the config looks like described here https://docs.spring.io/spring-boot/docs/2.4.5/reference/htmlsingle/#boot-features-kafka-extra-props@KafkaListener(topics = "in", containerFactory = "testFactory")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String result = process(record);
kafkaTemplate.send("out", result).addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
LOGGER.error("Failed to send message: {}", result, throwable);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
LOGGER.debug("Successfully send message: {}", result);
ack.acknowledge();
}
});
}
.get()
on to ensure its performed synchronously but i want to unit test all the possible scenarios
sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.common.errors.DisconnectException
Hi Everyone, I am new in Kafka and trying to test my application with embedded kafka my class config like below:
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka
And I am using below versions:
junit:4.13.2
kafka:2.6.0
kafka confluent avro serializer: 5.2.1
But I got below error:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1786)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:413)
at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:116)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:277)
at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:635)
at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:390)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
... 25 more
Caused by: java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1285)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1272)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:315)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
... 34 more
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.client.ZKClientConfig
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 39 more
Could somebody please help me if you faced with such an issue before. Thanks in advance
hi - see that the next release has an upgrade to kafka 2.8.0. Should I expect a kafka mismatch with the latest 2.7 release?
Having some weird errors like Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/kafka/common/record/BufferSupplier
Hey all, is there a way for me to know if the Kafka broker is disconnected in my consumer. It is currently set up and working great via the @KafkaListener
annotation on a method.
My use case is that if the Kafka broker has become disconnected, for example: it was once connected and working but now no longer is. Is there a way i can hook into that so a method is called so that i can log any appropriate details for the operations team?
Hi devs
I have setup a single node kafka using bitnami-kafka (https://github.com/bitnami/bitnami-docker-kafka/blob/master/docker-compose.yml) with some modifications as below
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
volumes:
- "/var/data/bitnami:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:2.7.0
ports:
- "9092:9092"
volumes:
- "/var/data/bitnami:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
Below is my application.yml for spring boot producer client
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: "my-app-group"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
But the producer is not able to connect to the broker with the folowing exception
2021-07-29 10:25:30.139 ERROR 14455 --- [ad | producer-1] m.l.springkafka.producer.EventProducer : Error in publishing message. Root cause: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:620) ~[spring-kafka-2.7.4.jar:2.7.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:872) ~[spring-kafka-2.7.4.jar:2.7.4]
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1366) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:690) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:381) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.7.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
Whereas I am able to produce using kafka-console-producer --topic test --bootstrap-server localhost:9092
and consume successfully using kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Can any one please provide some inputs here? TIA
How to handle this specific use case in Spring Batch - Different Names?
There is a CSV file with many records where the CSV header names and the DB domain object names are different. How to fetch the CSV data and create a model to persist in the database in Spring Batch? Below throws errors since the header and db or jpa names are different
Example, employee.csv
Employee Id, Employee Name, Employee Address, Employee Address 2, Date Of Birth
Domain Object Employee.java
public string empId;
public string empName;
public string empAddress;
public string empAddress2;
public string empDOB;
Error thrown on the flat file reader
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.core.io.FileSystemResource;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
@Component
@JobScope
public class EmployeeFlatFileItemReader extends FlatFileItemReader<Employee> implements StepExecutionListener {
private String fileName;
private String filePath;
@Override
public void beforeStep(StepExecution stepExecution) {
fileName = (String) stepExecution.getJobExecution().getJobParameters().getString("fileName");
filePath = (String) stepExecution.getJobExecution().getJobParameters().getString("filePath");
setResource(new FileSystemResource(filePath));
}
public EmployeeFlatFileItemReader() {
try {
//init();
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames("Employee Id" + "," + "Employee Name" + "," + "Employee Address" + "," + "Employee Address 2" + "," + "Date Of Birth");
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
setTargetType(Employee.class); // Throws error
}
});
}
};
setLineMapper(lineMapper);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
}
Registering error with row details + line number + error message
I'm processing a csv file using spring batch. I want to registering error with row details + line number + error message on all the steps wherever the error has been occured. The error may occur in reading stage or file input stage or processing stage or writing stage.
How to do this? Best practice to do this?
Thanks
ConcurrentMessageListenerContainer
and starting the container. I'm using this container as I need to define the concurrency as well. Now, I also want my consumers to consume messages in a batch. Is that possible with this implementation?