@Bean
public MessageSource<Object> jdbcMessageSource() throws SQLException {
JdbcPollingChannelAdapter a = new JdbcPollingChannelAdapter(odsDataSource(), "select msg from demo.integration");
return a;
}
@Bean
public IntegrationFlow dbpollerFlow() throws Exception{
return IntegrationFlows.from(jdbcMessageSource()
, c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.handle(printer)
.get();
}
@Configuration
@IntegrationComponentScan
class IntegrationConfiguration {
@Bean
@ServiceActivator(inputChannel = "pubSubChannel")
fun messageHandler(pubsubTemplate: PubSubTemplate): MessageHandler {
return PubSubMessageHandler(pubsubTemplate, "pub-sub-topic")
}
}
@MessagingGateway(defaultRequestChannel = "pubSubChannel")
interface RemoveTokenGateway {
fun removeToken(@Header("token") token: String)
}
but when I try to inject this into the server:
@Service
class MessageService(private val removeTokenGateway: RemoveTokenGateway) {
}
I get org.springframework.beans.factory.NoSuchBeanDefinitionException No bean named 'removeTokenChannel' available
name="mqToWebFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2"
or name="mqToWebFlow.subFlow#0.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#4"
, which aren't very descriptive. ;-) Any ideas?
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer#transferFilesFromRemoteToLocal
because I should read files from nested folder of smb shared folder.FileListFilter<SmbFile>
can not be using because org.springframework.integration.file.support.FileUtils#purgeUnwantedElements
static method is invoked before that and do skip nested folder (accept file only org.springframework.integration.smb.inbound.SmbInboundFileSynchronizer#isFile
).Pool has been closed exceptions
which seems to be cause of caching session factory returning closed sessions . THe way we were using the factories was with dynamic SI flows for sftp/ftp inbound adapters like here https://stackoverflow.com/questions/43916317/strategy-to-refresh-update-sessionfactory-in-spring-integration . Do let us know if we are missing anything
Hi guys, I don't understand the errorChannel mechanism.
I have defined a bean of type DirectChannel named 'errorChannel', I have created an IntegrationFlow from this errorChannel to perform error handling.
I have another flow created from an HttpInbound, when an error occurs in this flow I expect the message to be routed to my errorChannel but that's not the case.
Any idea what could have go wrong, or what I misunderstood?
I have an interesting scenario that's got me scratching my head.
My SI application accepts data via HTTP POST, transforms it, and writes to ActiveMQ. In the event that ActiveMQ is down, the caller still gets a 200. Behind the scenes, it sends the message to the error channel, writes to Mongo, and a separate retry process reads from Mongo and will attempt to push to ActiveMQ. It has been in production for 2+ years and works perfectly.
I am now in the process of converting this project to use Kafka instead of ActiveMQ and, unfortunately, it doesn't behave the same way. What I mean by that is when Kafka is down, it never makes it to the error channel and the caller ends up receiving a HTTP timeout.
The reason for the difference in behavior seems to stem from the fact that when sends to AMQ fail, JMS Exceptions are thrown and SI can easily pivot to the error channel. The Kafka producer, on the other hand, seems to throw exceptions internally but they never find their way back up stream. After a lengthy debugging session, I see KafkaProducerMessageHandler
call sendFuture = this.kafkaTemplate.send(producerRecord);
but it never returns, thus the subsequent call to processSendResult()
is never executed and it just sits there.
Is this a bug? or is there some other way I should be configuring the Kafka producer to get it to behave the way I want?
Flux.from(reactiveSource()).map(Message::getPayload)
and replace with Flux.just("data")
it returns straight away. Somewhere the message cannot be read of the queue, but I am not sure why. Any ideas?package id.verifymy.spring.integration.accumulators.config;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.messaging.Message;
import id.verifymy.spring.integration.accumulators.model.SimpleMessage;
import reactor.core.publisher.Flux;
@Configuration
@EnableIntegration
public class SpringIntegrationConfiguration {
@Autowired
IntegrationFlowContext integrationFlowContext;
@Bean
public Publisher<Message<Object>> reactiveSource() {
return IntegrationFlows.from(messagesPerCustomerQueue()).log().toReactivePublisher();
}
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.from(WebFlux.inboundGateway("/test").requestMapping(r -> {
r.methods(HttpMethod.POST);
r.produces(MediaType.APPLICATION_JSON_VALUE);
}).requestPayloadType(SimpleMessage.class)).channel(messagesPerCustomerQueue())
.handle((p, h) -> Flux.from(reactiveSource()).map(Message::getPayload)
// Flux.just("data")
).log().bridge().log().get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedDelay(0);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue().get();
}
}
@Configuration
@EnableIntegration
@EnableIntegrationMBeanExport(server = "mbeanServer", managedComponents = "input")
public class ContextConfiguration {
@Bean
public MBeanServerFactoryBean mbeanServer() {
return new MBeanServerFactoryBean();
}
}
channel(MessageChannels.executor("executorChannel", threadpoolTaskExecutorBean)
. why I cant see it in JMC? org.springframework.integration >> MessageChannel >> executorChannel attributes
I have a jms outbound channel adapter. I would like to send a message to my Apache Artemis Broker and add a delay.
<int-jms:outbound-channel-adapter connection-factory="scheduledConnectionFactory" channel="tnpScheduledOutboundChannel" destination="tnpScheduledQueue" />
My java code that adds the header to the spring integration message is:
return MessageBuilder.withPayload(sdpInfo).setHeader("_AMQ_SCHED_DELIVERY",sdpInfo.getDelay()).build();
The header is added as a spring integration header. But its not picked up by JMS. Do i need to add some kind of JmsHeaderMapper?
Could someone point to some documentation or example.
Hi, I'm trying to validate incoming Amqp
message based on some required headers and in case of the error put it to dlq similar to what spring-cloud-stream does. scs creates .dlq
for every queue and puts error messages to the .dlq
wondering how may I achieve same behaviour with spring integration amqp component?
Any help is much appreciated
here is a sample kotlin code without validation:
@Bean
fun customerEventFlow(
connectionFactory: ConnectionFactory,
consumerRetryTemplate: RetryTemplate,
amqpTemplate: AmqpTemplate
): IntegrationFlow {
val inboundGateway = Amqp.inboundGateway(connectionFactory, amqpTemplate, qName)
.retryTemplate(consumerRetryTemplate)
return IntegrationFlows
.from(inboundGateway)
.log<Message<*>> { log.info("{}", it) }
.channel(subscriptionInChannel)
.get()
}
If I have a tcp inbound adapter, where I can receive messages containing different types JSON data, what would be the best way to deal with routing these messages to the correct handler?
The alternatives I've considered are:
channelMapping
or subFlowMapping
to direct messages to appropriate handlersCould anyone give some advice on which method would be preferable?
Or if there are other, better methods which I have not considered?
I am using kotlin DSL if it matters.
Here's a short example of what I have so far just handling one type of incoming message:
@Bean
fun connectionFactory(): TcpClientConnectionFactorySpec =
Tcp.netClient(properties.baseUrl, properties.port)
.serializer(...)
.deserializer(...)
@Bean
fun tcpOut() = integrationFlow(channels.outboundChannel()) {
transform(Transformers.toJson())
handle(Tcp.outboundAdapter(connectionFactory()))
}
@Bean
fun tcpIn() = integrationFlow(Tcp.inboundAdapter(connectionFactory())) {
transform(Transformers.objectToString())
transform(Transformers.fromJson(AuthenticationResponse::class.java))
handle(apiService)
}
@Configuration
class ChannelsConfiguration {
@Bean
fun outboundChannel(): DirectChannel = MessageChannels.direct().get()
}
@Service
class ApiService(
private val outboundChannel: DirectChannel
) {
fun authenticationResponseHandler(response: AuthenticationResponse, headers: MessageHeaders) {
...
}
fun authenticate(authenticationRequest: AuthenticationRequest) {
outboundChannel.send(GenericMessage(authenticationRequest))
}
}
MessageProducerSupport
-based inbound adapter. The obvious solution is to use @RefreshScope
on the beans that require updated credentials; however that won't affect the already-started stream. For the adapter to correctly refresh, doStop()
and doStart()
would need to be invoked. Is there any built-in integration between MessageProducerSupport.doStart() and doStop() and Spring Cloud Config refreshable context? Or does the end-user application need to listen to context refresh event and manually restart the inbound adapter?
Hello, can anyone help me / give me advice how to setup the following;
I have a SFTP server which i'm polling (every 10sec?) for a new file, than I'll retrieve this file locally, and invoke a handler method (e.g. to read the file / create a flux of elements, save it to the local database, etc)
Afterwards I want to pass along that flux of elements to another server (using RSocketOutboundGateway??)
so the second service retrieves that same flux of elements, and can do some processing as well.
Currently I have the following setup;
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(excelRemoteDirectory);
fileSynchronizer.setPreserveTimestamp(true);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("testdennis*.xlsx"));
return fileSynchronizer;
}
@Bean(name = "directChannel")
public MessageChannel directChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(channel = "directChannel", poller = @Poller(fixedDelay = "10000"))
public MessageSource<File> myMessageSource() {
SftpInboundFileSynchronizingMessageSource messageSource = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
messageSource.setLocalDirectory(new File("sftp-inbound"));
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalFilter(new SimplePatternFileListFilter("testdennis*.xlsx"));
return messageSource;
}
@Bean(name = "fluxMessageChannel")
public MessageChannel fluxMessageChannel() {
return new FluxMessageChannel();
}
@Bean
@Gateway(requestChannel = "fluxMessageChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway = new RSocketOutboundGateway("my-rsocket-route");
rsocketOutboundGateway.setInteractionModel(RSocketInteractionModel.requestChannel);
rsocketOutboundGateway.setExpectedResponseType(Void.class);
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
return new ClientRSocketConnector(URI.create("ws://localhost:8880"));
}
With the following (intermediate) handler / service activator defined;
@Service
public class ExcelFileHandler {
@ServiceActivator(inputChannel = "directChannel", outputChannel = "fluxMessageChannel")
public Flux<MyElement> handleMessage(Message<File> message) {
File file = message.getPayload();
log.info("Handling excel file '{}'", file.getName());
return Flux.just(....);
}
}
fluxMessageChannel -> doesn't have subscribers to accept messages
But I'm not aware how to fix it in this setup..
Hi! I am new to Spring Integration and I am trying to integrate it (no pun intended) with Spring Cloud Task and Spring Batch. I would like to have a task that registers an integrationflow and bind the lifecycle of the tasklet to the very same flow. The flow is just performing a finite forwarding. What is the closest mechanism to an imaginary but convenient arch like the following?:
IntegrationFlowRegistration myLoaderFlow = this.flowContext.registration(tmpFlow).register();
myLoaderFlow.startAndRunWhile(outputChannel -> outputChannel.idleSinceLastEmission() = 15 min);
With this I would enable my 2 particular needs. Make the lifecycle of the task binded to that of the flow (the startAndRunWhile part) and being able to define a condition that signals the end of the flow (the idleSinceLastEmission part)
onError(Throwable t) { }
with a blank method body. Cause it is swallows the exceptions completely.WebFlux.outboundGateway
where I would like to handle server-side errors (404, 503, ...). I tried the pattern from the question in https://stackoverflow.com/questions/48452591/spring-integration-webflux-error-handling, but the doOnError
handler never gets called. What do I need to do so that the Gateway really emits a Mono
and the exception can be handled downstream?
Hi.
I have a case where I need to listen to a S3 Bucket and when a new file is there, I need to consume that file, read and process. It's CSV files.
Everyday I'll receive like 750k CSVs files in a short time, to process the files I was using Spring Batch due to I need some concerns with perfomance and also the possibility to restart the processing where it stopped.
And for pooling the S3 bucket I'm using Spring Integration, but now I'm trying to combine both.
For each file start a job.
I found this: Spring Batch Integration
I know that is a Spring Batch project and here it's about integration, but I didn't find so many examples and also the tutorials and videos are older than 4 years.
Any recommendation about my case?
I'd like to know if I'm doing in the right way.
Hi again.
Based on my previous question I followed S3 Pooling docs with spring-integration-aws and also spring batch integration to run my jobs and I got some results, but I getting some errors.
I created a question in Stack overflow with more details: Question
I really appreciate any help.