by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 06:20

    snripa on vault-m2m

    m2m vault integration started (compare)

  • May 24 20:14
    segabriel review_requested #135
  • May 24 20:14
    segabriel synchronize #135
  • May 24 20:14

    segabriel on copy-msg-hdrs-corresp-client-trnsprt

    Update sc services version to r… Merge pull request #133 from sc… Merge branch 'develop' into cop… and 1 more (compare)

  • May 24 20:09

    segabriel on responsd-with-original-qualifier

    (compare)

  • May 24 20:09

    segabriel on develop

    Update sc services version to r… Merge pull request #133 from sc… (compare)

  • May 24 20:09
    segabriel closed #133
  • May 24 18:51
    segabriel closed #80
  • May 24 18:48
    segabriel assigned #130
  • May 24 18:41
    segabriel synchronize #135
  • May 24 18:41

    segabriel on copy-msg-hdrs-corresp-client-trnsprt

    Polishing (compare)

  • May 24 18:39
    segabriel labeled #135
  • May 24 18:39
    segabriel labeled #135
  • May 24 18:39
    segabriel assigned #135
  • May 24 18:39
    segabriel review_requested #135
  • May 24 18:38
    segabriel synchronize #135
  • May 24 18:38

    segabriel on copy-msg-hdrs-corresp-client-trnsprt

    Implement decoding rsocket head… (compare)

  • May 24 18:38
    segabriel synchronize #135
  • May 24 18:38

    segabriel on copy-msg-hdrs-corresp-client-trnsprt

    Implement decoding rsocket head… (compare)

  • May 24 18:18
    segabriel edited #135
Jeroen Nouws
@MOES-Media
@Configuration
    @AutoConfigureAfter(CodecConfiguration::class)
    @ConditionalOnClass(RSocketServiceTransport::class)
    @ConditionalOnBean(ServiceMessageCodec::class)
    class RsocketTransportConfiguration {

        @Bean
        fun rsocketServiceTransport(scalecubeProperties: ScalecubeProperties,
                                    headersCodec: HeadersCodec, dataCodecs: List<DataCodec>): ServiceTransport {
            var serviceTransport = RSocketServiceTransport()
            scalecubeProperties.cluster?.transport?.let { transport ->
                serviceTransport = serviceTransport.tcpServer {
                    TcpServer.create().runOn(it).port(transport.port)
                }

                transport.rsocket?.let { rsocket ->
                    serviceTransport = serviceTransport.numOfWorkers(rsocket.workerCount)
                }
            }
            serviceTransport = serviceTransport.dataCodecs(dataCodecs)
            serviceTransport = serviceTransport.headersCodec(headersCodec)

            return serviceTransport
        }
    }
Jeroen Nouws
@MOES-Media
also this behaviour was exactly the same when using the spring-boot-starter, I know you said it was experimental but just a heads up :)
Jeroen Nouws
@MOES-Media
ok so after reading up on how the ServiceLoader exactly works, I created my own custom JacksonCodec which looks like this
package be.moesmedia.imera.projects.gateway.configuration.codecs

import be.moesmedia.imera.projects.gateway.configuration.SpringContext
import com.fasterxml.jackson.databind.ObjectMapper
import io.scalecube.services.transport.api.DataCodec
import io.scalecube.services.transport.api.HeadersCodec
import java.io.InputStream
import java.io.OutputStream
import java.lang.reflect.Type

class CustomJacksonCodec : DataCodec, HeadersCodec {

    companion object {
        const val contentType = "application/json"
    }

    private lateinit var objectMapper: ObjectMapper

    override fun contentType(): String = contentType
    override fun encode(stream: OutputStream?, headers: MutableMap<String, String>?) = getObjectMapper().writeValue(stream, headers)

    @Suppress("UNCHECKED_CAST")
    override fun decode(stream: InputStream): MutableMap<String, String> = when (stream.available()) {
        0 -> HashMap()
        else -> (getObjectMapper().readValue(stream, HashMap::class.java)) as HashMap<String, String>
    }

    override fun encode(stream: OutputStream, value: Any) =
            getObjectMapper().writeValue(stream, value)

    override fun decode(stream: InputStream, type: Type): Any = getObjectMapper().readValue(stream, getObjectMapper().typeFactory.constructType(type))

