Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Aditya Chaturvedi
    @apexkid
    It is not working.
    Ignacio Baca Moreno-Torres
    @ibaca
    yep, this is another problem, try to add the .subscribeOn(io()) in the save observable too
    it depends on how those methods are implemented (getToken and saveTokenToBackend)
    Aditya Chaturvedi
    @apexkid
    both methods are Retrofit api call
    Ignacio Baca Moreno-Torres
    @ibaca
    and the RxThreadCallAdapter should not handle this for you?
    are you configured that correctly?
    Aditya Chaturvedi
    @apexkid
    yes its working when i call it individually
    but not when i chain it
    Ignacio Baca Moreno-Torres
    @ibaca
    then just repeat the subscribeOn just after the api call
    Aditya Chaturvedi
    @apexkid
    brb
    Ignacio Baca Moreno-Torres
    @ibaca
    look like the API call apply the observeOn automattically (I think, not sure…)
    Ignacio Baca Moreno-Torres
    @ibaca
    uhm RxJava2CallAdapterFactory do not do that, not sure, anyways your problem MUST be that the flatMap has been evaluated on the mainThread so adding the subscribeOn must solve the problem, also, I really think that you should apply the RxJava2CallAdapterFactory using the RxJava2CallAdapterFactory.createWithScheduler(io()) call, that assert that all request call are evaluated in the io scheduler
    Aditya Chaturvedi
    @apexkid
    Thanks @ibaca . Your suggestion of changing the RxFactory worked. I still wonder why the issue was happening in the first place.
    I was already going .ObserveOn to my completable
    Ignacio Baca Moreno-Torres
    @ibaca
    :+1:
    Ewan Higgs
    @ehiggs
    hey hey. Do you reckon Maybe should have a version of just that takes an Optional<T> that translates it to .map(o -> Maybe.just(o)).orElse(Maybe.never()) ?
    Also, None of the .from methods seem to support CompletableFuture directly and some of them claim that there is some blocking behaviour when using fromFuture. Is that the correct bridge between CompletableFuture<?> and e.g. Single<?> ?
    Menkir
    @Menkir
    image.png
    Hey guys im trying to understand the purpose of subscribeON and observeOn.
    Am i right with my understanding?
    Dan O'Reilly
    @dano
    @ehiggs A lot of what you're suggesting is provided in https://github.com/akarnokd/RxJava2Jdk8Interop, fwiw
    Ewan Higgs
    @ehiggs
    @dano, thanks :)
    Dan O'Reilly
    @dano
    there's MaybeInterop.fromOptional, and mapOptional
    i think FlowableInterop does it, too
    it also has some fromFuture APIs, not sure if they provide what you're looking for, though
    Ewan Higgs
    @ehiggs
    I'm having a weird problem where I want to split a stream into var first = stream.firstOrError() and var rest = stream.skip(1). Then I do first.map(f -> someFunction(f, rest)). But when the first is done with that first element it cancels the rest of the stream which seems to hang up on rest. This is when using reactive-grpc... (I don't seem to be able to reproduce it using Flowable.just type tests even though cancel is called inside rxjava). is there a way to multiplex a stream like this?
    Volkan Yazıcı
    @vy
    Both RxJava 1 and 2 are both using j.u.c.ScheduledThreadPoolExecutor in default schedulers and hence have unbounded task queues, right?
    Ewan Higgs
    @ehiggs
    Is there a class to inherit for a pass-thru component where I can just fill in onNext, onComplete, etc? I get a lot of messy code when I have to keep state between map and doOnComplete. It would be a lot cleaner if I could just inherit some nice class that handles subscriptions and backpressure for me.
    Ignacio Baca Moreno-Torres
    @ibaca
    :astonished: onNext and clear in the same sentence
    hehe I like to think that rxjava is like joinig pipes…
    Pipes2.jpg
    hehe and this is the onNext operator…
    pipe-burst.jpeg
    Ewan Higgs
    @ehiggs

    Ha. :) I guess the interface might be something like:

    interface StreamProcessor<T, R> {
      R onNext(T data);
      void onError(Throwable t);
      void onComplete();
    }

    Then Flowable and friends can have

    Flowable<R> map(StreamProcessor<T, R> proc) {
        return this.map(proc::onNext).doOnError(proc::onError).doOnCompelete(proc::onComplete);
    }

    Maybe this exists already(?)

    Ignacio Baca Moreno-Torres
    @ibaca
    doOnEach
    anyway, your StreamProcessor looks like… not the best idea, maybe a too-imperative-style, taking some ideas from functional programing, some immutability, will make your life much easier in RX, at that point, maybe you get better results if you just use materialize and the subscribe(n -> /*do everithing here, both events and lifecycle will be notified*/)
    Ewan Higgs
    @ehiggs
    Right but if I want to do things like finalize resource usage in onComplete that need access to things being used in onNext then I could make a closure inline. But it makes it hard to maintain as I try to build more and more things working asynchronously in the same group of requests. Pulling this out into something like StreamProcessor, I have a named type that I can test on its lonesome. :)
    Ignacio Baca Moreno-Torres
    @ibaca
    If you have long living resources then use using
    Incubator
    @incube8r
    Hello anyone familar with chaining aysnc calls?
    I mean I have a for-loop and for each for-loop it needs to call a async method, I want the "callback" not to return until all async methods have returned, but in the perspective of the original caller, it is just one callback , is this possible?
    Ghost
    @ghost~5ac67e39d73408ce4f948d87
    you can use awaitAll or allOf (java completable future), but it will wait for all to finish
    Incubator
    @incube8r
    @akumariiit is this blocking? I can't use blocking
    Incubator
    @incube8r
    This is my psuedo-code:
        @Override
        public Single<User> retrieve(String entityId) {
            BaasUser baasUser = new BaasUser();
            baasUser.setEntityId(entityId);
            baasUser.setIncludes(Arrays.asList("userProfile"));
            return baasUser.retrieve().map(user -> {
                String username = user.getUsername();
                String dateCreated = user.getDateCreated();
                String dateUpdated = user.getDateUpdated();
                BaasLink userProfileLink = user.getFirstLink();
                userProfileLink.getEntities().forEach(stubEntity -> {
                    Single<UserProfile> observable = stubEntity.retrieve().map(entity -> {
                        UserProfile userProfile = new UserProfile();
                        userProfile.setEntityId(entity.getStringProperty("entityId"));
                        userProfile.setName(entity.getStringProperty("name"));
                        return userProfile;
                    });
                    observable.subscribe(userProfile -> {
                        // until all UserProfile is fetched this 'retrieve' "callback" should not return
                    }, error -> {
                       // err 
                    });
                });
            });
        }
    Ignacio Baca Moreno-Torres
    @ibaca
    if you talks about data it is ussually easier to see how to combine it, for example, do you mean that you need to load a list of users asynchronous and when all of them are ready execute a method, then if you have some Single<User> loadUser(int id) you can Single<List<User>> loadedUsers = Observable.fromIterable(userIds).flatMapSingle(id -> loadUser(id)).toList())
    Incubator
    @incube8r
    @ibaca in my case I need to get all the "UserProfile" associated with a give User, it can be either zero or multiple, then until all are fetched (based on the loop) then and only then the callback should return be triggered
    (My problem here is the resource does not return the actual needed JSON object, only a stub for the Id) in which I need to still get the actual body of the profile assynchronously, and since we're on GWT I cannot do blocking :-(
    Incubator
    @incube8r
    @ibaca am I aiming for something that is even possible with RxJava? Or what I'm trying to do is impossible?
    Incubator
    @incube8r
        @Override
        public Single<User> retrieve(String entityId) {
            BaasUser baasUser = new BaasUser();
            baasUser.setEntityId(entityId);
            baasUser.setIncludes(Arrays.asList("userProfile"));
            return baasUser.retrieve().map(user -> {
                String username = user.getUsername();
                String dateCreated = user.getDateCreated();
                String dateUpdated = user.getDateUpdated();
    
                List<UserProfile> userProfiles = new LinkedList<>();
    
                BaasLink userProfileLink = user.getFirstLink();
                userProfileLink.getEntities().forEach(stubEntity -> {
                    Single<UserProfile> observable = stubEntity.retrieve().map(entity -> {
                        UserProfile userProfile = new UserProfile();
                        userProfile.setEntityId(entity.getStringProperty("entityId"));
                        userProfile.setName(entity.getStringProperty("name"));
                        return userProfile;
                    });
                    observable.subscribe(userProfile -> {
                        // until all UserProfile is fetched this 'retrieve' "callback" should not return
                        userProfiles.add(userProfile);
                    }, error -> {
                       // err
                    });
                });
                User user1 = new User();
                user1.setDateCreated(dateCreated);    
                user1.setDateUpdated(dateUpdated);    
                user1.setUsername(username);
                user1.setUserProfiles(userProfiles);
                return user1;
            });
        }
    Ghost
    @ghost~5ac67e39d73408ce4f948d87
    @incube8r yes it is blocking
    you can use awaitAll or allOf (java completable future), but it will wait for all to finish
    Johannes Haposan Napitupulu
    @haposan06
    image.png