RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
@jaumard Ok so first I don't know if this is a great idea. I am just going to assume you have your reasons why just unsubscribing isn't good enough. Now on to how you would do this. If you wanted a cancellation token like C#, you could do it fairly easily if you used Observable.create. C# cancellation tokens are glorified boolean holders anyway. You could also use Maybe.create, Flowable.create, etc. I don't think you could do this with Single unless you just say it was an error on cancel. What you would do is in your tight loop you just check your cancellation token. My example will be in Kotlin (sorry I am primarily a Android programmer)
Observable.create { emitter ->
for (val action : actions) ->
if (token.cancelled()) {
break
}
emitter.onNext(action())
}
emitter.onComplete()
}
Depending on what you are trying to do this might work. If you have a chain of Maybes, you could also just decide to not return the next step
fun Maybe<T>.ifNotCancelled(defered: () -> Maybe<R>): Maybe<R> {
return this.filter { !token.isCancelled() }
.flatMap(defered)
}
step1()
.ifNotCancelled { step2() }
.ifNotCancelled { step3() }
.ifNotCancelled { step4() }
There are quite a few ways to do this and without more info I am just taking shots in the dark, but I hope that help gives you an idea of what you can do. The reason this is a little strange is probably because you don't want to model something as part of the stream. Usually it is just easier to keep all the information as part of the stream then you can do whatever you want via more RxJava operators like filter, flatMap, map etc. One of the ways we do this in my project is with switchMap. When you don't stay inside the stream, your kind of just using RxJava as a glorified Functional library / Thread replacement which isn't horrible but kind of leaves all these great ideas behind.
List<String> bleDevicesWithMyPrefix = Scanner.create(context).scanForPrefix("ABC").blockingGet();
private fun scanAllDevices(): Observable<ScanResult>
= rxBleClient.scanBleDevices(
ScanSettings.Builder()
.setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
.setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
.build(),
ScanFilter.Builder()
.build())
//Begin test code
var emittedList: List<String> = listOf("MMM", "OOO", "ABC", "ABCDEF", "ZZZ", "ABC", "RRRRRRR", "ZZABC")
private fun scanAllDevicesTEST(): Observable<ScanResult> {
return Observable
.intervalRange(0, emittedList.size.toLong(), 0, 200, TimeUnit.MILLISECONDS, timeoutScheduler)
.map { index -> ScanResult(MyDevice(emittedList[index.toInt()]), 4, 10, ScanCallbackType.CALLBACK_TYPE_ALL_MATCHES, MyRecord()) }
}
private fun scanAllDevices(context: Context) {
val rxBleClient = RxBleClient.create(context);
rxBleClient.scanBleDevices(
ScanSettings.Builder ()
.setScanMode(ScanSettings.SCAN_MODE_LOW_POWER) // change if needed
.setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES) // change if needed
.build(),
ScanFilter.Builder()
.build()
)
.observeOn(Schedulers.io())
.subscribe{scanResult->
val rxBleDevice = rxBleClient.getBleDevice(scanResult.getBleDevice().getMacAddress());
Logger.d(TAG, "The device has name ${rxBleDevice.name}")
}
}
button_scan3.setOnClickListener {
Log.d(TAG, "\n\nFiring up Scanner3")
val scanner = SuiteScanner.create(this.applicationContext)
val set = mutableSetOf<String>()
scanner.scanForAllBLEDevices3("ABC")
.doOnComplete { Log.d(TAG, "Completed...Scanner3")
for(name in set ) {
Log.d(TAG, "BLE Device: $name")
}
}
.subscribe { scanResult ->
val rxBleDevice = scanResult.getBleDevice()
Log.d(TAG, "The device has name ${rxBleDevice.name}")
val name = rxBleDevice.name
if (name != null) {
set.add(name)
}
}
}
fun scanForAllBLEDevices3(prefix: String): Observable<ScanResult> {
Log.w(TAG, "We are in scanForAllBLEDevices3.")
return rxBleScanner()
.take(2, TimeUnit.SECONDS, timeoutScheduler)
.observeOn(Schedulers.io())
.filter { device -> inspectScanResult(device.bleDevice.name, prefix) }
}
private fun inspectScanResult(foundDevice: String?, prefix: String): Boolean {
Log.d(TAG, " Inspecting: $foundDevice")
if(foundDevice == null) {
return false
}
return (foundDevice.startsWith(prefix))
}
.compose(f -> {
final AtomicBoolean isFirst = new AtomicBoolean();
return f.filter(r -> isFirst.compareAndSet(false, true));
})
Flowable
.create(emitter -> {
int i = 0;
while (i++ < 10) {
emitter.onNext(RandomUtils.nextInt() + ":" + i);
Thread.sleep(100);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
//.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.buffer(1, TimeUnit.SECONDS, Schedulers.trampoline())
.subscribe(list -> {
System.out.println("BlockedBufferRxTest.main:" + list.toString());
});
Thread.sleep(1000000);
Schedulers.trampoline() blocked the current main thread that cannot execute subscribe method. There are some overwrite buffer methods which called to Schedulers.computation() as input arguments.
Using RxJava 1.1.9, I have a problem as follows:
import rx.Observable;
public enum ZipWithTest {;
public static void main(String[] args) {
Observable
.just(1, 2, 3)
.doOnNext(next -> System.out.format("next: %d%n", next))
.zipWith(
Observable.just(10, 100, 1000),
(next, multiplier) -> next * multiplier)
.doOnNext(multiplied -> System.out.format("multiplied: %d%n", multiplied))
.toBlocking()
.last();
}
/*
next: 1
next: 2
next: 3
multiplied: 10
multiplied: 200
multiplied: 3000
*/
}
I was expecting to get the following:
next: 1
multiplied: 10
next: 2
multiplied: 200
next: 3
multiplied: 3000
What am I missing?
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1)))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1).mergeWith(p.ignoreElements())))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1 ⏎ emmit 2 ⏎ emmit 3