These are chat archives for ReactiveX/RxJava

11th
Feb 2016
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:41
hi
Do you know why i take an error? java.util.NoSuchElementException: Sequence contains no elements
PublishSubject<Integer> uploadObservable = PublishSubject.create();
        uploadObservable.onNext(1);
        uploadObservable.onNext(2);
        uploadObservable.onNext(3);
        uploadObservable.onCompleted();

        Observable<Integer> registerObservable = uploadObservable
                .last();

        uploadObservable.concatWith(registerObservable)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("completed");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }
                });
Dorus
@Dorus
Feb 11 2016 11:45
@charbgr You are using a publish subject. You need to call onNext after you subscribe, or use something different like a replay subject.
Because your publish subject already completed once last subscribes, last will see an empty sequence, and yield NoSuchElementException as per spec.
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:47
ohh thank you!
So you are suggesting replaysubject?
Dorus
@Dorus
Feb 11 2016 11:47
That's one option. Easy to get one source.replay(1).
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:48
by source you mean uploadObservable in my example?
Dorus
@Dorus
Feb 11 2016 11:49
yes
And you need to call replay and connect to it before you make the calls to onNext :)
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:50
you are right. if i replace PublishSubject with ReplaySubject everything works
Dorus
@Dorus
Feb 11 2016 11:50
Yup, it should.
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:51
if i use ReplaySubject i must use the replay method?
i don’t think so
Dorus
@Dorus
Feb 11 2016 11:51
no, one or the other
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:51
so cool :)
thank you
Dorus
@Dorus
Feb 11 2016 11:52
I mentioned replay because i assumed you where using the Subject as an example to replace your real source :)
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:53
the replay must be before last ?
Dorus
@Dorus
Feb 11 2016 11:53
no, before the onNext calls.
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:54
uploadObservable.onNext(1);
        uploadObservable.onNext(2);
        uploadObservable.onNext(3);
        uploadObservable.replay(1);
        uploadObservable.onCompleted();
this doesn’t work
Dorus
@Dorus
Feb 11 2016 11:54
noooo, lol
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:54
what?? :P
Dorus
@Dorus
Feb 11 2016 11:54
ConnectableObservable<Integer> uploadObservable2 = uploadObservable.replay(1);
Subscription sub = uploadObservable2.connect()
uploadObservable.onNext(1);
uploadObservable.onNext(2);
uploadObservable.onNext(3);
uploadObservable.onCompleted();
wait, that's wrong
edited
And then use uploadObservable2 to subscribe at.
Something along those lines, don't have a compiler at hand here.
Vasilis Charalampakis
@charbgr
Feb 11 2016 11:57
uploadObservable2 doesnt have connect
Dorus
@Dorus
Feb 11 2016 11:58
Remember uploadObservable2 is of type ConnectableObservable
This message was deleted
This message was deleted
delete nope, should have connect()
Dorus
@Dorus
Feb 11 2016 12:06
Oh also, you need to connect your replaySubject too of course, if you use that.
http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#Replay contains a good explanation.
Vasilis Charalampakis
@charbgr
Feb 11 2016 13:19
thanks @Dorus :)
Boy Wang
@boyw165
Feb 11 2016 14:24
Hi, I'm new to RxAndroid. I read some articles talking about this and still have a question about subscription and composite subscription. Why can a composite subscription can add an arbitrary subscription by unscribing it and still can get emissions from the observable?
In my opinion, I think subscription is like a voucher containing some reference to the subscriber so that it can unscrible from the observable. Please correct me if I'm wrong. :D
Dorus
@Dorus
Feb 11 2016 14:26
Do you mean, subscription a gets b added, and then b unsubscribe? That wont affect a, as the reference goes from a->b, not the other way around. If a does unsubscribe, b will go down with it.
At least that's how i understand it :)
Boy Wang
@boyw165
Feb 11 2016 14:30
Oh so that's why it is called composite subscription. Which also meant that it is able to pass the reference from a subscription to a composite subscription? Looks like so, COOL.
Dorus
@Dorus
Feb 11 2016 14:33
An subscription is nothing more than what you can compare to a cancelationToken. It holds a reference to the scheduler that does the work. If you unsubscribe it tells the subscription and everything before it to cancel all ongoing work. That way you wont receive new events.
Sometimes the ongoing work keeps going as it can't stop mid run, but it wont emit anymore, that's guaranteed. Canceling the work itself happens on a best-effort basis.
Boy Wang
@boyw165
Feb 11 2016 14:35
Oh I see. Thank you @Dorus .
Dorus
@Dorus
Feb 11 2016 14:36
Also using unsubscribe shouldn't be all that common at all. Normally you either wait for onCompleted or onError or use operators like take(n) that unsubscribe for you.
However chaining subscriptions can be very useful to keep the number of references you need to track low.
I often use it inside observable.create if i work with multiple subscriptions in there.
purna455
@purna455
Feb 11 2016 16:11
Currently we are using Java Futures in our Rest Api to Invoke multiple services asynchronously...

