These are chat archives for ReactiveX/RxJava

13th
Nov 2015
Javier Domingo Cansino
@txomon
Nov 13 2015 10:54
Hey, anyone here active or should I rather send a forum topic?
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Nov 13 2015 14:32
Usually several people check this chat everyday, so you can post question here
Javier Domingo Cansino
@txomon
Nov 13 2015 16:39
I will upload the code to github
David Stemmer
@weefbellington
Nov 13 2015 16:59
gist works for small things too
Javier Domingo Cansino
@txomon
Nov 13 2015 17:10
I was going to opensource it anyway so,...
Dorus
@Dorus
Nov 13 2015 17:11
use observeOn instead of subscribeOn?
Oh wait, you tell it to do so with .observeOn(AndroidSchedulers.mainThread())
Javier Domingo Cansino
@txomon
Nov 13 2015 17:13
yep, I have to do both AFAIK...
Dorus
@Dorus
Nov 13 2015 17:13
You should rarely need subscribeOn
Javier Domingo Cansino
@txomon
Nov 13 2015 17:14
@Dorus but I want to execute it in another thread and receive it in the main thread...
David Stemmer
@weefbellington
Nov 13 2015 17:14
@txomon is it possible that makeUbusRpcClientCall(String method, String path, Map arguments); is switching threads?
Dorus
@Dorus
Nov 13 2015 17:17
@txomon what part of the code should execute on the other thread? Because right now everything after .observeOn(AndroidSchedulers.mainThread()) will be executed on the main thread, and anything before that on whatever thread triggers the event in Events.click(sendButton)
All subscribeOn does is subscribe on Schedulers.io, it has nothing to do with calls from sendButton, just the register of the event.
Javier Domingo Cansino
@txomon
Nov 13 2015 17:19
I don't understand...
@Dorus you mean I should switch threads within the sendButton callback?
Dorus
@Dorus
Nov 13 2015 17:23
I am not familiar enough with android to say if subscribeOn there is necessarily, might very well be, however it doesn't do anything beyond moving the subscribe logic to another scheduler. The events themselves will start on the event thread (whatever that is), and then be moved to the main thread when they run into observeOn. At least that's how i read it.
David Stemmer
@weefbellington
Nov 13 2015 17:24
@Dorus I thought that only the onNext, onError, and onCompleted calls will be executed on the main thread if you specify observeOn(AndroidScehdulers.mainThread(). The code inside the mapping functions should be executed on the I/O thread.
at least, that’s the way I read it
Dorus
@Dorus
Nov 13 2015 17:25
Doesn't flatMap have a overload that takes a scheduler you can use?
Javier Domingo Cansino
@txomon
Nov 13 2015 17:25
@Dorus android has 1 thread, the main/UI thread. Everything you see there is executed in the main thread. I thought that what .subscribeOn() did was to execute all the following stuff in another thread
Dorus
@Dorus
Nov 13 2015 17:25
No, you're confusing what subscribeOn does. What you want is done by observeOn
subscribeOn controlls the thread that register the event, but has nothing to do with the thread that executes the events.
Javier Domingo Cansino
@txomon
Nov 13 2015 17:26
ohhhh
David Stemmer
@weefbellington
Nov 13 2015 17:26
@Dorus that doesn’t sound right to me
Javier Domingo Cansino
@txomon
Nov 13 2015 17:27
@weefbellington I have just checked and it's true
but yeah, seems like exactly the opposite to me xD
Dorus
@Dorus
Nov 13 2015 17:28
@weefbellington As far as i know, internally, flatMap will execute the onNext, onError, and onCompleted of the underlying previous Observable, hence execute them on whatever thread it received. However it might then execute new work (as it's flatmap, it will most likely do so), on a new scheduler.
David Stemmer
@weefbellington
Nov 13 2015 17:29
Many implementations of ReactiveX use “Schedulers” to govern an Observable’s transitions between threads in a multi-threaded environment. You can instruct an Observable to do its work on a particular Scheduler by calling the Observable’s SubscribeOn operator.
The ObserveOn operator is similar, but more limited. It instructs the Observable to send notifications to observers on a specified Scheduler.
Dorus
@Dorus
Nov 13 2015 17:30
You can do something like source.observeOn(ioScheduler).map(e ->ioWork(e)).observeOn(uiScheduler).map(e -> uiWork(e))
yes, subscribeOn controlls on what thread the Observable will run.
For example, if you use Observable.create(o -> someLogic(o)) then someLogic(o) will run on whatever thread you specified in subscribeOn
However, your subscription will run on whatever thread you specify in observeOn
Just like all logic in intermediate operators.
(that do not introduce there own scheduler)
David Stemmer
@weefbellington
Nov 13 2015 17:34

before you said

subscribeOn controlls the thread that register the event, but has nothing to do with the thread that executes the events.

which I thought was a confusing statement
Dorus
@Dorus
Nov 13 2015 17:34
lemme try to reword it
take this exmaple:
TextView tv = (TextView) this.findViewById(R.id.hello_world);
//Interface
View.OnClickListener m_click_itf = new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                     // TODO Auto-generated method stub

            }
};
tv.setOnClickListener(m_click_itf);
When you subscribe to Events.text(ubusObject), there will be some underlying code running that calls something equivalent to tv.setOnClickListener(m_click_itf);.
idk the exact code, but something like ubusObject.setOnTextChangeListerner(onNext). That code will run on whatever thread is set by subscribeOn. However, that will most likely not be the same thread used to execute onNext later on.
Sorry for my poor knowledge of the right terminology :)
David Stemmer
@weefbellington
Nov 13 2015 17:45
I think the problem with @txomon’s code is that he is calling observeOn/subscribeOn too early
he should be calling it immediately before subscribe
that way it propogates back to every observable in the chain
I don’t think the code in flatMap is being run on the I/O thread because subscribeOn is getting called -only- on the previous observable
Dorus
@Dorus
Nov 13 2015 17:46
Yeah, or call observeOn twice, first to switch to a background or io thread, and next to the main thread right before subscribe
David Stemmer
@weefbellington
Nov 13 2015 17:47
I’d suggest doing it once
5.5. Call the ObserveOn operator as late and in as few places as possible
from the Reactive guidelines here:
Javier Domingo Cansino
@txomon
Nov 13 2015 17:49
in my case just deleted the subscribeon and changed observeOn
Dorus
@Dorus
Nov 13 2015 17:50
The entire document doesn't mention SubscribeOn. But my suggestion would be to call SubscribeOn as early and as little as possible also.
As in, only when you know you need it.
And then right after creating the Observable.
David Stemmer
@weefbellington
Nov 13 2015 17:50
by “early” do you mean as close to the subscribe call as possible?
Dorus
@Dorus
Nov 13 2015 17:50
yes
no
i mean close to the observable source
David Stemmer
@weefbellington
Nov 13 2015 17:52
I disagree, on Android anyway you generally want all the observables in the chain to execute on a specified I/O or computation thread
not just the first one
you almost never want work in the Observable chain being done on the UI thread for example
you only want the onNext, onError and onCompleted observer callbacks to do UI work
Dorus
@Dorus
Nov 13 2015 17:53
Mm, we need some code that demonstrate what observeOn and subscribeOn does....
Javier Domingo Cansino
@txomon
Nov 13 2015 17:53
@weefbellington you have confused them now
observeOn is the one making change the thread everything executes in
suscribeOn is the one that controls the thread the subscription is done in
@Dorus please assert =)
Dorus
@Dorus
Nov 13 2015 17:54
Correct :)
David Stemmer
@weefbellington
Nov 13 2015 17:55
observeOn determines the thread that the observer callbacks happen on
Javier Domingo Cansino
@txomon
Nov 13 2015 17:55
yes
Dorus
@Dorus
Nov 13 2015 17:55
I once had an Observable that emitted 10.000 events / second, the thread used to generate them was constantly active. Next i used publish().refCount(). In my main thread i subscribed 4 observers to this observable. However, funny thing, only the first observer got registered to the underlying observable. What happened? My main thread wend missing! It was busy generating 10.000 events/second. Once i used subscribeOn(ThreadPool).publish().refCount() it worked as expected and the event generation was done in the background.
David Stemmer
@weefbellington
Nov 13 2015 17:56
subscribeOn controls the thread that the subscription is done on, but for a cold observable, it also controls the thread that the work executes on
Dorus
@Dorus
Nov 13 2015 17:57

