Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    mreddimasi
    @mreddimasi
    so the UI doesn't have dependency to wait for the DB insert to happen
    in the datalayer
    Zak Taccardi
    @ZakTaccardi
    I'm not sure exactly. You should set up a test environment for testing Rx behaviors
    You could just call an asynchronous method in .doOnNext which would do the insert without blocking the stream
    mreddimasi
    @mreddimasi
    @ZakTaccardi ... Thanks so much for your help and patience, will definitely try your suggestions
    Laurent
    @Crystark

    Hi! I'm trying to get the max element from my observable. The thing is there can be more than one "max" element and i want to randomize the choice between those when it's the case. Currently i'm using rxjava-math and doing the following:

                .compose(o -> OperatorMinMax.max(o, (bp1, bp2) -> {
                    int c = bp1.compareTo(bp2);
                    return c == 0 ? (Randoms.R.nextBoolean() ? 1 : -1) : c; // Randomize order if equal
                }))

    I'm breaking the comparator contract though even though it's done on purpose. Would there be a more friendly way of doing so ? I was thinking about reducing my elements to a list of all max elements and then shuffling them and taking first but it seems more complicated in terms of code and would probably be less perfomant.

    David Hoepelman
    @dhoepelman

    @Laurent I assume you want to gather the whole stream before returning an answer? You can use a fold/reduce/collect to gather all max elements and then shuffle/select:

    source.collect(() -> new ArrayList<ElementType>(),
    (list, e) -> {
        if(list.isEmpty() || list.get(0).compareTo(e) == 0) {
            list.add(e);
        } else if(!list.isEmpty() && list.get(0).compareTo(e) < 0) {
            list.clear();
            list.add(e); 
        }
    }).map(list -> new Optional(list.get(Random.nextInt(list.size()))))

    note: it’s nicer to use an immutable list + fold, but I’m not versed enough in the Java API’s. Here’s the scala version:

    source.foldLeft(List())((acc, e) => acc match {
        case Nil => List(e)
        case head :: _ if head < e => List(e)
        case head :: _ if head == e => e :: acc
        case _ => acc
    }).map(list => list.get(Random.nextInt(list.size)))

    Note that in your current implementation you don’t uniformely pick elements, the earlier elements of the stream have 1/2^n chance of being picked while the last max element has a 1/2 chance of being picked

    Laurent
    @Crystark
    @dhoepelman yes indeed, the chances aren't uniform in my current version. I guess I didn't notice how important the difference would be. Actually not shuffling at all wouldn't change much in my current use case. I guess what fold left does is actually the same as the RxJava .reduce() méthod which takes an initial value as first parameter ? I'll try that thanks
    Nelson Pestana
    @npestana
    Hi everyone. I'm using ReactiveX 2 with retrofit, and using the operator merge to join multiple requests. Currently it receives from all requests a List with objects of the same class. How can I identify each request on the result? Does each call of accept in the consumer happens in the same order of the member in the array parameter of merge?
    Zak Taccardi
    @ZakTaccardi
    not really sure what you mean
    Nelson Pestana
    @npestana

    The code:

    Observable.merge(observablesList).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(@NonNull List<String> busStops) throws Exception {
                    Log.e("List content", String.valueOf(busStops.toString()));
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    Log.e("Error", throwable.toString());
                }
            });

    In each call of accept I need to know each observable (from merged) it belongs to.

    Zak Taccardi
    @ZakTaccardi
    then your list of observables needs to contain that additional value
    your list of observables should have each element be : Observable<ResponseWithSource> where ResponseWithSource is:
    class ResponseWithSource{
        Response response;
        Source source; //the additional info you want along with your response
    }
    Nelson Pestana
    @npestana
    Oh I see. Now I need to check my retrofit code, because it returns each observable. I think I have to make it return just the list, and instantiate the class you suggest when creating the observable.
    Thanks :)
    Nelson Pestana
    @npestana
    Now I have a diliema, retrofit 2 returns an Observable<List<String>>, and I can't change the list part because retrofit needs it. Is there anyway to create a new observable from that one to use MyResponse class. My code:
    // Retrofit interface
    @GET("/{id}")
    Observable<List<String>> loadOne(@Path("id") String id);
    
    // Class
    public static class MyResponse {
        List<String> data;
        String id;
    
        public MyResponse(List<String> data, String id) {
            this.data = data;
            this.id = id;
        }
    }
    
    // Load data
    Observable<List<String>> observable = api.loadOne("1");
    Zak Taccardi
    @ZakTaccardi
    @npestana why is your response static?
    Observable.just("1")
                    .switchMap { one -> api.loadOne(one)
                            .map { response -> 
                                one //you can access one here
                            }
                    }
    Nelson Pestana
    @npestana
    @ZakTaccardi because it is a nested class here I'm testing the code
    Zak Taccardi
    @ZakTaccardi
    oh my bad, wups
    note you should have your api return Observable<Response<List<String>>, otherwise you won't be able to handle exceptions properly. Response is a okhttp/retrofit class
    Nelson Pestana
    @npestana
    @ZakTaccardi oh, I'm going to test it now. Maybe that response comes with the some request info. I'm still having an hard time to have the a request parameter available there in the response
    tuananhtd
    @tuananhtd
    Hi guys, my project got built successfully but when running it got Exception in thread "main" java.lang.NoClassDefFoundError: io/reactivex/Observable. Do you guys have any idea?
    Mark Elston
    @melston
    @tuananhtd , it sounds like rxjava isn't in your classpath at runtime. Check out the 'Getting Started' section at https://github.com/ReactiveX/RxJava
    tuananhtd
    @tuananhtd
    Thanks Mark
    Neil Okamoto
    @gonewest818
    Slightly off-topic - can someone confirm if there's any plan to resume work on https://github.com/ReactiveX/RxClojure ?
    Bayarmanlai
    @hyperamli_twitter

    hi guys,
    I have methods:

    Observable<List<Product>> getProducts()
    Observable<List<Category>> getCategories()

    I want to implement:

    Observable<List<Product>> getEnrichedProducts() {
        // call getProducts
        // call getCategories
        // 1. enrich every single product with category object then return list of enriched product
        // 2. If getCategories throws exception just return original products returned from getProducts
    }
    Johannes Schneider
    @jschneider

    Hey guys. I have a Flowable<byte[]>.
    It provides raw communication data.

    I want to convert these byte-Arrays into "Java Objects".

    Unfortunately the byte[] arrays do not correspond to the beginning/end of each object. I first have to detect the beginning/end of each object.

    Any ideas how to solve this with RxJava?

    Johannes Schneider
    @jschneider
    Thanks. I will take a look.
    Laurent
    @Crystark

    Hey there. I'm using RxNetty and receiving an Observable<ByteBuf> as I download a very large gzip file. The thing is I don't want to download the whole file before starting to read it. I'd like to be able to do the following:

    Observable<ByteBuf> o = ...;
    Observable<String> byLine = o
        .compose(toCompositeByteBuf) // Single<CompositeByteBuf>
        .compose(toByteBufInputStream) // Single<ByteBufInputStream>
        .compose(toGZIPInputStream) // Single<GZIPInputStream>
        .compose(splitLines); // Observable<String>

    The final observable would emit as many lines as are in the original file. My problem is in toCompositeByteBufand toByteBufInputStream: I don't want to wait for the CompositeByteBuf to be collected before starting to emit the lines and I need the InputStream to be disposed of only when all ByteBuf have been added to the CompositeByteBuf.
    I was looking at Observable.using but I'm not sure how it can answer my problem. Do you guys have any idea how I can solve that ?

    Laurent
    @Crystark
    The more I think about this the more it seems I would need an observable-backed InputStream but that seems a bit too much for my current skills.
    Michael Zinn
    @RedNifre
    Is it possible to only get the most recent element of an observable or null if the observable has no elements yet?
    BehaviorSubject comes close but it also gives me all the following elements. I only need one.
    (I'm totally new to rx)
    Fabricio
    @ofabricio
    it seems it's BehaviorSubject what you want
    Michael Zinn
    @RedNifre
    So I just unsubscribe after the first onNext to make sure I only get one element?
    Fabricio
    @ofabricio
    if you only need one then .take(1)
    Michael Zinn
    @RedNifre
    ah! However, what if the subject is empty and I don't want to do anything if the first element comes later?
    Fabricio
    @ofabricio
    behavior requires an initial value (it can be initialized with null)
    what are you trying to do?
    Michael Zinn
    @RedNifre
    Open a dialog box where the user can enter a username and password. It's an app, there's a persistence that provides an Observable<Credentials>. The idea is that when the dialog gets opened the input fields get filled with the username and password, if it exists, but the text fields shouldn't update later (it would override what the user is entering).
    I COULD add a synchronous "getCredentials" method to the persistence, but I hope there's a way to have the Observable<Credentials> to be the only source of credentials comming from the persistence.
    Fabricio
    @ofabricio
    then thats it.
    getCredentials().take(1).subscribe(cred => {
      if (cred == null) return;
      nameField = cred.name; 
      etc.
    });
    getCred stream must be a behaviorSub
    Michael Zinn
    @RedNifre
    Currently, the cred can't be null, the observable only emits valid credentials at the moment.
    Fabricio
    @ofabricio
    but the credential is filled in the login right?
    Michael Zinn
    @RedNifre
    if there are persisted credentials those will be filled into the dialog when the dialog opens. When the user entered credentials and hits save, the dialog pushes those credentials into another BehaviorSubject which is also injected in the persistence. The persistence then first saves those credentials and then pushes it to the other BehaviorSubject, which is also the one the dialog subscribes to. On startup, the persistence also pushes the credentials into the BehaviorSubject, if they exist.
    Fabricio
    @ofabricio
    so you're afraid it'll replace the field?
    Michael Zinn
    @RedNifre
    It won't happen in practice because the dialog is the only thing writing to the persistence and the dialog closes when you save. It works, but it doesn't feel correct.