Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    V k k Sa
    @vkks_gitlab
    How to unit test the below. Note - I want to unit test using simple SpringJunit4...not full blown springboot
    @Bean
    public MessageSource<Object> jdbcMessageSource() throws SQLException {
       JdbcPollingChannelAdapter a = new JdbcPollingChannelAdapter(odsDataSource(), "select msg from demo.integration");
       return a;
    }
    
    @Bean
    public IntegrationFlow dbpollerFlow() throws Exception{        
        return IntegrationFlows.from(jdbcMessageSource()
                        , c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                    .handle(printer)
                    .get();                
    }
    128 replies
    V k k Sa
    @vkks_gitlab
    image.png
    Thomas Einwaller
    @tompson
    Hi, I am having troubles using the @MessagingGateway annotation in Kotlin
    my config looks like:
    @Configuration
    @IntegrationComponentScan
    class IntegrationConfiguration {
    
        @Bean
        @ServiceActivator(inputChannel = "pubSubChannel")
        fun messageHandler(pubsubTemplate: PubSubTemplate): MessageHandler {
            return PubSubMessageHandler(pubsubTemplate, "pub-sub-topic")
        }
    }
    
    @MessagingGateway(defaultRequestChannel = "pubSubChannel")
    interface RemoveTokenGateway {
        fun removeToken(@Header("token") token: String)
    }

    but when I try to inject this into the server:

    @Service
    class MessageService(private val removeTokenGateway: RemoveTokenGateway) {
    }

    I get org.springframework.beans.factory.NoSuchBeanDefinitionException No bean named 'removeTokenChannel' available

    13 replies
    Thiago Milczarek Sayao
    @tsayao
    Hi. I have a IntegrationFlows.from().transform().channel().get() flow. How do I ignore null output from transform() ?
    22 replies
    Thilo-Alexander Ginkel
    @ginkel
    Hi there! I am trying to figure out how to set the name Micrometer metrics label of the various components of a Spring Integration flow when using the Java DSL. ATM, these are auto-generated such as name="mqToWebFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2" or name="mqToWebFlow.subFlow#0.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#4", which aren't very descriptive. ;-) Any ideas?
    5 replies
    Greg Eales
    @0x006EA1E5
    Hi, is this the right place to ask about spring-integration-aws? We are seeing some undesired behaviour with kinesis when reading from a stream closed shards
    85 replies
    Mikhail Slyunchenko
    @michael_gitlab_gitlab
    Hi there! I have updated spring integration file from 4.3.12 to 5.2.3 and I get trouble with org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer#transferFilesFromRemoteToLocal because I should read files from nested folder of smb shared folder.
    The implimentation of that method has been changed, and my FileListFilter<SmbFile> can not be using because org.springframework.integration.file.support.FileUtils#purgeUnwantedElements static method is invoked before that and do skip nested folder (accept file only org.springframework.integration.smb.inbound.SmbInboundFileSynchronizer#isFile).
    Is there best way for copy files by filter from nested folder of smb shared folder?
    79 replies
    swaindhruv8
    @swaindhruv8
    Hi Team,
    swaindhruv8
    @swaindhruv8
    i am putting my scenario. Client will send a single request to my application which will route to multiple external host system for a single request. Some of the external systems are legacy system which is accepting only XML request, some are accepting json. Then collect the response from the host system , aggregate the response and send single response to client. Please suggest if I can achieve through SI. if so, please suggest with the workflow of xml based configuration like , howmany gateway, router,transformer and the sequence to use it.
    9 replies
    Gaurav Rawat
    @gauravbrills
    Does Spring Integration caching session factory change to prevent race conditions require any changes to application code [Release 5.1.12] , after upgrading the same we started getting Pool has been closed exceptions which seems to be cause of caching session factory returning closed sessions . THe way we were using the factories was with dynamic SI flows for sftp/ftp inbound adapters like here https://stackoverflow.com/questions/43916317/strategy-to-refresh-update-sessionfactory-in-spring-integration . Do let us know if we are missing anything
    15 replies
    V k k Sa
    @vkks_gitlab
    Hi, I have a publish subscribe with 2 subscribers, and has a series of handle activator calls after it. Inside subscribe flow there are handle methods and return some object. This is existing flow, so not changing return [to void). The problem I have is - I have to add another handle to do say public MessageHandler justSuppressing() {return m ->{};}. Is there a better way?
    29 replies
    BigMikeATL
    @BigMikeATL_twitter
    When using routeToRecipients(), if I send to multiple channels and there's an error downstream from one of those channels, would I be correct in understanding that the transactions are unique to each channel and do not propagate back to the flow that contained the routeToRecipients() call?
    14 replies
    ghilainm
    @ghilainm

    Hi guys, I don't understand the errorChannel mechanism.

    I have defined a bean of type DirectChannel named 'errorChannel', I have created an IntegrationFlow from this errorChannel to perform error handling.

    I have another flow created from an HttpInbound, when an error occurs in this flow I expect the message to be routed to my errorChannel but that's not the case.

    Any idea what could have go wrong, or what I misunderstood?

    3 replies
    dsinghal09
    @dsinghal09

    Hi guys,

    I have a requirement to continuously poll an email and look for an attachment, if there's any attachment I need to download it and save it on a remote server.

    I was able to connect my mail server and download the attachment, but now I'm stuck on how to forward this message channel to connect with SFTP server and save the file there, can you please guide on how I can do this ?
    3 replies
    gopalsai
    @gopalsai
    Hey, moving from 4.1.5 spring integration to 5.3.2 release…..is “@EnableIntegrationMBeanExport” this annotation not needed anymore in latest release?
    2 replies
    Thiago Milczarek Sayao
    @tsayao
    how do I create a MessageHandler that does something (like a JdbcMessageHandler) and if it fails does something else (like another JdbcMessageHandler) that inserts on another table
    5 replies
    BigMikeATL
    @BigMikeATL_twitter

    I have an interesting scenario that's got me scratching my head.

    My SI application accepts data via HTTP POST, transforms it, and writes to ActiveMQ. In the event that ActiveMQ is down, the caller still gets a 200. Behind the scenes, it sends the message to the error channel, writes to Mongo, and a separate retry process reads from Mongo and will attempt to push to ActiveMQ. It has been in production for 2+ years and works perfectly.

    I am now in the process of converting this project to use Kafka instead of ActiveMQ and, unfortunately, it doesn't behave the same way. What I mean by that is when Kafka is down, it never makes it to the error channel and the caller ends up receiving a HTTP timeout.

    The reason for the difference in behavior seems to stem from the fact that when sends to AMQ fail, JMS Exceptions are thrown and SI can easily pivot to the error channel. The Kafka producer, on the other hand, seems to throw exceptions internally but they never find their way back up stream. After a lengthy debugging session, I see KafkaProducerMessageHandler call sendFuture = this.kafkaTemplate.send(producerRecord); but it never returns, thus the subsequent call to processSendResult() is never executed and it just sits there.

    Is this a bug? or is there some other way I should be configuring the Kafka producer to get it to behave the way I want?

    5 replies
    DarkEdges
    @darkedges
    Morning I am trying to post to queue and return the value back via a REST Interface. I can see the messages flow across the platoform but the request hangs. If I remove Flux.from(reactiveSource()).map(Message::getPayload) and replace with Flux.just("data") it returns straight away. Somewhere the message cannot be read of the queue, but I am not sure why. Any ideas?
    package id.verifymy.spring.integration.accumulators.config;
    
    import org.reactivestreams.Publisher;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.HttpMethod;
    import org.springframework.http.MediaType;
    import org.springframework.integration.channel.QueueChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.dsl.MessageChannels;
    import org.springframework.integration.dsl.PollerSpec;
    import org.springframework.integration.dsl.Pollers;
    import org.springframework.integration.dsl.context.IntegrationFlowContext;
    import org.springframework.integration.scheduling.PollerMetadata;
    import org.springframework.integration.webflux.dsl.WebFlux;
    import org.springframework.messaging.Message;
    
    import id.verifymy.spring.integration.accumulators.model.SimpleMessage;
    import reactor.core.publisher.Flux;
    
    @Configuration
    @EnableIntegration
    public class SpringIntegrationConfiguration {
    
        @Autowired
        IntegrationFlowContext integrationFlowContext;
    
        @Bean
        public Publisher<Message<Object>> reactiveSource() {
            return IntegrationFlows.from(messagesPerCustomerQueue()).log().toReactivePublisher();
        }
    
        @Bean
        public IntegrationFlow messagesPerCustomerFlow() {
            return IntegrationFlows.from(WebFlux.inboundGateway("/test").requestMapping(r -> {
                r.methods(HttpMethod.POST);
                r.produces(MediaType.APPLICATION_JSON_VALUE);
            }).requestPayloadType(SimpleMessage.class)).channel(messagesPerCustomerQueue())
                    .handle((p, h) -> Flux.from(reactiveSource()).map(Message::getPayload)
                    // Flux.just("data")
                    ).log().bridge().log().get();
        }
    
        @Bean(name = PollerMetadata.DEFAULT_POLLER)
        public PollerSpec poller() {
            return Pollers.fixedDelay(0);
        }
    
        @Bean
        public QueueChannel messagesPerCustomerQueue() {
            return MessageChannels.queue().get();
        }
    }
    25 replies
    V k k Sa
    @vkks_gitlab
    This message was deleted
    1 reply
    V k k Sa
    @vkks_gitlab
    Hello, I am using the code as in the link @Configuration @EnableIntegration @EnableIntegrationMBeanExport(server = "mbeanServer", managedComponents = "input") public class ContextConfiguration { @Bean public MBeanServerFactoryBean mbeanServer() { return new MBeanServerFactoryBean(); } }
    4 replies
    V k k Sa
    @vkks_gitlab
    Can I ask what is the DSL equivalent of
    <channel id="mychannel"> <queue/> </channel>
    22 replies
    V k k Sa
    @vkks_gitlab
    Hello, i have a boot app, v 1.4.3 and integration v4.3.6, . I added a configuration with @EnableIntegrationMBeanExporter with MBeanServerFactoryBean. When I start app using farmStart, I see org.springframework.boot, but not integration in JMC. But I see Registered endpoint without MessageSource.... in logs. please can I get troubleshooting steps
    58 replies
    V k k Sa
    @vkks_gitlab
    Hi, I have a ThreadPoolTaskExecutor bean with thread group name and thread name prefix specified, This is set in a channel channel(MessageChannels.executor("executorChannel", threadpoolTaskExecutorBean). why I cant see it in JMC? org.springframework.integration >> MessageChannel >> executorChannel attributes
    139 replies
    Gaurav Rawat
    @gauravbrills
    😁 yup true, saw your pr discussions really extensible this is coupled by what all you can do with advices .. The possibilities are just endless 👍
    V k k Sa
    @vkks_gitlab
    image.png
    V k k Sa
    @vkks_gitlab
    image.png
    Artem Bilan
    @artembilan
    image.png
    V k k Sa
    @vkks_gitlab
    image.png
    dmpour23
    @dmpour23

    I have a jms outbound channel adapter. I would like to send a message to my Apache Artemis Broker and add a delay.

    <int-jms:outbound-channel-adapter connection-factory="scheduledConnectionFactory" channel="tnpScheduledOutboundChannel" destination="tnpScheduledQueue" />

    My java code that adds the header to the spring integration message is:

     return MessageBuilder.withPayload(sdpInfo).setHeader("_AMQ_SCHED_DELIVERY",sdpInfo.getDelay()).build();

    The header is added as a spring integration header. But its not picked up by JMS. Do i need to add some kind of JmsHeaderMapper?
    Could someone point to some documentation or example.

    9 replies
    Arthur Kazemi
    @bidadh

    Hi, I'm trying to validate incoming Amqp message based on some required headers and in case of the error put it to dlq similar to what spring-cloud-stream does. scs creates .dlq for every queue and puts error messages to the .dlq wondering how may I achieve same behaviour with spring integration amqp component?

    Any help is much appreciated

    here is a sample kotlin code without validation:

      @Bean
      fun customerEventFlow(
        connectionFactory: ConnectionFactory,
        consumerRetryTemplate: RetryTemplate,
        amqpTemplate: AmqpTemplate
      ): IntegrationFlow {
        val inboundGateway = Amqp.inboundGateway(connectionFactory, amqpTemplate, qName)
          .retryTemplate(consumerRetryTemplate)
        return IntegrationFlows
          .from(inboundGateway)
          .log<Message<*>> { log.info("{}", it) }
          .channel(subscriptionInChannel)
          .get()
      }
    5 replies
    David Parry
    @davidparry
    If I have a <int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/> and the somePojo has anotherMethod annotated @ServiceActivator both method signatures take MyMessage Object will they both be called when a jmsMessage is received?
    10 replies
    Erik Haqvinsson
    @erikhaq

    If I have a tcp inbound adapter, where I can receive messages containing different types JSON data, what would be the best way to deal with routing these messages to the correct handler?

    The alternatives I've considered are:

    • configure a route and use either channelMapping or subFlowMapping to direct messages to appropriate handlers
    • use different inbound and outbound adapters for every different type of message

    Could anyone give some advice on which method would be preferable?
    Or if there are other, better methods which I have not considered?

    I am using kotlin DSL if it matters.

    Here's a short example of what I have so far just handling one type of incoming message:

    @Bean
    fun connectionFactory(): TcpClientConnectionFactorySpec =
        Tcp.netClient(properties.baseUrl, properties.port)
            .serializer(...)
            .deserializer(...)
    
    @Bean
    fun tcpOut() = integrationFlow(channels.outboundChannel()) {
        transform(Transformers.toJson())
        handle(Tcp.outboundAdapter(connectionFactory()))
    }
    
    @Bean
    fun tcpIn() = integrationFlow(Tcp.inboundAdapter(connectionFactory())) {
        transform(Transformers.objectToString())
        transform(Transformers.fromJson(AuthenticationResponse::class.java))
        handle(apiService)
    }
    
    @Configuration
    class ChannelsConfiguration {
        @Bean
        fun outboundChannel(): DirectChannel = MessageChannels.direct().get()
    }
    
    @Service
    class ApiService(
        private val outboundChannel: DirectChannel
    ) {
    
        fun authenticationResponseHandler(response: AuthenticationResponse, headers: MessageHeaders) {
            ...
        }
    
        fun authenticate(authenticationRequest: AuthenticationRequest) {
            outboundChannel.send(GenericMessage(authenticationRequest))
        }
    
    }
    30 replies
    bahram
    @MarsDown
    hi
    Is it possible to connect to exchange mail by spring integration mail?
    31 replies
    Elena Felder
    @elefeint
    We have an spring-cloud/spring-cloud-gcp#2605 in Spring Cloud GCP, in which the user has a requirement to rotate credentials stored in a file. The service used (Pub/Sub) is wrapped in a MessageProducerSupport-based inbound adapter. The obvious solution is to use @RefreshScope on the beans that require updated credentials; however that won't affect the already-started stream. For the adapter to correctly refresh, doStop() and doStart() would need to be invoked. Is there any built-in integration between MessageProducerSupport.doStart() and doStop() and Spring Cloud Config refreshable context? Or does the end-user application need to listen to context refresh event and manually restart the inbound adapter?
    33 replies
    dnijssen
    @dnijssen

    Hello, can anyone help me / give me advice how to setup the following;
    I have a SFTP server which i'm polling (every 10sec?) for a new file, than I'll retrieve this file locally, and invoke a handler method (e.g. to read the file / create a flux of elements, save it to the local database, etc)
    Afterwards I want to pass along that flux of elements to another server (using RSocketOutboundGateway??)
    so the second service retrieves that same flux of elements, and can do some processing as well.

    Currently I have the following setup;

    @Bean
        public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
            SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
            fileSynchronizer.setDeleteRemoteFiles(true);
            fileSynchronizer.setRemoteDirectory(excelRemoteDirectory);
            fileSynchronizer.setPreserveTimestamp(true);
            fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("testdennis*.xlsx"));
    
            return fileSynchronizer;
        }
    
        @Bean(name = "directChannel")
        public MessageChannel directChannel() {
            return new DirectChannel();
        }
    
        @Bean
        @InboundChannelAdapter(channel = "directChannel", poller = @Poller(fixedDelay = "10000"))
        public MessageSource<File> myMessageSource() {
            SftpInboundFileSynchronizingMessageSource messageSource = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
            messageSource.setLocalDirectory(new File("sftp-inbound"));
            messageSource.setAutoCreateLocalDirectory(true);
            messageSource.setLocalFilter(new SimplePatternFileListFilter("testdennis*.xlsx"));
    
            return messageSource;
        }
    
        @Bean(name = "fluxMessageChannel")
        public MessageChannel fluxMessageChannel() {
            return new FluxMessageChannel();
        }
    
        @Bean
        @Gateway(requestChannel = "fluxMessageChannel")
        public RSocketOutboundGateway rsocketOutboundGateway() {
            RSocketOutboundGateway rsocketOutboundGateway = new RSocketOutboundGateway("my-rsocket-route");
            rsocketOutboundGateway.setInteractionModel(RSocketInteractionModel.requestChannel);
            rsocketOutboundGateway.setExpectedResponseType(Void.class);
            rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    
            return rsocketOutboundGateway;
        }
    
        @Bean
        public ClientRSocketConnector clientRSocketConnector() {
            return new ClientRSocketConnector(URI.create("ws://localhost:8880"));
        }

    With the following (intermediate) handler / service activator defined;

    @Service
    public class ExcelFileHandler {
    
        @ServiceActivator(inputChannel = "directChannel", outputChannel = "fluxMessageChannel")
        public Flux<MyElement> handleMessage(Message<File> message) {
            File file = message.getPayload();
            log.info("Handling excel file '{}'", file.getName());
    
            return Flux.just(....);
        }
    }
    got the following exception; fluxMessageChannel -> doesn't have subscribers to accept messages But I'm not aware how to fix it in this setup..
    168 replies
    nightswimmings
    @nightswimmings

    Hi! I am new to Spring Integration and I am trying to integrate it (no pun intended) with Spring Cloud Task and Spring Batch. I would like to have a task that registers an integrationflow and bind the lifecycle of the tasklet to the very same flow. The flow is just performing a finite forwarding. What is the closest mechanism to an imaginary but convenient arch like the following?:

    IntegrationFlowRegistration myLoaderFlow = this.flowContext.registration(tmpFlow).register();
    myLoaderFlow.startAndRunWhile(outputChannel -> outputChannel.idleSinceLastEmission() = 15 min);

    With this I would enable my 2 particular needs. Make the lifecycle of the task binded to that of the flow (the startAndRunWhile part) and being able to define a condition that signals the end of the flow (the idleSinceLastEmission part)

    nightswimmings
    @nightswimmings
    The input of the flow is currently a JMS queue (JmsMessageDrivenEndpoint) with about 200k messages to be routed on each execution in case it is relevant
    I saw pollers like SimpleActiveIdleReceiveMessageAdvice (if I understood correctly the complex documentation), are not recommended for big loads, but MessageDrivenChannels are not blocking and I don't see any way to configure something like DynamicPeriodicTrigger, or anything to process idle times
    nightswimmings
    @nightswimmings
    Reading about routers and group aggregators I saw some idle timeout options but I am not sure they are a good option for my case
    31 replies
    brrshh
    @brrshh
    Hello. My integration pipeline starts from webfluxGateway, in the middle of pipline i used ws:outbound-gateway. For success case all works as expected, but in case ws:outbound-gateway get error response by default it throws SoapFaultClientException. In this case error is not coming to the error channel, defined in first webfluxGateway. I see same bug was fixed, i used spring integration 5.1.2. Any idea how can i fix?
    dnijssen
    @dnijssen
    Hello @artembilan , I might have found another "edge-case" regarding spring-projects/spring-integration#3448 . Using the NullChannel indeed subscribes the payload now, but I wonder if it is so smart to keep the onError(Throwable t) { } with a blank method body. Cause it is swallows the exceptions completely.
    Although it is indeed a "NullChannel" , but maybe a log message could be useful for end projects. Or atleast document is more explicitly, what do you think?
    8 replies
    Thilo-Alexander Ginkel
    @ginkel
    Hi! Let's say, I have a WebFlux.outboundGateway where I would like to handle server-side errors (404, 503, ...). I tried the pattern from the question in https://stackoverflow.com/questions/48452591/spring-integration-webflux-error-handling, but the doOnError handler never gets called. What do I need to do so that the Gateway really emits a Mono and the exception can be handled downstream?
    6 replies
    Guilherme Bernardi
    @guibernardi

    Hi.
    I have a case where I need to listen to a S3 Bucket and when a new file is there, I need to consume that file, read and process. It's CSV files.
    Everyday I'll receive like 750k CSVs files in a short time, to process the files I was using Spring Batch due to I need some concerns with perfomance and also the possibility to restart the processing where it stopped.
    And for pooling the S3 bucket I'm using Spring Integration, but now I'm trying to combine both.
    For each file start a job.
    I found this: Spring Batch Integration
    I know that is a Spring Batch project and here it's about integration, but I didn't find so many examples and also the tutorials and videos are older than 4 years.

    Any recommendation about my case?
    I'd like to know if I'm doing in the right way.

    5 replies
    Guilherme Bernardi
    @guibernardi

    Hi again.
    Based on my previous question I followed S3 Pooling docs with spring-integration-aws and also spring batch integration to run my jobs and I got some results, but I getting some errors.

    I created a question in Stack overflow with more details: Question

    I really appreciate any help.