Basically here's what I've got
.enrich( e ->
e.requestSubFlow(sf ->
sf.log("subflow-before")
.handle(
//Get info from each VM
// Http.outboundGateway(vCenterConfigProperties.getVm().getGetVM())
// .httpMethod(HttpMethod.GET)
// .uriVariable("vm", "headers.vm")
// .mappedRequestHeaders("vmware-api-session-id")
// .expectedResponseType(String.class)
WebFlux.outboundGateway(vCenterConfigProperties.getVm().getGetVM(), vcenterWebClient)
.httpMethod(HttpMethod.GET)
.uriVariable("vm", "headers.vm")
.expectedResponseType(String.class)
)
.logAndReply("subflow-after")
)
.propertyExpression("name", "#jsonPath(payload, '$.value.name')")
.propertyExpression("guest_OS", "#jsonPath(payload, '$.value.guest_OS')")
.propertyExpression("bios_uuid", "#jsonPath(payload, '$.value.identity.bios_uuid')")
.propertyExpression("mac_addresses", "#jsonPath(payload, '$.value.nics[*].value.mac_address')")
.notPropagatedHeaders(
"vm", "server", "transfer-encoding", "contentType", "http_statusCode"
)
)
For some reason - the Subflow/outboundgateway call never replies --- it stalls out the entire processing - am I missing something here?
2021-08-27 17:11:59.343 WARN 52973 --- [ctor-http-nio-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has exited due to a timeout: GenericMessage ... data omitted ...
I think I solved this but I'm not 100% sure how/why this is working. The overall flow (which I had not posted) looks like the following
return f->f.handle(
WebFlux.outboundGateway(vSphereConfigurationProperties.getVm().getListVm(), vsphereWebClient)
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
)
.channel(MessageChannels.flux())
.transform("#jsonPath(payload, '$.value[*].vm')")
.split()
.enrichHeaders(eh -> eh.headerExpression("vm", "payload"))
.handle((p,h) -> new HashMap<String, Object>())
.enrich( e ->
e.requestSubFlow(
sf ->
sf.handle(
WebFlux.outboundGateway(vSphereConfigurationProperties.getVm().getVmDetails(), vsphereWebClient)
.httpMethod(HttpMethod.GET)
.uriVariable("vm", "headers.vm")
.mappedResponseHeaders()
.expectedResponseType(String.class)
)
.bridge()
)
.propertyExpression("name", "#jsonPath(payload, '$.value.name')")
.propertyExpression("guest_OS", "#jsonPath(payload, '$.value.guest_OS')")
.propertyExpression("bios_uuid", "#jsonPath(payload, '$.value.identity.bios_uuid')")
.propertyExpression("mac_addresses", "#jsonPath(payload, '$.value.nics[*].value.mac_address')")
)
.transform(Transformers.toJson(ResultType.STRING))
.aggregate()
.logAndReply();
I welcome any explanation on this. The biggest changes I made were adding the channel(MessageChannels.flux())
after the first outboundGateway
and adding the .bridge()
on the requestSubFlow
.
Hello.
I have an async tcp client configured like so:
...
@Bean
fun connectionFactory(): TcpClientConnectionFactorySpec =
Tcp.netClient(properties.baseUrl, properties.port)
.serializer(serializer)
.deserializer(serializer)
.leaveOpen(true)
@Bean
fun tcpOut() = integrationFlow(channels.outboundChannel()) {
transform(Transformers.toJson(ResultType.BYTES))
handle(Tcp.outboundAdapter(connectionFactory()))
}
@Bean
fun tcpIn() = integrationFlow(Tcp.inboundAdapter(connectionFactory())) {
enrichHeaders {
headerFunction<Message<*>>(JsonHeaders.RESOLVABLE_TYPE, ::resolveTypeHeader)
}
transform(Transformers.fromJson())
handle(handlerService)
}
...
It works good but I can't figure out how to dynamically handle multiple simultaneous connections.
First, how do you initiate a new connection when sending a message through the outbound channel?
I'm using a DirectChannel
.
Second, is there a way to send data on a specific connection?
I'm thinking using the ip_connectionId
header somehow.
I have looked into CachingClientConnectionFactory
but it seems to just cycle the connections used?
Basically I want to keep a set of connections where I send different data depending on the contents of the payload.
I am looking at the tcp-client-server-multiplex
sample and the default behaviour in the sample is to either throw an exception from the discard channel handler or to return a MessageTimeoutException
.
Both of these result in my original connection being closed. I can understand the reasoning, since you don't know what state the connection in.
In my setup though, I can have many messages already sent to the server that rely on that connection to get back. So I would prefer the connection to not be closed.
When I return a value from the discard channel handler I get a response from the gateway, but on the server side I still get the following error:
org.springframework.messaging.MessageHandlingException: Unable to find outbound socket in the [bean 'serverFlow.ip:tcp-outbound-channel-adapter#0'
Note. I am not using the example as is, I rewrote it in Kotlin DSL since that is what I use in my target application. I can share the code if necessary.
Http.outboundGateway()
calls using WebFlux.outboundGateway()
I would be under the impression that it would work identically. What I'm seeing is otherwise. The processing of the flow seems to go off asynchronously and things "downstream" are stuck waiting for something to come back - Ultimately I get WARN
messages to the effect that nothing is receiving the reply message and that the thread has timed out. There is not enough reference implementation out there that shows how to do this. At present, the effort I'm undertaking I think is going to have to shift back to using RestTemplate
.
public MessageChannel helloRequestChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow sayHelloFlow(){
return IntegrationFlows.from("helloRequestChannel")
.transform((String s) -> s.toUpperCase())
.get();
}
@MessagingGateway
public interface EchoGateway {
@Gateway(requestChannel = "helloRequestChannel")
String echo(String message);
}
I can't get this to work:
@Bean
public IntegrationFlow inbound() {
return IntegrationFlows.from(http())
.handle(message -> System.out.println(message))
.get();
}
private HttpRequestHandlerEndpointSpec http() {
return Http.inboundGateway("/foo")
.requestMapping(mapping -> mapping.methods(HttpMethod.POST))
.requestPayloadType(String.class);
}
When I post to /foo, the message is received by handle()
but the response is http 500 internal server error.
I have a sample configuration like:
@Configuration
@IntegrationComponentScan
public class FtpIntegrationConfig {
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("host");
sf.setPort(21);
sf.setUsername("username");
sf.setPassword("password");
return new CachingSessionFactory<>(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlows.from("toFtpChannel")
.handle(
Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectoryExpression("headers['" + FileHeaders.REMOTE_DIRECTORY + "']"))
.get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
Mono<Void> sendToFtp(
byte[] content,
@Header(FileHeaders.REMOTE_DIRECTORY) String remoteDirectory,
@Header(FileHeaders.FILENAME) String fileName);
}
}
I want to use the sendToFtp in a reactive chain, something like:
dataService
.fetchFile()
.flatMap(file -> myGateway.sendToFtp(file, "", "random-file-name.xml"))
.doOnSuccess(
(x) -> {
log.info("Upload success");
})
.subscribe()
..but doOnSuccess never executes (even though file is indeed sent to FTP server). Can someone point me in the right direction, not sure what I doing wrong? I am using spring-boot-starter-integration:2.5.4 and spring-integration-ftp:5.5.3.
Hi Everyone,
can someone please help me with this?
I am trying to upgrade Spring boot from 1.2.4 release to 1.5.22 release, we are using Spring integration …so I upgraded spring integration from 4.1.x to 4.3.21
and I am getting this error, while trying to bootRun
attaching the screenshot….
and using these annotation in my main.java
Running into a vexing problem with Spring Integration under Spring Boot 2.5.6. I'm using WebClient to retrieve rest payload that is somehow exceeding a max buffer size of 262144. I have configured the client using a CodecCustomizer
@Bean
public CodeCustomizer webClientCodecCustomizer() {
return (configurer) -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024);
}
In my integration flow I'm using WebFlux.outboundGateway(uri, webClient)....
.
I am at a loss as to why it is still sticking with the default setting.
Hello! I'm quite new using Spring Integration and I'm not sure if I misusing it or I'm just missing something.
I have an application that has a inbound and outbound flux message channels. The messages from inbound channel are currently consumed by different services and they have multiple types (i.e.: Message<HandshakeChallenge> and Message<HandshakeAnswer>)
It seems @ServiceActivator
try to deliver all those messages even if the method signature doesn't matches so in the end I have exceptions like this one: EL1004E: Method call: Method process(org.project.HandshakeChallenge) cannot be found on type com.project.PeerManager
Does anyone have an idea how I can consume events respecting the method signature? Something similar to @EventListener
from Spring Core
Hi integrators :),
A question about this configuration:
@Bean
public IntegrationFlow readingZipFilesFlow(@Value("${resource.local.unzipped-dir}") String dirName) {
UnZipTransformer unZipTransformer = new UnZipTransformer();
unZipTransformer.setExpectSingleResult(false);
unZipTransformer.setZipResultType(ZipResultType.FILE);
unZipTransformer.setDeleteFiles(true);
unZipTransformer.setWorkDirectory(uncompressedDir);
return IntegrationFlows
.from(Files.inboundAdapter(this.fileService.getZipDirDestination().toFile())
.filterFunction(file -> !file.isDirectory() && this.isZipFile(file))
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(Pollers
.fixedDelay(500)
.maxMessagesPerPoll(1L)))
.transform(unZipTransformer)
.handle(this.fileService, "uploadFiles") // method uploadFiles is annotated with @Async
.get();
}
I would like to know how that flow could be run by multiple threads ?
I can potentially have 30k zip files to unzip and upload their content to a remote server so I would like to involve multiple with each one executing its own readingZipFilesFlow
.
For testing purpose I tried this configuration:
spring:
task:
scheduling:
pool:
size: 5 # I would like `readingZipFilesFlow` to be executed 5 times by 5 different threads
execution:
pool:
core-size: 15 # this configuration is for @Async methods
max-size: 30
I would like readingZipFilesFlow
to be executed by 5 different threads, meaning that having 5 threads polling the same directory at the same time (with each one reading only 1 file).
Is it possible?
Thank you
I'm trying to reconcile a syntax thing with the way that Spring Integration flows are constructed
using the Java DSL.
In many cases I saw earlier on the typical pattern was:
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows.from("input")
.filter("world"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
In other cases I've seen, more or less the above but using a Lambda syntax I haven't fully understood but
have been using extensively.
@Bean
public IntegrationFlow someFlow() {
return f -> f.filter("world"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
More or less I understand the two syntaxes, and understand that in both cases there will be a channel
created with a name like someFlow.input
.
What I cannot reconcile is when I have to do something with an inbound adapter. Using the lambda syntax,
how would that be "connected" to the flow.
Most examples of using things like the Udp.inboundAdapter
have to use the first style with anIntegrationFlows.from()
and targeting someFlow
by adding a .channel("someFlow.input")
.
Thanks in advance!
Hi everyone, I've noticed a weird behaviour on the aggregator that I'm using. From time to time, some messages get stuck in INT_MESSAGE and INT_GROUP_TO_MESSAGE tables, never to be released. I've tested different configurations on my aggregator, which curretly looks like this
private IntegrationFlow aggregatedEventHandlerFlow(String sourceChannel, GenericHandler<?> handler,
MessageGroupStore messageGroupStore, PlatformTransactionManager txManager, long timeout, int messageCount) {
return IntegrationFlows.from(sourceChannel)
.aggregate(a -> a.correlationStrategy(message -> findCorrelationStrategy(message.getPayload()))
.messageStore(messageGroupStore)
.releaseStrategy(new MessageCountReleaseStrategy(messageCount))
.groupTimeout(timeout)
.sendPartialResultOnExpiry(true)
.expireGroupsUponTimeout(true)
.expireGroupsUponCompletion(true)
.forceReleaseAdvice(transactionInterceptor(txManager)))
.handle(handler, GenericEndpointSpec::transactional)
.routeToRecipients(new ErrorPayloadRouterSpecConsumer(Seq(), ERROR_PAYLOAD_CHANNEL_NAME,
IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME))
.get();
}
I don't know what I'm missing... do I need to specify a MessageGroupStoreReaper configuration?
Hi integrators :smile: ,
I have a strange behavior when using the UnZipTransformer
class.
I keep getting this Not a zip file
exception:
Caused by: org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.lang.IllegalStateException: Not a zip file: C:\Test\test.zip
at org.springframework.integration.zip.transformer.UnZipTransformer.doZipTransform(UnZipTransformer.java:196) ~[spring-integration-zip-2.0.0.jar:na]
at org.springframework.integration.zip.transformer.AbstractZipTransformer.doTransform(AbstractZipTransformer.java:105) ~[spring-integration-zip-2.0.0.jar:na]
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33) ~[spring-integration-core-5.5.8.jar:5.5.8]
... 22 common frames omitted
Caused by: java.lang.IllegalStateException: Not a zip file: C:\Test\test.zip
at org.springframework.integration.zip.transformer.UnZipTransformer.doZipTransform(UnZipTransformer.java:85) ~[spring-integration-zip-2.0.0.jar:na]
... 24 common frames omitted
Whereas the file is a real zip.
I can inspect the content of that test.zip
file with any zip explorer like the zipinfo
command.
What could cause that error?
.gateway()
would be appropriate in this case of not, and why?
This is what happens to the IDE when I try to follow the suggestions. I managed to fix this by doing
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.handle(bulkWriteToCockroach()))
.subscribe(f -> f
.handle(bulkWriteToPulsar()))
.get().setIgnoreFailures(false));
That's because setIgnoreFailures
doesn't return this
. Anyway that is pretty confusing. I think a docs addition to this kind of thing can be helpful. But the most problematic part from my point of view is that failure at the first subscription didn't failed the next subscription
Hi integrators,
I have a strange behavior with @JmsListener
and DefaultJmsListenerContainerFactory
I have 2 methods in 2 different classes anotated with @JmsListener
, something like:
@Component
public class FirstBusAdapter {
@JmsListener(
destination = UPDATE_DESTINATION_NAME,
subscription = SUBSCRIPTION_NAME,
containerFactory = "customJmsListenerContainerFactory")
public void update(@Payload Json json, @Headers Map<String, Object> headers) {
}
}
@Component
public class SecondBusAdapter {
@JmsListener(
destination = CREATE_DESTINATION_NAME,
subscription = SUBSCRIPTION_NAME,
containerFactory = "customJmsListenerContainerFactory")
public void create(@Payload Product product, @Headers Map<String, Object> headers) {
}
}
The customJmsListenerContainerFactory
is defined in a configuration class:
@Configuration
public class ServiceBusConfiguration {
@Bean
public DefaultJmsListenerContainerFactory customJmsListenerContainerFactory(ConnectionFactory connectionFactory, CustomMessageConverter messageConverter, CustomErrorHandler errorHandler) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(errorHandler);
factory.setMessageConverter(messageConverter);
factory.setConnectionFactory(connectionFactory);
factory.setSubscriptionDurable(Boolean.TRUE);
return factory;
}
}
And CustomMessageConverter
is defined like:
@Component
public class CustomMessageConverter extends SimpleMessageConverter {
....
}
As we can see I use the same customJmsListenerContainerFactory
in both @JmsListener
but in one case the registered CustomMessageConverter
is invoked and not in the other case.
When a message arrives in the UPDATE_DESTINATION_NAME
destination, the message converter is invoked but not in the CREATE_DESTINATION_NAME
.
Is it a normal behavior?
I tried defining another DefaultJmsListenerContainerFactory
bean with another SimpleMessageConverter
and everything works. It is like we can't share the same MessageConverter.
I have the following Kotlin DSL in my configuration:
@Bean
fun jmsOutboundFlow(jmsConnectionFactory: ConnectionFactory) =
integrationFlow {
handle(
Jms.outboundAdapter(jmsConnectionFactory)
.apply {
destination(properties.mq.outgoingQueue)
channel(mqRequestChannel())
}
)
}
mqRequestChannel() just returns a DirectChannel.
This code compiled fine using Spring Boot 2.5.8 which depends on Spring Integration Core 5.5.7.
Going to Spring Boot 2.5.9 and Spring Integration Core 5.5.8, I am getting an error on the channel(mqRequestChannel())
stating fun channel(messageChannel: MessageChannel): Unit' can't be called in this context by implicit receiver. Use the explicit one if necessary
I'm still tracking down the changes to try and figure out what the cause is, but was hoping maybe this is a known issue?
@Bean
public IntegrationFlow readingZipFilesFlow(@Value("${resource.local.unzipped-dir}") String dirName) {
UnZipTransformer unZipTransformer = new UnZipTransformer();
unZipTransformer.setExpectSingleResult(false);
unZipTransformer.setZipResultType(ZipResultType.FILE);
unZipTransformer.setDeleteFiles(true);
unZipTransformer.setWorkDirectory(uncompressedDir);
return IntegrationFlows
.from(Files.inboundAdapter(this.fileService.getZipDirDestination().toFile())
.filterFunction(file -> !file.isDirectory() && this.isZipFile(file))
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(Pollers
.fixedDelay(500)
.maxMessagesPerPoll(1L)))
.transform(unZipTransformer)
.transform(..... )//How to get the Message here? not the payload
.route(....) // Same thing here
.handle(this.fileService, "uploadFiles")
.get();
}
Hi all, I am doing a small poc with rsocket: on my server I set up a method with @ConnectMapping("poc-services") where I read the setup payload and store the requester in a map, and on my client I created a bean as
@Bean
public RSocketRequester getRSocketRequester() {
return RSocketRequester.builder()
.rsocketConnector(connector -> {
connector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)));
connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
})
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.setupRoute("poc-services")
.setupData("test")
.tcp("localhost", 7000);
}
When I run my client I expected it to connect to the server and the method annotated with @ConnectMapping to be triggered, but that is not the case. I then noticed that the RSocketRequester.builder() also provides an alternative as connectTcp(String host, int port) which works as I expected however it is deprecated. So I would like to ask what is the proper way of doing my setup :)