Spring AMQP - support for Spring programming model with AMQP, especially but not limited to RabbitMQ
No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
Please learn how to use Gitter markdown (click the M on the right). You need to surround your code with three back-ticks (fences) on separate lines before and after the code).
What is
customPublisherSubscriber.receiveMessageFromResponseQ(
? I don't see that either in your code or the test.
correlationKey
in the rabbit template in order to use custom correlation header.I have a multi-threaded Spring Boot batch job which appears to be writing messages out-of-order to a RabbitMQ queue, even though the calling code is sequential. E.g. the job's main thread writes message1, before spawning a second thread that writes message2. Anyway, I'm trying to understand the options for guaranteeing that messages are written in order:
1) setChannelTransacted(true)
- doesn't help guarantee order?
2) use scoped operations - the code for writing messages is spread out across the steps of my batch job so it's hard to find a good transaction boundary, for a single invoke()
call;
3) use ThreadChannelConnectionFactory
- only helps guarantee order within a single thread, not across multiple threads;
4) is there some way I can force the RabbitTemplate (which is shared across threads) to use a single channel/connection for every message it writes, even if it's much slower?
I need help. Could any of you please help me understand what's going on here:
This is what I get...
May 08 21:35:44Z srv01 app_name INFO o.s.a.r.c.CachingConnectionFactory: Attempting to connect to: [127.0.0.1:5672]
May 08 21:35:44Z srv01 app_name INFO o.s.a.r.l.SimpleMessageListenerContainer: Restarting Consumer@f05ac43: tags=[[]], channel=null, acknowledgeMode=MANUAL local queue size=0
May 08 21:35:44Z srv01 app_name WARN c.r.c.i.ForgivingExceptionHandler: An unexpected connection driver error occured (Exception message: Socket closed)
May 08 21:35:49Z srv01 app_name WARN c.r.c.i.ForgivingExceptionHandler: An unexpected connection driver error occured (Exception message: Socket closed)
May 08 21:35:49Z srv01 app_name INFO o.s.a.r.c.CachingConnectionFactory: Attempting to connect to: [127.0.0.1:5672]
May 08 21:35:55Z srv01 app_name INFO o.s.a.r.l.SimpleMessageListenerContainer: Restarting Consumer@48b7af3f: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
May 08 21:35:55Z srv01 app_name ERROR o.s.a.r.l.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s).#012org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException#012#011at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:73)#012#011at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:524)#012#011at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:751)#012#011at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)#012#011at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2089)#012#011at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062)#012#011at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2042)#012#011at org.spring...
but then... It connects
May 08 21:35:55Z srv01 app_name INFO o.s.a.r.c.CachingConnectionFactory: Attempting to connect to: [127.0.0.1:5672]
May 08 21:35:55Z srv01 app_name WARN c.r.c.i.ForgivingExceptionHandler: An unexpected connection driver error occured (Exception message: Socket closed)
May 08 21:36:00Z srv01 app_name INFO o.s.a.r.c.CachingConnectionFactory: Created new connection: rabbitConnectionFactory#5a08b084:187/SimpleConnection@171a3a0c [delegate=amqp://user@127.0.0.1:5672/, localPort= 59784]
May 08 21:53:35Z srv02 app_name INFO o.a.c.c.C.[Tomcat].[localhost].[/]: Initializing Spring DispatcherServlet 'dispatcherServlet'
May 08 21:53:36Z srv02 app_name INFO o.s.web.servlet.DispatcherServlet: Initializing Servlet 'dispatcherServlet'
May 08 21:53:36Z srv02 app_name INFO o.s.web.servlet.DispatcherServlet: Completed initialization in 24 ms
Amqp.inboundAdapter
I use in my spring integration flow?Amqp.inboundAdapter
and consumer priorities: https://www.rabbitmq.com/consumer-priority.html
Hey Team,
I am trying to write integration testcase using testContainer for RabbitMQ using RabbitMQContainer for rabbittemplate.convertAndSend. Everything works fine for a consumer application where connection has been established at app startup. But for a producer app, where connection gets established when convertAndSend mehtod call - .I am facing connection refused error.
'@SpringBootTest(properties = "spring.main.allow-bean-definition-overriding=true", webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles(profiles = "test")
@ContextConfiguration(initializers = {EventIT.Initializer.class})
@Testcontainers
public class EventIT {
@Container
private static final RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3-management")
.withExposedPorts(5672, 15672).waitingFor(Wait.forLogMessage(".*Server startup complete.*",1));
@Autowired
private RabbitTemplate rabbitTemplate;
@DisplayName("MessagePost")
@Test
public void testProduceMessage() {
Assertions.assertTrue(rabbit.isRunning());
rabbitTemplate.convertAndSend("New message");
}
public static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
System.out.println("Initialize Padma");
TestPropertyValues.of(
"spring.rabbitmq.host=" + rabbit.getContainerIpAddress(),
"spring.rabbitmq.port=" + rabbit.getMappedPort(5672)
).applyTo(configurableApplicationContext);
}
}
}'
Hi spring-amqp team,
I am using @RabbitListener to create a consumer but I don't want to force the queue to already exist. When the app starts, if the queue doesn't exist a com.rabbitmq.client.ShutdownSignalException is thrown with the following error message:
channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'some-queue' in vhost '/', class-id=50, method-id=10)
I know one simple solution is to make sure the queue already exists, another is to create the queue in the config using @Bean, but I don't want either solution because I don't want the queue to exist in some situations. So all I want is to be able to catch whatever exception is thrown by @RabbitListener and just ignore it, how can I accomplish this please?
Thank you very much in advance for your insight and help!
null
in the binding, although perhaps programmatically it is possible.
Hello, I'm having a trouble using selector. I'm using JMS with azure's service bus. This is what I did
@Component
@Slf4j
public class MessageTestRunner implements CommandLineRunner {
private static final String QUEUE_NAME = "lva-test-queue";
private static final String PING_SELECTOR = "selector = 'PING'";
private static final String PONG_SELECTOR = "selector = 'PONG'";
private static final String SB_SCHEDULED_ENQUEUE_HEADER = "x-opt-scheduled-enqueue-time";
private final JmsTemplate jmsTemplate;
public MessageTestRunner(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Override
public void run(String... args) throws Exception {
jmsTemplate.convertAndSend(QUEUE_NAME, ping(), m -> {
m.setStringProperty("selector", "PING");
return m;
});
jmsTemplate.convertAndSend(QUEUE_NAME, pong(), m -> {
m.setStringProperty("selector", "PONG");
return m;
});
}
private PingMessage ping() {
final PingMessage msg = new PingMessage();
msg.setAt(ZonedDateTime.now());
return msg;
}
private PongMessage pong() {
final PongMessage msg = new PongMessage();
msg.setAt(ZonedDateTime.now());
return msg;
}
@JmsListener(destination = QUEUE_NAME, selector = PING_SELECTOR, containerFactory = JMS_FACTORY_NAME)
public void handle(PingMessage message) {
log.debug("Handling ping message [{}]", message);
}
@JmsListener(destination = QUEUE_NAME, selector = PONG_SELECTOR, containerFactory = JMS_FACTORY_NAME)
public void handle(PongMessage message) {
log.debug("Handling pong message [{}]", message);
}
}
however when I run this code I'm getting converting exception
org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method could not be invoked with incoming message
Endpoint handler details:
Method [public void com.example.MessageTestRunner.handle(com.example.data.model.PingMessage)]
Bean [com.example.MessageTestRunner@7aae1170]
; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.data.model.PongMessage] to [com.example.data.model.PingMessage] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@72eb85b7, failedMessage=org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@72eb85b7
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:118) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.10.jar:5.3.10]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerC
Hello,
I'm trying to implement some JUnits tests to cover a RabbitListener and MessagePostProcessor and I'm running into odd issues. I'm hoping that its just a simple configuration issue.
Here is a link to my spring-rabbit-test-example github example of this issue
The Spring Boot application runs successfully and delivers the messages as I would expect. However when I run the RabbitTests test cases, It throws the following exception:
I have searched all over for examples of using the spring-rabbit-test code but I haven't had much luck in finding a complete example, especially one that uses JUnit5
org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: No listener for thinghappened
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:82)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2192)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2138)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1065)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1130)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1123)
at com.example.junitstuff.RabbitTests.test(RabbitTests.java:46)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
Hi spring-amqp team,
I am currently using @Autowired to get a ConnectionFactory and then creating a RabbitTemplate as a @Bean by passing that autowired ConnectionFactory to it; I am also using @RabbitListener to create a listener. It seems that both publishing and consuming are using the same connection. Best practices suggest that I should use separate connections for publishing and consuming, could you please suggest how this can be done? Thank you very much in advance for your insight and help!
DOWN
in case of expired certificates, so the service is restarted eventually by the scheduler. we'll consider opening a feature request, because it would be much better if the service wouldnt have to be restarted in the first place
@Component
public class FactoryRefresher {
private final CachingConnectionFactory ccf;
private final RabbitListenerEndpointRegistry registry;
private final ExtendedFactoryBean extended = new ExtendedFactoryBean();
private SSLContext context;
FactoryRefresher(CachingConnectionFactory ccf, RabbitListenerEndpointRegistry registry,
@Value("${spring.rabbitmq.ssl.trust-store}") Resource trustStore,
@Value("${spring.rabbitmq.ssl.key-store}") Resource keyStore) {
this.ccf = ccf;
this.registry = registry;
this.extended.setTrustStoreResource(trustStore);
this.extended.setTrustStorePassphrase("secret");
this.extended.setKeyStoreResource(keyStore);
this.extended.setKeyStorePassphrase("secret");
ccf.getRabbitConnectionFactory().setSslContextFactory(this::context);
this.extended.setUpSSL();
}
private SSLContext context(String name) {
this.extended.setUpSSL();
return this.context;
}
/**
* Call this from an actuator to rebuild the SSLContext
*/
public void refreshConnection() {
this.registry.stop();
// stop any producers too, if possible
this.extended.setUpSSL();
this.ccf.resetConnection();
this.registry.start();
// restart producers
}
private class ExtendedFactoryBean extends RabbitConnectionFactoryBean {
@Override
protected void setUpSSL() {
try {
KeyManager[] keyManagers = configureKeyManagers();
TrustManager[] trustManagers = configureTrustManagers();
SSLContext context = createSSLContext();
context.init(keyManagers, trustManagers, null); // or a secure random
FactoryRefresher.this.context = context;
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
@RabbitListener
used to pick up on JSON deserialization issues (ie. we get a ListenerExecutionFailedException, with as cause MessageConversionException). Since 2.4.0, these errors are (by default?) not delivered anymore to this error handler. I noticed the docs also mention this is intended. I did not however see this behavior change in release or upgrade notes, nor if there perhaps is a way to get back the old behavior.
Hi, I have 3 problems in my Spring Boot Microservices example.
Here is my first question : https://stackoverflow.com/questions/73226572/docker-compose-issue-org-hibernate-hibernateexception-access-to-dialectresoluti
Here is my second question : https://stackoverflow.com/questions/73227538/rabbitmq-cannot-see-queue-and-cannot-send-message-from-one-service-to-another-in
Here is my third question : https://stackoverflow.com/questions/73228395/spring-cloud-api-gateway-cannot-access-to-url-of-a-service-through-api-gateway
I'm making an example with Spring Cloud.
I have problem on Docker side.
When I run docker-compose,
1) Keycloak,rabbitmq,mysql and config server are running.
2) Here is my error in Eureka Server
Connect Timeout Exception on Url - http://configserver:9191/. Will be trying the next u
rl if available
2022-08-17 11:39:08.699 WARN 1 --- [ main] o.s.c.c.c.ConfigServerConfigDataLoader : Could not locate PropertySource ([ConfigServerConfigDataResource@17fc391b uris = array<
String>['http://configserver:9191/'], optional = true, profiles = list['default']]): I/O error on GET request for "http://configserver:9191/discovery-server/default": Connection refuse
d (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
3 ) Here is my error in API Gateway
DiscoveryClient_API-GATEWAY/b522a17da5b3:api-gateway:8600: registering service...
2022-08-17 11:40:25.122 INFO 1 --- [nfoReplicator-0] c.n.d.s.t.d.RedirectingEurekaHttpClient : Request execution error. endpoint=DefaultEndpoint{ serviceUrl='http://localhost:8761/eu
reka/}, exception=I/O error on POST request for "http://localhost:8761/eureka/apps/API-GATEWAY": Connect to localhost:8761 [localhost/127.0.0.1] failed:
4 ) Here is my error in Management Service
Request execution failed with message: I/O error on PUT request for "http://eurekaserve
r:8761/eureka/apps/MANAGEMENT-SERVICE/2c9b83d65d89:management-service:9002": Connect to eurekaserver:8761 [eurekaserver/172.18.0.5] failed: Connection refused (Connection refused); nes
ted exception is org.apache.http.conn.HttpHostConnectException: Connect to eurekaserver:8761 [eurekaserver/172.18.0.5] failed: Connection refused
5) Here is my errors in User Service, Advertisement Service and Report Service
java.lang.ClassCastException: class org.hibernate.dialect.MySQL8Dialect cannot be cast to class java.sql.Driver (org.hibernate.dialect.MySQL8Dialect is in unnamed module of loader org.
springframework.boot.loader.LaunchedURLClassLoader @2096442d; java.sql.Driver is in module java.sql of loader 'platform')
Here is the project link : https://github.com/Rapter1990/SpringBootMicroservices
@Component
@RequiredArgsConstructor
public class RabbitBrokerBean implements MessagePublishable, MessageSubscribable, MessageListener {
private SimpleMessageListenerContainer container;
private RabbitAdmin rabbitAdmin;
private RabbitMQSetting setting;
@Override
public void initialize(Map<String, Object> optionMap, Set<String> systemIdSet) {
try {
setting = new RabbitMQSetting(optionMap, systemIdSet);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
var exchange = new FanoutExchange(setting.getExchangeName());
this.rabbitAdmin = new RabbitAdmin(connectionFactory);
this.rabbitAdmin.declareExchange(exchange);
for (String queueName : setting.getQueueSet()) {
var queue = new Queue(queueName, false);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
}
} catch (AmqpException e) {
throw new MessageBrokerException();
}
}