but when i gone through http://reactivex.io/intro.html

it says
"It is difficult to use Futures to optimally compose conditional asynchronous execution flows (or impossible, since latencies of each request vary at runtime). This can be done, of course, but it quickly becomes complicated (and thus error-prone) or it prematurely blocks on Future.get(), which eliminates the benefit of asynchronous execution."

would really interested to know how RxJava solves this problem...i found it difficult to understand from the documentation..Please help!!

Should i really replace Futures with RxJava to achieve better performance???our Api will have heavy load of requests..

Dorus
@Dorus
Feb 11 2016 17:17
Yes, rxjava solves this. I believe there should be an from future call also. Will need a little more detailed questions thou
purna455
@purna455
Feb 11 2016 18:27
sorry atleast for me the documentation is a bit confusing...can someone gimme a highlevel benefit of using RxJava Observable in terms of asynchronous calls??
Dorus
@Dorus
Feb 11 2016 19:40
(link to video is on that site)
purna455
@purna455
Feb 11 2016 20:59
i just saw this video to be honest i really didnt get how Observable solves the Future.get() thread blocking problem
Dorus
@Dorus
Feb 11 2016 21:03

Oh, i guess i understood your question wrong. I think you mean this snipped?

This can be done, of course, but it quickly becomes complicated (and thus error-prone) or it prematurely blocks on Future.get(), which eliminates the benefit of asynchronous execution.

If you block threads (threadpool threads at worse), that's really bad for performance. Rx helps to avoid calls like Future.get() that might block because you only schedule the callback and stay in the async context at all time.

