RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Ok. I missed one detail. B
can be merged with C
Observable<Long> a = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> b = a.map(v -> -v);
Observable<Long> c = Observable.interval(500, TimeUnit.MILLISECONDS).map(x -> x + 1000);
Observable.zip(a, Observable.merge(b, c), (v1, v2) -> "o1: " + v1 + ", o2: " + v2).subscribe(System.out::println);
Then zip
does wrong job too
combineLatestRight
. Does that exists in RxJava?
PublishSubject<Integer> seq = PublishSubject.create();
Observable<Integer> seq2= seq.map(x ->{
timesCalled += 1;
return x * 2;
});
seq2.subscribe(e -> System.out.println("Hello 1: " + e));
seq2.subscribe(e -> System.out.println("Hello 2: " + e));
seq.onNext(1);
System.out.println("Times map called: " + timesCalled);
This code shows that map
in this case called twice. (Is that a error or not?). And I get 2 different objects in each subscription, expected same.
publish
creates ConnectableObservable
. Which waits connect
before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
map
it again by the way
Observable.from(aCollectionOfThousandsOfStrings)
flatMap
signature with maxConcurrent of, say, 100 ensure that no more than 100 invocations of the map function are made at a time?
Observable.from(...)
(or range
) to swamp my io layer with too many requests in an instantObservable.range(1, 1000)
.map(id -> buildUrlToResource(id))
//driver can only cope with around 150 asynchronous requests
.flatMap(url -> driver.fireRequestAsync(url),
100)
.doOnNext(response -> showNotification(response.content())
.toBlocking().last(); //wait for last response
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
private final Func2<T, T, R> mergeFunc;
public LookbackTransformer(Func2<T, T, R> mergeFunc) {
this.mergeFunc = mergeFunc;
}
@Override
public Observable<R> call(Observable<T> source) {
Observable<T> offsetSource = source.skip(1);
return source.zipWith(offsetSource, mergeFunc);
}
}
interval
which emits [1, 2, 3, 4, 5]
and map it to [(1,2),(2,3),(3,4),(4,5)]