RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Rx is about streams. If you are just doing timers, I wouldn't bother with Rx. Rx is kind of a philosophy, you change your idea about how you write code to be about creating pipelines to transform data.
With your specific example, maybe you are listing a bunch of bluetooth devices for the user. The way you have it now, the user needs to wait for 4 seconds to get all the devices. But you can easily change that with Rx. You could remove the take
and instead scan
and then you would get a new list of devices every time you find one. Because Observables are async and support multiple values you can do that. With Futures you are limited to one value so you couldn't update, and synchronously you would block. I personally use Rx quite a bit as an Android developer. A lot of the easy code is stuff like
database.query()
.map { queryToViewModel(it) }
.subscribe { bindModel(it) }
And now I don't actually do anything directly and my view model gets updated automatically.
@jaumard Ok so first I don't know if this is a great idea. I am just going to assume you have your reasons why just unsubscribing isn't good enough. Now on to how you would do this. If you wanted a cancellation token like C#, you could do it fairly easily if you used Observable.create. C# cancellation tokens are glorified boolean holders anyway. You could also use Maybe.create, Flowable.create, etc. I don't think you could do this with Single unless you just say it was an error on cancel. What you would do is in your tight loop you just check your cancellation token. My example will be in Kotlin (sorry I am primarily a Android programmer)
Observable.create { emitter ->
for (val action : actions) ->
if (token.cancelled()) {
break
}
emitter.onNext(action())
}
emitter.onComplete()
}
Depending on what you are trying to do this might work. If you have a chain of Maybes, you could also just decide to not return the next step
fun Maybe<T>.ifNotCancelled(defered: () -> Maybe<R>): Maybe<R> {
return this.filter { !token.isCancelled() }
.flatMap(defered)
}
step1()
.ifNotCancelled { step2() }
.ifNotCancelled { step3() }
.ifNotCancelled { step4() }
There are quite a few ways to do this and without more info I am just taking shots in the dark, but I hope that help gives you an idea of what you can do. The reason this is a little strange is probably because you don't want to model something as part of the stream. Usually it is just easier to keep all the information as part of the stream then you can do whatever you want via more RxJava operators like filter, flatMap, map etc. One of the ways we do this in my project is with switchMap. When you don't stay inside the stream, your kind of just using RxJava as a glorified Functional library / Thread replacement which isn't horrible but kind of leaves all these great ideas behind.
List<String> bleDevicesWithMyPrefix = Scanner.create(context).scanForPrefix("ABC").blockingGet();
private fun scanAllDevices(): Observable<ScanResult>
= rxBleClient.scanBleDevices(
ScanSettings.Builder()
.setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
.setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
.build(),
ScanFilter.Builder()
.build())
//Begin test code
var emittedList: List<String> = listOf("MMM", "OOO", "ABC", "ABCDEF", "ZZZ", "ABC", "RRRRRRR", "ZZABC")
private fun scanAllDevicesTEST(): Observable<ScanResult> {
return Observable
.intervalRange(0, emittedList.size.toLong(), 0, 200, TimeUnit.MILLISECONDS, timeoutScheduler)
.map { index -> ScanResult(MyDevice(emittedList[index.toInt()]), 4, 10, ScanCallbackType.CALLBACK_TYPE_ALL_MATCHES, MyRecord()) }
}
private fun scanAllDevices(context: Context) {
val rxBleClient = RxBleClient.create(context);
rxBleClient.scanBleDevices(
ScanSettings.Builder ()
.setScanMode(ScanSettings.SCAN_MODE_LOW_POWER) // change if needed
.setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES) // change if needed
.build(),
ScanFilter.Builder()
.build()
)
.observeOn(Schedulers.io())
.subscribe{scanResult->
val rxBleDevice = rxBleClient.getBleDevice(scanResult.getBleDevice().getMacAddress());
Logger.d(TAG, "The device has name ${rxBleDevice.name}")
}
}
button_scan3.setOnClickListener {
Log.d(TAG, "\n\nFiring up Scanner3")
val scanner = SuiteScanner.create(this.applicationContext)
val set = mutableSetOf<String>()
scanner.scanForAllBLEDevices3("ABC")
.doOnComplete { Log.d(TAG, "Completed...Scanner3")
for(name in set ) {
Log.d(TAG, "BLE Device: $name")
}
}
.subscribe { scanResult ->
val rxBleDevice = scanResult.getBleDevice()
Log.d(TAG, "The device has name ${rxBleDevice.name}")
val name = rxBleDevice.name
if (name != null) {
set.add(name)
}
}
}
fun scanForAllBLEDevices3(prefix: String): Observable<ScanResult> {
Log.w(TAG, "We are in scanForAllBLEDevices3.")
return rxBleScanner()
.take(2, TimeUnit.SECONDS, timeoutScheduler)
.observeOn(Schedulers.io())
.filter { device -> inspectScanResult(device.bleDevice.name, prefix) }
}
private fun inspectScanResult(foundDevice: String?, prefix: String): Boolean {
Log.d(TAG, " Inspecting: $foundDevice")
if(foundDevice == null) {
return false
}
return (foundDevice.startsWith(prefix))
}
.compose(f -> {
final AtomicBoolean isFirst = new AtomicBoolean();
return f.filter(r -> isFirst.compareAndSet(false, true));
})
Flowable
.create(emitter -> {
int i = 0;
while (i++ < 10) {
emitter.onNext(RandomUtils.nextInt() + ":" + i);
Thread.sleep(100);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
//.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.buffer(1, TimeUnit.SECONDS, Schedulers.trampoline())
.subscribe(list -> {
System.out.println("BlockedBufferRxTest.main:" + list.toString());
});
Thread.sleep(1000000);
Schedulers.trampoline() blocked the current main thread that cannot execute subscribe method. There are some overwrite buffer methods which called to Schedulers.computation() as input arguments.
Using RxJava 1.1.9, I have a problem as follows:
import rx.Observable;
public enum ZipWithTest {;
public static void main(String[] args) {
Observable
.just(1, 2, 3)
.doOnNext(next -> System.out.format("next: %d%n", next))
.zipWith(
Observable.just(10, 100, 1000),
(next, multiplier) -> next * multiplier)
.doOnNext(multiplied -> System.out.format("multiplied: %d%n", multiplied))
.toBlocking()
.last();
}
/*
next: 1
next: 2
next: 3
multiplied: 10
multiplied: 200
multiplied: 3000
*/
}
I was expecting to get the following:
next: 1
multiplied: 10
next: 2
multiplied: 200
next: 3
multiplied: 3000
What am I missing?
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1)))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1
Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
.compose(o -> o.publish(p -> p.take(1).mergeWith(p.ignoreElements())))
.subscribe(n -> out.println("final " + n));
// emmit 1 ⏎ final 1 ⏎ emmit 2 ⏎ emmit 3