These are chat archives for ReactiveX/RxJava

14th
Nov 2015
Dorus
@Dorus
Nov 14 2015 00:00
Ok, so you have a stream that fires every time text changes. When you subscribe to this stream, and the user types Reactive, your stream will contain R, Re, Rea, Reac... right?
Javier Domingo Cansino
@txomon
Nov 14 2015 00:00
The stuff needs to be executed in Schedulers.io() and the result be received in the main thread
@Dorus it may be emitting all the time, but it goes to a buffer by combinelatest that just saves the last value
Dorus
@Dorus
Nov 14 2015 00:01

Please realize that after you subscribed to

 return Observable.combineLatest(
    ubusObjectText,
    ubusMethodText,
    new Func2<String, String, Object>() {
        @Override
        public Object call(String ubusObject, String ubusMethod) {
            OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
            return activity.makeUbusRpcClientCall(ubusObject, ubusMethod, null);
        }
    }
);

you will execute the new Func2 on every single text change.

Javier Domingo Cansino
@txomon
Nov 14 2015 00:02
no
Dorus
@Dorus
Nov 14 2015 00:02
yes :)
Javier Domingo Cansino
@txomon
Nov 14 2015 00:02
because the root listener is sendCallClick, isn't it?
I mean, the suscription is efective when triggered by sendCallClick
combinelatest is not the same as 'get the last value'.
Actually you really do not want to use combinelatest here. You simply want to read editable.toString() directly i think.

the design pattern I have understood is to basically convert every input in an observable

That is not a good design pattern. It works in some cases but is completely wrong in others.

Javier Domingo Cansino
@txomon
Nov 14 2015 00:05
ok, so in my case
Dorus
@Dorus
Nov 14 2015 00:05
You do not want to do something every time text changes.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:05
I just want to gather my text on demand, by the sendbutton
Dorus
@Dorus
Nov 14 2015 00:05
You want to do something only when a button is hit right?
Javier Domingo Cansino
@txomon
Nov 14 2015 00:05
yes
I was thinking that the .subscribe() would make effect only when the root Observable would be use
sendCallClick().{...}.subscribe()
so when sendCallClick() was triggered, both ubusObject and ubusMethod's latest values would be gathered, and passed into a function
Dorus
@Dorus
Nov 14 2015 00:11
.map(new Func1<Object, Object>() {
        @Override
        public Observable<Object> call(Object t) {
             OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
             return activity.makeUbusRpcClientCall(ubusObject.toString(), ubusMethod.toString(), null);
        }
})
That should be enough
Javier Domingo Cansino
@txomon
Nov 14 2015 00:11
it returns an object
Dorus
@Dorus
Nov 14 2015 00:11
oh right
yeah i wasn't sure on that, i always prefer to give correct types if i know them
Javier Domingo Cansino
@txomon
Nov 14 2015 00:13
object, like literally because it's json
deserialized json
Dorus
@Dorus
Nov 14 2015 00:13
ah right
well not too important
let's see, now you want to execute the final subscribe on the mainThread right?
So at the very least, move observeOn down, to after the map
Next thing we need to worry about: What thread is dispatching sendCallClick?
Javier Domingo Cansino
@txomon
Nov 14 2015 00:16
yeah, the idea is that everything but the .subscribe function should be executed in the io thread
Dorus
@Dorus
Nov 14 2015 00:16
Oh btw, i replaced ubusObjectText with ubusObject in the map code up here, used the wrong variable.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:16
yeah I noticed
I now have
        sendCallClick
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.io())
            .map(new Func1<Object, Object>() {
                @Override
                public Object call(Object o) {
                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                    Object ret;
                    ret = activity.makeUbusRpcClientCall(
                            ubusObject.getText().toString(),
                            ubusMethod.getText().toString(),
                            null);
                    return ret;
                }
            })
            .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());
