RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Now i've changed it so when onNext
is called i emit the next value from the array. I need to show an infinite loop of screens each showen for a different time.
private Observable<Integer> screenTimeSteam() {
return Observable.create(emitter -> {
StreamFinishEventListener finishListener = new StreamFinishEventListener(){
int i = 0;
@Override
public void onFinished() {
int item = i % timers.length;
i++;
emitter.onNext(timers[item]);
}
};
setFinishListener(finishListener);
finishListener.onFinished();
});
}//screenTimeSteam
Is there an easier way?
Hi! I need some help knowing which operators to use in the following scenario:
I have a button that submits a code to the backend and also a button that would cancel the submission of the code. I already have a method that makes an API call to the backend and returns an Observable<Result<String>>, let's call this submitCode()
I would like to do the following:
submitButtonClickStream -> trigger submitCode() -> cancel submission if cancel stream onNexts -> subscribe to submitCode result and implement success and failure handlers
emitter
?
Pardon my limited knowledge in RxJava; how can I achieve following? I am trying to implement something like below.
CustomIterator<String> iter = ... code to build custom iterator
Observable.<String>create(emitter -> {
while(iter.hasNext()) {
emitter.onNext(iter.next());
}
}, BackpressureMode.BUFFER)
.map(id -> {
Doc doc = fetchFromDB(id);
convertDocToVO(doc);
})
.map(vo -> complicatedTask)
.forEach(/* Do final processing */);
;
I need some help around complicatedTask
. Current non-reactive implementation performs an action parallely (using FJP common pool) and for each VO
document, it creates Future
and drops that to a blocking queue. The consumer blocks on the Future
sequentially and once available does some other processing. This model gives us two things:
Future
s are created.Some clarifications:
compicatedTask
does all the processing in CPU; no I/O. compicatedTask
is another type of DTO
.if I want to achieve similar in reactive, how should I do?
Future
from .map(vo -> complicatedTask)
, then it will lead to a lot of Future
s in memory.
Hi,
I'm a bit confused about how doOnComplete
works.
Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
.doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.test()
.await();
private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
return groupedFlowable.compose(upstream -> {
AtomicLong count = new AtomicLong(0);
return upstream
.doOnNext(s -> {
count.incrementAndGet();
println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
})
.doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
});
}
private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
return flowable.publish(stringFlowable ->
stringFlowable.buffer(1)
.concatMap(bufferedStrings ->
Flowable.just(bufferedStrings)
.doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
.delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
.doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
.flatMap(Flowable::fromIterable)
));
}
Output:
pool-1-thread-1 - group: a - stats - [aa] - count 1
pool-1-thread-1 - before delay - [aa]
pool-1-thread-1 - group: b - stats - [ba] - count 1
pool-1-thread-1 - before delay - [ba]
pool-1-thread-1 - group: a - stats - [ab] - count 2
pool-1-thread-1 - group: b - stats - [bb] - count 2
pool-1-thread-1 - end emit
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
RxComputationThreadPool-1 - after delay - [aa]
RxComputationThreadPool-2 - after delay - [ba]
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
RxComputationThreadPool-1 - before delay - [ab]
RxComputationThreadPool-2 - before delay - [bb]
RxComputationThreadPool-4 - after delay - [bb]
RxComputationThreadPool-3 - after delay - [ab]
RxComputationThreadPool-3 - before delay - [ac]
RxComputationThreadPool-5 - after delay - [ac]
Result is that
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
...
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
Why stats - complete is before stats - [ac] - count 3
? Am I break rule saying "emit event synchronously, not concurrently"
When I remove publish
from bufferAndDelay
method order is correct.
Shorter version:
Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
.doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
groupBy.flatMap(characterStringGroupedFlowable -> {
return stats(characterStringGroupedFlowable).publish(stringFlowable ->
stringFlowable.concatMap(s ->
Flowable.just(s)
.delay(100, TimeUnit.MILLISECONDS)));
})
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.test()
.await();
Output:
pool-1-thread-1 - group: a - stats - [aa] - count 1
pool-1-thread-1 - group: b - stats - [ba] - count 1
pool-1-thread-1 - group: a - stats - [ab] - count 2
pool-1-thread-1 - group: b - stats - [bb] - count 2
pool-1-thread-1 - end emit
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
Single
to a POST api returning a Completable
. I am not able to achieve that. I posted a SO question but no help till now. Please suggest how to do this. https://stackoverflow.com/questions/53633071/how-to-pass-output-of-a-single-to-a-completable-in-rxjava