Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ewan Higgs
    @ehiggs
    hey hey. Do you reckon Maybe should have a version of just that takes an Optional<T> that translates it to .map(o -> Maybe.just(o)).orElse(Maybe.never()) ?
    Also, None of the .from methods seem to support CompletableFuture directly and some of them claim that there is some blocking behaviour when using fromFuture. Is that the correct bridge between CompletableFuture<?> and e.g. Single<?> ?
    Menkir
    @Menkir
    image.png
    Hey guys im trying to understand the purpose of subscribeON and observeOn.
    Am i right with my understanding?
    Dan O'Reilly
    @dano
    @ehiggs A lot of what you're suggesting is provided in https://github.com/akarnokd/RxJava2Jdk8Interop, fwiw
    Ewan Higgs
    @ehiggs
    @dano, thanks :)
    Dan O'Reilly
    @dano
    there's MaybeInterop.fromOptional, and mapOptional
    i think FlowableInterop does it, too
    it also has some fromFuture APIs, not sure if they provide what you're looking for, though
    Ewan Higgs
    @ehiggs
    I'm having a weird problem where I want to split a stream into var first = stream.firstOrError() and var rest = stream.skip(1). Then I do first.map(f -> someFunction(f, rest)). But when the first is done with that first element it cancels the rest of the stream which seems to hang up on rest. This is when using reactive-grpc... (I don't seem to be able to reproduce it using Flowable.just type tests even though cancel is called inside rxjava). is there a way to multiplex a stream like this?
    Volkan Yazıcı
    @vy
    Both RxJava 1 and 2 are both using j.u.c.ScheduledThreadPoolExecutor in default schedulers and hence have unbounded task queues, right?
    Ewan Higgs
    @ehiggs
    Is there a class to inherit for a pass-thru component where I can just fill in onNext, onComplete, etc? I get a lot of messy code when I have to keep state between map and doOnComplete. It would be a lot cleaner if I could just inherit some nice class that handles subscriptions and backpressure for me.
    Ignacio Baca Moreno-Torres
    @ibaca
    :astonished: onNext and clear in the same sentence
    hehe I like to think that rxjava is like joinig pipes…
    Pipes2.jpg
    hehe and this is the onNext operator…
    pipe-burst.jpeg
    Ewan Higgs
    @ehiggs

    Ha. :) I guess the interface might be something like:

    interface StreamProcessor<T, R> {
      R onNext(T data);
      void onError(Throwable t);
      void onComplete();
    }

    Then Flowable and friends can have

    Flowable<R> map(StreamProcessor<T, R> proc) {
        return this.map(proc::onNext).doOnError(proc::onError).doOnCompelete(proc::onComplete);
    }

    Maybe this exists already(?)

    Ignacio Baca Moreno-Torres
    @ibaca
    doOnEach
    anyway, your StreamProcessor looks like… not the best idea, maybe a too-imperative-style, taking some ideas from functional programing, some immutability, will make your life much easier in RX, at that point, maybe you get better results if you just use materialize and the subscribe(n -> /*do everithing here, both events and lifecycle will be notified*/)
    Ewan Higgs
    @ehiggs
    Right but if I want to do things like finalize resource usage in onComplete that need access to things being used in onNext then I could make a closure inline. But it makes it hard to maintain as I try to build more and more things working asynchronously in the same group of requests. Pulling this out into something like StreamProcessor, I have a named type that I can test on its lonesome. :)
    Ignacio Baca Moreno-Torres
    @ibaca
    If you have long living resources then use using
    Incubator
    @incube8r
    Hello anyone familar with chaining aysnc calls?
    I mean I have a for-loop and for each for-loop it needs to call a async method, I want the "callback" not to return until all async methods have returned, but in the perspective of the original caller, it is just one callback , is this possible?
    Ghost
    @ghost~5ac67e39d73408ce4f948d87
    you can use awaitAll or allOf (java completable future), but it will wait for all to finish
    Incubator
    @incube8r
    @akumariiit is this blocking? I can't use blocking
    Incubator
    @incube8r
    This is my psuedo-code:
        @Override
        public Single<User> retrieve(String entityId) {
            BaasUser baasUser = new BaasUser();
            baasUser.setEntityId(entityId);
            baasUser.setIncludes(Arrays.asList("userProfile"));
            return baasUser.retrieve().map(user -> {
                String username = user.getUsername();
                String dateCreated = user.getDateCreated();
                String dateUpdated = user.getDateUpdated();
                BaasLink userProfileLink = user.getFirstLink();
                userProfileLink.getEntities().forEach(stubEntity -> {
                    Single<UserProfile> observable = stubEntity.retrieve().map(entity -> {
                        UserProfile userProfile = new UserProfile();
                        userProfile.setEntityId(entity.getStringProperty("entityId"));
                        userProfile.setName(entity.getStringProperty("name"));
                        return userProfile;
                    });
                    observable.subscribe(userProfile -> {
                        // until all UserProfile is fetched this 'retrieve' "callback" should not return
                    }, error -> {
                       // err 
                    });
                });
            });
        }
    Ignacio Baca Moreno-Torres
    @ibaca
    if you talks about data it is ussually easier to see how to combine it, for example, do you mean that you need to load a list of users asynchronous and when all of them are ready execute a method, then if you have some Single<User> loadUser(int id) you can Single<List<User>> loadedUsers = Observable.fromIterable(userIds).flatMapSingle(id -> loadUser(id)).toList())
    Incubator
    @incube8r
    @ibaca in my case I need to get all the "UserProfile" associated with a give User, it can be either zero or multiple, then until all are fetched (based on the loop) then and only then the callback should return be triggered
    (My problem here is the resource does not return the actual needed JSON object, only a stub for the Id) in which I need to still get the actual body of the profile assynchronously, and since we're on GWT I cannot do blocking :-(
    Incubator
    @incube8r
    @ibaca am I aiming for something that is even possible with RxJava? Or what I'm trying to do is impossible?
    Incubator
    @incube8r
        @Override
        public Single<User> retrieve(String entityId) {
            BaasUser baasUser = new BaasUser();
            baasUser.setEntityId(entityId);
            baasUser.setIncludes(Arrays.asList("userProfile"));
            return baasUser.retrieve().map(user -> {
                String username = user.getUsername();
                String dateCreated = user.getDateCreated();
                String dateUpdated = user.getDateUpdated();
    
                List<UserProfile> userProfiles = new LinkedList<>();
    
                BaasLink userProfileLink = user.getFirstLink();
                userProfileLink.getEntities().forEach(stubEntity -> {
                    Single<UserProfile> observable = stubEntity.retrieve().map(entity -> {
                        UserProfile userProfile = new UserProfile();
                        userProfile.setEntityId(entity.getStringProperty("entityId"));
                        userProfile.setName(entity.getStringProperty("name"));
                        return userProfile;
                    });
                    observable.subscribe(userProfile -> {
                        // until all UserProfile is fetched this 'retrieve' "callback" should not return
                        userProfiles.add(userProfile);
                    }, error -> {
                       // err
                    });
                });
                User user1 = new User();
                user1.setDateCreated(dateCreated);    
                user1.setDateUpdated(dateUpdated);    
                user1.setUsername(username);
                user1.setUserProfiles(userProfiles);
                return user1;
            });
        }
    Ghost
    @ghost~5ac67e39d73408ce4f948d87
    @incube8r yes it is blocking
    you can use awaitAll or allOf (java completable future), but it will wait for all to finish
    Johannes Haposan Napitupulu
    @haposan06
    image.png
    Hi, I want to ask why code above the program not output anything? I ran it on intellij.
    The program will give the output if I comment on line subscibeOn
    Ignacio Baca Moreno-Torres
    @ibaca
    @haposan06 the problem is your JVM is shuting down too early because in RX default schedulers creates deamon threads
    one dirty but simple solution is to change the subscribe by doOnNext and then add .ignoreElements().blockingAwait()
    so the main thread do not end until the observable completes
    Johannes Haposan Napitupulu
    @haposan06
    @ibaca Thanks man. This finally helps me give a nice sleep
    Johannes Haposan Napitupulu
    @haposan06
    image.png
    Hello, I want to ask about this snippet why it got compile error. The map R generic type should detect its return type from lambda function, and adjust it to comply with R return type of map cmiiw
    But the compiler detects the R return type of map as Objetct, not Integer
    Ignacio Baca Moreno-Torres
    @ibaca
    @haposan06 remove the fromCallable, just returns getString().map
    Johannes Haposan Napitupulu
    @haposan06
    That snippet is my version of oversimplifying my real solution here. I want to wrap my computation logic and execute it asynchronously. And inside the method, it will call another Single method.

    So it will looks like this @ibaca

    public Single<Integer> getLength(int a){
    return Single.fromCallable(()->{
    int b = a; //computation logic example
    int c = a + b; //computation logic example
    return getString().map(s -> {
    String d = "Hello" + c;
    return Single.just(d.length());
    });
    });
    }
    • I have no idea why the text uppercase
    Ignacio Baca Moreno-Torres
    @ibaca
    public Single<Integer> getLength(int a){
      return getString().map(s -> {
        int b = a; //computation logic example
        int c = a + b; //computation logic example
        String d = "Hello" + c;
        return d.length();
      });
    }
    Johannes Haposan Napitupulu
    @haposan06

    What if I want to put getString in the middle in my computation. For instance

    public Single<Integer> getLength(int a){
            return Single.fromCallable(()->{
                int b = a; //computation logic example
                int cArgument = a + b; //computation logic example
                return getString(cArgument).map(s -> {
                    String d = "Hello" + s;
                    return Single.just(d.length());
                });
            });
        }

    But I dont want to be like this

    public Single<Integer> getLength(int a){
            int b = a; //computation logic example
            int cArgument = a + b; //computation logic example
            return getString(cArgument).map(s -> {
                String d = "Hello" + s;
                return Single.just(d.length());
            });
    
        }

    Because it means it will execute the code above without defer. I want to use fromCallable because from the documentation

    • Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}.
    • It makes passed function "lazy".