Dorus
@Dorus
Nov 14 2015 00:17
You have to indent code with at least 4 spaces
or a tab
Javier Domingo Cansino
@txomon
Nov 14 2015 00:18
that's it,... I supposed they were really with github flavored markdown, and I was doing the backticks thing
so, the subscribeOn will make sure the getCallResultObserver()'s Observer get's executed on mainThread
Dorus
@Dorus
Nov 14 2015 00:19
I would suggest you start with this:
sendCallClick
    .map(new Func1<Object, Object>() {
        @Override
        public Object call(Object o) {
            OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
            Object ret;
            ret = activity.makeUbusRpcClientCall(
                    ubusObject.getText().toString(),
                    ubusMethod.getText().toString(),
                    null);
            return ret;
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());
Javier Domingo Cansino
@txomon
Nov 14 2015 00:19
but the calling thread is the mainThread
Dorus
@Dorus
Nov 14 2015 00:19
And then try to find out what thread is running the map.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:19
the one registering this
Dorus
@Dorus
Nov 14 2015 00:19
The one the register it isnt important
The one that executes the events is.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:20
well, in theory it will be the same one
Dorus
@Dorus
Nov 14 2015 00:20
Can you run it and find out?
Javier Domingo Cansino
@txomon
Nov 14 2015 00:20
sure
Dorus
@Dorus
Nov 14 2015 00:20
Because it's the thread that gets run by view.addTextChangedListener

I dont think this will be the main thread, but if it is, you can fix it with

sendCallClick
    .observeOn(Schedulers.io())
    .map(new Func1<Object, Object>() {
        @Override
        public Object call(Object o) {
            OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
            Object ret;
            ret = activity.makeUbusRpcClientCall(
                    ubusObject.getText().toString(),
                    ubusMethod.getText().toString(),
                    null);
            return ret;
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());

Try that next

Javier Domingo Cansino
@txomon
Nov 14 2015 00:24
nothing happens xD
Dorus
@Dorus
Nov 14 2015 00:24
Also a small nitpick: Line 40 of Events.java, replace return subject; with return subject.asObservable();
With nothing happens, do you mean the main threads is locked up?
In that case use the second piece of code i gave :)
The one with sendCallClick.observeOn(Schedulers.io()) added on the second line.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:25
no no, I mean like literally nothing
Dorus
@Dorus
Nov 14 2015 00:25
Okay, that's not good
Javier Domingo Cansino
@txomon
Nov 14 2015 00:26
yeah xD
I will try the second one
with all the observeOn calls
Dorus
@Dorus
Nov 14 2015 00:26
If it still fails, also add .subscribeOn(Schedulers.io()) back :(
Javier Domingo Cansino
@txomon
Nov 14 2015 00:27
I don't know why, I mean, it should crash...
maybe I am treating the error...
Dorus
@Dorus
Nov 14 2015 00:27
Why should it crash?
Javier Domingo Cansino
@txomon
Nov 14 2015 00:27
but it should be logging
because networking ops in main (ui) thread are not permitted
for obvious reasons
ok, the second one with the 2 observeOn works as expected
Dorus
@Dorus
Nov 14 2015 00:28
Wonderfull :D

btw, why

        Object ret;
        ret = activity.makeUbusRpcClientCall(
                ubusObject.getText().toString(),
                ubusMethod.getText().toString(),
                null);
        return ret;

I would fly with

        return activity.makeUbusRpcClientCall(
                ubusObject.getText().toString(),
                ubusMethod.getText().toString(),
                null);
Javier Domingo Cansino
@txomon
Nov 14 2015 00:28
let me upload latest version
we have a problem with raising the exception
I mean throwing the exception
Dorus
@Dorus
Nov 14 2015 00:29
ooh i know
Javier Domingo Cansino
@txomon
Nov 14 2015 00:29
ok, it's updated now
Dorus
@Dorus
Nov 14 2015 00:30
you get exceptions but you do not catch them, you should catch them and foward them in onError()
Javier Domingo Cansino
@txomon
Nov 14 2015 00:30
ohhhhh
ok I now understand
hell, the previous flatmap + combineLatest was so unreadable
Dorus
@Dorus
Nov 14 2015 00:30
yeah, we have to go back to flatmap
Javier Domingo Cansino
@txomon
Nov 14 2015 00:30
I didn't know where to extract that o
Dorus
@Dorus
Nov 14 2015 00:30
but not combineLatest
Javier Domingo Cansino
@txomon
Nov 14 2015 00:30
why flatmap?
Dorus
@Dorus
Nov 14 2015 00:31
observable.create is your friend i think.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:32
@Dorus where do I get the onError() from?
because I need a way to hook into the pipeline..
Dorus
@Dorus
Nov 14 2015 00:34
let me look up my example with observable.create
i can write it easily but you want java 7 not java 8 ;)
Javier Domingo Cansino
@txomon
Nov 14 2015 00:34
I really prefer the super verbose way
Dorus
@Dorus
Nov 14 2015 00:34
I'm puzzeling how to do that
Javier Domingo Cansino
@txomon
Nov 14 2015 00:34
it's more clear for me
Dorus
@Dorus
Nov 14 2015 00:35
.flatMap(i -> {
        return Observable.create(o -> {
            try {
                o.onNext(call(i));
            } catch (Exception e) {
                o.onError(e);
            }
        });
    })
That's the Java 8 way. I'm trying to get to 7 :)
Javier Domingo Cansino
@txomon
Nov 14 2015 00:35
ok, so lets work with that without .create
if you had to do it by hand
Dorus
@Dorus
Nov 14 2015 00:36
no, you need that create. That's the entire point.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:36
ah
so I cannot do the same thing without the create?
I thought create was just a quick link
Dorus
@Dorus
Nov 14 2015 00:37
.flatMap(new Func1<Integer, Observable<Object>>() {
        @Override
        public Observable<Object> call(Integer i) {
            return Observable.create(o -> {
                try {
                    o.onNext(call(i));
                } catch (Exception e) {
                    o.onError(e);
                }
            });
        }
    }
Javier Domingo Cansino
@txomon
Nov 14 2015 00:37
like a wrapper
Dorus
@Dorus
Nov 14 2015 00:37
There, it was easy once i found the right button in eclipse :P
Javier Domingo Cansino
@txomon
Nov 14 2015 00:38
o where did it come from?
oh is lambda
ok
Dorus
@Dorus
Nov 14 2015 00:39
i'm still not there
Javier Domingo Cansino
@txomon
Nov 14 2015 00:40
    sendCallClick
            .observeOn(Schedulers.io())
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object o) {
                    return Observable.create(
                            new Observable.OnSubscribe<Object>() {
                                @Override
                                public void call(Subscriber<? super Object> subscriber) {

                                }
                            }
                    )

                }
            })