I'm not very familiar with Future myself, so i find it hard to give examples :)
purna455
@purna455
Feb 11 2016 21:15
ok lemme understand with an example... if a client is invoking multiple service calls (s1,s2,which returns futures)asynchronously..and s1.get();s2.get() ,in this case s2.get() will not be executed until s1.get() returns.And if i use Observable as it doesn"t use get and it uses callbacks ie s1.map(..);s2.map(..) both are executed at same time and whichever returns first the associated callback will be executed ..and at the same time the worker thread(client ) which calls these services is not blocked...
This message was deleted
Dorus
@Dorus
Feb 11 2016 21:18
is s2 dependant on the result of s1?
or should they run parallel and have there results combined?
purna455
@purna455
Feb 11 2016 21:19
run parallel
Dorus
@Dorus
Feb 11 2016 21:19
And what do you want to do with the results?
combine them? Do some independant action?
Do the same action twice?
purna455
@purna455
Feb 11 2016 21:20
i have both type of scenarios...
Dorus
@Dorus
Feb 11 2016 21:20
ok, lemme go over them
If you want to combine the results, you use zip(s1, s2, (s1result, s2result) => { ... })and you wont have any thread waiting. (in my example s1 and s2 are already observables)
purna455
@purna455
Feb 11 2016 21:21
and if its independent?
Dorus
@Dorus
Feb 11 2016 21:21
If you want to do the same action on both, you can use merge(s1, s2)
If you want different actions, you can use s1.subscribe(); s2.subscribe()
Of course you decorate subscribe with the actual action you want to perform.
merge is also powerful because is serialize the results.
So no need to think about concurrency.
purna455
@purna455
Feb 11 2016 21:22
so in all the 3 cases the worker thread is not blocked rt?
Dorus
@Dorus
Feb 11 2016 21:22
nope
purna455
@purna455
Feb 11 2016 21:23
ok in the video he said result.Map({s->if(s.equals(x){}})??
Dorus
@Dorus
Feb 11 2016 21:24
map tranforms the result from an async function to anther result.
purna455
@purna455
Feb 11 2016 21:25
ok i guess i need to check how its different from subscribe
Dorus
@Dorus
Feb 11 2016 21:25
with futures, something like this:
Object o = myFuture.get();
if (s.equals(x)) {
   return y;
}
return z;
here, on the first line, you block a thread.
purna455
@purna455
Feb 11 2016 21:27
yeah
please show using observable
Dorus
@Dorus
Feb 11 2016 21:27
i'm working on it :)
purna455
@purna455
Feb 11 2016 21:27
sorry
Dorus
@Dorus
Feb 11 2016 21:28
my IDE wouldn't cooperate :)
purna455
@purna455
Feb 11 2016 21:29
np thanks for your help..im newbie to RXjava ..we are still evaluating whether to use RxJava or Java8 CompletableFuture....trying to find the difference..completableFuture also provides callback for async calls
Dorus
@Dorus
Feb 11 2016 21:31
o1.map(s -> {
    if (s.equals(x)) {
        return y;
    }
    return z;
});
i'm skipping some details here
o1.map() returns an observable, you still need to subscribe to it to tell it what to do when the results return.
In the future example, you have a function running, absorbing a thread.
o1.map() wont do anything until you subscribe.
Anyway the really interesting operator is flatMap, that allows you to run multiple async functions and gather the results back together.
purna455
@purna455
Feb 11 2016 21:37
thats zip right?
Dorus
@Dorus
Feb 11 2016 21:37
well, with zip you start multiple calls out of 1 source item.
With flatMap you can start one for every source item.
Usually you combine both.
Sheet 32-34 in the presentation shows a nice example
purna455
@purna455
Feb 11 2016 21:38
which presentation sheets?
nr 46 shows zip i think
flatMap and zip
purna455
@purna455
Feb 11 2016 21:43
ok lemme understand o1 is an observable result from a service call..and o1.map modifies and returns the modified result...and o1.subscribe is the actual callback which will be executed when map is returned...so for this service call example we need perform 3 steps from client side 1:make service call which returns observable result;2: o1.map to modify the result ,3: o1.subscribe (callback()) which executes the callback..please correct me if im wrong..
Dorus
@Dorus
Feb 11 2016 21:44
actually o1.map(...).subscribe(...)
    Observable<Integer> o1 = Observable.just(1);
    o1.map(s -> s + 1).subscribe(s -> System.out.println(s));
This will print '2'.
    Observable<Integer> o1 = Observable.just(1,2,3);
    o1.map(s -> s + 1).subscribe(s -> System.out.println(s));
prints
2
3
4
purna455
@purna455
Feb 11 2016 21:47
ok in Observable.just(1) will not block the client thread...??
Dorus
@Dorus
Feb 11 2016 21:48
no, it create an observable that contains 1 value (1).
public void runFuture(Future<Integer> f) {
    Observable<Integer> o1 = Observable.from(f);
    o1.map(s -> s + 1).subscribe(s -> System.out.println(s));
}
here we take the result from a future and print it after adding 1.
Of course, future is a boring source, it only gives you one element.
But it's a start.
This function returns instantly.
And it print from a background thread.

Better would be to use

public Subscription runFuture(Future<Integer> f) {
    Observable<Integer> o1 = Observable.from(f);
    return o1.map(s -> s + 1).subscribe(s -> System.out.println(s));
}

Now the client can also cancel the work if needed.

Dorus
@Dorus
Feb 11 2016 21:56
mmm, thenCompose looks comparable to map
or actually flatMap should be used.
I wonder if it has equalivant operators for the rest of the tranformations
Still i think Rx is strong on sources with multiple values, i dont think Futures can do that. Even with single values Rx offers some nice additions.
Dorus
@Dorus
Feb 11 2016 22:10

@purna455 Did i lose you? :)

I think i should go back up a level and ask you what you really want to do. Rx is good with event streams, combining them, query them. Async calls are pretty much streams with 1 event. It can combine with those seamless, but if that's all you using it for, there are indeed other powerful frameworks too.

havn't watched if fully but it looks like it's about your question
purna455
@purna455
Feb 11 2016 22:47
sorry Dorus i lost connection im back lemme go thru your posts
purna455
@purna455
Feb 11 2016 22:53
ok lemme explain my usecase ...we are having a rest api which internally calls mutiple external services some are independent and some are dependent on each other...so currently we are calling all these services asynchronously and using Futures...im trying to understand if we use observable do we get any performance benefit...as our api will always has huge load of user requests.....
Dorus
@Dorus
Feb 11 2016 22:55
That does sound perfect for Rx
purna455
@purna455
Feb 11 2016 22:55

so if i use observable in Observable <serviceresult1> r1=callservice1();

Observable <serviceresult2> r2=callservice2();

Observable <serviceresult3> r3=callservice3();

