Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Hello. My integration pipeline starts from webfluxGateway, in the middle of pipline i used ws:outbound-gateway. For success case all works as expected, but in case ws:outbound-gateway get error response by default it throws SoapFaultClientException. In this case error is not coming to the error channel, defined in first webfluxGateway. I see same bug was fixed, i used spring integration 5.1.2. Any idea how can i fix?
    Hello @artembilan , I might have found another "edge-case" regarding spring-projects/spring-integration#3448 . Using the NullChannel indeed subscribes the payload now, but I wonder if it is so smart to keep the onError(Throwable t) { } with a blank method body. Cause it is swallows the exceptions completely.
    Although it is indeed a "NullChannel" , but maybe a log message could be useful for end projects. Or atleast document is more explicitly, what do you think?
    8 replies
    Thilo-Alexander Ginkel
    Hi! Let's say, I have a WebFlux.outboundGateway where I would like to handle server-side errors (404, 503, ...). I tried the pattern from the question in https://stackoverflow.com/questions/48452591/spring-integration-webflux-error-handling, but the doOnError handler never gets called. What do I need to do so that the Gateway really emits a Mono and the exception can be handled downstream?
    6 replies
    Guilherme Bernardi

    I have a case where I need to listen to a S3 Bucket and when a new file is there, I need to consume that file, read and process. It's CSV files.
    Everyday I'll receive like 750k CSVs files in a short time, to process the files I was using Spring Batch due to I need some concerns with perfomance and also the possibility to restart the processing where it stopped.
    And for pooling the S3 bucket I'm using Spring Integration, but now I'm trying to combine both.
    For each file start a job.
    I found this: Spring Batch Integration
    I know that is a Spring Batch project and here it's about integration, but I didn't find so many examples and also the tutorials and videos are older than 4 years.

    Any recommendation about my case?
    I'd like to know if I'm doing in the right way.

    5 replies
    Guilherme Bernardi

    Hi again.
    Based on my previous question I followed S3 Pooling docs with spring-integration-aws and also spring batch integration to run my jobs and I got some results, but I getting some errors.

    I created a question in Stack overflow with more details: Question

    I really appreciate any help.

    Priyam Chaudhuri
    does spring integration mail support polling multiple mailbox using pop3? Is there any DSL code example for Pop3 for spring integration email
    5 replies
    Erik Haqvinsson

    Is there anyway one can transform different messages using a jsonPath expression to diffrentiate between them?

    Right now I do it by routing messages to different channels and then creating a new integration flow for each channel and then doing a specific transform for that message, and then sending the message to a handling function, but it's a lot of repeated code that way.

    9 replies
    Alwyn Schoeman
    Is there a way when using Kotlin/Java DSL to specify the type of the Message payload when used in an AggregatorSpec in order to avoid casting?
    14 replies
    Thilo-Alexander Ginkel
    Hi there! Is there a way to attach some kind of interceptor to each Spring Integration handler defined in a flow (without having to resort to AOP)?
    10 replies
    Christian German
    Hi everyone
    Is this a normal behavior for spring-integration to use the taskScheduler from spring-boot (@EnableScheduling)?
    As I can see, @EnableScheduling creates a bean named taskScheduler with a default pool size set to 1. Spring-integration does not create his own if such a bean exists, hence, this override the default pool size (10) from spring-integration. Also, the property spring.integration.taskScheduler.poolSize is obviously ignored.
    13 replies

    Hi I am trying to usefile:inbound-channel-adapter , file:splitter and int-jms:outbound-channel-adapter.

    The process is the following. I poll from a staging folder using the file:inbound-channel-adapter.
    Then move it to a processing folder using a service activator. (Header Enricher takes care of new filename).
    Use the file:splitter to split the lines into file and then transform the content to Java object. This is then
    passed to JMS via jms:outbound-channel-adapter. If all goes well the file is moved to an archive folder or if an "certain" errors occurs it should go to an error folder.

    It seems to work ok but when I simulate JMS connectivity issue i.e its Artemis is down. I catch the error in my
    GlobalExceptionHandler. nested exception is javax.jms.JMSException:

    What I would like todo here is log the line were the error occured in the file and move the file to and error directory.
    So that I can then manually deal with the file.

    My problem here is that the original message (ErrorMessage errMsg = (ErrorMessage) message;) is the actual File message not the individual message at the specific line the was attempted to be sent to JMS Queue.
    So at GlobalExceptionHandler level there is no way of knowing which line failed. I need to trace it in the logs.

    How can I do this?

    10 replies
    King Leonidas

    Hi everyone,

    I'm trying to fetch files from sftp server directory with subfolders. I know that I can use sftp inbound streaming adapter if the directory don't have subfolders but my requirement is different. I need to get new files as well as the updated ones based on the timestamp. I tried using two outbound gateways one for fetching the file list and another one for the actual fetching but I'm not sure if I can apply the filter (new and updated files) just like in the sftp:inbound-streaming-channel-adapter. Is there a better way to do this? Thanks!

    4 replies
    Greg Eales
    Hi, I understand that spring-intergration-aws doesn't work with spring-integration 5.4, so we can't use spring-boot 2.4 yet. Is this right?
    17 replies
    King Leonidas
    Hi! I am trying to use the RotatingServerAdvice to poll new files from multiple resource. I have a new requirement that for every new client, a new directory is created in the sftp server and I want to add that directory to the poll as well. So I will be using int-sftp:outbound-gateway to query all directories and check whether a new directory is created then add the new directory to the key directories. Is it possible that way? Thank you.
    public RotatingServerAdvice advice() {
        List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
        keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
        keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
        keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
        keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
        keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
        keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
        return new RotatingServerAdvice(delegatingSf(), keyDirectories);
    9 replies
    Gary Russell
    Please add three back ticks (as gates) before and after code snippets (on separate lines).
    See the markdown help (button to the right).
    Lukáš Vasek
    Hello, is this right place to ask questions about JMS please? I have following issue . Basically I'd like to send ContentType as system property (not message) to service bus. Is there any simple way how to do this? Thanks
    5 replies
    Guilherme Bernardi


    I'm trying to add header to the IntegrationFlow (S3 Pooling) message.

    I created a question at the Stack Overflow, if someone could help me:



    2 replies
    Patrik Mihalcin

    Hi all
    I have

        Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
                .messageConverter(new MarshallingMessageConverter(xmlMarshaller))
                .configureContainer(configurer -> configurer
        .publishSubscribeChannel(s -> s
                .subscribe(flow -> flow
                .subscribe(flow -> flow

    service publishes AMQP messages to another RabbitMQ queue

    I can see tx commit after whole flow is executed:
    CachingConnectionFactory: AMQChannel(amqp://guest@,8) channel.txCommit()

    Is there a way to tell CachingConnectionFactory to commit after .handle(service) step and before .channel(INTERNAL_EVENT_PROCESSED)?

    22 replies
    Hi I am trying to implement an aggregator in order to do batch inserts using .jdbcTemplate.batchUpdate of incoming jms messages.
    Below is my configuration. Release-strategy-expression is variable i have tried batches of 50,100,500, and a timeout of 1000 to 5000 ms.
    However I observe a "polling" functionality i.e message are consumed slowly even slower than the timeout (I am using Artemis as a broker).
     <int:transformer input-channel="dInbound" ref="jsonToDTransformer" requires-reply="true" output-channel="dAggregatorChannel"/>
          <!-- aggregator -->
        <int:aggregator id="dAggregator"
                        release-strategy-expression="size() == ${aggregator.releaseStrategy.limit}"
        <!-- Update db -->
    10 replies
    Fast question, I am working on an older boot project and it seems, at least on a mvn test, inner static configurations are picked up by ComponentScan. Is this by design or it is an old bug?
    20 replies
    Patrik Mihalcin

    Hi all.. I'm facing a problem where I have flow:

    return IntegrationFlows.from(
            Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
                    .configureContainer(container -> container
            .transform(fromJson(DeliveryUpdate.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
            .publishSubscribeChannel(s -> s
                    .subscribe(f -> f
                    .subscribe(f -> f
                            .handle(mssDeliveryNotificationService, "process")

    System which integrates with my system sends AMQP message with specified TypeId header which contains name of their external class: net.homecredit.mss.domain.types.DeliveryNotificationType

    At runtime spring integration throws exception:

    EL1004E: Method call: Method process(net.homecredit.mss.domain.types.DeliveryNotificationType) cannot be found on type net.homecredit.notifier.service.MssDeliveryUpdateService$$EnhancerBySpringCGLIB$$bc0ec70f

    Why does Spring Integration tries to find method with parameter of external class type?
    I'm doing transform to DeliveryUpdate, which is my internal class as you can see

    9 replies
    Guilherme Bernardi

    Hi all.
    I'm trying to solve something, I'm consuming files from a S3 bucket and I'm getting this message:

    The remote file [s3object name] has not been transferred to the existing local file 'directory/filename'. Consider removing the local file.

    Based on this great answer, I understand that the files is always copied from remote to local.

    So how could I avoid this warning?

    I created the metadataStore and set the LocalFilter like this:

    messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), ""));

    My idea is if the user put the same file twice in the bucket I avoid process it.

    What I'm doing wrong?

    10 replies

    We have implemented a delayer channel backed by a persistence message store in our application .

    Sometimes the spring integration flow cannot send a message to upstream , this happens due to a throttling limit in the application. In this scenario we send that message to the delayer channel with a delay of 30s.

    So that this can be again retried after 30s to be sent to the upstream application.

    Sometimes we observed that the messages are stuck in int_messages table and they are never getting retried. We don't find any exception or error in the logs.

    Can you please suggest some actions. Like what packages we can enable logging and other places that we must go through to find why delayed messages are sometimes not getting retried ?

    12 replies
    Hi there,
    Could you migrate to junit 5 the version of junit in org.springframework.integration:spring-integration-test-support? I use the version 5.4.4 and junit is still in 4.13.2
    Gary Russell
    ?? It has support for both
    project('spring-integration-test-support') {
        description = 'Spring Integration Test Support - **No SI Dependencies Allowed**'
        dependencies {
            compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
            api "org.hamcrest:hamcrest-library:$hamcrestVersion"
            api "org.mockito:mockito-core:$mockitoVersion"
            api "org.assertj:assertj-core:$assertjVersion"
            api 'org.springframework:spring-context'
            api 'org.springframework:spring-messaging'
            api 'org.springframework:spring-test'
            optionalApi ("junit:junit:$junit4Version") {
                exclude group: 'org.hamcrest'
            optionalApi 'org.junit.jupiter:junit-jupiter-api'
            optionalApi 'org.apache.logging.log4j:log4j-core'
    Screen Shot 2021-03-18 at 12.30.01 PM.png
    Thank you @garyrussell, in my case only junit 4 was available, it's not a big deal since we can exclude it (what I did when I saw the problem)
    Gary Russell
    I see, yes, sorry, it was only recently changed from api to optionalApi on master (5.5.x).
    Screen Shot 2021-03-18 at 12.54.18 PM.png
    Artem Bilan
    That's correct. It is only a current 5.5 matter. I realized that it could be some frustration for end-users, so decided to move away from JUnit 4 as default dep over there.
    We can't make such a change in the point release (some 5.4.x) because it is a breaking change
    Let me add a Migration Guide note so! Since it is still a breaking change although minor one :smile:
    Patrik Mihalcin
    Hi all
    How can I get http response status code and body mapped to java class using @Gateway method in @MessagingGateway interface?
    2 replies
    Redion Muraj
    Hi, I'm trying to move emails to specific folders using java mail and I get this exception :
    class org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage cannot be cast to class com.sun.mail.imap.IMAPMessage
    28 replies
    This is how I'm trying to move it
    Sedrak Dalaloyan
    Hi guys, sorry for stupid question: why spring integration lock is only working on same JVM, but not working on multi server setup? New tryLock call from another server just override previous one.
    1 reply
    Is there some kind of bean in spring integration i can use to sent message into a channel named "" programatically? Something like: messageSender.sent(new GenericMessage("hello"), "myFlow.input")?
    6 replies
    Jimmy Bway

    Hi guys, i have a question
    When using annotation @Aggregator, @ReleaseStrategy, @CorrelationStrategy it uses org.springframework.integration.config.annotation.AggregatorAnnotationPostProcessor#createHandler to create an instance of MessageHandler is there a way to configure this instance in such a way that expire-groups-upon-completion is take in account ?

    As for now the only i saw was to create programmatically

        @ServiceActivator(inputChannel = "incoming.event.channel")
        public MessageHandler aggregator() {
            AggregatingMessageHandler aggregator = new AggregatingMessageHandler(
                    new SimpleMessageStore(0),
                    message -> Product.class.getSimpleName(),
            return aggregator;
    8 replies
    Eric Deandrea

    I’m trying to implement a Spring WebFlux controller class/endpoint which publishes a message to a subscriber, receives a reply, and then returns a Mono to the consumer. Something like

    public Mono<SomeObject> getSomething() {
      // Do something to publish a message to a consumer & return the reply

    Then somewhere else in the code have a consumer which listens for the event, processes it, and replies back.

    I’ve never used Spring Integration but found FluxMessageChannel (https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#fluxmessagechannel-and-reactivestreamsconsumer) and was wondering if that was the right appoach.

    I’m trying to replicate what Vert.x does with it’s EventBus (https://vertx.io/docs/apidocs/io/vertx/core/eventbus/EventBus.html#request-java.lang.String-java.lang.Object-) and wondering what the right approach might be here

    3 replies
    Hi! A brief question.. Is it possible to use spring-cloud-kubernetes-config to read a secret that is in the shape of <name>.properties: base64string without a deployment.yml that does the volume mount-path? I mean if one puts the key-value pairs at root in the configmap, and uses spring.cloud.kubernetes.config.sources, it is enough with the configmap name to inject the properties as envars, but what is those properties are wrapped as a x.properties: | .. Do we really need to mount them in the container before runtime or is there any option to read them from the configmap 100% on-the-fly?
    3 replies
    Hi, I'm new in spring-integration. Now I'm playing with AggregatorSpec.class. So I have task to grouping events till timeout. May be you can help me?
    .aggregate(aggregatorSpec -> aggregatorSpec
    I tried to use .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_THRESHOLD, 5000))
    But there are not realeasing, when for example only one message in group
    Also tried .groupTimeoutExpression("5000"). But It isn't suitable for me
    I need that flow. Aggregate get first message in group and start waiting for some time and then expire this group.
    19 replies
    Tim Feuerbach
    Hi, is it possible to poll multiple messages batch wise with a JdbcMessageStore? Judging from the API of MessageStore and Poller I would say no, but asking to be sure. Issuing a query per message seems a bit wasteful in our case. (I would be okay with the implications, e.g. the entire transaction failing if one message out of the batch fails)
    5 replies
    Ricardo Ferreira

    Hi! I'm very new to SI and I'm trying to combine Spring Cloud Stream with Kafka binder and SI (DSL). The use case is as follows:
    1- receive an activation event with a feature code on an input topic
    2- trigger the corresponding IntegrationFlow (function-based), which routes the message to 2 sub-flows depending on the type of the event
    3- each sub-flow handles the message differently and sends it to their defined output topic

    Integration flow definition:

        public IntegrationFlow activationFlow(ObjectMapper objectMapper) {
            return IntegrationFlows.from(Function.class, gateway -> gateway.beanName("onActivationRequest"))
                    .transform(Transformers.fromJson(Activation.class, new Jackson2JsonObjectMapper(objectMapper)))
                    .<Activation, Boolean>route(this::isAwesomeFeature,
                            mapping -> mapping
                                    .subFlowMapping(true, sf ->
                                    .subFlowMapping(false, sf -> sf.channel("onActivationRequest-out-1")))


            definition: onActivationRequest
              destination: feature-activation-request
              destination: awesome-activation-request
              destination: other-activation-request

    Every time the Integration Flow tries to send the message to an output the following error is logged:

    org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for (...)

    Maybe this is a dumb question but I'm really struggling with this simple use case. Any help would be greatly appreciated.

    9 replies

    We have implemented Aggregator channel backed by a persistence message store in our application .
    Which is running on a clustered environment with dual nodes.

    The aggregator works fine and releases the message if poller from any one node fetches all the rows,
    It fails to release if poller from both the nodes fetches few few rows

    Consider an example -
    Release strategy - Message will be released when total rows will be 21
    Poller from node 1 fetches 9 rows, processes them and inserts them into the INT_MESSAGE_GROUP table with corelation key

    Poller from node 2 fetches the other remaining rows, processes them and tries to insert into the same INT_MESSAGE_GROUP table with same corelation key
    ultimately results in PSQLException: ERROR: duplicate key value violates unique constraint "message_group_pk"

    Hence the message is never released

    Please suggest some actions. Currently using 5.2.1.RELEASE

    11 replies
    Hi, I have a requirement to pick the oldest file from remote sftp using the timestamp on the filename. Can this be achieved by using Comparator?
    1 reply