it's what I am getting
Dorus
@Dorus
Nov 14 2015 00:40
    .flatMap(new Func1<Object, Observable<Object>>() {
        @Override
        public Observable<Object> call(Object i) {
            return Observable.create(new OnSubscribe<Object>() {
                @Override
                public void call(Subscriber<? super Object> o) {
                    try {
                        o.onNext(caller(i));
                    } catch (Exception e) {
                        o.onError(e);
                    }
                }
            });
        }
    })
And now i need to insert your code
Javier Domingo Cansino
@txomon
Nov 14 2015 00:42
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object o) {
                    return Observable.create(
                            new Observable.OnSubscribe<Object>() {
                                @Override
                                public void call(Subscriber<? super Object> subscriber) {
                                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                                    try {
                                        subscriber.onNext(
                                                activity.makeUbusRpcClientCall(
                                                        ubusObject.getText().toString(),
                                                        ubusMethod.getText().toString(),
                                                        null));
                                    } catch (Throwable e) {
                                        subscriber.onError(e);
                                    }
                                }
                            }
                    );

                }
            })
now
that's it, isn't it?
Dorus
@Dorus
Nov 14 2015 00:42
sendCallClick
    .observeOn(Schedulers.io())
    .flatMap(new Func1<Object, Observable<Object>>() {
        @Override
        public Observable<Object> call(Object i) {
            return Observable.create(new OnSubscribe<Object>() {
                @Override
                public void call(Subscriber<? super Object> o) {
                    try {
                        OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                        o.onNext(activity.makeUbusRpcClientCall(
                                ubusObject.getText().toString(),
                                ubusMethod.getText().toString(),
                                null));
                    } catch (Exception e) {
                        o.onError(e);
                    }
                }
            });
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());
Javier Domingo Cansino
@txomon
Nov 14 2015 00:43
yayh!!!
Dorus
@Dorus
Nov 14 2015 00:43
If we have the same code now, it should be correct!
YAY!
Javier Domingo Cansino
@txomon
Nov 14 2015 00:43
mother of god F* java7
Dorus
@Dorus
Nov 14 2015 00:43
Now give it a spin :)
Javier Domingo Cansino
@txomon
Nov 14 2015 00:43
it's unbearable..
sure
btw, why flatmap instead of map?
Dorus
@Dorus
Nov 14 2015 00:44
Because we want to be able to emit bot onNext and onError
So we create a new observable (with Observable.create)
But now we have a stream of nested Observables
Javier Domingo Cansino
@txomon
Nov 14 2015 00:44
oh, with each request, instead of with each emission?
Dorus
@Dorus
Nov 14 2015 00:45
Each time you call makeUbusRpcClientCall, a new observer is created around it.
But you do not want these observables, you want there content.
So flatmap flattens this again.
Flatten from Observable<Observable<Object>> -> Observable<Object>.
Do you follow that?
Javier Domingo Cansino
@txomon
Nov 14 2015 00:47
ufff
not really, if we go step by step
we have
Dorus
@Dorus
Nov 14 2015 00:47

We have a stream

click --- click --- click

Javier Domingo Cansino
@txomon
Nov 14 2015 00:47
let's start from Map
Dorus
@Dorus
Nov 14 2015 00:47
For every click, we create a observable
Javier Domingo Cansino
@txomon
Nov 14 2015 00:47
yes
I throught the observable was the stream
and the clicks just emissions
items I mean
Dorus
@Dorus
Nov 14 2015 00:48
click -  ----------------------click -----------------
 |                              |
 \--makeUbusRpcClientCall()-|   \--makeUbusRpcClientCall()-|
Javier Domingo Cansino
@txomon
Nov 14 2015 00:49
yes
so the items are the clicks, we agree on that
Dorus
@Dorus
Nov 14 2015 00:49
Now we want to get the results back into a single stream.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:50
so you mean that the click is like a fork?
Dorus
@Dorus
Nov 14 2015 00:50
you know this notation right
--------- \\ endless stream
----|       \\ finished stream
----x      \\ error stream
Javier Domingo Cansino
@txomon
Nov 14 2015 00:50
now I do
Dorus
@Dorus
Nov 14 2015 00:50
Marble diagrams
Javier Domingo Cansino
@txomon
Nov 14 2015 00:50
now I do =)
Dorus
@Dorus
Nov 14 2015 00:50
They're easier to read in pictures. Ascii art is hard :)
Javier Domingo Cansino
@txomon
Nov 14 2015 00:51
the thing is that, why is it important to have a single stream
Dorus
@Dorus
Nov 14 2015 00:51
Ok, as you see, the observables that call makeUbusRpcClientCall() end in onComplete()
But you want the results from them.
So you need to merge()
Javier Domingo Cansino
@txomon
Nov 14 2015 00:51
there is no onComplete()
they are endless streams
oh wait, should my code be like
Dorus
@Dorus
Nov 14 2015 00:52
Remember this code?
            public void call(Subscriber<? super Object> o) {
                try {
                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                    o.onNext(activity.makeUbusRpcClientCall(
                            ubusObject.getText().toString(),
                            ubusMethod.getText().toString(),
                            null));
                } catch (Exception e) {
                    o.onError(e);
                }
            }