    private fun getObjectMapper() = if (!this::objectMapper.isInitialized) {
        this.objectMapper = SpringContext.getBean(ObjectMapper::class.java)!!
        this.objectMapper
    } else {
        this.objectMapper
    }
}
SpringContext is a simple Component that implements the ApplicationContextAware interface, which holds a reference towards the springcontext from which I can load the correct objectmapper
@Component
class SpringContext : ApplicationContextAware {
    @Throws(BeansException::class)
    override fun setApplicationContext(context: ApplicationContext) {
        Companion.context = context
    }

    companion object {
        private var context: ApplicationContext? = null

        fun <T : Any?> getBean(beanClass: Class<T>): T? {
            return context?.getBean(beanClass)
        }
    }
}
Ronen
@ronenhamias
@MOES-Media thanks for the information.
i think it will be good idea to put the information to an issue and submit it to the spring support repository.
if you have interest with spring support i guess a help improving it is always welcome.
Jeroen Nouws
@MOES-Media
@ronenhamias when I'm done with the POC I'm willing to help out a bit, but currently a bit busy
I learned a lot this weekend how to configure Scalecube in a Spring application, what are the development guidelines?
felipevillar
@felipevillar
Hi @ronenhamias , is there an example somewhere on ScaleCube working with an Aeron transport? I was also curious to see ScaleCube working with something like an SBE codec (or any codec based on buffer-wrapping rather than field copying). When I looked at SBE with RSocket it seemed inefficient due to buffer-copying between the Netty ByteBufs and the Agrona buffers.
Ronen
@ronenhamias

We do it internally in our enterprise project we didnt open that to the open-source
as part of our project
http://exberry.io/

but there is a project called Aeron POC where we did all our research and reserach it self is public info

@felipevillar
there is another project
so reactor team moved reactor aeron to our hands
Ronen
@ronenhamias
and we internally have reactor cluster for our high frequency trading platform running production
based on this deep research we done around aeron
Ronen
@ronenhamias
where we contributed and maintaining the reactor project for aeron officially done by reactor team
"When I looked at SBE with RSocket it seemed inefficient due to buffer-copying between the Netty ByteBufs and the Agrona buffers"
you are very much correct there is an issue with reactor and netty.
so we have our own transport since in scalecube we are not bounded to one transport / protocol / codecs
Ronen
@ronenhamias
so we do use rsocket transport for day-to-day use but when we want this really low latency on critical path where having UDP really gives the benefit we want and where its relevant then we use Aeron Transport.
and still Reactor-Aeron gives better perfromance then Reactor-netty (you can check the benchmarks)
but its not a replacement to reactor netty since these should be used on diffrent usecases
felipevillar
@felipevillar

I see thanks @ronenhamias , that sounds interesting. Since the buffers underlying SBE codecs are mutable and shared (and typically accessed from a single thread) I was mainly interested in seeing how well such a codec would fit with the "share nothing" & reactive style of programming. I'll have a closer look at the scalecube/reactor-aeron project, maybe that will give me a better understanding.

and we internally have reactor cluster for our high frequency trading platform running production

just checking... by "reactor cluster" did you mean a ScaleCube cluster, or a real-logic/aeron-cluster?

Ronen
@ronenhamias
real-logic/aeron-cluster
scalecube provides 2 phase desrialization its very much desired that data will be mutable
@felipevillar
and yes for that transport we use single thread approch
its not a problem since on such critical path where it brings value it handles millions of messages per sec with low latnecy
ofcourse you must be careful not to block
also with Aeron you cant block so it fits very well
felipevillar
@felipevillar
I see... that does sound very cool
郝羽
@harry-hao

Hi, I met an error that I don't understand, would someone please take a look?

Thanks.

The error is:

Error receiving frame:
java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(1): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 0, widx: 1, cap: 1/1, unwrapped: PooledUnsafeDirectByteBuf(ridx: 4, widx: 427, cap: 1024))

I am using:

  • <spring-boot.version>2.2.6.RELEASE</spring-boot.version>
  • <scalecube.version>2.9.0</scalecube.version>
  • <scalecube-cluster.version>2.4.10</scalecube-cluster.version>
  • <rsocket.version>1.0.0-RC6</rsocket.version>

Code to reproduce:

@Slf4j
public class HelloScalecubeApplicationTest {