but for a cold observable, it also controls the thread that the work executes on

That only holds true if you do not use observeOn, althou all code before observeOn will still run on the thread used in observeOn. Even that isn't 100% true, a cold observable can also scheduler it's work on another thread if it want, depending on it's implementation.

David Stemmer
@weefbellington
Nov 13 2015 17:59
I think we are mixing up definitions of “work"
I am defining “work” as actions that are executed as part of the observable chain
in which case subscribeOn shifts the “work” to a different thread
Javier Domingo Cansino
@txomon
Nov 13 2015 18:00
btw, how do I throw exceptions? because I can't modify the flatmap internal function to throw an exception...
Dorus
@Dorus
Nov 13 2015 18:00
Catch is and call onError()
David Stemmer
@weefbellington
Nov 13 2015 18:02
so, if you call subscribeOn(Schedulers.io) on the final observable in a chain, what I am saying is that it will shift the work done by the prior operators in the chain (map, flatmap, filter, etc.) to the I/O thread
if that assumption is wrong, then I have been using RxJava very wrongly for a year and a half
Javier Domingo Cansino
@txomon
Nov 13 2015 18:03
@Dorus but I have no reference to the subscriber... I mean, I just receive the objects...
Dorus
@Dorus
Nov 13 2015 18:03
you should have in flatMap
Javier Domingo Cansino
@txomon
Nov 13 2015 18:09
hell I really find confusing this java syntax...
Dorus
@Dorus
Nov 13 2015 18:12
mm i cant find any example online quickly, let me write one
Observable.create(o -> {
    try {
        o.onNext(call(i));
    } catch (Exception e) {
        o.onError(e);
    }
});
So including flatmap you get
.flatMap(i -> {
    return Observable.create(o -> {
        try {
            o.onNext(call(i));
        } catch (Exception e) {
            o.onError(e);
        }
    });
})
Dorus
@Dorus
Nov 13 2015 18:19
@weefbellington look at this one: http://reactivex.io/documentation/scheduler.html
It start on blue because of the third operator subscribeOn, then goes orange because of the first observeOn, all code is executed on orange, untill it goes pink on the last observeOn right before subscribe.
Dorus
@Dorus
Nov 13 2015 18:37