Ooh right
We forgot something
Javier Domingo Cansino
@txomon
Nov 14 2015 00:53
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object o) {
                    return Observable.create(
                            new Observable.OnSubscribe<Object>() {
                                @Override
                                public void call(Subscriber<? super Object> subscriber) {
                                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                                    try {
                                        subscriber.onNext(
                                                activity.makeUbusRpcClientCall(
                                                        ubusObject.getText().toString(),
                                                        ubusMethod.getText().toString(),
                                                        null));
                                    } catch (Throwable e) {
                                        subscriber.onError(e);
                                    }
                                    subscriber.onComplete();
                                }
                            }
                    );

                }
            })
yeah
Dorus
@Dorus
Nov 14 2015 00:53
            public void call(Subscriber<? super Object> o) {
                try {
                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                    o.onNext(activity.makeUbusRpcClientCall(
                            ubusObject.getText().toString(),
                            ubusMethod.getText().toString(),
                            null));
                } catch (Exception e) {
                    o.onError(e);
                }
            }
            o.onComplete();
Javier Domingo Cansino
@txomon
Nov 14 2015 00:53
should it with that onComplete()
Dorus
@Dorus
Nov 14 2015 00:53
dôh
Javier Domingo Cansino
@txomon
Nov 14 2015 00:53
yeah
Dorus
@Dorus
Nov 14 2015 00:54
Else they stay open, memory leaks :worried:
Javier Domingo Cansino
@txomon
Nov 14 2015 00:54
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object o) {
                    return Observable.create(
                            new Observable.OnSubscribe<Object>() {
                                @Override
                                public void call(Subscriber<? super Object> subscriber) {
                                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                                    try {
                                        subscriber.onNext(
                                                activity.makeUbusRpcClientCall(
                                                        ubusObject.getText().toString(),
                                                        ubusMethod.getText().toString(),
                                                        null));
                                    } catch (Throwable e) {
                                        subscriber.onError(e);
                                    }
                                    subscriber.onCompleted();
                                }
                            }
                    );

                }
            })
ok, now it makes more sense
so the observable I am getting the values for the observer get's created on each click
Dorus
@Dorus
Nov 14 2015 00:55
I'm used to Rx.Net where Observable.create has a slightly different syntax :D
Javier Domingo Cansino
@txomon
Nov 14 2015 00:55
yeah no worries
I appreciate a hell lot the time you are expending with me
Dorus
@Dorus
Nov 14 2015 00:56

A well-formed Observable must invoke either the Subscriber's onCompleted method exactly once or its onError method exactly once.

Important line in the RxJava docs.

