Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    David Stocking
    @dmstocking
    Thanks @akarnokd. I guess it makes sense that if you use a TestScheduler local to the test you will still get GCed. I think it is probably best if I add an explicit dispose for these async tests. Besides leaking memory, I think I could possibly have an Observable triggered from another test before the GC has run. I do not want to flakiness in my test.
    Ankush Grover
    @groverankush
    I came across a scenario where I have to return a Single A and before returning that single I have to save an Arraylist which is contained in the Single A in the local database. Since I am fairly new to Rx concepts I am unable to figure out the right way.
    Can anyone please help?
    Ankush Grover
    @groverankush

    I came across a scenario where I have to return a Single A and before returning that single I have to save an Arraylist which is contained in the Single A in the local database. Since I am fairly new to Rx concepts I am unable to figure out the right way.

    Can anybody help?

    David Karnok
    @akarnokd
    @groverankush You should probably read the Getting started and look for RxJava tutorials or any RxJava book first.
    mghildiy
    @mghildiy
    Hi...any book suggestion for a beginner in reactive programming?
    David Karnok
    @akarnokd
    Nurkiewicz: Reactive Programming with RxJava: http://shop.oreilly.com/product/0636920042228.do
    Ignacio Baca Moreno-Torres
    @ibaca
    This is one of the most famous articles about introduction to reactive programming https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
    And this is my preferred practical tutorial to start using Rx operators without understanding the theory of reactive programming http://reactivex.io/learnrx/
    Ignacio Baca Moreno-Torres
    @ibaca
    Is like if you force some one to understand what is a monad to start using jQuery :-1: hehe just start using it, and then learn the theory behind
    mghildiy
    @mghildiy
    Thaks
    Thanks*
    Darrin
    @darrinps

    I'm trying to do something that I thought would be very simple but can't get it to work. I have a function (f1) that calls another function (f2). It in turn calls a third one (f3). Now, f3 has the job of scanning for devices. It will just keep scanning away. Function f2 I want to use as a time limiter to f3. It's role is to give f3 a given number of seconds to complete its work, then return all devices that f3 found that start with a given prefix. Then, that list of devices is to be returned to f1. Functions f1 and f3 are working as needed. Only f2 is buggy. The way it is coded right now, the result list gets sent back immediately (before any values get placed into it). How would you change this to do what I am trying please? Here is what it looks like (in Kotlin) now:

    fun returnAllDevicesStartingWith(devicePrefix: String): List<String> {
    
    Logger.d(TAG, "About to scan for devices. Will return those starting with $devicePrefix")
    
    val mutableList = mutableListOf<String>()
    
    val result = scanForDevices()
           .take(4, TimeUnit.SECONDS, timeoutScheduler)
    
           .subscribe{scanResult ->
               val name = scanResult.bleDevice.name
    
               Logger.d(TAG, "Potential device match named $name")
    
               if(name != null) {
                   if(name.startsWith(prefix = devicePrefix)) {
                       Logger.d(TAG, "Match found $name")
                       mutableList.plus(name)
                   }
               }
           }
    return mutableList

    }

    David Stocking
    @dmstocking

    @darrinps your code looks a little strange to me. Is this a "BlockingObservable" or running on this thread? It doesn't look like it. So this is what it looks like would happen returnAllDevicesStartingWith() would be called and it would setup your observable, but that would be in another thread that might not have run yet. So you immediately return your empty list. Meanwhile in the other thread, you are going to add the devices as you get them. If you have had any experience with Java Futures, it is the same thing. Because it is async, it might not have run yet. A super rough example.

        public Future<Integer> calculate(Integer input) {
            List<Integer> list = new ArrayList<Integer>();
            executor.submit(() -> {
                list.add(input * input);
            });
            return list;
        }

    To fix, I would suggest that you continue the Observable chain. Rx does spread (like of like a Maybe Monad or Optional if you have ever used them) so instead your function would be.

    fun returnAllDevicesStartingWith(devicePrefix: string): Single<List<String>> {
        return scanForDevices()
            ...
            .toList();
    }
    Darrin
    @darrinps

    It's on the same thread. Really nothing can be done until the function completes (in real life, it will only be allowed to run for 2 seconds or so). Yes, you are correct in that the way it is right now the function returns an empty list right away. I tried doing what you suggested, but .toList() isn't available. Here is what I tried but cannot.

              fun returnAllDevicesStartingWith(devicePrefix: String): Single<List<String>> {
    
             Logger.d(TAG, "About to scan for devices. Will return those starting with $devicePrefix")
    
            val mutableList = mutableListOf<String>()
    
            val result = scanForDevices()
            .take(4, TimeUnit.SECONDS, timeoutScheduler)
    
           .subscribe{scanResult ->
               val name = scanResult.bleDevice.name
    
           Logger.d(TAG, "Potential device match named $name")
    
           if(name != null) {
               if(name.startsWith(prefix = devicePrefix)) {
                   Logger.d(TAG, "Match found $name")
                   mutableList.plus(name)
               }
           }
       }.toList()   //IS NOT ALLOWED HERE

    }

    David Stocking
    @dmstocking

    I meant something more like this

    fun returnAllDevicesStartingWith(devicePrefix: String): Single<List<String>> {
        return  scanForDevices()
           .take(4, TimeUnit.SECONDS, timeoutScheduler)
           .map { it.bleDevice.name }
           .filter { it.startsWith(devicePrefix) }
           .toList()
    }

    (I don't know if you are using RxJava1 or 2, so I assumed 2)

    Darrin
    @darrinps
    Yes. Gotcha. Let me try that. Appreciate it!
    David Stocking
    @dmstocking
    Then the next guy can do returnAllDevicesStartingWith('blue').map { ... } .filter { ... } .whatever { ... }
    There were some resources earlier in the chat on starting with RxJava that would probably help a lot.
    Darrin
    @darrinps
    I've Googled for things on timers and only pulling for a given time and didn't see much that helped. Honestly, the more I use Rx the more I question using it. It's probably great for a certain percentage of things, but most others that I could do easily without it I find way too complicated in Rx.
    David Stocking
    @dmstocking

    Rx is about streams. If you are just doing timers, I wouldn't bother with Rx. Rx is kind of a philosophy, you change your idea about how you write code to be about creating pipelines to transform data.

    With your specific example, maybe you are listing a bunch of bluetooth devices for the user. The way you have it now, the user needs to wait for 4 seconds to get all the devices. But you can easily change that with Rx. You could remove the take and instead scan and then you would get a new list of devices every time you find one. Because Observables are async and support multiple values you can do that. With Futures you are limited to one value so you couldn't update, and synchronously you would block. I personally use Rx quite a bit as an Android developer. A lot of the easy code is stuff like

    database.query()
            .map { queryToViewModel(it) }
           .subscribe { bindModel(it) }

    And now I don't actually do anything directly and my view model gets updated automatically.

    Darrin
    @darrinps
    Understood on the streams thing. I think I am understanding now what you changed....the Single is just what it says...a single item. So I'm guessing that it will wait to return the entire list at once since in this case it happens to be a a Single that will give back a List<String>. So far so good. In my function which calls this one I do this: List<String> devices = bleUtility.returnAllDevicesStartingWith(prefix).blockingGet();
    Darrin
    @darrinps
    In my case, it won't just be displaying, but rather connecting to them, so that's why getting a newly created list for each new device found would be much more complication than it is worth. At least to me. This works great BTW. I just tested it. THANKS!
    David Stocking
    @dmstocking
    Hmm that would be a little weird, but there are other use cases. I use them a lot for request response type flows.
    connection.send(packet)
        .filter { it.type == WAHTEVER }
        .subscribe { handle(it) }
    骨来(PeterLi)
    @pli2014
    Hi guys, I have a question about RxJava invoker-chain which has a lot of next op fill in the stack.
    I hardly diagnosed business exception log when async calling or exception status.
    Ignacio Baca Moreno-Torres
    @ibaca
    Yep, crazy stack traces is a known inconvenience for rx
    There are tricks to get better stack traces like https://github.com/akaita/RxJava2Debug
    Find in google, there are a lot of interesting discussions ReactiveX/RxJava#3521
    骨来(PeterLi)
    @pli2014
    @ibaca got it, thanks about this solution.
    骨来(PeterLi)
    @pli2014
    Who does implement flow DSL graph for debug/log? Flowable has a function chain which includes map/flatMap/filter .etc asynchronous chain calling is very complexed, asynchronous debug stack always is confused.
    Facebook graphQL resolved this issue with datafetcher,which could be fetched data cross any method as multiple data sources asynchronously.
    graphQL is a query language and DSL specification,describe data and structure data fetcher
    骨来(PeterLi)
    @pli2014
    welcome to talk about this topic about RxJava and graphQL in graph DSL description
    Jaumard
    @jaumard
    Hello people, is there a way with RXJava2 to have a cancellationToken to cancel a single ? What I’m trying to achieve is a long series of actions, at some point the user want to cancel it, so I want to stop my series of actions, but to cancel my actions I need to excecute some asynchronous task and I don’t want my single to finish until the cancellation is done. Is such a thing possible and how ? I have no clue how to implement this :/
    Ignacio Baca Moreno-Torres
    @ibaca
    Un-subscribe the single when the cancelation is done?
    David Stocking
    @dmstocking

    @jaumard Ok so first I don't know if this is a great idea. I am just going to assume you have your reasons why just unsubscribing isn't good enough. Now on to how you would do this. If you wanted a cancellation token like C#, you could do it fairly easily if you used Observable.create. C# cancellation tokens are glorified boolean holders anyway. You could also use Maybe.create, Flowable.create, etc. I don't think you could do this with Single unless you just say it was an error on cancel. What you would do is in your tight loop you just check your cancellation token. My example will be in Kotlin (sorry I am primarily a Android programmer)

    Observable.create { emitter ->
        for (val action : actions) ->
            if (token.cancelled()) {
                break
            }
            emitter.onNext(action())
        }
        emitter.onComplete()
    }

    Depending on what you are trying to do this might work. If you have a chain of Maybes, you could also just decide to not return the next step

    fun Maybe<T>.ifNotCancelled(defered: () -> Maybe<R>): Maybe<R> {
        return this.filter { !token.isCancelled() }
             .flatMap(defered)
    }
    
    step1()
        .ifNotCancelled { step2() }
        .ifNotCancelled { step3() }
        .ifNotCancelled { step4() }

    There are quite a few ways to do this and without more info I am just taking shots in the dark, but I hope that help gives you an idea of what you can do. The reason this is a little strange is probably because you don't want to model something as part of the stream. Usually it is just easier to keep all the information as part of the stream then you can do whatever you want via more RxJava operators like filter, flatMap, map etc. One of the ways we do this in my project is with switchMap. When you don't stay inside the stream, your kind of just using RxJava as a glorified Functional library / Thread replacement which isn't horrible but kind of leaves all these great ideas behind.

    Jaumard
    @jaumard
    @dmstocking thanks for the feedback ! No problem with kotlin I’m also an Android programmer pimarily ^^
    That’s the path I was thinking to take (and put the cancel as an error on my single); Just wanted some feedbacks on how was my solutions as I’m not expert in RX ^^. The full story is that we were given a proprietary SDK to interact with a BT device, this SDK work based on one listener with around 40 methods to override and implements, and we can put only one listener on the SDK. I want to hide this stuff by wrapping it into RX calls to be more readable/maintenable once used on the app (actually it’s a Flutter plugin but same problem). And the SDK doesn’t allow simultanous calls so once you cancel something you need to wait until it’s successfuly cancelled and you have receive the right callback from the SDK.
    I know the solution of the token inside the create is not perfect, but I have to comply with the SDK I have and make it work ^^
    I’ll go that way and see how it goes :)
    Darrin
    @darrinps
    Still trying to understand how to scan for a limited time then return a list to work with. I am using the RxAndroidBle to scan for all BLE devices. I am trying to look for all devices starting with ABC for example. When I create a test by emitting values using Observable.intervalRange, everything works perfectly. If I don't try to constrain the RxAndroidBle.scanBleDevices function by using a .take() and just subscribe to the results, that works as expected. When I try to build up a list of BLE devices it finds and then return it though, it doesn't work (nothing ever gets returned).
    I call the function like this:
              List<String> bleDevicesWithMyPrefix = Scanner.create(context).scanForPrefix("ABC").blockingGet();
    Darrin
    @darrinps
         fun scanForPrefix(prefix: String): Single<MutableList<String?>> {
                return scanAllDevices()
                .take(15, TimeUnit.SECONDS, timeoutScheduler)
                .map { it.bleDevice.name }
                .filter { inspectScanResult(it,prefix) }
                .toList()
    }
              private fun scanAllDevices(): Observable<ScanResult>
            = rxBleClient.scanBleDevices(
            ScanSettings.Builder()
                    .setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
                    .setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
                    .build(),
            ScanFilter.Builder()
                    .build())
    Darrin
    @darrinps
    The test I set up works perfectly. It looks like this:
               //Begin test code
    var emittedList: List<String> = listOf("MMM", "OOO", "ABC", "ABCDEF", "ZZZ", "ABC", "RRRRRRR", "ZZABC")
    
    private fun scanAllDevicesTEST(): Observable<ScanResult> {
    
        return Observable
                .intervalRange(0, emittedList.size.toLong(), 0, 200, TimeUnit.MILLISECONDS, timeoutScheduler)
                .map { index -> ScanResult(MyDevice(emittedList[index.toInt()]), 4, 10, ScanCallbackType.CALLBACK_TYPE_ALL_MATCHES, MyRecord()) }
    
    }
    So I am confused why it works for the test which emits values just like the real thing does, and not for the real thing. Again, if I don't do a take, and subscribe instead, then the scanAllDevices code does scan correctly. I just can't allow it to scan forever and I need one complete list. Not something that gets sent back an element at a time.
    Darrin
    @darrinps
    For example, doing this I do get the BLE devices scanned successfully. I just cannot use them coming back async. I need a complete list or as much of a list as can be found in a specified time period. Note that the scanner takes less than a second to get all of the values when it is working, but I gave it up to 15 just to take out that variable. Here is the code that does at least scan.
        private fun scanAllDevices(context: Context) {
    
        val rxBleClient = RxBleClient.create(context);
    
        rxBleClient.scanBleDevices(
                 ScanSettings.Builder ()
                        .setScanMode(ScanSettings.SCAN_MODE_LOW_POWER) // change if needed
                        .setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES) // change if needed
                        .build(),
                ScanFilter.Builder()
                        .build()
        )
                .observeOn(Schedulers.io())
                .subscribe{scanResult->
                    val rxBleDevice = rxBleClient.getBleDevice(scanResult.getBleDevice().getMacAddress());
                    Logger.d(TAG, "The device has name ${rxBleDevice.name}")
                }
    }
    Ignacio Baca Moreno-Torres
    @ibaca
    Not sure, but add various ".doOnNext" to log element to see which operator is not doing what you expect
    erge123
    @erge123
    @songyunlu
    Darrin
    @darrinps
    Thanks for the suggestion. I did try that, and didn't see any .doOnNext methods ever get hit anywhere. Anyway, I figured out how to do what I want, but I still am not certain why what I had did not work. What I came up with looks like this:
        button_scan3.setOnClickListener {
            Log.d(TAG, "\n\nFiring up Scanner3")
    
            val scanner = SuiteScanner.create(this.applicationContext)
            val set = mutableSetOf<String>()
    
            scanner.scanForAllBLEDevices3("ABC")
                    .doOnComplete { Log.d(TAG, "Completed...Scanner3")
                        for(name in set ) {
                            Log.d(TAG, "BLE Device: $name")
                        }
                    }
                    .subscribe { scanResult ->
                        val rxBleDevice = scanResult.getBleDevice()
                        Log.d(TAG, "The device has name ${rxBleDevice.name}")
                        val name = rxBleDevice.name
                        if (name != null) {
                            set.add(name)
                        }
                    }
    
        }                                                                                                                                                                                                                                                                                                                                                                                                                                           
    
        fun scanForAllBLEDevices3(prefix: String): Observable<ScanResult> {
    
        Log.w(TAG, "We are in scanForAllBLEDevices3.")
    
        return rxBleScanner()
                .take(2, TimeUnit.SECONDS, timeoutScheduler)
                .observeOn(Schedulers.io())
                .filter { device -> inspectScanResult(device.bleDevice.name, prefix) }
    }
    
    
    private fun inspectScanResult(foundDevice: String?, prefix: String): Boolean {
        Log.d(TAG, " Inspecting: $foundDevice")
    
        if(foundDevice == null) {
            return false
        }
    
        return (foundDevice.startsWith(prefix))
    }
    So I'll just do what needs to be done in the onComplete()
    Yannick Lecaillez
    @ylecaillez
    Hi !
    Is there an operator which behaves like take(1) but does not cancel upstream once the first item has been emitted ?