@weefbellington I made some code to demonstrate what i mean:

    Observable.create(o -> {
        o.onNext(0);
        Scheduler s = Schedulers.newThread();
        s.createWorker().schedule(() -> o.onNext(1), 1, TimeUnit.SECONDS);
    })
    .doOnEach(i -> System.out.println(i.getValue() + " do1 on " + Thread.currentThread().getName()))
    .subscribeOn(Schedulers.newThread())
    .doOnEach(i -> System.out.println(i.getValue() + " do2 on " + Thread.currentThread().getName()))
    .observeOn(Schedulers.newThread())
    .doOnEach(i -> System.out.println(i.getValue() + " do3 on " + Thread.currentThread().getName()))
    .subscribe(i -> System.out.println(i + " subscribe on " + Thread.currentThread().getName()));

Result in:

0 do1 on RxNewThreadScheduler-2
0 do2 on RxNewThreadScheduler-2
0 do3 on RxNewThreadScheduler-1
0 subscribe on RxNewThreadScheduler-1
1 do1 on RxNewThreadScheduler-3
1 do2 on RxNewThreadScheduler-3
1 do3 on RxNewThreadScheduler-1
1 subscribe on RxNewThreadScheduler-1

subscribeOn(Schedulers.newThread()) creates newThread-2, so Observable.create will run on this scheduler and call the first onNext on it, however, observeOn switches to ThreadScheduler-1. Both 0 do3, 0 subscribe and 1 do3, 1 subscribe are called on this thread. Now our Observable decides to schedule the next OnNext 1 second later on a new thread, resulting in o.onNext(1) to be called on NewThread-3 in 1 do1 and 1 do2.