Javier Domingo Cansino
@txomon
Nov 14 2015 00:56
yeah, I read that but hadn't used onXxxxx() never xD
Dorus
@Dorus
Nov 14 2015 00:56
I'm now wondering if i ever did this wrong in rx.net also.
no worries, i'm learning myself too here :)
Javier Domingo Cansino
@txomon
Nov 14 2015 00:56
so the flatMap will get in charge of getting the closed observer
and feeding the next one
so I will get all of them as if they where one
Dorus
@Dorus
Nov 14 2015 00:57
yes
Javier Domingo Cansino
@txomon
Nov 14 2015 00:57
as if the ubuscall was continuous
Dorus
@Dorus
Nov 14 2015 00:57
more like, as if they all run on the same level.
Now sendCallClick is a continuous stream again.
Javier Domingo Cansino
@txomon
Nov 14 2015 00:59
okey, let me try because I have still the problem with throwing the exception
but I think it's elsewhere
Dorus
@Dorus
Nov 14 2015 01:01
yeah, rx can be a bit risky when you call functions with exceptions inside the lambdas (or anon classes)
In rx.net it tend to ignore them, if you are lucky they fall out into your IDE (if you are unlucky your IDE goes down, i've seen it happen).
In RxJava it can give nasty results like killing the scheduler or worse.
without any log or warning.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:02
heh
the problem is that the exception is not being caught now xD
Dorus
@Dorus
Nov 14 2015 01:03
The solution is to properly catch them and throw them in onError ofcourse.
Be carefull with things like catch (UbusRpcException e) that might not catch all possible errors.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:03
yeah
look at that
I have updated it
so now trust me when I say that makeUbusRpcClientCall throws an exception
Dorus
@Dorus
Nov 14 2015 01:04
lol, Throwable is a bit of a overkill, i believe you should only catch Exceptions if i remember correctly.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:05
onError is prepared for Throwable, so i am going that way
Dorus
@Dorus
Nov 14 2015 01:05
Ah right
(i'm not overly familiar with this part of RxJava as you notice)
Javier Domingo Cansino
@txomon
Nov 14 2015 01:05
problem, that Log.d("Catching throwable in Observable" );
is not getting called
but there is a raise therer
Dorus
@Dorus
Nov 14 2015 01:06
What if you move the Log.d to the subscribe function?
Anyway it should run Log.d if there is a error :S
I doubt OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main; manages to raise a error. You could put that line inside the try to be 100% safe.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:07
the problem is that there is no error at all
Dorus
@Dorus
Nov 14 2015 01:08
Can you put a breakpoint on that line?
The one with Log.d or even inside the try
Javier Domingo Cansino
@txomon
Nov 14 2015 01:10
sure
I am rebooting because I left no space in the device /tmp
xDD
Dorus
@Dorus
Nov 14 2015 01:11
Anyway we're reaching a point where it should be working. I'm starting to get confused :(
Javier Domingo Cansino
@txomon
Nov 14 2015 01:11
oh it work
but the problem is that I don't get anything if there is an error
Dorus
@Dorus
Nov 14 2015 01:11
You mean you dont get a line in the log right? Because you are not supposed to get anything else :)
Javier Domingo Cansino
@txomon
Nov 14 2015 01:11
yeah
I just have log lines xDDD
no visual feedback
Dorus
@Dorus
Nov 14 2015 01:14
Anyway, in any case onError() will end the subscription. That might not always be what you want.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:14
xDDD
Dorus
@Dorus
Nov 14 2015 01:14
If you want to recover from the error you need addtional code (or simply not raise onError(), i saw in the old code you ignored UbusRpcException, you might still want to do that, but do crash on other errors for example.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:15
the onError() doesn't end the suscription
I mean, I get the fail log many times
Dorus
@Dorus
Nov 14 2015 01:15
That's how merge works.
mm wait
Javier Domingo Cansino
@txomon
Nov 14 2015 01:15
although maybe because onError is not happening xD
...
so I literally set debug points in every line within the subscription... that is just not stopping
Dorus
@Dorus
Nov 14 2015 01:17
So even activity.makeUbusRpcClientCall is not happening?
Javier Domingo Cansino
@txomon
Nov 14 2015 01:18
also, I have debugged inside makeUbusRpcClientCall, and if I go one step further than the raise (this is, the throw), it kills the debugger
xD
it really kills everything, even the app
Dorus
@Dorus
Nov 14 2015 01:19
Btw try to place subscriber.onCompleted(); inside the try.
Obviouse we shouldnt call onComplete after we had a error.
Javier Domingo Cansino
@txomon
Nov 14 2015 01:20
I changed the code to
    sendCallClick
            .observeOn(Schedulers.io())
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object o) {
                    return Observable.create(
                            new Observable.OnSubscribe<Object>() {
                                @Override
                                public void call(Subscriber<? super Object> subscriber) {
                                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                                    Object ret;
                                    try {
                                        ret = activity.makeUbusRpcClientCall(
                                                ubusObject.getText().toString(),
                                                ubusMethod.getText().toString(),
                                                null);
                                    } catch (Throwable e) {
                                        Log.d(TAG, "Catching throwable in Observable");
                                        subscriber.onError(e);
                                        return;
                                    }
                                    subscriber.onNext(ret);
                                    subscriber.onCompleted();
                                }
                            }
                    );

                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());
Dorus
@Dorus
Nov 14 2015 01:20
That works also
Javier Domingo Cansino
@txomon
Nov 14 2015 01:20
and there is the debugging thing, that doesn't let me debug
Dorus
@Dorus
Nov 14 2015 01:20
But really shouldn't make any difference :/
Javier Domingo Cansino
@txomon
Nov 14 2015 01:20
and if I try to debug inside makeUbusRpcClientCall, it kills the app
Dorus
@Dorus
Nov 14 2015 01:21
Meh, too much Rx for the debugger?
can you debug if you strip Rx?
I think the problem is elsewhere
Javier Domingo Cansino
@txomon
Nov 14 2015 01:23
strip Rx...
Dorus
@Dorus
Nov 14 2015 01:23
strip it out, remove it, run just
activity.makeUbusRpcClientCall(
                                            ubusObject.getText().toString(),
                                            ubusMethod.getText().toString(),
                                            null)
(on the right thread)
Javier Domingo Cansino
@txomon
Nov 14 2015 01:24
heh
I don't even know how this thread things work..
for java I mean
maybe we can try raising the exception there
?
instead of making the call, I raise directly the exception
Dorus
@Dorus
Nov 14 2015 01:25
Well you just schedule the work on a scheduler
easiest way
Schedulers.io().createWorker().schedule(action)
Where action is your action
IE new Action0() { Call { ...code... }}
Javier Domingo Cansino
@txomon
Nov 14 2015 01:27
I tried with throwing the exception there
it didn't work
but wait
WTF!
Dorus
@Dorus
Nov 14 2015 01:27
Now i'm curriouse
Javier Domingo Cansino
@txomon
Nov 14 2015 01:27
yeah, the thing is that this looks like I am running and old version...
Dorus
@Dorus
Nov 14 2015 01:27
Aaaaah
Told you something else was wrong
Javier Domingo Cansino
@txomon
Nov 14 2015 01:31
mega fail
Dorus
@Dorus
Nov 14 2015 01:31
Well, keep battling your compiler, i'm sure you
Javier Domingo Cansino
@txomon
Nov 14 2015 01:31
hahaha
Dorus
@Dorus
Nov 14 2015 01:31
win :)
Javier Domingo Cansino
@txomon
Nov 14 2015 01:31
no
I filled my /tmp
xDDD
Dorus
@Dorus
Nov 14 2015 01:32
But it's late here so i'm going to say goodnight.
Does it work now?
I'll read up tomorrow
Javier Domingo Cansino
@txomon
Nov 14 2015 01:32
thanks a lot!!
it's 1:32 here too
xD
and yes, onError finishes the merge thing
Dorus
@Dorus
Nov 14 2015 01:35
yeah, that was the next topic. If you do not call onError you can go back to map even, else you can use one of the error operators to recover. Read up on them :)
Javier Domingo Cansino
@txomon
Nov 14 2015 01:35
yep
Javier Domingo Cansino
@txomon
Nov 14 2015 01:35
see you tomorrow!
Dorus
@Dorus
Nov 14 2015 01:35
retry() is my first guess,
gnight :D
Javier Domingo Cansino
@txomon
Nov 14 2015 01:35
ok, thank you very much!
Javier Domingo Cansino
@txomon
Nov 14 2015 01:53
@Dorus seems like your idea would be to make it as an operator. That won't work because my observer won't get notified about the error at all, it will be intercepted by the chain's .retry()
maybe I should not use Exceptions at all...
I mean, they are a nasty way to do things anyway, and in Java are really heavy IIRC...
Javier Domingo Cansino
@txomon
Nov 14 2015 01:58
I am going to change all my code not to use Exceptions but when they are unrecoverable
Abhinav Solan
@eyeced
Nov 14 2015 06:45
Hey guys .. need the best way to solve this problem .. Problem here is .. I have an API call which fetches data in time range .. now this data partially lies in one DB (newer data) and rest of it lies in another DB, so what would be the best way to merge this data .. in most cases data from one DB which has newer data would be enough only if th etime range spans between these two I would need to fetch data from the older DB .. what could be the best method in rxjava to merge these 2 stream .. 2nd stream would be dependent on the last element of the 1 st steam
Dorus
@Dorus
Nov 14 2015 10:38
@eyeced concat them?
@txomon .retry() has a overload that let you specify what exceptions to retry and what to fail on (forwards).
Javier Domingo Cansino
@txomon
Nov 14 2015 11:27
@Dorus the thing is that if it retries, it doesn't reach the next step in the pipe
I mean, the error dies on retry()
so no onError() reaches the subscriber
Dorus
@Dorus
Nov 14 2015 11:38
What exactly do you want to do with the onError? If you've correctly responded to it by continue to wait for the next click, that's all you need right?
Also you can add a doOnError(e -> writeLog()) to write your log file, if you really want to.
Or just write to the log file before you call subscriber.onError(e), that's what you're currently doing right?
Javier Domingo Cansino
@txomon
Nov 14 2015 11:46
The thing is that onError unsubscribes the observer
I can only subscribe if I do it from the pipeline it seems
Dorus
@Dorus
Nov 14 2015 11:47
I do not quite understand what you mean.
retry() will resubscribe, and outside the stream you can actually subscribe again if you keep a reference to your observable.
It wont be the same subscription ofcourse.
But keeping it inside the stream is much cleaner.
(for non-fatal errors)
Javier Domingo Cansino
@txomon
Nov 14 2015 11:54
No no
the thing is that the error will be handled by retry()
no onError() will reach the subscriber
    sendCallClick
            .observeOn(Schedulers.io())
            .flatMap(new Func1<Object, Observable<Object>>() {
                @Override
                public Observable<Object> call(Object o) {
                    return Observable.create(
                            new Observable.OnSubscribe<Object>() {
                                @Override
                                public void call(Subscriber<? super Object> subscriber) {
                                    OnCustomCallFragmentInteractionListener activity = (OnCustomCallFragmentInteractionListener) main;
                                    try {
                                        subscriber.onNext(
                                                activity.makeUbusRpcClientCall(
                                                        ubusObject.getText().toString(),
                                                        ubusMethod.getText().toString(),
                                                        null));
                                    } catch (Throwable e) {
                                        subscriber.onError(e);
                                    }
                                    subscriber.onCompleted();
                                }
                            }
                    );

                }
            })
            .retry()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(((OnCustomCallFragmentInteractionListener) main).getCallResultObserver());
