These are chat archives for ReactiveX/RxJava

9th
Mar 2017
Exerosis
@Exerosis
Mar 09 2017 06:37
Is there a simple way to block a thread until an observable emits?
Sven Jacobs
@svenjacobs
Mar 09 2017 07:40

Hi all,
let’s assume I have a function that takes a URL to a service and returns a Single of a data class that holds the data returned by the service and an optional nextUrl of the paginated results. How would I concat all Singles of all paginated results in Rx-manner without having to rely on a tempory data structure that holds the intermediate results?

The point is in order to call the second request I require the nextUrl from the first request. If the second request also has a nextUrl there needs to be a third request and so forth until nextUrl is null.

Is this possible in a fluent, Rx-style manner?

@Exerosis You could just use blockingGet() for example (or toBlocking() if you’re on v1)
Exerosis
@Exerosis
Mar 09 2017 07:42
Oh that would work out, I actually just ended up calling first() instead
Sven Jacobs
@svenjacobs
Mar 09 2017 07:43
@Exerosis Doesn’t first() return a Single/Observable again?
Exerosis
@Exerosis
Mar 09 2017 07:45
If I read the docs correctly, it blocks until it can return the first element an observable emits. Let me double check
https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators right, ok turns out I didn't look carefully :P
Sven Jacobs
@svenjacobs
Mar 09 2017 07:47
@Exerosis If you specify a different scheduler for this Observable (for instance Schedulers.computation()) first() wouldn’t block the current thread
Exerosis
@Exerosis
Mar 09 2017 07:48
Yep, toBlocking().first() should do it :)
Sven Jacobs
@svenjacobs
Mar 09 2017 07:48
@Exerosis Excactly :D
Exerosis
@Exerosis
Mar 09 2017 07:49
Thank you!
Sven Jacobs
@svenjacobs
Mar 09 2017 07:49
np
Exerosis
@Exerosis
Mar 09 2017 08:23

Could somebody explain to me why the val is emitted directly after subing in this:
Observable<StockQuote> observable = Observable.fromCallable(() -> { return YahooFinance.get("NVDA").getQuote(); }).repeatWhen(o -> Observable.interval(20, TimeUnit.SECONDS)).replay(1).autoConnect(); observable.subscribe(val -> { System.out.println(val); });

but nothing is ever emitted by this:
Observable<StockQuote> observable = Observable.fromCallable(() -> { return YahooFinance.get("NVDA").getQuote(); }).repeatWhen(o -> Observable.interval(20, TimeUnit.SECONDS)).replay(1).autoConnect(); observable.observeOn(AndroidSchedulers.mainThread()).subscribe(val -> { System.out.println(val); });
I must not understand something here.