David Stemmer
@weefbellington
Nov 13 2015 19:20
@Dorus you’re right, I just tested it independently. I was mistaken in thinking that observeOn only scheduled the thread for the final subscriber. It actually schedules the thread for all subsequent Observers in the chain, including those created by the lift operator
David Stemmer
@weefbellington
Nov 13 2015 19:26
however I would still contend in most cases, it’s best to only call observeOn once, right before the final subscribe call. On Android, I always call subscribeOn to schedule where the work will begin. If you forget to do this, you’ll often end up subscribing on the UI thread and locking it up.
Dorus
@Dorus
Nov 13 2015 19:32
Yeah, it's ugly when a Observable eats the subscribing thread, but often not something you have control over. Easy enough to add subscribeOn to work around it, and then we end up adding subscribeOn everywhere 'just to be safe' without really understanding why it (isn't) needed.
Tbh i find it a bit of a oversight that it's not mandatory for observables (or at least best practice), to schedule there work asap. Only reason against that i've heard is performance.
Dorus
@Dorus
Nov 13 2015 19:59
David Stemmer
@weefbellington
Nov 13 2015 20:46
@Dorus I suppose I don’t understand Erik’s perspective. What’s so bad about the mental model that “subscribeOn can be used to schedule work on a background thread”?
is he saying that originating Observables should always shedule their work using Worker#schedule, instead of just doing it on whatever thread the onSubscribe callback happens on?
Dorus
@Dorus
Nov 13 2015 21:39
@weefbellington Well i'm afraid to make hard statements here, but my best guess would be: Because that mental model is wrong. You assume onNext will be called on the same thread that registered the event, while i highly doubt that is the case (other than by sheer luck). In fact, the entire goal of subscribeOn is to register events on the UI thread in languages where this is required, while you subscribe in another threads.
Another valid use of subscribeOn is to keep ownership of the subscribing thread, typical when the subscribe action is long-running. I dont think that's the case when your source is Events.click.
observeOn is the correct method to ensure some work is on on a certain thread.
As switching threads is expensive, you want minimize it to where you really need it.
Javier Domingo Cansino
@txomon
Nov 13 2015 22:59
That example with this explanation was really mind blowing for me
@Dorus could you make that example in Java7 (no lambda) syntax? I really find difficult to understand the old syntax and I don't know how to translate lambdas to old one
:(
Dorus
@Dorus
Nov 13 2015 23:16

@txomon I replaced the lambda in Observable.create, as i think that was the most difficult one to grasp.

    Observable.create(new OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> o) {
            o.onNext(0);
            Scheduler s = Schedulers.newThread();
            s.createWorker().schedule(new Action0() {
                @Override
                public void call() {
                    o.onNext(1);
                }
            }, 1, TimeUnit.SECONDS);
        }
    })

Is this better?

Once you are used to lambdas, the old syntax is completely unreadable :)
Javier Domingo Cansino
@txomon
Nov 13 2015 23:36
@Dorus indeed, that's my problem, I come from C and python
Java is too much code
I just realized I am using also a combine latest
but why is it nested...
hell I hate nested stuff
I don't do any of it in C nor python :( or not that much at least
Dorus
@Dorus
Nov 13 2015 23:43

Oh you can remove the nesting, it takes it one step further:

Observable.create(new subscrib())

and