the subscriber will never execute onError()
because .retry() is doing it for it
so the only way to pass the error is not to pass an error but saying it has a failure
this is, I need to tell that there is an error without exceptions
Dorus
@Dorus
Nov 14 2015 12:12
@txomon Did you look at the overload i told you to use?
@txomon .retry() has a overload that let you specify what exceptions to retry and what to fail on (forwards).
http://reactivex.io/RxJava/javadoc/rx/Observable.html#retry%28rx.functions.Func2%29
Also, put the onComplete inside the try, else you end up calling both onError and onComplete.
And that's against the Rx contract.
Javier Domingo Cansino
@txomon
Nov 14 2015 12:18
Yeah, I previously had an else, but else doesn't exist in java xD
I did a return already
@Dorus anyway, it will get unsubscribed and I know no way of getting subscribed from the subscriber
Dorus
@Dorus
Nov 14 2015 12:21
For non-fatal errors, you let retry catch them and continue to monitor the clicks. For fatal errors, you close the view (and possibly more), and subscribe again when the user opens the view again.
Javier Domingo Cansino
@txomon
Nov 14 2015 12:21
@Dorus the thing is, how do I get notified of the error in the subscriber?
Dorus
@Dorus
Nov 14 2015 12:21
Another thing: you never store the subscription from subscribe. I would store it and make sure it gets unsubscribed in the onDetach()
Ah
subsctiber has multiple functions
Javier Domingo Cansino
@txomon
Nov 14 2015 12:22
I need to give feedback if an error ocurs
Dorus
@Dorus
Nov 14 2015 12:22
onNext, onError and onComplete
So you just listen to both onNext and onError
Javier Domingo Cansino
@txomon
Nov 14 2015 12:22
yeah, but onError won't be called if an error occurs
Dorus
@Dorus
Nov 14 2015 12:22
ooh, a non fatal error?
Javier Domingo Cansino
@txomon
Nov 14 2015 12:22
ok, I think we are making loops
yeah
Dorus
@Dorus
Nov 14 2015 12:22
Just use Do i think
Javier Domingo Cansino
@txomon
Nov 14 2015 12:22
that's it =)
Do?
Dorus
@Dorus
Nov 14 2015 12:23
Or do it in the retry function, but that's ugly
doOnError
mm i'm thinking
I thin do is the best solution
Javier Domingo Cansino
@txomon
Nov 14 2015 12:27
the thing is, why do I get unsubscribed
I don't want to get unsubscribed
this is a flatmap
if it would be a map it would make sense
but this is a flow on each own each time
I am now trying to see how does retry() work, because I should be able to make the same from my subscriber
meaning, if I can make a retry() from the subscriber that would be nice
Dorus
@Dorus
Nov 14 2015 12:36
You could, but it would be really ugly. With circular references and stuff. You would need a reference to your subscribe function inside your subscribe function so you can subscribe, you also need a reference to the preceeding observable. That's less problematic but still not something i would recommend.
Remember the Rx syntax
OnNext*(OnCompleted|OnError)?
0 or more OnNext calls, optional followed by either a OnCompleted or OnError.
After a OnError, it is no longer allowed to receive other events. Therefor you either need to remove the OnError from the sequence, or the sequence will end (=unsubscribe).
Javier Domingo Cansino
@txomon
Nov 14 2015 12:40
yeah, I removed that problem by putting the onCompleted inside the try
anyway, I will need to switch to that non-error stuff
Dorus
@Dorus
Nov 14 2015 12:41
Anyway another option is to use onErrorReturn on the inner observable (inside flatMap), and insert a special onNext item that you handel in the subscriber to display the error.
Javier Domingo Cansino
@txomon
Nov 14 2015 12:42
Yeah, but for that, I just emit an error as usual
I mean, I control the ubusRPC lib
so I can make it send the error as an object rather than throwing an exception
Dorus
@Dorus
Nov 14 2015 12:43
You can also use onErrorResumeNext instead of onErrorReturn, so that you can rethrow errors also.
That's possible but i would prefer not to mutilate the library for some specific uses.
Javier Domingo Cansino
@txomon
Nov 14 2015 12:44
well, the thing is: Java exceptions cost a lot of resources, is that true for java 6?
if it is, I rather use exceptions for real errors than for just a "you forgot to authenticate your request"
or stuff like that
Dorus
@Dorus
Nov 14 2015 12:45
a lot of resources is a big word, at least back in the days when i tested it (for java 5), it was 100x slower than returning a value, but i could still throw 150.000 of them per second.
Javier Domingo Cansino
@txomon
Nov 14 2015 12:46
sure, 100x is enough, I am just getting started =)
Dorus
@Dorus
Nov 14 2015 12:46
So a single exception when a user clicks a button and something goes wrong is perfectly acceptable.
Really
do not EVER care about performance
unless you have performance issues. Then benchmark and improve the worse parts.
In all other cases, write the best possible code
Javier Domingo Cansino
@txomon
Nov 14 2015 12:46
is not about performance but about being really annoying to handle an exception in the rx pipeline
Dorus
@Dorus
Nov 14 2015 12:47
Throwing a exception in a hot loop in the normal flow is bad, both in performance and readability. Using exceptions to mark special flow is where they are made for
Dorus
@Dorus
Nov 14 2015 12:58
sendCallClick
    .flatMap(e -> 
        Observable.create(o -> {
            try { 
                o.onNext(makeUbusCall());
                o.onCompleted();
            } catch (Throwable t) {
                o.onError(t);
            };
    })).doOnError(e -> {
        if (e instanceof UbusException) {
            displayErrorMessage();
        }
    }).retry((e, c) -> e instanceof UbusException)
    .subscribe(...);
