These are chat archives for ReactiveX/RxJava

18th
Mar 2016
fAns1k
@fAns1k
Mar 18 2016 11:14
hi guys, please help me with this snippet, it looks simple but for some reasone it hasn’t work as expected
Observable.from(videoStreams)
                        .delay(videoStream -> playlistSubject.startWith((Void) null)
                                .doOnNext(new Action1<Void>() {
                                    @Override
                                    public void call(Void aVoid) {
                                        Timber.d("Emit delay");
                                    }
                                }))
my point is to emit items one by one, to play in media player
Dorus
@Dorus
Mar 18 2016 11:15
delay merely delays the current one item untill the inner observable emit. This happens instantly because of startWith.
fAns1k
@fAns1k
Mar 18 2016 11:16
i tried to fix it this way =)
Dorus
@Dorus
Mar 18 2016 11:16
What's expected?
fAns1k
@fAns1k
Mar 18 2016 11:17
i get list of items (e.g. 5) and i start to play first immideately
others should play after first, etc
media player gives me event that video is ended, and then i play next one
Dorus
@Dorus
Mar 18 2016 11:20
So you need to buffer the items untill the previouse finishes. Try concatMap.
fAns1k
@fAns1k
Mar 18 2016 11:20
i cannot interact with player inside the stream
that’s why i use subject (PublishSubject) to send event to push new item
Dorus
@Dorus
Mar 18 2016 11:25
So if i understand you correctly, you have 2 streams, one that emit 5 items and should reemit them 1 at the time, each time another stream emits the previous item is finished playing?
I'm not even 100% sure if this is an reactive problem.
fAns1k
@fAns1k
Mar 18 2016 11:26
i guess it does
Dorus
@Dorus
Mar 18 2016 11:27
anyway, you can even use zip. source.zip(playerReady(startWith(ready))).
fAns1k
@fAns1k
Mar 18 2016 11:27
ok, will try, thanks for response =)
Dorus
@Dorus
Mar 18 2016 11:28
zip will wait for both streams to have emited something.
We append a start value to playerReady so that it always starts as ready, so the first item from source is emitted.
fAns1k
@fAns1k
Mar 18 2016 11:28
yep, it connects one to one from streams
Dorus
@Dorus
Mar 18 2016 11:28
Then the next item from source will be forwarded as soon as playerReadyyields again.
fAns1k
@fAns1k
Mar 18 2016 11:28
thanks
Dorus
@Dorus
Mar 18 2016 11:29
Good luck :)
fAns1k
@fAns1k
Mar 18 2016 11:39
it seems you are right =)
thanks for your help
really simple issue (
@Dorus
Dorus
@Dorus
Mar 18 2016 11:39
np ;)
cavemansspa
@cavemansspa
Mar 18 2016 12:00
@Dorus -- did you see those groovy github examples -- is that still the way to implement a non-blocking Observable?
Dorus
@Dorus
Mar 18 2016 13:07
@cavemansspa I'm not sure where to look at. Observables are normally never blocking anyway.
You mean using Thread.start stuff? Well, i would prefer to have cancelation implemented and also use the Schedulers class (pass in a scheduler to those functions). But overall it's not against the Rx contract to do it like that.
Dorus
@Dorus
Mar 18 2016 13:15
Also, it totally IS possible to unsubscribe from customObservableBlocking. Just do customObservableBlocking.take(2). Unsubscribes after 2 items :-)
(just that it will then ignore all subsequent calls to onNext)
cavemansspa
@cavemansspa
Mar 18 2016 13:26
@Dorus - yes, i was referring to the Thread.start approach. i thought that maybe the schedulers functionality came after those examples were created.
Dorus
@Dorus
Mar 18 2016 13:26
I have no idea who made those examples or when :)
cavemansspa
@cavemansspa
Mar 18 2016 13:27
looks like ben christensen
two years ago
Dorus
@Dorus
Mar 18 2016 13:28
Looks like it. Like i said, they're not wrong.
cavemansspa
@cavemansspa
Mar 18 2016 13:30
okay thanks for input -- wasn't sure if there was a better way since a lot of stuff has been added since those were put together.
Dorus
@Dorus
Mar 18 2016 13:31
The most important part is to stay within the contract: Serialize calls to onNext and finish with either onCompleted or onError.
Other than that you can call code from any thread.