class subscrib implements OnSubscribe<Integer> {
    @Override
    public void call(Subscriber<? super Integer> o) {
        System.out.println(" create1 on " + Thread.currentThread().getName());
        o.onNext(0);
        System.out.println(" create2 on " + Thread.currentThread().getName());
        Scheduler s = Schedulers.newThread();
        s.createWorker().schedule(() -> {
            System.out.println(" create3 on " + Thread.currentThread().getName());
            o.onNext(1);
            System.out.println(" create4 on " + Thread.currentThread().getName());
        } , 1, TimeUnit.SECONDS);
    }
}
Javier Domingo Cansino
@txomon
Nov 13 2015 23:44
@Dorus do you mind if we make the examples with my code?
that way I can follow the transformations, the new thread thing is confusing me xD
    sendCallClick
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.io())
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object _) {
                    return Observable.combineLatest(
                            ubusObjectText,
                            ubusMethodText,
                            new Func2<String, String, Object>() {
                                @Override
                                public Object call(String ubusObject, String ubusMethod) {
                                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                                    Object ret;
                                    ret = activity.makeUbusRpcClientCall(ubusObject, ubusMethod, null);
                                    return ret;
                                }
                            }
                    );
                }
            })
            .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());
Dorus
@Dorus
Nov 13 2015 23:48

Another interesting observation i make here:

You use sendCallClick.subscribeOn but on ubusObjectText you do not call subscribeOn. (Not that i think it's needed, just a observation).

Javier Domingo Cansino
@txomon
Nov 13 2015 23:48
so we have this which is basically .flatmap + combineLatest
Dorus
@Dorus
Nov 13 2015 23:48
Ok, what part is wrong/confusing?
Javier Domingo Cansino
@txomon
Nov 13 2015 23:48
yeah I was going to say that
why do I need that flatmap?
I was taking that as a reference
Dorus
@Dorus
Nov 13 2015 23:50
Because the result of Observable.combineLatest is a observable. If you use map instead, you end up with a Observable of Observables. Observable<Observable<Object>>. flatMap is actually a map(...).merge() so it wraps the results from multiple combineLatest back into 1 Observable.
Javier Domingo Cansino
@txomon
Nov 13 2015 23:52
but why do I need a map
Dorus
@Dorus
Nov 13 2015 23:52
Mmm, i'm not even sure why you are using combineLatest
Javier Domingo Cansino
@txomon
Nov 13 2015 23:52
to gather all the inputs at once
the design pattern I have understood is to basically convert every input in an observable
and manage them from the pipeline
Dorus
@Dorus
Nov 13 2015 23:53
Right now you say: Every time the user click, start observing phoneNumberText and messageBodyText and create a new message when they change.
This is most likely not what you want
if the user clicks twice, you start to observe twice
Javier Domingo Cansino
@txomon
Nov 13 2015 23:53
that way you don't have different setups, like this goes through Rx and this doesn't
Dorus
@Dorus
Nov 13 2015 23:54
lets look at this observable:
final Observable<String> ubusObjectText = Events.text(ubusObject);
Javier Domingo Cansino
@txomon
Nov 13 2015 23:54
yes, that's an observable created once
Dorus
@Dorus
Nov 13 2015 23:54
What does it do? I would guess it send a event every time the text in ubusObject changes?
or is it a static value?
(in that case, i would use Observable.just to create it)
it's just an utility function that takes care of the creation
what I understand it does is to create an observable from a TextView
Dorus
@Dorus
Nov 13 2015 23:57
Mmm in Rx.Net we simply have a observable.fromEvent(). I'm wondering if RxJava has something similar, it should have.
Javier Domingo Cansino
@txomon
Nov 13 2015 23:58
There is but the problem is that there is a lot of android going on there
Dorus
@Dorus
Nov 13 2015 23:58
yeah okay, let's focus on the problem. I'm easily distracted as you notice ;)
Javier Domingo Cansino
@txomon
Nov 13 2015 23:58
so the solution is to make a wrapper that takes care of doing .onNext()
Dorus
@Dorus
Nov 13 2015 23:59
The imporant part is that you change afterTextChanged into a observable stream.
Javier Domingo Cansino
@txomon
Nov 13 2015 23:59
indeed
The idea in those lines is to make sendCallClick gather all the Observables' last values and execute a function with it