These are chat archives for ReactiveX/RxJava

26th
Jul 2018
mghildiy
@mghildiy
Jul 26 2018 03:18
Thaks
Thanks*
Darrin
@darrinps
Jul 26 2018 17:42

I'm trying to do something that I thought would be very simple but can't get it to work. I have a function (f1) that calls another function (f2). It in turn calls a third one (f3). Now, f3 has the job of scanning for devices. It will just keep scanning away. Function f2 I want to use as a time limiter to f3. It's role is to give f3 a given number of seconds to complete its work, then return all devices that f3 found that start with a given prefix. Then, that list of devices is to be returned to f1. Functions f1 and f3 are working as needed. Only f2 is buggy. The way it is coded right now, the result list gets sent back immediately (before any values get placed into it). How would you change this to do what I am trying please? Here is what it looks like (in Kotlin) now:

fun returnAllDevicesStartingWith(devicePrefix: String): List<String> {

Logger.d(TAG, "About to scan for devices. Will return those starting with $devicePrefix")

val mutableList = mutableListOf<String>()

val result = scanForDevices()
       .take(4, TimeUnit.SECONDS, timeoutScheduler)

       .subscribe{scanResult ->
           val name = scanResult.bleDevice.name

           Logger.d(TAG, "Potential device match named $name")

           if(name != null) {
               if(name.startsWith(prefix = devicePrefix)) {
                   Logger.d(TAG, "Match found $name")
                   mutableList.plus(name)
               }
           }
       }
return mutableList

}

David Stocking
@dmstocking
Jul 26 2018 18:03

@darrinps your code looks a little strange to me. Is this a "BlockingObservable" or running on this thread? It doesn't look like it. So this is what it looks like would happen returnAllDevicesStartingWith() would be called and it would setup your observable, but that would be in another thread that might not have run yet. So you immediately return your empty list. Meanwhile in the other thread, you are going to add the devices as you get them. If you have had any experience with Java Futures, it is the same thing. Because it is async, it might not have run yet. A super rough example.

    public Future<Integer> calculate(Integer input) {
        List<Integer> list = new ArrayList<Integer>();
        executor.submit(() -> {
            list.add(input * input);
        });
        return list;
    }

To fix, I would suggest that you continue the Observable chain. Rx does spread (like of like a Maybe Monad or Optional if you have ever used them) so instead your function would be.

fun returnAllDevicesStartingWith(devicePrefix: string): Single<List<String>> {
    return scanForDevices()
        ...
        .toList();
}
Darrin
@darrinps
Jul 26 2018 18:11

It's on the same thread. Really nothing can be done until the function completes (in real life, it will only be allowed to run for 2 seconds or so). Yes, you are correct in that the way it is right now the function returns an empty list right away. I tried doing what you suggested, but .toList() isn't available. Here is what I tried but cannot.

          fun returnAllDevicesStartingWith(devicePrefix: String): Single<List<String>> {

         Logger.d(TAG, "About to scan for devices. Will return those starting with $devicePrefix")

        val mutableList = mutableListOf<String>()

        val result = scanForDevices()
        .take(4, TimeUnit.SECONDS, timeoutScheduler)

       .subscribe{scanResult ->
           val name = scanResult.bleDevice.name

       Logger.d(TAG, "Potential device match named $name")

       if(name != null) {
           if(name.startsWith(prefix = devicePrefix)) {
               Logger.d(TAG, "Match found $name")
               mutableList.plus(name)
           }
       }
   }.toList()   //IS NOT ALLOWED HERE

}

David Stocking
@dmstocking
Jul 26 2018 18:14

I meant something more like this

fun returnAllDevicesStartingWith(devicePrefix: String): Single<List<String>> {
    return  scanForDevices()
       .take(4, TimeUnit.SECONDS, timeoutScheduler)
       .map { it.bleDevice.name }
       .filter { it.startsWith(devicePrefix) }
       .toList()
}

(I don't know if you are using RxJava1 or 2, so I assumed 2)

Darrin
@darrinps
Jul 26 2018 18:15
Yes. Gotcha. Let me try that. Appreciate it!
David Stocking
@dmstocking
Jul 26 2018 18:15
Then the next guy can do returnAllDevicesStartingWith('blue').map { ... } .filter { ... } .whatever { ... }
There were some resources earlier in the chat on starting with RxJava that would probably help a lot.
Darrin
@darrinps
Jul 26 2018 18:18
I've Googled for things on timers and only pulling for a given time and didn't see much that helped. Honestly, the more I use Rx the more I question using it. It's probably great for a certain percentage of things, but most others that I could do easily without it I find way too complicated in Rx.
David Stocking
@dmstocking
Jul 26 2018 18:31

Rx is about streams. If you are just doing timers, I wouldn't bother with Rx. Rx is kind of a philosophy, you change your idea about how you write code to be about creating pipelines to transform data.

With your specific example, maybe you are listing a bunch of bluetooth devices for the user. The way you have it now, the user needs to wait for 4 seconds to get all the devices. But you can easily change that with Rx. You could remove the take and instead scan and then you would get a new list of devices every time you find one. Because Observables are async and support multiple values you can do that. With Futures you are limited to one value so you couldn't update, and synchronously you would block. I personally use Rx quite a bit as an Android developer. A lot of the easy code is stuff like

database.query()
        .map { queryToViewModel(it) }
       .subscribe { bindModel(it) }

And now I don't actually do anything directly and my view model gets updated automatically.

Darrin
@darrinps
Jul 26 2018 18:34
Understood on the streams thing. I think I am understanding now what you changed....the Single is just what it says...a single item. So I'm guessing that it will wait to return the entire list at once since in this case it happens to be a a Single that will give back a List<String>. So far so good. In my function which calls this one I do this: List<String> devices = bleUtility.returnAllDevicesStartingWith(prefix).blockingGet();
Darrin
@darrinps
Jul 26 2018 18:41
In my case, it won't just be displaying, but rather connecting to them, so that's why getting a newly created list for each new device found would be much more complication than it is worth. At least to me. This works great BTW. I just tested it. THANKS!
David Stocking
@dmstocking
Jul 26 2018 19:03
Hmm that would be a little weird, but there are other use cases. I use them a lot for request response type flows.
connection.send(packet)
    .filter { it.type == WAHTEVER }
    .subscribe { handle(it) }