Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    骨来(PeterLi)
    @pli2014
    welcome to talk about this topic about RxJava and graphQL in graph DSL description
    Jaumard
    @jaumard
    Hello people, is there a way with RXJava2 to have a cancellationToken to cancel a single ? What I’m trying to achieve is a long series of actions, at some point the user want to cancel it, so I want to stop my series of actions, but to cancel my actions I need to excecute some asynchronous task and I don’t want my single to finish until the cancellation is done. Is such a thing possible and how ? I have no clue how to implement this :/
    Ignacio Baca Moreno-Torres
    @ibaca
    Un-subscribe the single when the cancelation is done?
    David Stocking
    @dmstocking

    @jaumard Ok so first I don't know if this is a great idea. I am just going to assume you have your reasons why just unsubscribing isn't good enough. Now on to how you would do this. If you wanted a cancellation token like C#, you could do it fairly easily if you used Observable.create. C# cancellation tokens are glorified boolean holders anyway. You could also use Maybe.create, Flowable.create, etc. I don't think you could do this with Single unless you just say it was an error on cancel. What you would do is in your tight loop you just check your cancellation token. My example will be in Kotlin (sorry I am primarily a Android programmer)

    Observable.create { emitter ->
        for (val action : actions) ->
            if (token.cancelled()) {
                break
            }
            emitter.onNext(action())
        }
        emitter.onComplete()
    }

    Depending on what you are trying to do this might work. If you have a chain of Maybes, you could also just decide to not return the next step

    fun Maybe<T>.ifNotCancelled(defered: () -> Maybe<R>): Maybe<R> {
        return this.filter { !token.isCancelled() }
             .flatMap(defered)
    }
    
    step1()
        .ifNotCancelled { step2() }
        .ifNotCancelled { step3() }
        .ifNotCancelled { step4() }

    There are quite a few ways to do this and without more info I am just taking shots in the dark, but I hope that help gives you an idea of what you can do. The reason this is a little strange is probably because you don't want to model something as part of the stream. Usually it is just easier to keep all the information as part of the stream then you can do whatever you want via more RxJava operators like filter, flatMap, map etc. One of the ways we do this in my project is with switchMap. When you don't stay inside the stream, your kind of just using RxJava as a glorified Functional library / Thread replacement which isn't horrible but kind of leaves all these great ideas behind.

    Jaumard
    @jaumard
    @dmstocking thanks for the feedback ! No problem with kotlin I’m also an Android programmer pimarily ^^
    That’s the path I was thinking to take (and put the cancel as an error on my single); Just wanted some feedbacks on how was my solutions as I’m not expert in RX ^^. The full story is that we were given a proprietary SDK to interact with a BT device, this SDK work based on one listener with around 40 methods to override and implements, and we can put only one listener on the SDK. I want to hide this stuff by wrapping it into RX calls to be more readable/maintenable once used on the app (actually it’s a Flutter plugin but same problem). And the SDK doesn’t allow simultanous calls so once you cancel something you need to wait until it’s successfuly cancelled and you have receive the right callback from the SDK.
    I know the solution of the token inside the create is not perfect, but I have to comply with the SDK I have and make it work ^^
    I’ll go that way and see how it goes :)
    Darrin
    @darrinps
    Still trying to understand how to scan for a limited time then return a list to work with. I am using the RxAndroidBle to scan for all BLE devices. I am trying to look for all devices starting with ABC for example. When I create a test by emitting values using Observable.intervalRange, everything works perfectly. If I don't try to constrain the RxAndroidBle.scanBleDevices function by using a .take() and just subscribe to the results, that works as expected. When I try to build up a list of BLE devices it finds and then return it though, it doesn't work (nothing ever gets returned).
    I call the function like this:
              List<String> bleDevicesWithMyPrefix = Scanner.create(context).scanForPrefix("ABC").blockingGet();
    Darrin
    @darrinps
         fun scanForPrefix(prefix: String): Single<MutableList<String?>> {
                return scanAllDevices()
                .take(15, TimeUnit.SECONDS, timeoutScheduler)
                .map { it.bleDevice.name }
                .filter { inspectScanResult(it,prefix) }
                .toList()
    }
              private fun scanAllDevices(): Observable<ScanResult>
            = rxBleClient.scanBleDevices(
            ScanSettings.Builder()
                    .setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
                    .setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
                    .build(),
            ScanFilter.Builder()
                    .build())
    Darrin
    @darrinps
    The test I set up works perfectly. It looks like this:
               //Begin test code
    var emittedList: List<String> = listOf("MMM", "OOO", "ABC", "ABCDEF", "ZZZ", "ABC", "RRRRRRR", "ZZABC")
    
    private fun scanAllDevicesTEST(): Observable<ScanResult> {
    
        return Observable
                .intervalRange(0, emittedList.size.toLong(), 0, 200, TimeUnit.MILLISECONDS, timeoutScheduler)
                .map { index -> ScanResult(MyDevice(emittedList[index.toInt()]), 4, 10, ScanCallbackType.CALLBACK_TYPE_ALL_MATCHES, MyRecord()) }
    
    }
    So I am confused why it works for the test which emits values just like the real thing does, and not for the real thing. Again, if I don't do a take, and subscribe instead, then the scanAllDevices code does scan correctly. I just can't allow it to scan forever and I need one complete list. Not something that gets sent back an element at a time.
    Darrin
    @darrinps
    For example, doing this I do get the BLE devices scanned successfully. I just cannot use them coming back async. I need a complete list or as much of a list as can be found in a specified time period. Note that the scanner takes less than a second to get all of the values when it is working, but I gave it up to 15 just to take out that variable. Here is the code that does at least scan.
        private fun scanAllDevices(context: Context) {
    
        val rxBleClient = RxBleClient.create(context);
    
        rxBleClient.scanBleDevices(
                 ScanSettings.Builder ()
                        .setScanMode(ScanSettings.SCAN_MODE_LOW_POWER) // change if needed
                        .setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES) // change if needed
                        .build(),
                ScanFilter.Builder()
                        .build()
        )
                .observeOn(Schedulers.io())
                .subscribe{scanResult->
                    val rxBleDevice = rxBleClient.getBleDevice(scanResult.getBleDevice().getMacAddress());
                    Logger.d(TAG, "The device has name ${rxBleDevice.name}")
                }
    }
    Ignacio Baca Moreno-Torres
    @ibaca
    Not sure, but add various ".doOnNext" to log element to see which operator is not doing what you expect
    erge123
    @erge123
    @songyunlu
    Darrin
    @darrinps
    Thanks for the suggestion. I did try that, and didn't see any .doOnNext methods ever get hit anywhere. Anyway, I figured out how to do what I want, but I still am not certain why what I had did not work. What I came up with looks like this:
        button_scan3.setOnClickListener {
            Log.d(TAG, "\n\nFiring up Scanner3")
    
            val scanner = SuiteScanner.create(this.applicationContext)
            val set = mutableSetOf<String>()
    
            scanner.scanForAllBLEDevices3("ABC")
                    .doOnComplete { Log.d(TAG, "Completed...Scanner3")
                        for(name in set ) {
                            Log.d(TAG, "BLE Device: $name")
                        }
                    }
                    .subscribe { scanResult ->
                        val rxBleDevice = scanResult.getBleDevice()
                        Log.d(TAG, "The device has name ${rxBleDevice.name}")
                        val name = rxBleDevice.name
                        if (name != null) {
                            set.add(name)
                        }
                    }
    
        }                                                                                                                                                                                                                                                                                                                                                                                                                                           
    
        fun scanForAllBLEDevices3(prefix: String): Observable<ScanResult> {
    
        Log.w(TAG, "We are in scanForAllBLEDevices3.")
    
        return rxBleScanner()
                .take(2, TimeUnit.SECONDS, timeoutScheduler)
                .observeOn(Schedulers.io())
                .filter { device -> inspectScanResult(device.bleDevice.name, prefix) }
    }
    
    
    private fun inspectScanResult(foundDevice: String?, prefix: String): Boolean {
        Log.d(TAG, " Inspecting: $foundDevice")
    
        if(foundDevice == null) {
            return false
        }
    
        return (foundDevice.startsWith(prefix))
    }
    So I'll just do what needs to be done in the onComplete()
    Yannick Lecaillez
    @ylecaillez
    Hi !
    Is there an operator which behaves like take(1) but does not cancel upstream once the first item has been emitted ?
    I need that because i send multiple request concurrently (flatMap) but i only care about the first response coming back. Still i want the subsequent request to be performed. Any idea ?
    Sadly, Flowable.amb() is also cancelling remaining requests
    Yannick Lecaillez
    @ylecaillez
    Hmm, switchMap() might be what i'm looking for. Sorry for the noise.
    David Karnok
    @akarnokd
    Beware that switchMap cancels the active inner source if a new upstream item arrives and gets mapped in.
    Also not completing and not cancelling may leak resources.
    Yannick Lecaillez
    @ylecaillez
    Yes this does not work for my use case i ended with something like that:
    Using compose:
    .compose(f -> {
                               final AtomicBoolean isFirst = new AtomicBoolean();
                               return f.filter(r -> isFirst.compareAndSet(false, true));
                           })
    David Stocking
    @dmstocking
    I think you could create your own operator doOnFirst() easy if your in kotlin
    fun Observable<T>.doOnFirst(action: (T) -> Unit) {
        var first = true;
        return this.doOnNext { item ->
            if (first) {
                first = false
                action(item)
            }
        }
    }
    骨来(PeterLi)
    @pli2014
            Flowable
                    .create(emitter -> {
                        int i = 0;
                        while (i++ < 10) {
                            emitter.onNext(RandomUtils.nextInt() + ":" + i);
                            Thread.sleep(100);
                        }
                        emitter.onComplete();
                    }, BackpressureStrategy.BUFFER)
                    //.subscribeOn(Schedulers.computation())
                    .observeOn(Schedulers.io())
                    .buffer(1, TimeUnit.SECONDS, Schedulers.trampoline())
                    .subscribe(list -> {
                        System.out.println("BlockedBufferRxTest.main:" + list.toString());
                    });
    
             Thread.sleep(1000000);
    Schedulers.trampoline() blocked the current main thread that cannot execute subscribe method. There are some overwrite buffer methods which called to Schedulers.computation() as input arguments.
    Yannick Lecaillez
    @ylecaillez
    @dmstocking How is this different from what i've done ? I'm in Java btw.
    Isn't there a backpressure problem here ?
    Volkan Yazıcı
    @vy

    Using RxJava 1.1.9, I have a problem as follows:

    import rx.Observable;
    
    public enum ZipWithTest {;
    
        public static void main(String[] args) {
            Observable
                    .just(1, 2, 3)
                    .doOnNext(next -> System.out.format("next: %d%n", next))
                    .zipWith(
                            Observable.just(10, 100, 1000),
                            (next, multiplier) -> next * multiplier)
                    .doOnNext(multiplied -> System.out.format("multiplied: %d%n", multiplied))
                    .toBlocking()
                    .last();
        }
    
        /*
        next: 1
        next: 2
        next: 3
        multiplied: 10
        multiplied: 200
        multiplied: 3000
         */
    
    }

    I was expecting to get the following:

        next: 1
        multiplied: 10
        next: 2
        multiplied: 200
        next: 3
        multiplied: 3000

    What am I missing?

    骨来(PeterLi)
    @pli2014
    @vy doOnNext only go through every element in just(1, 2, 3) source.
    so that it invokes an action when it calls {@code onNext}
    Volkan Yazıcı
    @vy
    @pli2014 I got it. But why is it getting every element from just(1,2,3) and then zipping with the rest? Shouldn't they go in parallel? That is, one from source, one from multiplier, one from source, one from multiplier, and so on.
    骨来(PeterLi)
    @pli2014
    zipwith behavior is just that
    DoOnNext is called after ever onNext op from every element of source
    Ignacio Baca Moreno-Torres
    @ibaca
    @vy you should really upgrade to rxjava 2, but as rx1 observable has backpressure this might work as you said, but the problem is that zipWith has a buffer, so the 3 next call are juts filling the zip buffer, not sure if in rx1 exists the zipWith operator overload to indicate the buffer size, but you can do it in rx2, in that case you need to use Flowable instead of Observable to support backpressure
    Ignacio Baca Moreno-Torres
    @ibaca
    @ylecaillez you can use publish and merge the take(1) with a ignoreElements, so the source observable keep subscribe untils it completes (you can exgract the compose lamda to reuse the operator)
    Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
            .compose(o -> o.publish(p -> p.take(1)))
            .subscribe(n -> out.println("final " + n));
    // emmit 1 ⏎ final 1
    Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
            .compose(o -> o.publish(p -> p.take(1).mergeWith(p.ignoreElements())))
            .subscribe(n -> out.println("final " + n));
    // emmit 1 ⏎ final 1 ⏎ emmit 2 ⏎ emmit 3
    Mark Raynsford
    @io7m
    'ello. i'm wondering if there's a recommended way to achieve the following on 1.x: i have a hot observable (a publishsubject) but i'd like to buffer the last produced item so that new subscribers always see the most recently published event when they subscribe (and then see any new events when i publish them after that)
    i'm currently stuck on 1.x due to an incompatible third-party dependency in the project i'm working on, unfortunately
    sorry, that should read "buffer the most recently produced value"
    Mark Raynsford
    @io7m
    i assume that i want some combination of replay and defer, but i can't quite work it out
    Mark Raynsford
    @io7m
    ... turns out a BehaviorSubject does exactly this
    sorry for the noise!
    moritz bust
    @busti
    How does ReactiveX compile to so many different languages? Is a transpiler used to achieve this, or is every code-base managed separately?
    Igor Bozin
    @igorbzin_gitlab
    I am trying to get a grasp of RxJava. How would i chain following situation in calls with RxJava: i retrieve a Single<User> from my database, loading a user by id for example, this should happen async. After loading the user object, i put some of his specific data in an arraylist. After that finished I want to listen on this array for changes, so that when an element gets added or removed I write to the database async to update the user. I have following code so far, also implemented the piece you have suggested
    David Karnok
    @akarnokd
    @Busti RxJava compiles to a JVM bytecode and any JVM language should be able to interact with it. The language adapters such as RxScala, RxKotlin, etc. are usually thin wrappers around RxJava so technically you still run Java bytecode beneath.
    @igorbzin_gitlab You mean something like this? https://stackoverflow.com/q/28816691/61158
    moritz bust
    @busti
    @akarnokd Thank you for your answer. I was just wondering if non-related languages in the ReactiveX project somehow shared a common codebase since there are so many different languages supported. Something like haxe comes to mind, but I do realize that that would be a bad idea.
    Igor Bozin
    @igorbzin_gitlab

    @akarnokd yes, something exactly like this, thank you. I am still not sure how to chain those asynchronous calls though, since i operate on the database.

            Observable.just(uriObservableList)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<RxObservableList<Uri>>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "ONSUBSCRIBE ");
                        }
    
                        @Override
                        public void onNext(RxObservableList<Uri> uris) {
                            String s = makePathString(uris.getList());
                            AppExecutor.getInstance().diskIO().execute(() -> viewModel.updatePicturePaths(s)); // <-- THis async call should be performed on every onNext 
                            mAdapter.updatePictures(uris.getList());
                            Log.d(TAG, "ONNEXT");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "ONERROR  " + e);
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "ONCOMPLETE  ");
                        }
                    });

    How do I construct my Subscription so that before every onNext call I can perform a write operation on the database? I have tried doOnNext which is supposed to be executed before onNext, but it is always called from the main thread

    David Stocking
    @dmstocking
    @ylecaillez woopsy I didn't notice your message. Yours appears to be really similar. The only difference appears to be that I take a action to run not just filter the first value. Yours kind of looks like it implements first().
    Viktor Vostrikov
    @wellbranding
    I have previously used MediatorLiveData to combine results from FCM, Room and Networking in my Repository. It worked well. However, now I want more complexity and some additional bonuses from RxJava. This is why I have decided to use RxJava this time. I can combine Room and Networking with Observable.concatArrayEager(my _observables) also I have used merge, but I want to get only latest changed value, not all observables, like it is done with MediatorLiveData However, I don't now how to do that after FCM pushes value, and how should I notify my main observable after new changes occur? No examples on this issue whatsoever. It is crucial part. I receive FCM in my BroadCastReceiver and then notified my repository's livedata, which notified my MediatorLiveData... How to do that with RxJava? Would really welcome any advice, because it is really important issue.