RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Flowable
.create(emitter -> {
int i = 0;
while (i++ < 10) {
emitter.onNext(RandomUtils.nextInt() + ":" + i);
Thread.sleep(100);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
//.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.buffer(1, TimeUnit.SECONDS, Schedulers.trampoline())
.subscribe(list -> {
System.out.println("BlockedBufferRxTest.main:" + list.toString());
});
Thread.sleep(1000000);
Schedulers.trampoline() blocked the current main thread that cannot execute subscribe method. There are some overwrite buffer methods which called to Schedulers.computation() as input arguments.
Using RxJava 1.1.9, I have a problem as follows:
import rx.Observable;
public enum ZipWithTest {;
public static void main(String[] args) {
Observable
.just(1, 2, 3)
.doOnNext(next -> System.out.format("next: %d%n", next))
.zipWith(
Observable.just(10, 100, 1000),
(next, multiplier) -> next * multiplier)
.doOnNext(multiplied -> System.out.format("multiplied: %d%n", multiplied))
.toBlocking()
.last();
}
/*
next: 1
next: 2
next: 3
multiplied: 10
multiplied: 200
multiplied: 3000
*/
}
I was expecting to get the following:
next: 1
multiplied: 10
next: 2
multiplied: 200
next: 3
multiplied: 3000
What am I missing?
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1)))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1).mergeWith(p.ignoreElements())))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1 ⏎ emmit 2 ⏎ emmit 3
@akarnokd yes, something exactly like this, thank you. I am still not sure how to chain those asynchronous calls though, since i operate on the database.
Observable.just(uriObservableList)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RxObservableList<Uri>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "ONSUBSCRIBE ");
}
@Override
public void onNext(RxObservableList<Uri> uris) {
String s = makePathString(uris.getList());
AppExecutor.getInstance().diskIO().execute(() -> viewModel.updatePicturePaths(s)); // <-- THis async call should be performed on every onNext
mAdapter.updatePictures(uris.getList());
Log.d(TAG, "ONNEXT");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "ONERROR " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "ONCOMPLETE ");
}
});
How do I construct my Subscription so that before every onNext call I can perform a write operation on the database? I have tried doOnNext which is supposed to be executed before onNext, but it is always called from the main thread
@akarnokd already tried it.
viewModel.getPlace().map(this::getPicturePaths)
.subscribeOn(Schedulers.io())
.toObservable()
.concatMap(new Function<ArrayList<Uri>, Observable<RxObservableList<Uri>>>() {
@Override
public Observable<RxObservableList<Uri>> apply(ArrayList<Uri> uriArrayList) throws Exception {
mAdapter = new PictureRvAdapter(MarkerActivity.this, uriArrayList, MarkerActivity.this);
LinearLayoutManager layoutManager = new LinearLayoutManager(MarkerActivity.this, LinearLayoutManager.VERTICAL, false);
DividerItemDecoration decoration = new DividerItemDecoration(getApplicationContext(), VERTICAL);
mPicturesRV.addItemDecoration(decoration);
mPicturesRV.setLayoutManager(layoutManager);
mPicturesRV.setAdapter(mAdapter);
beginTransition(constraintSetEnd, 850);
Log.d(TAG, "Adapter set, transition started");
uriObservableList.setList(uriArrayList);
return uriObservableList.getObservable();
//return Observable.just(uriObservableList);
}
})
.doOnNext(new Consumer<RxObservableList<Uri>>() {
@Override
public void accept(RxObservableList<Uri> uris) throws Exception {
String pathString = makePathString(uris.getList());
viewModel.updatePicturePaths(pathString);
Log.d(TAG, "Updated pictures in database");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RxObservableList<Uri>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Onsubscribe called");
}
@Override
public void onNext(RxObservableList<Uri> uris) {
mAdapter.updatePictures(uris.getList());
Log.d(TAG, "Adapter pictures updated");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "OnError : " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "OnComplete called");
}
});
still gives me this error: MarkerActivity: OnError : java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.
Observable<String> stream = Observable.fromArray("a", "b", "c");
Observable<String> request = Single.just("1").toObservable();
stream.withLatestFrom(request, (streamItem, requestItem) -> streamItem + requestItem)
.forEach(System.out::println);
// output: a1 ⏎ b1 ⏎ c1 ⏎
@Test
public void threadCounter() throws Exception {
int j = 0;
while (j++ < 1000) {
Flowable
.fromCallable(() -> {
Thread.sleep(1000);
return RandomUtils.nextInt();
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(c -> {
//System.out.println("c:" + c);
});
}
IoScheduler io = (IoScheduler) Schedulers.io();
int i = 0;
while (i++ < 10000) {
System.out.println("io size:" + io.size());
Thread.sleep(1000);
}
}
//close the window when all tabs are closed
tabDidOpen.buffer(tabDidClose)
.map { it.size - 1 }
.scan(0) { acc, current -> acc + current }
.filter { it == 0 }
.subscribe { closeWindow() }
val lines = Observable.create<String> { while (true) it.onNext(readLine()!!) }
.subscribeOn(Schedulers.newThread()).share()
val plus = lines.filter { "+" == it }.map { { acc: Int -> acc + 1 } }
val minus = lines.filter { "-" == it }.map { { acc: Int -> acc - 1 } }
val calc = mergeArray<(Int) -> Int>(plus, minus).scan(0) { acc, fn -> fn(acc) }
calc.doOnNext(::println).blockingLast()
Observable<(Int) -> Int>
mergeWith(...)
but when the single terminates, the other stream also terminates. Im trying to find behaviour like this
function debounceOnMissedHeartbeat<TKey, TValue> (dueTime:number, onDebounceItemFactory:(key:TKey) => TValue, scheduler:Rx.Scheduler) {
let sources:Rx.GroupedObservable = this
return Rx.Observable.create(o => {
return sources.subscribe(innerSource => {
let key:TKey = innerSource.key
let debouncedStream = innerSource.debounceWithSelector(dueTime, () => onDebounceItemFactory(key), scheduler)
o.onNext(debouncedStream)
},
ex => o.onError(ex),
() => o.onCompleted()
)
})
}
Rx.Observable.prototype.debounceOnMissedHeartbeat = debounceOnMissedHeartbeat
function debounceWithSelector (dueTime:number, itemSelector, scheduler:Rx.Scheduler) {
let source = this
return Rx.Observable.create(o => {
let disposables = new Rx.CompositeDisposable()
let debounceDisposable = new Rx.SerialDisposable()
disposables.add(debounceDisposable)
let debounce = () => {
debounceDisposable.setDisposable(
scheduler.scheduleFuture(
'',
dueTime,
() => {
let debouncedItem = itemSelector()
o.onNext(debouncedItem)
}
)
)
}
disposables.add(
source.subscribe(
item => {
debounce()
o.onNext(item)
},
ex => {
try {
o.onError(ex)
} catch (err1) {
}
},
() => o.onCompleted()
)
)
debounce()
return disposables
})
}
Rx.Observable.prototype.debounceWithSelector = debounceWithSelector