RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
onNext
item be emitted, will it fail to be emitted silently, or will the call fail with an exception?
onNext [...] The Observable will not call this method again after it calls either Observer.onCompleted() or Observer.onError(java.lang.Throwable).
Hi, I have some flow which I can't resolve with provided operations. Or I just don't see something important.
I have sequence A
which produces sequence B
. Then later I combine them with combineLatest
operator, and get two notifies on each A
update. But I want one and latest.
Short illustration:
Observable<Long> o1 = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> o2 = o1.map(v -> -v);
Observable.combineLatest(o1, o2, (v1, v2) -> "o1: " + v1 + ", o2: " + v2)
.subscribe(System.out::println);
Provides output like this:
o1: 1, o2: -1
o1: 1, o2: -2
o1: 2, o2: -2
o1: 3, o2: -2
o1: 3, o2: -3
o1: 3, o2: -4
o1: 4, o2: -4
o1: 4, o2: -5
o1: 5, o2: -5
Ok. I missed one detail. B
can be merged with C
Observable<Long> a = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> b = a.map(v -> -v);
Observable<Long> c = Observable.interval(500, TimeUnit.MILLISECONDS).map(x -> x + 1000);
Observable.zip(a, Observable.merge(b, c), (v1, v2) -> "o1: " + v1 + ", o2: " + v2).subscribe(System.out::println);
Then zip
does wrong job too
combineLatestRight
. Does that exists in RxJava?
PublishSubject<Integer> seq = PublishSubject.create();
Observable<Integer> seq2= seq.map(x ->{
timesCalled += 1;
return x * 2;
});
seq2.subscribe(e -> System.out.println("Hello 1: " + e));
seq2.subscribe(e -> System.out.println("Hello 2: " + e));
seq.onNext(1);
System.out.println("Times map called: " + timesCalled);
This code shows that map
in this case called twice. (Is that a error or not?). And I get 2 different objects in each subscription, expected same.
publish
creates ConnectableObservable
. Which waits connect
before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
map
it again by the way
Observable.from(aCollectionOfThousandsOfStrings)
flatMap
signature with maxConcurrent of, say, 100 ensure that no more than 100 invocations of the map function are made at a time?
Observable.from(...)
(or range
) to swamp my io layer with too many requests in an instantObservable.range(1, 1000)
.map(id -> buildUrlToResource(id))
//driver can only cope with around 150 asynchronous requests
.flatMap(url -> driver.fireRequestAsync(url),
100)
.doOnNext(response -> showNotification(response.content())
.toBlocking().last(); //wait for last response