Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Jan Dolling
    @JDolling_twitter
    If I start "kafka-console-consumer.sh" with "--from-beginning" and "--topic dbserver1.inventory.customers" I see the records.
    My current configuration:
    mp.messaging.incoming.customers.connector=smallrye-kafka
    mp.messaging.incoming.customers.topic=dbserver1.inventory.customers
    mp.messaging.incoming.customers.group.id=demo1
    mp.messaging.incoming.customers.client.id=demo1
    mp.messaging.incoming.customers.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
    Clement Escoffier
    @cescoffier
    This creates a random uuid like the connector
    Remove the groupId and clientId completely
    Jan Dolling
    @JDolling_twitter
    This has no effect to the offset. besides setting a random uuid for the clinet id
    Clement Escoffier
    @cescoffier
    This is weird.
    Jan Dolling
    @JDolling_twitter
    ah setting "mp.messaging.incoming.customers.auto.offset.reset=earliest" delivers what I want
    Clement Escoffier
    @cescoffier
    Ah yes!
    That what I was wondering
    Jan Dolling
    @JDolling_twitter
    Thanks
    Ryan J. McDonough
    @damnhandy
    We have developed some config sources that leverage AWS Parameter Store and AWS AppConfig and would like to contribute them. First, I'd like to know if there's interest in adding these sources. Second: Is there a CLA and/or contribution guidelines linked anywhere?
    David M. Lloyd
    @dmlloyd
    the first answer is "maybe". the second answer is https://smallrye.io/community/#contributing
    for the config sources - it depends on what it entails, what dependencies are involved etc.
    in general, sources which are simple and low dependency yet useful are more preferred, but there's definitely room for discussion
    if you open an issue @ smallrye-config we can discuss there
    Ryan J. McDonough
    @damnhandy
    will do! thx!
    Roberto Cortez
    @radcortez
    thanks!
    Rafael de Andrade Sousa
    @rafaelsousa

    Hi @All, I'd like to make a question concerning a connector I am trying to develop.

    I want to connect to an apache pulsar server, using smallrye-reactive-messaging-kafka as model. I see that it uses an io.debezium.kafka.KafkaCluster (which seems to be a embeddable cluster) to simulate the tests.

    In case I can't instantiate a pulsar container in testing, can I make it using a Docker container? Would it be something allowed in the project?

    Clement Escoffier
    @cescoffier
    @rafaelsousa yes, of course. If you look at other connectors (Amqp, Mqtt...) they use test containers to start the broker.
    Rafael de Andrade Sousa
    @rafaelsousa
    org.testcontainers.containers.GenericContainer ?
    Clement Escoffier
    @cescoffier
    Yes
    (But the doc is rather short and incomplete unfortunately)
    Rafael de Andrade Sousa
    @rafaelsousa
    OMG, this is awesome, thank you so much! :)
    David M. Lloyd
    @dmlloyd
    This is how I'm using GH actions to do CI with an MR-JAR project that needs building on 11 and testing on 8 and 11: https://github.com/jbossas/jboss-threads/blob/master/.github/workflows/build.yml
    hope it's helpful for someone
    this is compatible with the way we're doing MR JARs in smallrye-parent
    Roberto Cortez
    @radcortez
    cool thank you :)
    Rafael de Andrade Sousa
    @rafaelsousa
    Hi, I have a question concerning the implementation of a reactive connector.
    Clement Escoffier
    @cescoffier
    Hi, what’s your question?
    Rafael de Andrade Sousa
    @rafaelsousa

    `
    @ApplicationScoped
    @Connector(PulsarConnector.CONNECTOR_NAME)
    public class PulsarConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {

    static final String CONNECTOR_NAME = "smallrye-pulsar";
    
    @Inject
    private Instance<Vertx> instanceOfVertx;
    
    @Inject
    @ConfigProperty(name = "pulsar.bootstrap.servers", defaultValue = "localhost:9092")
    private String servers;
    
    private List<PulsarSource> sources = new CopyOnWriteArrayList<>();
    private List<PulsarSink> sinks = new CopyOnWriteArrayList<>();
    
    private boolean internalVertxInstance = false;
    private Vertx vertx;
    
    public void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object event) {
        sources.forEach(PulsarSource::closeQuietly);
        sinks.forEach(PulsarSink::closeQuietly);
    
        if (internalVertxInstance) {
            vertx.closeAndAwait();
        }
    }
    
    @PostConstruct
    void init() {
        if (instanceOfVertx.isUnsatisfied()) {
            internalVertxInstance = true;
            this.vertx = Vertx.vertx();
        } else {
            this.vertx = instanceOfVertx.get();
        }
    }
    
    @Override
    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        try {
            String s = servers;
            PulsarSource<Object, Object> source = new PulsarSource<>(vertx, config, s);
            sources.add(source);
            return source.getSource();
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
        return null;
    }

    `

    It is what I have to return at public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
    Clement Escoffier
    @cescoffier
    Looks good so far.
    Rafael de Andrade Sousa
    @rafaelsousa

    Have to confess it is a little complex, but the Apache pulsar client has for the concept of Source :

    Producer<String> stringProducer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();
    stringProducer.send("My message");

    And I'm having a bad time to figure out how can this relate to a publisher / source
    Clement Escoffier
    @cescoffier
    This would be the ‘source’ (subscriber)
    Rafael de Andrade Sousa
    @rafaelsousa

    So, it would be this :

    Producer<byte[]> producer = client.newProducer()
    .topic("my-topic")
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    .sendTimeout(10, TimeUnit.SECONDS)
    .blockIfQueueFull(true)
    .create();

    Clement Escoffier
    @cescoffier
    Basically your would return a SubscriberBuilder that would on each message send it as you shown
    Yes this would be the Publisher side.
    How does the producer retrieves the messages?
    Is it a poll mechanism or some kind of push?
    Rafael de Andrade Sousa
    @rafaelsousa
    The producer defines what topic it wants to send it... and send.
    Whatever consumer is subscribed to that topic is updated.
    Clement Escoffier
    @cescoffier
    I would need to have a look at the API. The consumer should be able to poll or indicate how many messages the consumer can handle. Otherwise it would ‘OOM’ quickly
    Rafael de Andrade Sousa
    @rafaelsousa
    This is the Pulsar's API
    Clement Escoffier
    @cescoffier
    Yes it has a ‘receive’ method polling a single message.
    Rafael de Andrade Sousa
    @rafaelsousa
    Looking into it right away!
    :)