    public interface Hello {
        Mono<Void> hello();
    }

    public class HelloImpl implements Hello {

        @Override
        public Mono<Void> hello() {
            log.info("hello");
            return Mono.empty();
        }
    }

    @Test
    public void testScalecubeServices() {
        Hello hello = mock(Hello.class);

        when(hello.hello()).thenReturn(Mono.empty());

        Microservices seed = Microservices.builder()
                .discovery(ScalecubeServiceDiscovery::new)
                .transport(RSocketServiceTransport::new)
                .startAwait();

        Microservices proxy = Microservices.builder()
                .discovery(endpoint -> new ScalecubeServiceDiscovery(endpoint)
                        .membership(membership -> membership.seedMembers(seed.serviceAddress())))
                .transport(RSocketServiceTransport::new)
                .startAwait();

        Microservices stub = Microservices.builder()
                .discovery(endpoint -> new ScalecubeServiceDiscovery(endpoint)
                        .membership(membership -> membership.seedMembers(seed.serviceAddress())))
                .transport(RSocketServiceTransport::new)
                .services(new HelloImpl())
                .startAwait();

        proxy.call().api(Hello.class).hello().block();

        verify(hello, times(1)).hello();
    }

}
郝羽
@harry-hao
Oops, there's an mistake in the code snippet, I updated it and the error remains.
@Slf4j
public class HelloScalecubeApplicationTest {

    public interface Hello {
        Mono<Void> hello();
    }

    @Test
    public void testScalecubeServices() {
        Hello hello = mock(Hello.class);

        when(hello.hello()).thenReturn(Mono.empty());

        Microservices seed = Microservices.builder()
                .discovery(ScalecubeServiceDiscovery::new)
                .transport(RSocketServiceTransport::new)
                .startAwait();

        Microservices proxy = Microservices.builder()
                .discovery(endpoint -> new ScalecubeServiceDiscovery(endpoint)
                        .membership(membership -> membership.seedMembers(seed.serviceAddress())))
                .transport(RSocketServiceTransport::new)
                .startAwait();

        Microservices stub = Microservices.builder()
                .discovery(endpoint -> new ScalecubeServiceDiscovery(endpoint)
                        .membership(membership -> membership.seedMembers(seed.serviceAddress())))
                .transport(RSocketServiceTransport::new)
                .services(hello)
                .startAwait();

        proxy.call().api(Hello.class).hello().block();

        verify(hello, times(1)).hello();
    }

}
Jeroen Nouws
@MOES-Media
that's an issue with your codecs if I remember correct
if you scroll up you can see a config I made for running scalecube with spring (sidenote: this is from a POC, I don't necessarily use best practices)
the biggest caveat you have to take into account is how Scalecube instantiates it's codecs
Scalecube utilizes the ServiceLoader which uses the non-args constructor to create a new instance of it, however by doing this Scalecube instantiates it outside of the spring-context so you don't have any reference to your spring beans
that's why I created the SpringContext class which implements the ApplicationContextAware interface, when the applicationContext is initialized this class will get a reference to it and stores it in a static reference that can be used in my CustomJacksonCodec
Jeroen Nouws
@MOES-Media
Also you didn't annotate your interface with the scalecube @Service annotation and the @ServiceMethod (also from Scalecube)
郝羽
@harry-hao
I fixed the annotations problem, and I have removed spring from the hello world program.
The error still persist.
Then, I removed the seed, run scalecube-seed and use it as seed, the error is gone.
So, I think it is related to the scalecube-cluster (since it is happens while connecting to seed), but I still can't figure out why.
郝羽
@harry-hao
Ok, found the problem, it's my mistake.
I was using the service address to do cluster discovery, they talks different protocols.
For the cluster address, it should be seed.discovery().address().
Eugene Utkin
@eutkin
Hey, everybody. I think the Microservices class is too big. No idea to break it down into several separate modules, like taking Builder as a separate class?
Ronen
@ronenhamias
microservices class is all together a bootstrapping factory
the question is what the benefit of breaking it down?
the idea is that it provides a single entry point to all the sub modules
i guess its matter of style but i guess when you break something it should bring also new value
like some reuse... or some flexibility...
in such case better to point on some specific value we can gain.
Eugene Utkin
@eutkin
the module is growing in size and difficult to maintain, imho