These are chat archives for ReactiveX/RxJava

12th
Feb 2016
purna455
@purna455
Feb 12 2016 00:00
you mean seperate threads?
Dorus
@Dorus
Feb 12 2016 00:01
I used Schedulers.newThread()
There are other options too
like a threadpool etc.
purna455
@purna455
Feb 12 2016 00:01
Schedulers.newThread() ? wat this does
even w/o this they run in seperate threads rt... i mean asynchronously
Dorus
@Dorus
Feb 12 2016 00:02
It's an scheduler that schedules all work on a new thread
mmm, there are a number of concepts running trough eachother :P
purna455
@purna455
Feb 12 2016 00:03
little bit confused
Dorus
@Dorus
Feb 12 2016 00:03
i think i'm not explaining it right
Lemme do it different
Letrs assume the Observables are implented propperly for once :P
I'm now jumping trough hoops here that makes it difficult
and unneeded too
purna455
@purna455
Feb 12 2016 00:04
so in my second case ..where all services are independent...you didnt use this Schedulers.newThread()...still you said it runs asynchronously and scheduler takes care of it?
Dorus
@Dorus
Feb 12 2016 00:04
Lets forget the stuff about schedulers for now
purna455
@purna455
Feb 12 2016 00:05
ok
are you giving different code for the above 2 cases :smile:
Dorus
@Dorus
Feb 12 2016 00:06
also the stuff about defer and just
The basis is that Observables can subscribe quickly and return, and emit in the background.
However what i did with defer doesn't do that.
The rest of the idea was correct :)
Only we need to create our observables different.
purna455
@purna455
Feb 12 2016 00:08
you mean no need to wrap the call inside deffer?
Dorus
@Dorus
Feb 12 2016 00:09
not if they already return an observable
They already are async
purna455
@purna455
Feb 12 2016 00:10
if they dont then wrapping in deffer makes the call async else sync :worried:
Dorus
@Dorus
Feb 12 2016 00:10
no no, i'm confusing. Sorry
I just used deffer to show they're lazy :)
Shouldn't be showing it like that
defer makes things lazy that arn't
but normally obsrvables already are
purna455
@purna455
Feb 12 2016 00:11
adding deffer or not doesn"t make difference rt?
as observable already makes lazy
Dorus
@Dorus
Feb 12 2016 00:14
Mm, i cant import that rxjava-async into my IDE
purna455
@purna455
Feb 12 2016 00:14
so to do this i need to import rxjava-async ?
purna455
@purna455
Feb 12 2016 00:15
and in the above code which part of code is called stream?
Dorus
@Dorus
Feb 12 2016 00:15
I'm doing this all wrong. Sorry agani
purna455
@purna455
Feb 12 2016 00:16
which is wrong?
and in the above code which part of code is called stream?
Dorus
@Dorus
Feb 12 2016 00:16
The observable create an stream.
In our case, obsrvable.just() creates an stream of 1 element.
But we can also use observable.range(1,10) to create a stream of 10 elements, 1 trough 10.
purna455
@purna455
Feb 12 2016 00:17
so each service is a stream..in this case 3 streams?
Dorus
@Dorus
Feb 12 2016 00:18
Yes
purna455
@purna455
Feb 12 2016 00:18
and subscribe for all the streams?
Dorus
@Dorus
Feb 12 2016 00:18

Lets go backto

Observable <serviceresult1> r1=callservice1();
Observable <serviceresult2> r2=callservice2();
Observable <serviceresult3> r3=callservice3();

and forget about defer and just.

You can do r1.subscribe() to connect and listen to servce1
And r2.subscribe() for service 2.
purna455
@purna455
Feb 12 2016 00:19
why?
Dorus
@Dorus
Feb 12 2016 00:19
Here we already assume callservice1() doesn't do anything. It only returns an observable that does thing once you subscribe.
purna455
@purna455
Feb 12 2016 00:19
confused :worried:
can you paste the updated code for both cases
Dorus
@Dorus
Feb 12 2016 00:21
    Observable<Integer> r1 = callservice1();
    Observable<Integer> r2 = callservice1();
    Observable<Integer> r3 = callservice1();
    Observable.zip(r1, r2, r3, (result1, result2, result3) -> {
        return 4;
    }).subscribe(e -> System.out.println(e));
purna455
@purna455
Feb 12 2016 00:24
Observable <serviceresult1> r1= callservice1(); Observable <serviceresult2> r2=callservice2(); Observable <serviceresult3> r3=callservice3(); result = r2.flatMap(e -> { if (condition(e)) { return r1; } else { return r3; } }) result.subscribe( e -> { / whatever you want to do with the values from r1 and r3/}, () -> { /* whatever you want to do when all values are processed 3
for dependent services
Dorus
@Dorus
Feb 12 2016 00:25
newlines after ``` :)
    Observable<Integer> r1 = callservice1();
    Observable<Integer> r2 = callservice1();
    Observable<Integer> r3 = callservice1();
    r1.flatMap(e -> {
        if (e == 0) {
            return r2;
        }
        return r3;
    }).subscribe(e -> System.out.println(e));
Something like that yes
Lets start with this again
The idea is that r1 is already running in the background when you subscribe, as does r2 and r3. This is handeled bu the creation of the Observable.
Dorus
@Dorus
Feb 12 2016 00:30
Namely the rxjava-async module has functions that makes these.
But we can create observables from many sources.
Observable.from has most options
Doh, Observable.from has even an Future overload
I think we can ignore rxjava-async
That page should tell you what you need :)
By default subscribe happens on the current thread.
But as soon as you hit some scheduled actions, it moves away from the starting thread.
However, subscribeOn(scheduler) can be used to move even the subscribing call to a new scheduler. And that scheduler might use a different thread.
purna455
@purna455
Feb 12 2016 14:42
so here(in flatmap example) all the service calls happens in different thread ,whereas the subscribe call will happen in the current thread from where its invoked?
Boy Wang
@boyw165
Feb 12 2016 15:24
The retryWhen operator is awesome. But how could it resubscribe the returned observable? That's a bit confusing because it seems like the returned observable subscribe to a subject, is it?