Javier Domingo Cansino
@txomon
Nov 14 2015 13:26
oohhh I understand what you mean
so exceptions should be treated in the flow rather than in the subscriber
Dorus
@Dorus
Nov 14 2015 13:27
Yeah, only fatal errors should reach the subscriber.
At least, those fatal enough to end the subscription
Btw, i just tought of a alternative to Observable.create:
.flatMap(e -> {
    try {
        return Observable.just(makeUbusCall());
    } catch (Throwable t) {
        return Observable.error(t);
    };
})
Javier Domingo Cansino
@txomon
Nov 14 2015 14:41
no, we need the oncomplete
oh no, we don't
ufff
we start with the threading issue again
let me try a few variations
Javier Domingo Cansino
@txomon
Nov 14 2015 15:01
yeah, seems be happening correctly
Dorus
@Dorus
Nov 14 2015 17:32
Of course my suggestions are correct :D
Abhinav Solan
@eyeced
Nov 14 2015 18:17
thanks @Dorus .. actually the next observable would depend on the last element of the first observable is there some other way .. or I would just have to fetch the obs1.last() and then create the next stream
Dorus
@Dorus
Nov 14 2015 19:31

@eyeced yeah i get what you mean. I do not know of a default operator to do that, but not too hard to cook it up yourself:

    Observable.create(o -> {
        Observable<Integer> source1 = Observable.just(1); // init source 1 here.
        ConnectableObservable<Integer> s1 = source1.publish();
        Observable<Integer> s2 = s1.lastOrDefault(2) // return a default value if source 1 is empty. 
                .flatMap(e -> Observable.just(e + 1)); // init source 2 here based on e.
        Observable.concat(s1, s2).subscribe(o);
        o.add(s1.connect());
    });

That could actually make a nice Transformer i think.

Dorus
@Dorus
Nov 14 2015 19:37
<T> Transformer<T, T> concatOnLast(T defaultValue, Func1<T, Observable<T>> nextObservable) {  
    return source -> Observable.create(o -> {
        ConnectableObservable<T> s1 = source.publish();
        Observable<T> s2 = s1.lastOrDefault(defaultValue) // return a default value if source 1 is empty. 
                .flatMap(e -> nextObservable.call(e)); // init source 2 here based on e.
        Observable.concat(s1, s2).subscribe(o);
        o.add(s1.connect());
    });
}
Untested so please double check if it does what i think it should do :D
Abhinav Solan
@eyeced
Nov 14 2015 19:46
thanks a lot @Dorus .. this looks pretty neat
Dorus
@Dorus
Nov 14 2015 19:47
Sure, i'm getting the hang of RxJava here, love to puzzle around with it ^_^
Let me know if it does (or doesn't) work :)