Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Justin Tuchek
    @justintuchek
    Think it's harder to reason about who's subscriptions/disposables are you clearing because you first need to see if the thing is a Singleton or not and then try to figure out who else may be using it so you don't clobber somebody else's stream unintentionally.
    Carlo Xavier Lopez S
    @carloxavier_twitter
    good point thanks @jtuchek
    defaultbr
    @defaultbr

    someone can say if this make sense:

          checkCode(code)
                                .flatMap {
                                    response ->
                                    if(response.isSuccessful) {
                                        saveSetupToDatabase(response.body()!!)
    
                                        fetchCompaniesSetup(response.body()!!.imeirecord.oid)
    
                                                .flatMap { response ->
                                                    if(response.isSuccessful) {
                                                        saveCompaniesToDatabase(response.body()!!)
                                                    }
                                                        Observable.just(response)
    
                                                }
                                    } else {
                                        Observable.just(response)
                                    }
                                }
    
                                .subscribeOn(scheduler.io())
                                .observeOn(scheduler.ui())
                                .subscribe(
                                        { response ->
                                            if (response.isSuccessful) {
                                                println("sucess no 1")
                                                view?.hideLoading()
                                                view?.dataSaved()
                                            } else {
                                                println("handle retrofit errorbody2")
                                                view?.hideLoading()
                                                handleErrorBody(response)
                                            }
                                        },
                                        {   it ->
                                            println("handle exception")
                                            view?.hideLoading()
                                            handleException(it)
                                        }
    
                                )

    for some reason i think its wrong, i dont know why, is there a better way?

    Halim Dev
    @halimpuckjava
    hi, how i logger in console any events (onsubscribe, onext, onerror, oncomplete) come from observable with rxjava ?? there a LOG operator for reactor but for rxjava there is some think like this ???
    Joshua Street
    @jjstreet
    hello all
    hopefully have a simple question
    i have a class that acts as a client, returning singles
    i want poll with this single
    public Flowable<List<String>> poll() {
        return this.client.getData(this.host)
                .subscribeOn(this.scheduler)
                .retryWhen(errors -> Flowable.timer(this.retryDelay, TimeUnit.MILLISECONDS, this.scheduler))
                .repeatWhen(success -> success.delay(this.delay, TimeUnit.MILLISECONDS, this.scheduler));
    }
    this is how i have setup my polling method
    i have a working test for repeatWhen
    but my problem is trying to write a test for retrying
    i can't figure out if i have working
    Joshua Street
    @jjstreet
    hmm this works
    public Flowable<List<String>> poll() {
        return this.client.getData(this.host)
                .retryWhen(errors -> Flowable.timer(this.retryDelay, TimeUnit.MILLISECONDS, this.scheduler))
                .subscribeOn(this.scheduler)
                .repeatWhen(success -> success.delay(this.delay, TimeUnit.MILLISECONDS, this.scheduler));
    }
    David Karnok
    @akarnokd
    You have to compose over errors in retryWhen, just like in repeatWhen.
    Guillaume DROUET
    @gdrouet
    Is there a way to specify custom thread names in Schedulers without delegating a specific ExecutorService ?
    Ignacio Baca Moreno-Torres
    @ibaca
    various schedulers have a contructor that accept a ThreadFactory and there is a RxThreadFactory that allows to use a prefix, with this combination defining custom thread name is pretty trivial, most of this code is in the internal package but I think it is reasonable to use it, just keep it in one place
    David Karnok
    @akarnokd
    There exist various create methods that you can use to specify custom-threaded schedulers.
    Guillaume DROUET
    @gdrouet
    Thanks!
    neo
    @Raghav2211

    I need some guidance on publisher subscriber model of reactor :-

    Client request is cosume some data from kafka consumer so i'm thinking that way that i will create an event in my system and a blocking subsciber which subscribe the publisher. Event will go to kafkacosumer and consume the batch which has been given in the client request(for example 100k records will be comsume) , after that the publisher will publish the record one by one performing some processing or batch record

    My question is it the right approch to do this ? And how i can create the publisher and subscriber as a separate module

    Ignacio Baca Moreno-Torres
    @ibaca
    kafka has its own consumers, both low level (Consumer API) and high level (the pretty easy and powerful kafka streams api)
    public static Consumer<KafkaConsumer<?, ?>> topic(String topic) { return c -> c.subscribe(singleton(topic)); }
    public static <K, V> Flowable<ConsumerRecord<K, V>> consume(Consumer<KafkaConsumer<?, ?>> to,
            Deserializer<K> keyDe, Deserializer<V> valueDe, Map<String, Object> props) {
        return Flowable.<ConsumerRecord<K, V>>create(src -> {
            try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDe, valueDe)) {
                src.setCancellable(consumer::wakeup);
                to.accept(consumer);
                @Nullable Iterator<? extends ConsumerRecord<K, V>> it = null;
                while (!src.isCancelled()) {
                    try {
                        if (src.requested() == 0) {
                            Thread.yield();
                            continue;
                        }
                        if (it == null || !it.hasNext()) {
                            ConsumerRecords<K, V> poll = consumer.poll(100);
                            it = poll.iterator();
                        }
                        if (!it.hasNext()) continue;
                        while (!src.isCancelled() && src.requested() > 0 && it.hasNext()) {
                            src.onNext(it.next());
                        }
                    } catch (WakeupException ignore) {
                        // wakeup always means close
                    } catch (Exception ex) {
                        src.tryOnError(ex);
                    }
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(CONSUMER_SCHEDULER);
    }
    public static final Scheduler CONSUMER_SCHEDULER = new Scheduler() {
        final AtomicLong counter = new AtomicLong();
        final ThreadGroup group = new ThreadGroup("message-consumer");
        final ThreadFactory factory = target -> {
            Thread thread = new Thread(group, target, "message-consumer-" + counter.getAndIncrement());
            thread.setPriority(3); thread.setDaemon(true);
            return thread;
        };
        @Override public Worker createWorker() {
            return new NewThreadWorker(factory) {
                @Override public void dispose() { super.shutdown(); /*dispose in a non-interrupting fashion*/ }
            };
        }
    };
    Ignacio Baca Moreno-Torres
    @ibaca
    even so, I have been experimenting and this is an example implementation, I need to create a custom scheduler bc default one disposes the workers innmediatelly which is not a good idea bc you need to wait until the consumer is correctly closed
    nihiluis
    @nihiluis
    suppose I launch a http request and get a maybe which I turn to an Observable (right now), which method should I use on the observable to repeat the request if the response has an errors field?
    I thought repeatWhen would be good, but it only offers me an Observable<Any> to check if it has an errors field. I'm not sure if I can use it
    Ignacio Baca Moreno-Torres
    @ibaca
    Not sure this is the best option, but maping the response to itself but throwing an exception if is an error response (kind of KnownServerResponseError) and using the retry operator to handle both request and response error might work
    Ignacio Baca Moreno-Torres
    @ibaca
    Idealy this should be done by some request utility, not per request endpoint
    defaultbr
    @defaultbr

    Hello, someone know how to make this work syncronously?:

      .subscribeOn(scheduler.computation())
                    .observeOn(scheduler.ui())
                    .flatMap { ids -> Observable.fromIterable(ids) }
                    .flatMap { id -> requestEmployeeData(id) }
                    .doOnNext { response -> println("response ${response}") }
                    .subscribe()

    the requestEmployeeData is a retrofit request that returns Observable<Response<SyncEmployeeData>>, the issue is that its creating more than 100 requests at the same time but i only want to start the next on the ids list after the doOnNext is processed, is that possible?

    Ignacio Baca Moreno-Torres
    @ibaca
    I don't understand, do you want to resolve all ids, only the first, the last or just one each time sequentially? Anyway the UI shceduler should be added after the retrofit flatmap and only if actually need to use the UI thread, not required for the println
    defaultbr
    @defaultbr
    @ibaca i have a list of 100 ids, i want do iterate for each one and do the request, but i want the request to be made in a sync way (not async). Just start the request for id=2 when the request of id=1 is resolved, make sense ? i need this behaviour to not """ddos the server """
    defaultbr
    @defaultbr
    tried the blockingFirst but only works when the app is open and visible (its running in a service)
    Adam Dohnal
    @dohnala
    @defaultbr what about trying concat with map instead of flatMap? Something like this
            Observable.concat(
                    Observable
                            .fromIterable(ids)
                            .map(id -> requestEmployeeData(id))
                            .subscribeOn(Schedulers.computation()))
                    .doOnNext(response -> println("response ${response}"))
                    .subscribe();
    defaultbr
    @defaultbr
    will try
    defaultbr
    @defaultbr
    @dohnala cool, this worked!!!! thanks
    Ivan Schütz
    @i-schuetz
    is there a way to send 2 consecutive error events to a subject?
    basically I want that it continues normally when there are errors
    Ivan Schütz
    @i-schuetz
    of course given that any observers of it recover with onErrorResumeNext, onErrorReturn, etc
    Ivan Schütz
    @i-schuetz
    mySubject
        .onErrorResumeNext(Function {
            println("caught")
            Observable.empty<Item>()
        })
        .subscribe()
    
    mySubject.onError(Throwable(""))
    mySubject.onError(Throwable(""))
    Ivan Schütz
    @i-schuetz
    the use case... is that I forward the results of some observables to this subject and want to let the observer of the subject decide how to handle the errors
    Ivan Schütz
    @i-schuetz
    the only solution I can imagine right now is to create a separate subject for throwables and subscribe to it to get the error stream... which is sent via onNext(throwable) instead of onError(throwable)... but this is a bit ugly
    David Karnok
    @akarnokd
    onError can be sent once per subject. You can either have a separate error subject where you onNext(Throwable) or you can have a Subject<Notification<T>> and wrap items and errors into notifications: Notification.createOnNext, Notification.createOnError.
    Matteo Veroni
    @mavek87
    Hi everyone. Can you suggest me any simple and easy tutorial or video on rxjava for newbies?
    Ignacio Baca Moreno-Torres
    @ibaca
    this is fun and interactive http://reactivex.io/learnrx/, good start point if you don’t know where to start… :+1:
    Daimhim
    @Daimhim
    In subscribe I want success or failure,Which to use?
    Marc Guilera
    @marcguilera
    hi. i would like to match urls like /user/:id/login with incoming urls like /user/123/login so that i get a map contiaining id => 123
    i would like to make it generic so not have the user or login in the regex
    James Nelson
    @JamesXNelson
    the only uri templating library I've built is closed source.
    Marc Guilera
    @marcguilera
    :(