These are chat archives for ReactiveX/RxJava

25th
Apr 2018
neo
@Raghav2211
Apr 25 2018 17:10

I need some guidance on publisher subscriber model of reactor :-

Client request is cosume some data from kafka consumer so i'm thinking that way that i will create an event in my system and a blocking subsciber which subscribe the publisher. Event will go to kafkacosumer and consume the batch which has been given in the client request(for example 100k records will be comsume) , after that the publisher will publish the record one by one performing some processing or batch record

My question is it the right approch to do this ? And how i can create the publisher and subscriber as a separate module

Ignacio Baca Moreno-Torres
@ibaca
Apr 25 2018 20:48
kafka has its own consumers, both low level (Consumer API) and high level (the pretty easy and powerful kafka streams api)
public static Consumer<KafkaConsumer<?, ?>> topic(String topic) { return c -> c.subscribe(singleton(topic)); }
public static <K, V> Flowable<ConsumerRecord<K, V>> consume(Consumer<KafkaConsumer<?, ?>> to,
        Deserializer<K> keyDe, Deserializer<V> valueDe, Map<String, Object> props) {
    return Flowable.<ConsumerRecord<K, V>>create(src -> {
        try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDe, valueDe)) {
            src.setCancellable(consumer::wakeup);
            to.accept(consumer);
            @Nullable Iterator<? extends ConsumerRecord<K, V>> it = null;
            while (!src.isCancelled()) {
                try {
                    if (src.requested() == 0) {
                        Thread.yield();
                        continue;
                    }
                    if (it == null || !it.hasNext()) {
                        ConsumerRecords<K, V> poll = consumer.poll(100);
                        it = poll.iterator();
                    }
                    if (!it.hasNext()) continue;
                    while (!src.isCancelled() && src.requested() > 0 && it.hasNext()) {
                        src.onNext(it.next());
                    }
                } catch (WakeupException ignore) {
                    // wakeup always means close
                } catch (Exception ex) {
                    src.tryOnError(ex);
                }
            }
        }
    }, BackpressureStrategy.ERROR).subscribeOn(CONSUMER_SCHEDULER);
}
public static final Scheduler CONSUMER_SCHEDULER = new Scheduler() {
    final AtomicLong counter = new AtomicLong();
    final ThreadGroup group = new ThreadGroup("message-consumer");
    final ThreadFactory factory = target -> {
        Thread thread = new Thread(group, target, "message-consumer-" + counter.getAndIncrement());
        thread.setPriority(3); thread.setDaemon(true);
        return thread;
    };
    @Override public Worker createWorker() {
        return new NewThreadWorker(factory) {
            @Override public void dispose() { super.shutdown(); /*dispose in a non-interrupting fashion*/ }
        };
    }
};
Ignacio Baca Moreno-Torres
@ibaca
Apr 25 2018 20:54
even so, I have been experimenting and this is an example implementation, I need to create a custom scheduler bc default one disposes the workers innmediatelly which is not a good idea bc you need to wait until the consumer is correctly closed