RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
map
etc.
map
and use a empty .subscribe()
after the flatmap. Might be appropriate to use doOnEach
. Perhaps only part of your logic needs to be there. Not my favorite solution but at least it's readable.
doOnTerminate
is called after a downstream error is at the discretion of the operator
private Observable<Invoices> getInvoicesObservable(Invoices invoices) {
return invoices == null || invoices.getInvoices() == null ? Observable.empty()
: Observable.combineLatest(Observable.just(invoices), Observable.from(invoices.getInvoices())
.filter(invoice -> invoice.getStatus() == InvoiceStatus.UNPAID)
.toSortedList(), (Invoices rawInvoice, List<Invoice> invoiceList) -> {
rawInvoice.setInvoices(invoiceList);
return rawInvoice;
});
}
combileLatest(..,…, getInvoicesObservable)
.asObservable()
.distinct()
but only for consecutive elements?
distinctUntilChanged()
?
private static int getLevelFor(String s) {
return s.length();
}
public void forFor() {
List<String> uids = Arrays.asList("A", "BB", "CCCCCC");
Observable.from(uids)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(final String s) {
return Observable.range(1, getLevelFor(s))
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return s + "," + integer;
}
});
}
})
.toBlocking()
.forEach(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(o);
}
});
}
A,1
BB,1
BB,2
CCCCCC,1
CCCCCC,2
CCCCCC,3
CCCCCC,4
CCCCCC,5
CCCCCC,6