Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Joshua Street
    @jjstreet
    i've talked to some folks in the spring boot channel and they state that it could be related to what thread context class loader RxJava uses
    it shouldn't be using Tomcat's, and should be using the Application's
    does anyone have insight how i can confirm this is the problem and how to fix it?
    Joshua Street
    @jjstreet
    turns out using the IO scheduler had cached stale threads
    shutting down and starting the IO scheduler fixes the problem
    David Karnok
    @akarnokd
    For container usages, you have to manually shutdown the schedulers via Schedulers.shutdown() when the container lifecycle ends. It is also recommended you cancel any outstanding flows before that.
    Joshua Street
    @jjstreet
    thanks @akarnokd for the confirmation.
    Paul DeMarco
    @pauldemarco
    hi everyone, i have a function that I'd like to return a completable on whether or not it kicked off the longer running observable correctly.
    how can I achieve this?
    Completable startNofications() {
      longRunningObservable
             .flatMap(foo -> foo.setupNotification())
             .doOnNext( // Setup correctly, return completable to function)
             .map(otherwork)
             .subscribe(data -> ...)
    }
    Mark Raynsford
    @io7m
    'lo. is there such a thing as a Subject that protects against infinite recursion? consider the case where a subscriber to a Subject publishes events to that same Subject.
    that'll almost certainly result in a StackOverflowException sooner or later. it'd be nice if it were possible to electively avoid that on the first recursion
    Dorus
    @Dorus
    @io7m Dont use subjects?
    Exerosis
    @Exerosis
    Is there some type of Observable that has two types of data? I know a pair works fine, but I'm considering using this lib in an API I'm working on, Observables would be useful, but I would feel obligated to replace existing methods like onAdd(Consumer<Type> listener) with observable equivalents. In which case, I would end up doing a ton of pairing and then unpairing, which isn't something I really want to do(performance and more work for API user).
    Mark Raynsford
    @io7m
    @Dorus it's a possibility, but I'm exploring all options (that was basically the response I expected, I realize subjects aren't popular)
    David Karnok
    @akarnokd
    @io7m Use toSerialized(): it protects the Subject from reentrance and concurrent usage problems.
    Mark Raynsford
    @io7m
    @akarnokd Ah, interesting. Thank you! I'll give it a shot
    I sort of blanked when I saw a "serialized" subject type as I thought it referred to Serializable... :smile:
    David Karnok
    @akarnokd
    @Exerosis No, that requires a complete new library.
    Mark Raynsford
    @io7m
    @akarnokd Playing around a bit with serialized subjects... It does indeed project the subject from issues of re-entrancy and the like, but the problem is then that it turns a stack overflow exception into non-termination instead. Is there perhaps some way to make the recursion bounded? With the original stack overflow, the recursion is only bounded by the maximum stack size. I'm curious if there's some way to set a very low bound (say eight recursive calls before an unchecked exception of some type is raised)
    David Karnok
    @akarnokd
    Don't use Subjects and redesign your flows so that you don't need to trigger "yourself".
    Mark Raynsford
    @io7m
    right
    Exerosis
    @Exerosis
    @akarnokd Ah I see, although I actually did realize I can emulate the effect by creating a custom consumer. The issue is, unless I make a custom observable(have yet to check if this is even possible) I will still need to explicitly type my lambdas.
    Stephen Berger
    @GettingNifty
    Is this a general Java discussion room
    Or is there one
    Hi all
    Stephen Berger
    @GettingNifty
    plm75
    @plm75
    Hello, I'm planning on learning RxJava soon and have a little experience with RxSwift and RxJS. I was wondering if RxJava had the equivalent of the following library https://github.com/RxSwiftCommunity/Action , either as a separate library or as a language feature ? Thanks
    Stephen Berger
    @GettingNifty
    Oh I hate swift
    Stephen Berger
    @GettingNifty
    Well I'm not too good at web programming though.
    Yannick Lecaillez
    @ylecaillez

    Hi guys !
    I'm working on a custom groupBy() and reading the RxJava's one i wonder if there is a bug or something i don't get: it seems that groupBy() is request()ing twice its upstream for each receive item when each of these item is creating a new group:

    public void testGroup() {
            final AtomicLong requested = new AtomicLong();
            Flowable.range(1, 2048)
                    .doOnRequest(requested::addAndGet)
                    .groupBy(e -> e)
                    .subscribe(g -> g.subscribe(TestSubscriber.create()));
    
            System.out.println("Requested: " + requested);
    }

    Displays:
    Requested: 4224

    Ideally i would have expected 128 + 2048 so that requested of the source remains at 128.
    Am i missing something ?
    (I'm using RxJava 2.1.0)
    Stephen Berger
    @GettingNifty
    What's adsAndGet
    Yannick Lecaillez
    @ylecaillez
    its AtomicLong.addAndGet(requested)
    Stephen Berger
    @GettingNifty
    So is it retrieving and adding it toitself somehow .. I'm a newb
    Or your e reference
    Calling itself? How does that work
    Yannick Lecaillez
    @ylecaillez
    The problem i can see is if 2048 is not 2048 but something way bigger, at some point the requested value from the source will be huge.
    And if the source is at some point able to produce such number of items going into one group, we might hit an OOME ?
    @GettingNifty This is lamba. Basically its more or less the equivalent of:
    .doOnRequest(new LongConsumer() {
        void accept(long t) {
            requested.addAndGet(t);
        }
    })
    Stephen Berger
    @GettingNifty
    Maybe the one is the second array
    Try 0
    Yannick Lecaillez
    @ylecaillez
    Ok, i'm able to produce an OOME with groupBy() :-(
    Stephen Berger
    @GettingNifty
    I'm confused
    Stephen Berger
    @GettingNifty
    I think you need to schedule it or exit somehow
    var source = Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(10);
    ''var group = source.GroupBy(i => i % 3);
    group.SelectMany(
    grp =>
    grp.Max()
    .Select(value => new { grp.Key, value }))
    .Dump("group");''
    ::hh::
    How do u format
    Yannick Lecaillez
    @ylecaillez

    This is how i'm able to produce an OOME:

    ```java