These are chat archives for ReactiveX/RxJava

22nd
Mar 2017
David Hoepelman
@dhoepelman
Mar 22 2017 09:19

@Laurent I assume you want to gather the whole stream before returning an answer? You can use a fold/reduce/collect to gather all max elements and then shuffle/select:

source.collect(() -> new ArrayList<ElementType>(),
(list, e) -> {
    if(list.isEmpty() || list.get(0).compareTo(e) == 0) {
        list.add(e);
    } else if(!list.isEmpty() && list.get(0).compareTo(e) < 0) {
        list.clear();
        list.add(e); 
    }
}).map(list -> new Optional(list.get(Random.nextInt(list.size()))))

note: it’s nicer to use an immutable list + fold, but I’m not versed enough in the Java API’s. Here’s the scala version:

source.foldLeft(List())((acc, e) => acc match {
    case Nil => List(e)
    case head :: _ if head < e => List(e)
    case head :: _ if head == e => e :: acc
    case _ => acc
}).map(list => list.get(Random.nextInt(list.size)))

Note that in your current implementation you don’t uniformely pick elements, the earlier elements of the stream have 1/2^n chance of being picked while the last max element has a 1/2 chance of being picked

Laurent
@Crystark
Mar 22 2017 14:21
@dhoepelman yes indeed, the chances aren't uniform in my current version. I guess I didn't notice how important the difference would be. Actually not shuffling at all wouldn't change much in my current use case. I guess what fold left does is actually the same as the RxJava .reduce() méthod which takes an initial value as first parameter ? I'll try that thanks
Nelson Pestana
@npestana
Mar 22 2017 17:38
Hi everyone. I'm using ReactiveX 2 with retrofit, and using the operator merge to join multiple requests. Currently it receives from all requests a List with objects of the same class. How can I identify each request on the result? Does each call of accept in the consumer happens in the same order of the member in the array parameter of merge?
Zak Taccardi
@ZakTaccardi
Mar 22 2017 17:39
not really sure what you mean
Nelson Pestana
@npestana
Mar 22 2017 17:45

The code:

Observable.merge(observablesList).subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(@NonNull List<String> busStops) throws Exception {
                Log.e("List content", String.valueOf(busStops.toString()));
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                Log.e("Error", throwable.toString());
            }
        });

In each call of accept I need to know each observable (from merged) it belongs to.

Zak Taccardi
@ZakTaccardi
Mar 22 2017 17:47
then your list of observables needs to contain that additional value
your list of observables should have each element be : Observable<ResponseWithSource> where ResponseWithSource is:
class ResponseWithSource{
    Response response;
    Source source; //the additional info you want along with your response
}
Nelson Pestana
@npestana
Mar 22 2017 17:53
Oh I see. Now I need to check my retrofit code, because it returns each observable. I think I have to make it return just the list, and instantiate the class you suggest when creating the observable.
Thanks :)
Nelson Pestana
@npestana
Mar 22 2017 18:46
Now I have a diliema, retrofit 2 returns an Observable<List<String>>, and I can't change the list part because retrofit needs it. Is there anyway to create a new observable from that one to use MyResponse class. My code:
// Retrofit interface
@GET("/{id}")
Observable<List<String>> loadOne(@Path("id") String id);

// Class
public static class MyResponse {
    List<String> data;
    String id;

    public MyResponse(List<String> data, String id) {
        this.data = data;
        this.id = id;
    }
}

// Load data
Observable<List<String>> observable = api.loadOne("1");
Zak Taccardi
@ZakTaccardi
Mar 22 2017 22:53
@npestana why is your response static?
Observable.just("1")
                .switchMap { one -> api.loadOne(one)
                        .map { response -> 
                            one //you can access one here
                        }
                }
Nelson Pestana
@npestana
Mar 22 2017 22:56
@ZakTaccardi because it is a nested class here I'm testing the code
Zak Taccardi
@ZakTaccardi
Mar 22 2017 22:56
oh my bad, wups
note you should have your api return Observable<Response<List<String>>, otherwise you won't be able to handle exceptions properly. Response is a okhttp/retrofit class
Nelson Pestana
@npestana
Mar 22 2017 23:00
@ZakTaccardi oh, I'm going to test it now. Maybe that response comes with the some request info. I'm still having an hard time to have the a request parameter available there in the response