Dorus
@Dorus
Feb 11 2016 22:56
That looks a lot like the scenario in the 'Reactive Streams with Rx at JavaOne 2014' presentation
How are you calling service1, 2 and 3 with no parameters?
purna455
@purna455
Feb 11 2016 22:57
there are parameters i just showed an example
Dorus
@Dorus
Feb 11 2016 22:57
Where do they come from?
purna455
@purna455
Feb 11 2016 22:58
from the i/p to the rest api which internally calls these services
Dorus
@Dorus
Feb 11 2016 22:58
ok nvm i didnt understand that :P
purna455
@purna455
Feb 11 2016 22:59
ok there is an rest api which internally calls all these services
client will invoke the api which internally calls all the three services
based on the service2 result i need to call service3 or service 1
api takes care of sending parameters ..please ignore that part
ok for the given scenario how should i handle this with observable
Dorus
@Dorus
Feb 11 2016 23:03
What's the result of service 2 and 3? Is it the same?
purna455
@purna455
Feb 11 2016 23:04
no different
obviously it cannot be same :smile:
as they are two different services
Dorus
@Dorus
Feb 11 2016 23:04
In that case, source.flatMap(e -> service2).flatmap(e -> {if (e.service1) {return service1; } else {return service2; }})
I mean the datatype
purna455
@purna455
Feb 11 2016 23:05
ok
Dorus
@Dorus
Feb 11 2016 23:05
if they both return the same type you can combine the results
Else you need to split it into two streams
That perform different actions
purna455
@purna455
Feb 11 2016 23:05
what is source here?
Dorus
@Dorus
Feb 11 2016 23:05
Ofcourse it's possible to perform different actions until they contain the same data again.
purna455
@purna455
Feb 11 2016 23:06
please take this as reference
Observable <serviceresult1> r1=callservice1();
Observable <serviceresult2> r2=callservice2();
Observable <serviceresult3> r3=callservice3();
Dorus
@Dorus
Feb 11 2016 23:06
Yeah that's where you got me confused. You got no parameters or return values.
purna455
@purna455
Feb 11 2016 23:07
ok all the methods which callservices will return Observable<resulttype>
this is possible rt
Dorus
@Dorus
Feb 11 2016 23:07
sounds good
Ok, i assume you want to start at r2
result = r2.flatMap(e -> {
    if (condition(e)) {
        return r1;
    } else {
        return r3;
    }
})
And you can then get the results by calling
result.subscribe(
  e -> { /* whatever you want to do with the values from r1 and r3*/}, 
  () -> { /* whatever you want to do when all values are processed */}
)
purna455
@purna455
Feb 11 2016 23:11
before this result=r2.flatMap(...) lines of code i can call the services as below rt??
Observable <serviceresult1> r1=callservice1();
Observable <serviceresult2> r2=callservice2();
Observable <serviceresult3> r3=callservice3();
then
This message was deleted
This message was deleted
result = r2.flatMap(e -> {
if (condition(e)) {
return r1;
} else {
return r3;
}
})
Dorus
@Dorus
Feb 11 2016 23:12
The last bits of code i pasted are better.
purna455
@purna455
Feb 11 2016 23:12
but where is the actual call to service here
Dorus
@Dorus
Feb 11 2016 23:12
Also surround your code with ``` or jump in of 4.
purna455
@purna455
Feb 11 2016 23:13
i mean before using flatmap do i need to call the methods which call services?
Dorus
@Dorus
Feb 11 2016 23:15
Observable <serviceresult1> r1=callservice1();
This should probably be
Observable <serviceresult1> r1=Observable.defer(() -> Observable.just(callservice1()));
When you said callservice1 returns an observable, i assumed it would be called when you subscribve to it.
That's how you normally call functions wrapped in Observables
And when you subscribe, it execute everything in the chain.
purna455
@purna455
Feb 11 2016 23:17
confused lemme write the code please coorect it ,i need to surround my code with ''' rt?
Dorus
@Dorus
Feb 11 2016 23:17
I think you are confused about how Observables are created and used :)
When you create a Observable, it's like declaring a function. It's there but it's not running.
Like lazy evaluation
Only when you subscribe, the inner function is ran.
So first you build up an observable chain, and then you subscribe to the entire chain.
Observable.defer(() -> Observable.just(callservice1()))

Here we make two steps. First the inner lambda () -> Observable.just(callservice1()))

This translate to

    private Observable <serviceresult1> sr1() {
        return Observable.just(callservice1());
    }
purna455
@purna455
Feb 11 2016 23:21

‘''Observable <serviceresult1> r1=Observable.defer(() -> Observable.just(callservice1()));
Observable <serviceresult2> r2=Observable.defer(() -> Observable.just(callservice2()));
Observable <serviceresult3> r3=Observable.defer(() -> Observable.just(callservice3()));

result = r2.flatMap(e -> { if (condition(e)) { return r1; } else { return r3; } })

result.subscribe( e -> { / whatever you want to do with the values from r1 and r3/}, () -> { /* whatever you want to do when all values are processed (i didn’t get this part r1,r3 are already processed before reaching here)’''

please correct me if im wrong
Dorus
@Dorus
Feb 11 2016 23:22
Use the ` right next to your 1 on the upper left of your keyboard :)
purna455
@purna455
Feb 11 2016 23:22
ok
Dorus
@Dorus
Feb 11 2016 23:22
Next, Observable.defer adds lazy evaluation.
purna455
@purna455
Feb 11 2016 23:22
Observable <serviceresult1> r1=Observable.defer(() -> Observable.just(callservice1())); Observable <serviceresult2> r2=Observable.defer(() -> Observable.just(callservice2())); Observable <serviceresult3> r3=Observable.defer(() -> Observable.just(callservice3())); result = r2.flatMap(e -> { if (condition(e)) { return r1; } else { return r3; } }) result.subscribe( e -> { / whatever you want to do with the values from r1 and r3/}, () -> { /* whatever you want to do when all values are processed (i didn’t get this part r1,r3 are already processed before reaching here)
Dorus
@Dorus
Feb 11 2016 23:22
Three times `
purna455
@purna455
Feb 11 2016 23:22
i did
Dorus
@Dorus
Feb 11 2016 23:22
Also you can edit if you're quick enough :)
mm, new line perhapts
after and before the last ```
Observable <serviceresult1> r1=Observable.defer(() -> Observable.just(callservice1()));
Observable <serviceresult2> r2=Observable.defer(() -> Observable.just(callservice2()));
Observable <serviceresult3> r3=Observable.defer(() -> Observable.just(callservice3()));
result = r2.flatMap(e -> { if (condition(e)) { return r1; } else { return r3; } })
result.subscribe( e -> { / whatever you want to do with the values from r1 and r3/}, () -> { /* whatever you want to do when all values are processed (i didn’t get this part r1,r3 are already processed before reaching here)
3
purna455
@purna455
Feb 11 2016 23:24
yep exactly
so where the actual service calls happening?
Dorus
@Dorus
Feb 11 2016 23:24
When you subscribe
it calls r2
r2 is deffer
that calls callservice2()
and wraps its result into an Observable.
Then the flatMap takes this result.
purna455
@purna455
Feb 11 2016 23:25
1 sec
Dorus
@Dorus
Feb 11 2016 23:25
And feeds it to the if statement
And then calls either r1 or r3
And the result of that is fed to the lambda in subscribe.
So the calls happen when you subscribe, inside the lambda we declared earlier for that purpose.
but they are only called on the 5th line where you use subscribe
(or rather, scheduled somewhere in the background, as subscribe also returns instantly)
purna455
@purna455
Feb 11 2016 23:32
lemme go thru and understand :)
what defer does?
Dorus
@Dorus
Feb 11 2016 23:37
lazy evaluation mostly.
I had to use something to change a function call to a Observable.
purna455
@purna455
Feb 11 2016 23:37
ok ...so in this context as services are dependent ..it looks like its calling synchronously...
correct me if im wrong
Dorus
@Dorus
Feb 11 2016 23:38
yes, that's what you want right?
You cant choose between r1 and r3 if you dont know the results of r2.
purna455
@purna455
Feb 11 2016 23:38
ok...in this context its synchronous rt
ok..if same services assuming they are independent how can i call them asynchronously...
Dorus
@Dorus
Feb 11 2016 23:40
Observable.zip(r1, r2, r3, (result1, result2, result3) -> { return 4; });
return 4; ofcourse needs to be replaced with what you actually want to return using the three results.
purna455
@purna455
Feb 11 2016 23:40
and subscribe as well?
Dorus
@Dorus
Feb 11 2016 23:41
yes
It wont execute until you subscribe.
Actually that with zip is what happened in the netflix example i let you watch.
he creates 3 observables, what are pretty much 3 calls, and the zips them.
purna455
@purna455
Feb 11 2016 23:42
thats really helpful !!! in this first case(dependent serviceS) how its different w/o using Observable and call the services synchronously...one by one
Dorus
@Dorus
Feb 11 2016 23:42
He used bookmarks, rating and metadata.
Well if you call them one by one, your thread is locked up right?
In the observable example, you return an Observable where you subscribe too once you know what you want to do with the results
Or you can return a Subscription instead.
If you already know what action you want to follow up with.
Either way the code all runs on an Scheduler, if you supply one.
purna455
@purna455
Feb 11 2016 23:45
oh you mean even when i use Observable in executing dependent services synchronously it wont block the thread?
Dorus
@Dorus
Feb 11 2016 23:45
So you wont block anywhere
purna455
@purna455
Feb 11 2016 23:45
oh you mean even when i use Observable in executing dependent services synchronously it wont block the thread?
Dorus
@Dorus
Feb 11 2016 23:46
yes
That's the entire point
subscribe() returns almost instantly
or well...
That depend on how you implement the first Observable.
purna455
@purna455
Feb 11 2016 23:47
so even the services need to run in order and the actual service calls happen in subscribe which process all calls at once so thread not blocked?
Dorus
@Dorus
Feb 11 2016 23:48
Actually i think we might need Observable.start instead of defer, so you can specify a scheduler.
purna455
@purna455
Feb 11 2016 23:48
:worried:
scheduler comes into play when subscribe is called rt?
Dorus
@Dorus
Feb 11 2016 23:49
well, if your functions are futures you just use observable.from ofcourse.
or use subscribeOn
purna455
@purna455
Feb 11 2016 23:49
no futures all servicecalls returning Observable
Dorus
@Dorus
Feb 11 2016 23:50
Yeah but it depend already on how the Observable is implemented.
If they already return Observables we can skip out on defer and stuff.
Normal (cold) observables dont do anything untill you subscribe.
And when you subscribe, they might do some work sync
but should quickly schedule themselves
For example when making an IO call
They schedule the result etc.
so they dont block
Then the thread returns and can call something else
In case of subscribe, it returns to our main function.
in case of zip, it will call r3 after r1 returns.
purna455
@purna455
Feb 11 2016 23:52
Observable <serviceresult1> r1= Observable.just(callservice1());
Observable <serviceresult2> r2=Observable.just(callservice2());
Observable <serviceresult3> r3=Observable.just(callservice3());
result = r2.flatMap(e -> { if (condition(e)) { return r1; } else { return r3; } })
result.subscribe( e -> { / whatever you want to do with the values from r1 and r3/}, () -> { /* whatever you want to do when all values are processed (i didn’t get this part r1,r3 are already processed before reaching here)
3
if callservice returns Observable
Dorus
@Dorus
Feb 11 2016 23:52
If your function doesn't return an Observable, and doesn't use anything async. We can still convert it by scheduling the entire function. Thats what deffer did.
purna455
@purna455
Feb 11 2016 23:52
cool
Dorus
@Dorus
Feb 11 2016 23:53
or well, i'm testing deffer now with thread.sleep :)
purna455
@purna455
Feb 11 2016 23:54
you mean if we wrap the service call in deffer it executes the call in asynchronously??
Dorus
@Dorus
Feb 11 2016 23:54
sorry i confused myself
Defer alone isn't enough to make a function async
purna455
@purna455
Feb 11 2016 23:55
:worried:
Dorus
@Dorus
Feb 11 2016 23:55
i normaly write in Rx.Net. Woops :P
Dont think it's different there
purna455
@purna455
Feb 11 2016 23:56
really appreciate your time.sorry for asking newbie questions :smile:
that really helps and will try to implement Observable in our api...
Dorus
@Dorus
Feb 11 2016 23:58
public static void main(String[] args) throws IOException, InterruptedException {
    Observable<Integer> r1 = Observable.defer(() -> Observable.just(sr1())).subscribeOn(Schedulers.newThread());
    Observable<Integer> r2 = Observable.defer(() -> Observable.just(sr1())).subscribeOn(Schedulers.newThread());
    Observable<Integer> r3 = Observable.defer(() -> Observable.just(sr1())).subscribeOn(Schedulers.newThread());
    Observable.zip(r1, r2, r3, (result1, result2, result3) -> {
        return 4;
    }).subscribe(e -> System.out.println(e));
    System.in.read();

}


private static Integer sr1() {
    System.out.println("Start sleep");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    System.out.println("End sleep");
    return 1;
}
This will run all 3 sevices on new threads.