by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Attoumane
    @akuma8

    Hi,
    How to use the gateway request/reply feature with Redis?
    I have this configuration:
    Outbound side from appA:

    @MessagingGateway
    public interface CommonMessageGateways {
    
         @Gateway( requestChannel = INFOS_COMPLETE_PROFILE_CHANNEL )
         OnInfosProfileEvent checkInfosProfile( @Payload String username );  // I expect an OnInfosProfileEvent response
    }
    
    @Bean
    public DirectChannel infosCompleteProfleChannel() {
       return MessageChannels
                    .direct( INFOS_COMPLETE_PROFILE_CHANNEL )
                    .get();
    }
    
    @Bean
    public IntegrationFlow infosProfileQueueOutboundGateway( RedisConnectionFactory redisConnectionFactory,
                RedisSerializer<?> redisSerializer ) {
            var queueGateway = new RedisQueueOutboundGateway( INFOS_COMPLETE_PROFILE_CHANNEL, redisConnectionFactory );
            queueGateway.setExtractPayload( false );
            queueGateway.setSerializer( redisSerializer );
            queueGateway.setRequiresReply( true );
            return IntegrationFlows.from( INFOS_COMPLETE_PROFILE_CHANNEL )
                    .handle( queueGateway )
                    .get();
    }

    In the inbound side, from appB:

    @Bean
     public RedisQueueInboundGateway infosProfileInboundGateway( RedisConnectionFactory connectionFactory,
                ThreadPoolTaskExecutor taskExecutor, RedisSerializer<?> redisSerializer ) {
            var inboundGateway = new RedisQueueInboundGateway( INFOS_COMPLETE_PROFILE_CHANNEL, connectionFactory );
            inboundGateway.setTaskExecutor( taskExecutor );
            inboundGateway.setSerializer( redisSerializer );
            inboundGateway.setExtractPayload( false );
            return inboundGateway;
     }
    
    @Bean
    public IntegrationFlow getInfosProfileFlow( RedisQueueInboundGateway inboundGateway, SelfService selfService ) {
            return IntegrationFlows.from( inboundGateway )
                    .handle( selfService::handleGetInfosProfile )
                    .get();
    }
    
    //selfservice::handleGetInfosProfile signature is:
    OnInfosProfileEvent handleGetInfosProfile( Message<?> message ); // it returns the result I expect in `appA`

    The problem is the response is never sent to appA which fails with a timeout. What am I doing wrong? Another question, is the couple (RedisQueueOutboundGateway, RedisQueueInboundGateway ) mandatory for a request/reply scenario where a @MessagingGateway method is implied?
    Thanks

    21 replies
    Arajit Samanta
    @arajitsamanta

    I am using spring-integration-redis as my message story for aggregator. I followed this article https://docs.spring.io/spring-integration/docs/5.2.1.RELEASE/reference/html/redis.html on using Jackson as value serializer rather than using default JDK one. Here is my redis config

    @Slf4j
    @Configuration
    @Profile({"!local", "!test"})
    @Import(RedisAutoConfiguration.class)
    public class RedisMessageStoreConfiguration {
    
      @Bean
      public RedisSerializer<Object> redisValueSerializerJackson() {
        ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
        return new GenericJackson2JsonRedisSerializer(mapper);
      }
    
      @Bean
      public MessageGroupStore messageStore(
          RedisTemplate<?, ?> redisTemplate, RedisSerializer<Object> redisValueSerializerJackson) {
        RedisMessageStore redisMessageStore =
            new RedisMessageStore(redisTemplate.getConnectionFactory());
        redisMessageStore.setValueSerializer(redisValueSerializerJackson);
        log.info("Using REDIS message store.");
        return redisMessageStore;
      }
    
    }

    In my spring boot unit test I autowired the RedisTemplate redisTemplate, and doing assertTrue(redisTemplate.getValueSerializer() instanceof GenericJackson2JsonRedisSerializer) returns false as this is still using JdkSerializationRedisSerializer

    Is there anything I am missing ?

    2 replies
    Gaurav Rawat
    @gauravbrills
    Hi I had a quick question , I am having a dynamic Integration flow that starts with a executor Channel and then routeToreceipients (additional downstream flows). This works great but sometimes when our cluster nodes go up and down we need to stop some of these flows .The stopping destroy of flows works but we have observed some objects are maybe in the executor pool queue and these get relayed to child flows on stopping /destroying the flow . Causing the error dispatcher has not subscriber. Is there a way to gracefully destroup stop Integration flows so that it processes all queued up messages . Or is there another strategy we can use maybe a direct channel but I think we might loose on parallel processing there .
    Attoumane
    @akuma8
    Hi, @artembilan thanks a lot for your reviews. I did changes on my PR about Redis Stream and I can’t still manage to make some tests passed. Please let me know when you will take a look. Thanks
    Arajit Samanta
    @arajitsamanta

    Team, Wanted to check if it's possible to have aggregator component's group timeout as dynamic? To give some background here is how my groups looks like

    Group-1
     Request1-1 : timeout 1sec
     Request1-2 : timeout 2sec
    Group-2
     Request1-1 : timeout 3sec
     Request1-2 : timeout 1sec
     Request1-3 : timeout 5sec

    Requirement is to aggregator group-timeout to be maximum timeout value for that group. In this example group1 would timeout in 2s, where as group 2 would be in 5sec

    Looking at the reference doc it seems aggregator timeout is configuration based and gets configured when app gets started.

    Is there anyway above requirement can be achieved ?

    Gary Russell
    @garyrussell
    The groupTimeout can be an expression evaluated against the group (after the new message is added). If you are using the DSL, you can use a simple function:
    .groupTimeout(group -> {
         // iterate over group.getMessages() to find the highest timeout
        return timeout;
    }
    Arajit Samanta
    @arajitsamanta
    @garyrussell Is it possible to do using AggregatorFactoryBean ? I am using this to configure my aggregator timeout with SpelExpression
    Gary Russell
    @garyrussell
    Yes setGroupTimeoutExpression(new FunctionExpression<MessageGroup, Long>(group -> {...}));
    Arajit Samanta
    @arajitsamanta
    @garyrussell Thank you, I will try it
    Arajit Samanta
    @arajitsamanta

    @garyrussell Coming back to this thread. Your suggested solution work. Thanks you for that. However I have noticed couple of other things

    • My function gets invoked twice for each group for no timeout scenarios. My group has 3 message and it gets invoked once with first message, then another time with first and second message.
    • For timeout it gets invoked 3 times. Same as no timeout scenarios plus another time for the last message when it arrives after timeout,

    Is that how it designed to work ? Reason why I am bit worried is that My dynamic timeout logic determination is bit complex. It involves going over a list, and a map on each item on a list.

    Also in-general my application success is heavily depend on the performance of Splitter-Aggregator, wondering if you guys published any performance best practices, like what not do. Any docs or link would be helpful.

    1 reply
    Jakub
    @jmayday

    Hi. I'm quite new to Spring Integration and I'm trying to test my integration flow (reading email-like structure from JMS and putting it in hazelcast cache - just a made up scenario).
    https://github.com/jmayday/messaging-activemq-si/blob/master/src/test/java/com/example/messagingactivemqsi/MessagingActivemqSiApplicationTests.java
    Could you please review this code? I know it might be too much to ask, because somehow I managed to write the test, but there are too many doubts.

    What I did:

    • Configured integration between JMS and Hazelcast (if you want to run the app it's described in README and docker-compose is there)
    • Written test checking if message in JMS invokes my ServiceActivator annotated handler
    • I didn't include HazelcastConfig in test (I don't want to instantiate Hazelcast instance as I'm mocking message handler responsible for putting data in Hazelcast anyway.
    • I had to mock fields from IntegrationFlowConfig constructor (connectionFactory and IMap Hazelcast's structure)
    • Is it OK that IntegrationFlowConfig has dependency to IMap? Is the scope of beans defined in this class fine or there are too many of them? There are message channels, endpoints, integration flows. I've read that proper shape of config class helps to later benefit in test by specifying only those config classes which are relevant (like I skipped HazelcastConfig)

    I also don't quite understand @SpringIntegrationTest's noAutoStartup. Should I point it to {"inboundChannelAdapter", "hazelcastCacheWritingMessageHandler"}) in my case? Both are enpoints right? For now I just skipped this annotation and using @SprintBootTest

    83 replies
    Thomas Schulz
    @thoschulz_gitlab
    Hi, does anyone have knowledge regarding spring-integration , jms and activemq ?
    2 replies
    I'm tackling the following problem: jms:inbound-gateway consumes a message vom ActiveMQ and when processing is done the session is committed and message is comsumed in the broker. However, if my spring boot application is getting shut down while processing the session is committed which basically means the message is consumed and lost. I tried many things (message adapter, transactional, transaction manager etc.) but the basic problem stays: why a commit and no rollback when the application is shut down.
    1 reply
    Filip Hrisafov
    @filiphr
    Hey everyone, first let me say thank you to the Spring team behind Spring Integration. We just used the Spring Mail Integration to add a nice extension for our own usage

    I have one question related to the ImapMailReceiver. We are using the default configuration with polling every 5 seconds and we are seeing:

    attempting to receive mail from folder [INBOX]

    every 5 seconds logged on info. Is there a reason why this is logged on info and not on debug? Are we doing something wrong in our configuration?

    4 replies
    Alex Drawbond
    @lxdraw
    Hello, does anyone know of any good examples published online showing how to convert a message with a reactive payload into a non-reactive type?
    4 replies
    Gaurav Rawat
    @gauravbrills
    Hi is there a way to put some predicate or condition on SqsMessageDrivenChannelAdapter to control its rate of message polling directly ,as sometimes lot of messages get piled up which our cluster cannot handle (hence there visibility timeout usually expires in between processing or they get reprocessed again ). We want to consume/throttle based on certain conditions like no of inflight messages etc. One way is to handle based on stopping and starting the adapter but wanted to know a more elegant approach possible directly by using SI.
    8 replies
    Chisom
    @flinchy
    I want to stream data from dynamodb/amazon s3 to elasticsearch can anyone direct me to a resource or resources that can enable me to carry out this task
    1 reply
    Gaurav Rawat
    @gauravbrills
    @artembilan for the previous thread was thinking of one approach . If I throttle back by having the pollable channel queue size of one it will in a way throttle the sqs message container and hence pausig new message consumption . I tried simulating this and it works but was thinking can we have a custom poller with the sqs adapter so the default poller doesnt gets stuck in case I have a caller runs policy down stream .
    Artem Bilan
    @artembilan
    The SQS channel adapter is an event-driven. It can’t come with any polling configuration. For on demand requests we need fully new component.
    Gaurav Rawat
    @gauravbrills
    yes true but isnt the output channel a poolable channel and if this channel has a small queue size wont it throttle back to the event driven sqs adapter when it tries to push messages to the output channel ?
    Artem Bilan
    @artembilan
    Well, you talk a little bit about different place of the flow. You said SQS pollable, which is not what you say with the pollable channel...
    The pollable channel does nothing by itself. It may have a queue size to throttle , that's true
    but polling mechanism is done in the PollingConsumer with its poller configuration
    And that one indeed could be configured with some custom PollerMetadata like maxMessagesPerPoll and polling interval
    If you worry about a default TaskScheduler which really initiate a polling task periodically according the trigger provided for the PollingConsumer, then you need to take a look into a:
        /**
         * Configure a {@link TaskScheduler} for those components which logic relies
         * on the scheduled tasks.
         * If not provided, falls back to the global {@code taskScheduler} bean
         * in the application context, provided by the Spring Integration infrastructure.
         * @param taskScheduler the {@link TaskScheduler} to use.
         * @since 5.1.3
         * @see #getTaskScheduler()
         */
        public void setTaskScheduler(TaskScheduler taskScheduler) {
    of that PollingConsumer
    Gaurav Rawat
    @gauravbrills
    Question ,will destroying a flow also destroy associated channels created in the flow ,specially executor channels that have an associated thread pool , will it also destroy the pool?
    Artem Bilan
    @artembilan
    @gauravbrills , no. The ExecutorChannel doesn't control the provided Executor. Therefore the pool of threads is not going to be destroyed when you destroy an ExecutorChannel.
    This is really not a bean responsibility to do something with externally injected objects into it.
    Gaurav Rawat
    @gauravbrills
    sure observed the same Thanks @artembilan .
    Gabriel Popovici
    @popovici.gabriel_gitlab
    hey folks ... wondering if there is any Spring Interation Git/Github adapter/component? very much appreciated ...:)
    i need to write a messaging flow which uploads to a Git repo/bitbucket
    thanks
    Artem Bilan
    @artembilan
    No, there is no Git channel adapters. You can use a respective API to call from the service activator. Contribution is welcome though!
    Tim Feuerbach
    @grubeninspekteur
    Hi! Are there any plans to provide an official integration of Apache Pulsar?
    Artem Bilan
    @artembilan
    No, we don't have such a plan. Contribution is welcome!
    Gaurav Rawat
    @gauravbrills
    @grubeninspekteur if its streaming try to take cues from the kinesis adapter , I did the same by extending MessageProducerSupport for a custom KCL based kinesis adapter by adapting it to SI aws kinesis adapter . Or you create abstract endpoints to init its client . We are thinking of trying it as an replacement for kinesis so if I do will put in the details
    3 replies
    Arajit Samanta
    @arajitsamanta
    When using Spring Integration Aggregator with SimpleMessageStore, I am always receiving the last message in Aggregator Post Processor. However when using external message store like REDIS it receives a List of messages. Is this something intended to happen ?
    1 reply
    Michael Olshansky
    @molshansky-zipwhip

    Hey gang. I'm tinkering with SI Kafka's listener container for the first time but what I'm seeing in the documentation here (https://docs.spring.io/spring-kafka/docs/current/reference/html/#java-dsl-configuration-2) doesn't compile:

    .configureListenerContainer(c ->
    c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
    .id("topic1ListenerContainer"))

    "c" resolves to type Object and AckMode is no longer part of AbstractMessageListenerContainer.

    What's the "correct" way to change the ackMode?

    I'm on kafka 2.3.1, si-kafka 3.2.1 (also tried 3.3.0), and spring kafka 2.3.3.

    Gary Russell
    @garyrussell
    c.getContainerProperties().setAckMode(...);
    We will fix the docs.
    Michael Olshansky
    @molshansky-zipwhip
    Thanks. Much appreciated.
    Andrew Terra
    @andrewterra

    I am attempting to make the switch from a splitter aggregator approach to a scatter-gather. The issue that I am having revolves around the headers that are populated within the split channels are not being aggregated into the gatherer's output channel. Whatever headers are created and added within the orchestration of the split messages are not included on the final aggregated message that is sent across the output-channel of the gatherer.

    The gather is given a reference to the aggregator class that I created. I created a method called aggregatedMessageWithHeaders which is meant to combine all of the payload and create a union of all of the headers from the split messages. The payload processing works fine because I can see the outbound message is a combination of all of the passed in payloads, but the headers are the same as the message that went across the input-channel instead of the union of all of the input messages of the aggregator.

    Below you can find the implementation of that method, paired down to only include how I meant to aggregate the union of the headers since that is where my problem lies

       public Message<?> aggregateMessagesWithHeaders(List<Message<?>> messages) throws AggregatorException {
          Map<String, Object> messageHeaders = new HashMap<>();
    
          for (Message<?> message : messages) {
             messageHeaders.putAll(message.getHeaders());
          }
    
          return new GenericMessage<>("<combinedPayload>Hi Gary and Artem</combinedPayload>", messageHeaders);
    
       }

    Do you guys know why the headers that are populated along the split channels are not being populated in the final output message of the gatherer?

    24 replies
    gopalsai
    @gopalsai

    Hey, are there any tutorials for Spring Integration out there in youtube/ websites? other than the spring documentation?

    thank you, any help is much appreciated.

    6 replies
    Andrew Terra
    @andrewterra
    So I figured out the scatter-gather stuff from yesterday, now I'm having an issue with the output channel of the scatter-gather being released before all of the split messages have been aggregated and sent to the output channel. I tried setting the replyTimeout of the MessagingGatewaySupport class to 60 seconds, and while that works sometimes it is not working 100% of the time. Is there another way to prevent that output channel from continuing down the orchestration?
    5 replies
    Andrew Terra
    @andrewterra

    Sorry for all of these questions, I really appreciate you taking the time to look these over. It looks like I'm still having issues with the scatter-gather though without setting the replyTimeout of the MessagingGatewaySupport.

    So I'm having success when I call setReplyTimeout(60000) to set the timeout of the MessagingGatewaySupport class, but once I remove that I am getting consistent NullPointerExceptions within my orchestration. Interestingly, in the debug logs I can see in the scenario when a NullPointerException is thrown I can see the requests going through the orchestration correctly, and can see the split messages making the requests they need to make, and the aggregator aggregating those messages, as well as the final response transformer transforming to the correct final response.

    The only difference between these runs is that I'm removing the setReplyTimeout(60000) method call. The one with that call I can successfully see the final response output to the client, but when I remove it I receive a null pointer exception, but can see the response if i look inside the logs.

    This is the setup I currently have for my scatter-gather.

    <int:splitter input-channel="vMultiThreaded_Splitter" output-channel="vMultiThreaded_scatterGather"
                     ref="vMultiThreadedSplitter" method="split" />
    
    <int:channel id="vMultiThreaded_scatterGather">
        <int:dispatcher task-executor="executor"/>
    </int:channel>
    <task:executor id="executor" pool-size="4"/>
    
    <int:scatter-gather input-channel="vMultiThreaded_scatterGather"
                           output-channel="responseChannel"
                           gather-channel="vMultiThreaded_aggregator" >
        <int:scatterer apply-sequence="true"
                         default-output-channel="vMultiThreaded_splitterRouter" />
        <int:gatherer ref="vMultiThreaded_Response_Aggregator"
                        method="aggregateMessages"
                        message-store="vMultiThreadedRemoveMessageFromStore"
                        expire-groups-upon-completion="true" />
    </int:scatter-gather>
    
    <!-- These make the outbound requests, and return their responses to the gather-channel (vMultiThreaded_aggregator)  -->
    <int:header-value-router input-channel="vMultiThreaded_splitterRouter" header-name="vMultiThreaded_splitterHeader">
        <int:mapping value="needRefund" channel="NeedRefund_RouterInput"/>
        <int:mapping value="needInfo" channel="NeedInfo_Chain_Input"/>
        <int:mapping value="needChange" channel="NeedChange_RouterInput"/>
        <int:mapping value="needAdditionalInfo" channel="needAdditionalInfo_RouterInput"/>
    </int:header-value-router>
    
    <bean id="vMultiThreadedRemoveMessageFromStore" class="org.springframework.integration.store.SimpleMessageStore" />
    
    <bean id="vMultiThreadedReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
        <property name="messageGroupStore" ref="vMultiThreadedRemoveMessageFromStore" />
        <property name="timeout" value="300000" />
    </bean>
    10 replies
    Alwyn Schoeman
    @alwyn

    Hi there,

    I am using a message gateway interface that returns a Mono. The flow uses a TCP.outboundGateway with a custom serializer/deserializer.

    My question is whether I should be using reactive constructs in my deserializer processing the inputStream? Currently I am doing readNBytes() on the inputStream. Blockhound seems to think this is ok.

    Andrew Terra
    @andrewterra

    Hey, I've been trying to debug this issue I've been having intermittently for the past couple of days. Sometimes, not always, I've been getting a runtime exception from within one of my XSLT transformers. The runtime exception is as follows

    failed to transform message; nested exception is java.lang.RuntimeException: Internal error evaluating template  at line 5 in module file:Request.xsl

    and at line 5 of that XSL file is this template

       <xsl:template match="/">

    So it looks like it's having issues matching to the base level of the message, except when I look into the logs, I can see the preSend across the channel that the xslt transformer is inside, as well as being able to see a message with a payload. That is what makes me think that I'm possibly doing something wrong within my context file, as opposed to something being wrong within the XSL file. It's happening on both request transformers, but not always
    Here is my context file below:

    <bean id="Request1Xslt" class="com.swacorp.reservationservices.xslt.XsltTemplate">
        <constructor-arg value="request1.xsl" />
    </bean>
    
    <bean id="Request2Xslt" class="com.swacorp.reservationservices.xslt.XsltTemplate">
        <constructor-arg value="request2.xsl" />
    </bean>
    
    <int:chain input-channel="gatewayInput" output-channel="response">
        <int:gateway request-channel="splitter" error-channel="errorChannel" />
    </int:chain>
    
    <int:splitter input-channel="splitter" output-channel="scatterGather"
                     ref="vMultiThreadedSplitter" method="split" />
    
    <int:channel id="scatterGather">
        <int:dispatcher task-executor="executor"/>
    </int:channel>
    <task:executor id="executor" pool-size="2"/>
    
    <int:scatter-gather input-channel="scatterGather"
                           output-channel="gathererResponse"
                           gather-channel="aggregator" >
        <int:scatterer apply-sequence="true"
                         default-output-channel="splitterRouter" />
        <int:gatherer ref="Response_Aggregator"
                        method="aggregateMessages"
                        message-store="messageStore"
                        expire-groups-upon-completion="true" />
    </int:scatter-gather>
    
    <int:header-value-router input-channel="splitterRouter" header-name="splitterHeader">
        <int:mapping value="request1" channel="request1-input"/>
        <int:mapping value="request2" channel="request2-input"/>
    </int:header-value-router>
    
    <int:chain input-channel="request1-input" output-channel="aggregator">
        <!-- error happens here -->
        <int-xml:xslt-transformer xsl-templates="Request1Xslt"/>
    
        <int:gateway request-channel="request1"  />
    </int:chain>
    
    <int:chain input-channel="request2-input" output-channel="aggregator">
        <!-- or the error happens here -->
        <int-xml:xslt-transformer xsl-templates="Request2Xslt"/>
    
        <int:gateway request-channel="request2"  />
    </int:chain>

    Has anyone else seen this problem before? Where do you think it could possibly come from, it's weird that its not happening every time.

    14 replies
    srikanth reddy beerelly
    @sri-beerelly

    Hi everyone, we are migrating our app from spring boot 1.3.8 to 2.2.9 and having issues with the spring integration which used to work perfectly fine with an older version. It's throwing an exception with BeanFactory must not be null message, when our code triggers the spring integration method from an interface with @MessagingGateway annotation. I have created a sample project using spring initialzr, then added a simple configuration class with spring integration beans and I can able to reproduce the issue. Please find the following code for reference. @artembilan Any thoughts?

    HelloController.java

    package com.springintegration.sample;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.springintegration.sample.SpringIntegrationConfig.TestGateway;
    import com.springintegration.sample.util.StringUtil;
    
    @RestController
    public class HelloController {
    
        @Autowired
        private ConfigurableApplicationContext applicationContext;
    
        @RequestMapping("/hello")
        public String index() {
    
            TestGateway testGateway = applicationContext.getBean(TestGateway.class);
            try {
                testGateway.processRequest("Spring Integration");
    
            } catch (Exception e) {
                System.err.println("Error during the hello endpoint call: "
                        + e.getMessage());
                System.err.println(StringUtil.stackTraceToString(e));
    
    
            }
            return "Greetings from Spring Boot!";
        }
    
    }

    SpringIntegrationConfig .java

    package com.springintegration.sample;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.Gateway;
    import org.springframework.integration.annotation.IntegrationComponentScan;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.annotation.Transformer;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.handler.ServiceActivatingHandler;
    import org.springframework.integration.transformer.MethodInvokingTransformer;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    import com.springintegration.sample.service.TempService;
    
    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    public class SpringIntegrationConfig {
        @MessagingGateway
        public interface TestGateway {
            @Gateway(requestChannel="testInputChannel", replyChannel="testOutputChannel")
            String processRequest(String request);
        }
    
        @Bean
        public MessageChannel testInputChannel() {
            System.out.println("In testInputChannel");
            return new DirectChannel();
        }
    
        @Bean
        public MessageChannel testOutputChannel() {
            System.out.println("In testOutputChannel");
            return new DirectChannel();
        }
    
        @Transformer(inputChannel="testInputChannel")
        @Bean
        public MessageHandler validationTransform() {
            ServiceActivatingHandler activatingHandler = new ServiceActivatingHandler(new MethodInvokingTransformer(tempService(), "testMethod"));
            activatingHandler.setOutputChannel(testOutputChannel());
            return activatingHandler;
        }
    
        @Bean
        public TempService tempService(){
            return new TempService();
        }
    }

    TempService.java

    package com.springintegration.sample.service;
    
    public class TempService {
        public void testMethod(String s) {
            System.out.println("TempService --- "+ s );
        }
    
    }
    Gary Russell
    @garyrussell

    @sri-beerelly I am not sure why it worked before (but you are jumping over many interim versions) but the transformer needs to be a @Bean

        @Transformer(inputChannel = "testInputChannel")
        @Bean
        public MessageHandler validationTransform() {
            ServiceActivatingHandler activatingHandler = new ServiceActivatingHandler(
                    transformer());
            activatingHandler.setOutputChannel(testOutputChannel());
            return activatingHandler;
        }
    
        @Bean
        public MethodInvokingTransformer transformer() {
            return new MethodInvokingTransformer(tempService(), "testMethod");
        }

    Also, the testMethod must return a reply, otherwise the gateway will hang waiting for it; you have no timeout defined.