RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1)))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1).mergeWith(p.ignoreElements())))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1 ⏎ emmit 2 ⏎ emmit 3
@akarnokd yes, something exactly like this, thank you. I am still not sure how to chain those asynchronous calls though, since i operate on the database.
Observable.just(uriObservableList)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RxObservableList<Uri>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "ONSUBSCRIBE ");
}
@Override
public void onNext(RxObservableList<Uri> uris) {
String s = makePathString(uris.getList());
AppExecutor.getInstance().diskIO().execute(() -> viewModel.updatePicturePaths(s)); // <-- THis async call should be performed on every onNext
mAdapter.updatePictures(uris.getList());
Log.d(TAG, "ONNEXT");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "ONERROR " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "ONCOMPLETE ");
}
});
How do I construct my Subscription so that before every onNext call I can perform a write operation on the database? I have tried doOnNext which is supposed to be executed before onNext, but it is always called from the main thread
@akarnokd already tried it.
viewModel.getPlace().map(this::getPicturePaths)
.subscribeOn(Schedulers.io())
.toObservable()
.concatMap(new Function<ArrayList<Uri>, Observable<RxObservableList<Uri>>>() {
@Override
public Observable<RxObservableList<Uri>> apply(ArrayList<Uri> uriArrayList) throws Exception {
mAdapter = new PictureRvAdapter(MarkerActivity.this, uriArrayList, MarkerActivity.this);
LinearLayoutManager layoutManager = new LinearLayoutManager(MarkerActivity.this, LinearLayoutManager.VERTICAL, false);
DividerItemDecoration decoration = new DividerItemDecoration(getApplicationContext(), VERTICAL);
mPicturesRV.addItemDecoration(decoration);
mPicturesRV.setLayoutManager(layoutManager);
mPicturesRV.setAdapter(mAdapter);
beginTransition(constraintSetEnd, 850);
Log.d(TAG, "Adapter set, transition started");
uriObservableList.setList(uriArrayList);
return uriObservableList.getObservable();
//return Observable.just(uriObservableList);
}
})
.doOnNext(new Consumer<RxObservableList<Uri>>() {
@Override
public void accept(RxObservableList<Uri> uris) throws Exception {
String pathString = makePathString(uris.getList());
viewModel.updatePicturePaths(pathString);
Log.d(TAG, "Updated pictures in database");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RxObservableList<Uri>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Onsubscribe called");
}
@Override
public void onNext(RxObservableList<Uri> uris) {
mAdapter.updatePictures(uris.getList());
Log.d(TAG, "Adapter pictures updated");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "OnError : " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "OnComplete called");
}
});
still gives me this error: MarkerActivity: OnError : java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.
Observable<String> stream = Observable.fromArray("a", "b", "c");
Observable<String> request = Single.just("1").toObservable();
stream.withLatestFrom(request, (streamItem, requestItem) -> streamItem + requestItem)
.forEach(System.out::println);
// output: a1 ⏎ b1 ⏎ c1 ⏎
@Test
public void threadCounter() throws Exception {
int j = 0;
while (j++ < 1000) {
Flowable
.fromCallable(() -> {
Thread.sleep(1000);
return RandomUtils.nextInt();
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(c -> {
//System.out.println("c:" + c);
});
}
IoScheduler io = (IoScheduler) Schedulers.io();
int i = 0;
while (i++ < 10000) {
System.out.println("io size:" + io.size());
Thread.sleep(1000);
}
}
//close the window when all tabs are closed
tabDidOpen.buffer(tabDidClose)
.map { it.size - 1 }
.scan(0) { acc, current -> acc + current }
.filter { it == 0 }
.subscribe { closeWindow() }
val lines = Observable.create<String> { while (true) it.onNext(readLine()!!) }
.subscribeOn(Schedulers.newThread()).share()
val plus = lines.filter { "+" == it }.map { { acc: Int -> acc + 1 } }
val minus = lines.filter { "-" == it }.map { { acc: Int -> acc - 1 } }
val calc = mergeArray<(Int) -> Int>(plus, minus).scan(0) { acc, fn -> fn(acc) }
calc.doOnNext(::println).blockingLast()
Observable<(Int) -> Int>
mergeWith(...)
but when the single terminates, the other stream also terminates. Im trying to find behaviour like this
function debounceOnMissedHeartbeat<TKey, TValue> (dueTime:number, onDebounceItemFactory:(key:TKey) => TValue, scheduler:Rx.Scheduler) {
let sources:Rx.GroupedObservable = this
return Rx.Observable.create(o => {
return sources.subscribe(innerSource => {
let key:TKey = innerSource.key
let debouncedStream = innerSource.debounceWithSelector(dueTime, () => onDebounceItemFactory(key), scheduler)
o.onNext(debouncedStream)
},
ex => o.onError(ex),
() => o.onCompleted()
)
})
}
Rx.Observable.prototype.debounceOnMissedHeartbeat = debounceOnMissedHeartbeat
function debounceWithSelector (dueTime:number, itemSelector, scheduler:Rx.Scheduler) {
let source = this
return Rx.Observable.create(o => {
let disposables = new Rx.CompositeDisposable()
let debounceDisposable = new Rx.SerialDisposable()
disposables.add(debounceDisposable)
let debounce = () => {
debounceDisposable.setDisposable(
scheduler.scheduleFuture(
'',
dueTime,
() => {
let debouncedItem = itemSelector()
o.onNext(debouncedItem)
}
)
)
}
disposables.add(
source.subscribe(
item => {
debounce()
o.onNext(item)
},
ex => {
try {
o.onError(ex)
} catch (err1) {
}
},
() => o.onCompleted()
)
)
debounce()
return disposables
})
}
Rx.Observable.prototype.debounceWithSelector = debounceWithSelector
fun <TKey, Any> Observable<GroupedObservable<TKey, Any>>.debounceOnMissedHeartbeat(dueTime: Long, onDebounceItemFactory:(key: TKey) -> Any, scheduler: Scheduler) =
Observable.create<Any> { o ->
this.subscribe({innerSource ->
val key: TKey? = innerSource.key
val debouncedStream = innerSource.debounceWithSelector(dueTime, onDebounceItemFactory(key), scheduler)
o.onNext(debouncedStream)
},
{ ex: Throwable -> o.onError(ex) },
{ o.onComplete() })
}