RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
map
and use a empty .subscribe()
after the flatmap. Might be appropriate to use doOnEach
. Perhaps only part of your logic needs to be there. Not my favorite solution but at least it's readable.
doOnTerminate
is called after a downstream error is at the discretion of the operator
private Observable<Invoices> getInvoicesObservable(Invoices invoices) {
return invoices == null || invoices.getInvoices() == null ? Observable.empty()
: Observable.combineLatest(Observable.just(invoices), Observable.from(invoices.getInvoices())
.filter(invoice -> invoice.getStatus() == InvoiceStatus.UNPAID)
.toSortedList(), (Invoices rawInvoice, List<Invoice> invoiceList) -> {
rawInvoice.setInvoices(invoiceList);
return rawInvoice;
});
}
combileLatest(..,…, getInvoicesObservable)
.asObservable()
.distinct()
but only for consecutive elements?
distinctUntilChanged()
?
private static int getLevelFor(String s) {
return s.length();
}
public void forFor() {
List<String> uids = Arrays.asList("A", "BB", "CCCCCC");
Observable.from(uids)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(final String s) {
return Observable.range(1, getLevelFor(s))
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return s + "," + integer;
}
});
}
})
.toBlocking()
.forEach(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(o);
}
});
}
A,1
BB,1
BB,2
CCCCCC,1
CCCCCC,2
CCCCCC,3
CCCCCC,4
CCCCCC,5
CCCCCC,6
Guys -- complete beginner here, still hasn't got a grasp of some of the concepts with ReactiveX.
So I have the following functions, all synchronous: a()
, b()
, c()
, d()
and e()
. Those functions are to be called once. b()
and c()
depend on a()
, d()
depends on b()
and e()
depends on c()
. I'd like this to happen: a->[b->d||c->e]
, arrow and ||
being series and parallel, respectively. I've got this so far:
Observable.defer(() -> Observable.just(a()))
.map(v -> {
Observable.just(b(v))
.map(v2 -> d(v2));
Observable.just(c(v))
.map(v3 -> e(v3))
return (Void) null;
});
So this is what I'd want to happen when the outer observable is subscribed to: a()
is called, returning v
. This is then fed into b()
and c()
, being mapped to d()
and e()
as they return. If everything goes well, no exceptions are thrown and the subscriber receives null
. However, this is not what happens, and just
is called before the deferred a()
, if I'm not mistaken. Doing Observable.defer(() -> Observable.just(b()))
for b()
and c()
causes them never to be called (obviously), as they're never subscribed to.
Any tips? Thanks!