Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dave G
    @djgraff209

    Basically here's what I've got

    .enrich( e ->
        e.requestSubFlow(sf -> 
            sf.log("subflow-before")
                .handle(
                //Get info from each VM
                // Http.outboundGateway(vCenterConfigProperties.getVm().getGetVM())
                //     .httpMethod(HttpMethod.GET)
                //     .uriVariable("vm", "headers.vm")
                //     .mappedRequestHeaders("vmware-api-session-id")
                //     .expectedResponseType(String.class)
                WebFlux.outboundGateway(vCenterConfigProperties.getVm().getGetVM(), vcenterWebClient)
                        .httpMethod(HttpMethod.GET)
                        .uriVariable("vm", "headers.vm")
                        .expectedResponseType(String.class)
            )
            .logAndReply("subflow-after")
        )
        .propertyExpression("name", "#jsonPath(payload, '$.value.name')")
        .propertyExpression("guest_OS", "#jsonPath(payload, '$.value.guest_OS')")
        .propertyExpression("bios_uuid", "#jsonPath(payload, '$.value.identity.bios_uuid')")
        .propertyExpression("mac_addresses", "#jsonPath(payload, '$.value.nics[*].value.mac_address')")
        .notPropagatedHeaders(
            "vm", "server", "transfer-encoding", "contentType", "http_statusCode"
        )
    )

    For some reason - the Subflow/outboundgateway call never replies --- it stalls out the entire processing - am I missing something here?

    Ultimately the program stalls out and errors with a message like this
    2021-08-27 17:11:59.343 WARN 52973 --- [ctor-http-nio-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has exited due to a timeout: GenericMessage ... data omitted ...
    Dave G
    @djgraff209

    I think I solved this but I'm not 100% sure how/why this is working. The overall flow (which I had not posted) looks like the following

    return f->f.handle(
                    WebFlux.outboundGateway(vSphereConfigurationProperties.getVm().getListVm(), vsphereWebClient)
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(String.class)
                )
                .channel(MessageChannels.flux())
                .transform("#jsonPath(payload, '$.value[*].vm')")
                .split()
                .enrichHeaders(eh -> eh.headerExpression("vm", "payload"))
                .handle((p,h) -> new HashMap<String, Object>())
                .enrich( e ->
                    e.requestSubFlow(
                        sf ->
                            sf.handle(
                                WebFlux.outboundGateway(vSphereConfigurationProperties.getVm().getVmDetails(), vsphereWebClient)
                                        .httpMethod(HttpMethod.GET)
                                        .uriVariable("vm", "headers.vm")
                                        .mappedResponseHeaders()
                                        .expectedResponseType(String.class)
                            )
                            .bridge()
                    )
                    .propertyExpression("name", "#jsonPath(payload, '$.value.name')")
                    .propertyExpression("guest_OS", "#jsonPath(payload, '$.value.guest_OS')")
                    .propertyExpression("bios_uuid", "#jsonPath(payload, '$.value.identity.bios_uuid')")
                    .propertyExpression("mac_addresses", "#jsonPath(payload, '$.value.nics[*].value.mac_address')")
                )
                .transform(Transformers.toJson(ResultType.STRING))
                .aggregate()
                .logAndReply();

    I welcome any explanation on this. The biggest changes I made were adding the channel(MessageChannels.flux()) after the first outboundGateway and adding the .bridge() on the requestSubFlow.

    25 replies
    Erik Haqvinsson
    @erikhaq

    Hello.

    I have an async tcp client configured like so:

    ...
    @Bean
    fun connectionFactory(): TcpClientConnectionFactorySpec =
        Tcp.netClient(properties.baseUrl, properties.port)
            .serializer(serializer)
            .deserializer(serializer)
            .leaveOpen(true)
    
    @Bean
    fun tcpOut() = integrationFlow(channels.outboundChannel()) {
        transform(Transformers.toJson(ResultType.BYTES))
        handle(Tcp.outboundAdapter(connectionFactory()))
    }
    
    @Bean
    fun tcpIn() = integrationFlow(Tcp.inboundAdapter(connectionFactory())) {
        enrichHeaders {
            headerFunction<Message<*>>(JsonHeaders.RESOLVABLE_TYPE, ::resolveTypeHeader)
        }
        transform(Transformers.fromJson())
        handle(handlerService)
    }
    ...

    It works good but I can't figure out how to dynamically handle multiple simultaneous connections.

    First, how do you initiate a new connection when sending a message through the outbound channel?
    I'm using a DirectChannel.

    Second, is there a way to send data on a specific connection?
    I'm thinking using the ip_connectionId header somehow.

    I have looked into CachingClientConnectionFactory but it seems to just cycle the connections used?

    Basically I want to keep a set of connections where I send different data depending on the contents of the payload.

    8 replies
    Alwyn Schoeman
    @alwyn

    I am looking at the tcp-client-server-multiplex sample and the default behaviour in the sample is to either throw an exception from the discard channel handler or to return a MessageTimeoutException.

    Both of these result in my original connection being closed. I can understand the reasoning, since you don't know what state the connection in.

    In my setup though, I can have many messages already sent to the server that rely on that connection to get back. So I would prefer the connection to not be closed.

    When I return a value from the discard channel handler I get a response from the gateway, but on the server side I still get the following error:

    org.springframework.messaging.MessageHandlingException: Unable to find outbound socket in the [bean 'serverFlow.ip:tcp-outbound-channel-adapter#0'

    Note. I am not using the example as is, I rewrote it in Kotlin DSL since that is what I use in my target application. I can share the code if necessary.

    51 replies
    Dave G
    @djgraff209
    I think I have a fundamental misunderstanding on how to use WebClient (via WebFlux) in an integration flow. If I replace something like Http.outboundGateway() calls using WebFlux.outboundGateway() I would be under the impression that it would work identically. What I'm seeing is otherwise. The processing of the flow seems to go off asynchronously and things "downstream" are stuck waiting for something to come back - Ultimately I get WARN messages to the effect that nothing is receiving the reply message and that the thread has timed out. There is not enough reference implementation out there that shows how to do this. At present, the effort I'm undertaking I think is going to have to shift back to using RestTemplate.
    7 replies
    kswat
    @kswat
    Hi, can I get help on how to send messages to gateway interface to test
        public MessageChannel helloRequestChannel(){
            return MessageChannels.direct().get();
        }
        @Bean
        public IntegrationFlow sayHelloFlow(){
            return IntegrationFlows.from("helloRequestChannel")
                    .transform((String s) -> s.toUpperCase())
                    .get();
        }
        @MessagingGateway
        public interface EchoGateway {
            @Gateway(requestChannel = "helloRequestChannel")
            String echo(String message);
        }
    3 replies
    kswat
    @kswat
    Hi, Is log() not like a peek? How can I continue my flow
    return IntegrationFlows.from("helloRequestChannel") .transform((String s) -> s.toUpperCase()) .log() .get();
    6 replies
    Jon Das
    @jondas_gitlab

    I can't get this to work:

        @Bean
        public IntegrationFlow inbound() {
            return IntegrationFlows.from(http())
                    .handle(message -> System.out.println(message))
                    .get();
        }
    
        private HttpRequestHandlerEndpointSpec http() {
            return Http.inboundGateway("/foo")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
                    .requestPayloadType(String.class);
        }

    When I post to /foo, the message is received by handle() but the response is http 500 internal server error.

    9 replies
    Dave G
    @djgraff209
    I have produced a small project to demonstrate the issue I've been having with WebFlux & Spring-Integration. @artembilan @garyrussell - Guys I apologize for the delay I was over thinking the problem and finally got the example put together. Any thoughts or suggestions are greatly welcome
    57 replies
    Aleksandar Vidakovic
    @aleksvidak

    I have a sample configuration like:

    @Configuration
    @IntegrationComponentScan
    public class FtpIntegrationConfig {
    
      @Bean
      public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("host");
        sf.setPort(21);
        sf.setUsername("username");
        sf.setPassword("password");
        return new CachingSessionFactory<>(sf);
      }
    
      @Bean
      public IntegrationFlow ftpOutboundFlow() {
        return IntegrationFlows.from("toFtpChannel")
            .handle(
                Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
                    .useTemporaryFileName(false)
                    .fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
                    .remoteDirectoryExpression("headers['" + FileHeaders.REMOTE_DIRECTORY + "']"))
            .get();
      }
    
      @MessagingGateway
      public interface MyGateway {
    
        @Gateway(requestChannel = "toFtpChannel")
        Mono<Void> sendToFtp(
            byte[] content,
            @Header(FileHeaders.REMOTE_DIRECTORY) String remoteDirectory,
            @Header(FileHeaders.FILENAME) String fileName);
      }
    }

    I want to use the sendToFtp in a reactive chain, something like:

    dataService
            .fetchFile()
            .flatMap(file -> myGateway.sendToFtp(file, "", "random-file-name.xml"))
            .doOnSuccess(
                (x) -> {
                  log.info("Upload success");
                })
            .subscribe()

    ..but doOnSuccess never executes (even though file is indeed sent to FTP server). Can someone point me in the right direction, not sure what I doing wrong? I am using spring-boot-starter-integration:2.5.4 and spring-integration-ftp:5.5.3.

    14 replies
    Alwyn Schoeman
    @alwyn
    Hi everyone, When adding custom metrics for say a MessageStore, is there any benefit in using the MetricsCaptor abstraction over directly using the MeterRegistry?
    2 replies
    Jaime Andrés Boyacá Silva
    @jandresboyaca
    Hi everyone, I have a doubt about Message Group. Is it created when arriving at the first message?
    In other words, the message group time starts after the first message arrives and it couldn't handle a timeout since the flow starts, right?
    18 replies
    neelagiri003
    @neelagiri003
    Hi, can anyone please explain with an example how the invoke() in RemoteFileTemplate works?
    47 replies
    MithilaKrishna
    @MithilaKrishna
    Hi, Can anyone provide an example for the get method of sftpremotefiletemplate that can be used to retrieve file from remote directory?
    3 replies
    Jaime Andrés Boyacá Silva
    @jandresboyaca
    Hi Everyone, Can anyone help me with this splitter?
    8 replies
    gopalsai
    @gopalsai
    image.png

    Hi Everyone,
    can someone please help me with this?

    I am trying to upgrade Spring boot from 1.2.4 release to 1.5.22 release, we are using Spring integration …so I upgraded spring integration from 4.1.x to 4.3.21

    and I am getting this error, while trying to bootRun
    attaching the screenshot….
    and using these annotation in my main.java

    5 replies
    image.png
    gopalsai
    @gopalsai
    image.png
    image.png
    5 replies
    Dave G
    @djgraff209

    Running into a vexing problem with Spring Integration under Spring Boot 2.5.6. I'm using WebClient to retrieve rest payload that is somehow exceeding a max buffer size of 262144. I have configured the client using a CodecCustomizer

    @Bean
    public CodeCustomizer webClientCodecCustomizer() {
        return (configurer) -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024);
    }

    In my integration flow I'm using WebFlux.outboundGateway(uri, webClient).....

    I am at a loss as to why it is still sticking with the default setting.

    50 replies
    Felipe Rotilho
    @rotilho
    This message was deleted
    Felipe Rotilho
    @rotilho

    Hello! I'm quite new using Spring Integration and I'm not sure if I misusing it or I'm just missing something.

    I have an application that has a inbound and outbound flux message channels. The messages from inbound channel are currently consumed by different services and they have multiple types (i.e.: Message<HandshakeChallenge> and Message<HandshakeAnswer>)

    It seems @ServiceActivator try to deliver all those messages even if the method signature doesn't matches so in the end I have exceptions like this one: EL1004E: Method call: Method process(org.project.HandshakeChallenge) cannot be found on type com.project.PeerManager

    Does anyone have an idea how I can consume events respecting the method signature? Something similar to @EventListener from Spring Core

    17 replies
    Mario
    @mmaryo
    Hello
    I'm looking for scheduler and orchestrator for job/lambda that are written in other language
    I see Spring Integration and Spring Cloud Data Flow, not sure to understand the difference
    What I need is a system that
    • write a message in queue periodically. Config of tasks are written in database and can be change dynamically. For example, Task A with 0 0 0/6 1/1 ? cron. Just need Spring to write a message in a queue.
    • able to schedule Task B (by send a message to a queue) when Task A is finish successfully
    • if possible an admin dashboard to follow what happen
      Is Spring the good tool for that ?
    6 replies
    Ghost
    @ghost~5f25c515d73408ce4feb1da3
    Hello. can I have example of Spring integration with webflux and kafka ?
    26 replies
    Attoumane
    @akuma8
    Hi there,
    A question:
    Why the spring-integration-zip artifact is not included in the spring-integration-bom. I just added the dependency in a project and was estonished to see that I have to manage its version myself.
    7 replies
    Attoumane
    @akuma8
    Hi @artembilan,
    Is there a simple way to tell to the FileReadingMessageSource to not read a file when it is not ready. Exemple when another process is writing in this file?
    8 replies
    Attoumane
    @akuma8

    Hi integrators :),
    A question about this configuration:

        @Bean
        public IntegrationFlow readingZipFilesFlow(@Value("${resource.local.unzipped-dir}") String dirName) {
            UnZipTransformer unZipTransformer = new UnZipTransformer();
            unZipTransformer.setExpectSingleResult(false);
            unZipTransformer.setZipResultType(ZipResultType.FILE);
            unZipTransformer.setDeleteFiles(true);
            unZipTransformer.setWorkDirectory(uncompressedDir);
    
            return IntegrationFlows
                    .from(Files.inboundAdapter(this.fileService.getZipDirDestination().toFile())
                                    .filterFunction(file -> !file.isDirectory() && this.isZipFile(file))
                                    .preventDuplicates(false),
                            endpointConfigurer -> endpointConfigurer.poller(Pollers
                                    .fixedDelay(500)
                                    .maxMessagesPerPoll(1L)))
                    .transform(unZipTransformer)
                    .handle(this.fileService, "uploadFiles") // method uploadFiles is annotated with @Async
                    .get();
        }

    I would like to know how that flow could be run by multiple threads ?
    I can potentially have 30k zip files to unzip and upload their content to a remote server so I would like to involve multiple with each one executing its own readingZipFilesFlow.
    For testing purpose I tried this configuration:

    spring:
      task:
        scheduling:
          pool:
            size: 5 # I would like `readingZipFilesFlow` to be executed 5 times by 5 different threads
        execution:
          pool:
            core-size: 15 # this configuration is for @Async methods
            max-size: 30

    I would like readingZipFilesFlow to be executed by 5 different threads, meaning that having 5 threads polling the same directory at the same time (with each one reading only 1 file).
    Is it possible?
    Thank you

    18 replies
    David Lawlor
    @David-Lawlor
    Hi, is it not recommended to use .get() when creating gateways? I ran into an issue of calling .get(), which causes the uriVariable() that I had added when creating the gateway to be lost
    12 replies
    Dave G
    @djgraff209

    I'm trying to reconcile a syntax thing with the way that Spring Integration flows are constructed
    using the Java DSL.

    In many cases I saw earlier on the typical pattern was:

    @Bean
    public IntegrationFlow someFlow() {
        return IntegrationFlows.from("input")
                                .filter("world"::equals)
                                .transform("Hello "::concat)
                                .handle(System.out::println)
                                .get();
    }

    In other cases I've seen, more or less the above but using a Lambda syntax I haven't fully understood but
    have been using extensively.

    @Bean
    public IntegrationFlow someFlow() {
        return f -> f.filter("world"::equals)
                        .transform("Hello "::concat)
                        .handle(System.out::println);
    }

    More or less I understand the two syntaxes, and understand that in both cases there will be a channel
    created with a name like someFlow.input.

    What I cannot reconcile is when I have to do something with an inbound adapter. Using the lambda syntax,
    how would that be "connected" to the flow.

    Most examples of using things like the Udp.inboundAdapter have to use the first style with an
    IntegrationFlows.from() and targeting someFlow by adding a .channel("someFlow.input").

    Thanks in advance!

    5 replies
    dgutierrez-stratio
    @dgutierrez-stratio

    Hi everyone, I've noticed a weird behaviour on the aggregator that I'm using. From time to time, some messages get stuck in INT_MESSAGE and INT_GROUP_TO_MESSAGE tables, never to be released. I've tested different configurations on my aggregator, which curretly looks like this

      private IntegrationFlow aggregatedEventHandlerFlow(String sourceChannel, GenericHandler<?> handler,
          MessageGroupStore messageGroupStore, PlatformTransactionManager txManager, long timeout, int messageCount) {
        return IntegrationFlows.from(sourceChannel)
            .aggregate(a -> a.correlationStrategy(message -> findCorrelationStrategy(message.getPayload()))
                .messageStore(messageGroupStore)
                .releaseStrategy(new MessageCountReleaseStrategy(messageCount))
                .groupTimeout(timeout)
                .sendPartialResultOnExpiry(true)
                .expireGroupsUponTimeout(true)
                .expireGroupsUponCompletion(true)
                .forceReleaseAdvice(transactionInterceptor(txManager)))
            .handle(handler, GenericEndpointSpec::transactional)
            .routeToRecipients(new ErrorPayloadRouterSpecConsumer(Seq(), ERROR_PAYLOAD_CHANNEL_NAME,
                IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME))
            .get();
      }

    I don't know what I'm missing... do I need to specify a MessageGroupStoreReaper configuration?

    28 replies
    Attoumane
    @akuma8

    Hi integrators :smile: ,
    I have a strange behavior when using the UnZipTransformer class.
    I keep getting this Not a zip file exception:

    Caused by: org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.lang.IllegalStateException: Not a zip file: C:\Test\test.zip
        at org.springframework.integration.zip.transformer.UnZipTransformer.doZipTransform(UnZipTransformer.java:196) ~[spring-integration-zip-2.0.0.jar:na]
        at org.springframework.integration.zip.transformer.AbstractZipTransformer.doTransform(AbstractZipTransformer.java:105) ~[spring-integration-zip-2.0.0.jar:na]
        at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33) ~[spring-integration-core-5.5.8.jar:5.5.8]
        ... 22 common frames omitted
    Caused by: java.lang.IllegalStateException: Not a zip file: C:\Test\test.zip
        at org.springframework.integration.zip.transformer.UnZipTransformer.doZipTransform(UnZipTransformer.java:85) ~[spring-integration-zip-2.0.0.jar:na]
        ... 24 common frames omitted

    Whereas the file is a real zip.
    I can inspect the content of that test.zip file with any zip explorer like the zipinfo command.
    What could cause that error?

    30 replies
    Ayarroumi
    @Ayarroumi

    Hi Integrators,

    Is there any news on when Spring Integration 6.0.0 will be released, and how long this will be supported for both OSS support and Commercial support?

    1 reply
    Lazar Nevyanov
    @nevyanov_twitter

    Hello all :)

    Does anybody know if there is appropriate way to unit test specific IntegrationFlow? Are there any examples I can use?

    6 replies
    Dominic Stew
    @migroskub
    Hey, I want to understand a little more about a question that has been asked lately on SO. I wonder if .gateway() would be appropriate in this case of not, and why?
    47 replies
    Dominic Stew
    @migroskub
    image.png

    This is what happens to the IDE when I try to follow the suggestions. I managed to fix this by doing

    .publishSubscribeChannel(s -> s
                    .subscribe(f -> f
                        .handle(bulkWriteToCockroach()))
                    .subscribe(f -> f
                        .handle(bulkWriteToPulsar()))
                    .get().setIgnoreFailures(false));

    That's because setIgnoreFailures doesn't return this. Anyway that is pretty confusing. I think a docs addition to this kind of thing can be helpful. But the most problematic part from my point of view is that failure at the first subscription didn't failed the next subscription

    2 replies
    Really stuck
    5 replies
    In this part
    Kindly help
    Attoumane
    @akuma8

    Hi integrators,
    I have a strange behavior with @JmsListener and DefaultJmsListenerContainerFactory
    I have 2 methods in 2 different classes anotated with @JmsListener, something like:

    @Component
    public class FirstBusAdapter {
        @JmsListener(
                destination = UPDATE_DESTINATION_NAME,
                subscription = SUBSCRIPTION_NAME,
                containerFactory = "customJmsListenerContainerFactory")
        public void update(@Payload Json json, @Headers Map<String, Object> headers) {
    
        }
    }
    
    @Component
    public class SecondBusAdapter {
        @JmsListener(
                destination = CREATE_DESTINATION_NAME,
                subscription = SUBSCRIPTION_NAME,
                containerFactory = "customJmsListenerContainerFactory")
        public void create(@Payload Product product, @Headers Map<String, Object> headers) {
    
        }
    }

    The customJmsListenerContainerFactory is defined in a configuration class:

    @Configuration
    public class ServiceBusConfiguration {
    
        @Bean
        public DefaultJmsListenerContainerFactory customJmsListenerContainerFactory(ConnectionFactory connectionFactory, CustomMessageConverter messageConverter, CustomErrorHandler errorHandler) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setErrorHandler(errorHandler);
            factory.setMessageConverter(messageConverter);
            factory.setConnectionFactory(connectionFactory);
            factory.setSubscriptionDurable(Boolean.TRUE);
            return factory;
        }
    }

    And CustomMessageConverter is defined like:

    @Component
    public class CustomMessageConverter extends SimpleMessageConverter {
         ....
    }

    As we can see I use the same customJmsListenerContainerFactory in both @JmsListener but in one case the registered CustomMessageConverter is invoked and not in the other case.
    When a message arrives in the UPDATE_DESTINATION_NAME destination, the message converter is invoked but not in the CREATE_DESTINATION_NAME.
    Is it a normal behavior?
    I tried defining another DefaultJmsListenerContainerFactory bean with another SimpleMessageConverter and everything works. It is like we can't share the same MessageConverter.

    Alwyn Schoeman
    @alwyn

    I have the following Kotlin DSL in my configuration:

        @Bean
        fun jmsOutboundFlow(jmsConnectionFactory: ConnectionFactory) =
            integrationFlow {
                handle(
                    Jms.outboundAdapter(jmsConnectionFactory)
                        .apply {
                            destination(properties.mq.outgoingQueue)
                            channel(mqRequestChannel())
                        }
                )
            }

    mqRequestChannel() just returns a DirectChannel.

    This code compiled fine using Spring Boot 2.5.8 which depends on Spring Integration Core 5.5.7.

    Going to Spring Boot 2.5.9 and Spring Integration Core 5.5.8, I am getting an error on the channel(mqRequestChannel()) stating fun channel(messageChannel: MessageChannel): Unit' can't be called in this context by implicit receiver. Use the explicit one if necessary

    I'm still tracking down the changes to try and figure out what the cause is, but was hoping maybe this is a known issue?

    35 replies
    Attoumane
    @akuma8
    Hi integrators,
    I have a dumb question with this integration flow (see comments on the flow):
    @Bean
        public IntegrationFlow readingZipFilesFlow(@Value("${resource.local.unzipped-dir}") String dirName) {
            UnZipTransformer unZipTransformer = new UnZipTransformer();
            unZipTransformer.setExpectSingleResult(false);
            unZipTransformer.setZipResultType(ZipResultType.FILE);
            unZipTransformer.setDeleteFiles(true);
            unZipTransformer.setWorkDirectory(uncompressedDir);
    
            return IntegrationFlows
                    .from(Files.inboundAdapter(this.fileService.getZipDirDestination().toFile())
                                    .filterFunction(file -> !file.isDirectory() && this.isZipFile(file))
                                    .preventDuplicates(false),
                            endpointConfigurer -> endpointConfigurer.poller(Pollers
                                    .fixedDelay(500)
                                    .maxMessagesPerPoll(1L)))
                    .transform(unZipTransformer)
                    .transform(..... )//How to get the Message here? not the payload
                    .route(....) // Same thing here
                    .handle(this.fileService, "uploadFiles") 
                    .get();
        }
    8 replies
    sfgvieira
    @sfgvieira

    Hi all, I am doing a small poc with rsocket: on my server I set up a method with @ConnectMapping("poc-services") where I read the setup payload and store the requester in a map, and on my client I created a bean as

    @Bean
      public RSocketRequester getRSocketRequester() {
        return RSocketRequester.builder()
            .rsocketConnector(connector -> {
              connector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)));
              connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
            })
            .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
            .setupRoute("poc-services")
            .setupData("test")
            .tcp("localhost", 7000);
      }

    When I run my client I expected it to connect to the server and the method annotated with @ConnectMapping to be triggered, but that is not the case. I then noticed that the RSocketRequester.builder() also provides an alternative as connectTcp(String host, int port) which works as I expected however it is deprecated. So I would like to ask what is the proper way of doing my setup :)

    4 replies
    dgutierrez-stratio
    @dgutierrez-stratio
    Hi everyone. I am writting a job orchestrator, and one of the orchestrated tasks must be able to listen on a specific channel (kafka topic) until a message is received. Messaging is written using Spring Integration, and I should be able to dynamically listen on new channels (backed up in kafka for this particular case), so that when the task is executed a new flow is instanciated (I can do that with IntegrationFlowContext), and a new channel, chosen by the user, should also be created. Is there any tool similar to IntegrationFlowContext for dynamic channel creation? Thanks in advace!
    16 replies
    Samphea
    @SOMPHEAR1
    Hello everyone, How to handle exceptions when a session disconnect in the FTP server with spring integration?
    9 replies
    ankitJavaMadeSoEasy
    @ankitJavaMadeSoEasy
    After looking at the logs we found that the exception occurred in
    "UPDATE INT_MESSAGE_GROUP set UPDATED_DATE=? where GROUP_KEY=? and REGION=?"
    But the error code thrown was '0', so spring integration class couldn't catch the proper exception and the exception was propagated to the top of hierarchy and got catched in super class Exception.
    So, we suspect that the issue was either in spring integration flow that it wasn't able to catch the uncategorized error or was it the PostgresDB which was culprit in throwing no error code.
    It will be highly appreciable if I can get some help here. Please tell if anything else is required from my side.
    I am attaching the detailed log captured in our code.
    1 reply
    ankitJavaMadeSoEasy
    @ankitJavaMadeSoEasy
    LOGS :