Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    David Young
    @thedewi
    I feel like I'm missing something fundamental. Why is there support for various schedulers if everything is working so hard to be synchronous?
    David Young
    @thedewi
    I can convert to enumerable and back to observable, which fixes the blocking but then I can't find any way to unsubscribe.
    David Young
    @thedewi
    Am I trying to do something that is fundamentally against the Rx.NET contract?
    David Young
    @thedewi
    aha! .ObserveOn(ThreadPoolScheduler.Instance) actually lifts backpressure, but only if that scheduler is applied after the producer I want to remove backpressure from.
    Bart de Boer
    @boekabart
    Marshalling to an different thread (for example with ObserveOn) is indeed a way to do it; but be aware that you have 0 queue control (the thread pool work-item queue is your queue now).
    What's the IO you're reading? A socket?
    Bart de Boer
    @boekabart

    but only if that scheduler is applied after the producer

    Not sure what you mean by that, in fact

    David Young
    @thedewi
    @boekabart Reading sensors via a blocking API with a small buffer, so I want my reading thread to be blocked on Read() not an unpredictable OnNext(), or I might lose samples just by any intermittent delay that occurs on the consumer end.
    (Which I do anticipate since my consumer is sometimes a single-threaded event loop)
    David Young
    @thedewi
    When you say "0 queue control" you make it sound like there was a queue I could control? Where would that be? In my experience there was no queue - since OnNext() did not return until all subscribers finished processing.

    but only if that scheduler is applied after the producer

    Not sure what you mean by that, in fact

    What I meant here was that even when I specifically ask the ThreadPoolScheduler to schedule events, OnNext() continues to block. Eg this blocks -

                return Observable.Create<int>(o =>
                {
                    return ThreadPoolScheduler.Instance.Schedule(1, (ii, recurse) =>
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(1));
                        o.OnNext(ii);
                        recurse(ii + 1);
                    });
                });
    David Young
    @thedewi
    Even in that case, I had to use .ObserveOn(ThreadPoolScheduler.Instance) to ensure a non-blocking OnNext()
    David Young
    @thedewi
    Here's a full example of OnNext() blocking causing the loss of clock ticks, which I'd like to avoid. Uncommenting the ObserveOn() call is the only way I've found to solve it, but I'd love to know if there is a more nuanced approach I am missing:
    using System;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Threading;
    
    namespace rx
    {
        internal static class Program
        {
            private static void Main()
            {
                using var subscription = TickProducer()
                    //.ObserveOn(ThreadPoolScheduler.Instance)
                    .Select(Transform)
                    .Subscribe(Consume);
                Thread.Sleep(10000);
            }
    
            private static IObservable<int> TickProducer()
            {
                return Observable.Create<int>(o =>
                {
                    var stopping = false;
                    var thread = new Thread(() =>
                    {
                        var started = DateTime.UtcNow;
                        var lastTick = 0;
                        while (!stopping)
                        {
                            var seconds = (int) DateTime.UtcNow.Subtract(started).TotalSeconds;
                            if (seconds != lastTick)
                            {
                                o.OnNext(lastTick = seconds);
                            }
                        }
                    });
                    thread.Start();
                    return () =>
                    {
                        stopping = true;
                        thread.Join();
                    };
                });
            }
    
            private static int Transform(int counter)
            {
                Thread.Sleep(TimeSpan.FromSeconds(0.5));
                return -counter;
            }
    
            private static void Consume(int counter)
            {
                Console.WriteLine(counter);
                if (new Random().Next(3) == 0)
                    Thread.Sleep(TimeSpan.FromSeconds(2)); // Occasional delay
            }
        }
    }
    Expected output is -1, -2, -3, ... with every counting number, no skipped numbers.
    David Young
    @thedewi
    Note that on average this consumer can definitely keep up - it can output 1.3 numbers per second. A queue length of 1 (or 2 for reliability) would result in no data loss. My problem is the queue length is 0.
    Robert Rajakone
    @robie2011
    Question about disposing observable: If i have one subject and multiple observables created from it, is it enough to dispose my subject or is it also necessary to dispose my observables in this case?
    If so, is the order of dispose invocation relevant?
    Dorus
    @Dorus
    @robie2011 Preferable complete your subject?
    Dorus
    @Dorus
    Also if your observables have operators like SelectMany on them with active inner observable, those will stay active unless you also dispose the subscriptions.
    Otherwise if you dont complete, i do think your subscriptions will eventually be GC'd, but there might be some delay in running this cleanup.
    Robert Rajakone
    @robie2011
    Ok. I will try. Is there any books that you can recommand? @Dorus
    Dorus
    @Dorus
    Robert Rajakone
    @robie2011
    thx
    Rich Bryant
    @richbryant
    this is a little old but still valuable - https://www.manning.com/books/rx-dot-net-in-action
    Robert Rajakone
    @robie2011
    thx rich
    Robert Rajakone
    @robie2011
    Question: If I want to combine two streams and want to have latest value of both but don't want to wait till both have a values how do I do? CombineLast seems to be the right operator but it waits for both streams.
    Olly Atkins
    @oatkins
    One way is to make sure both have a value by using Prepend() on each stream to supply an initial default.
    Magnus Lindhe
    @mgnslndh
    Hi! How can I combine Sample/Throttle to get values every X milliseconds (Sample) but also after Y amount of inactivity (Throttle)?
    Robert Rajakone
    @robie2011
    Maybe Window(TimeSpan) operator can do this for you… http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html#Window @mgnslndh
    Robert Rajakone
    @robie2011
    I want to do some calculation when one of many observable changes. This can be done by project (Select) each observable to a dummy value and merging it all together and finally calling select-operator with desired calculation.
    Question: Is there an easy way to do this instead of projecting all observable to dummy value and merging it together?
    HairingX
    @HairingX
    Hard to say without more concrete example.
    To combine observables into an array of observables you can use https://rxjs.dev/api/index/function/combineLatest. It requires all observablea to send at least 1 value before it continues though
    Dorus
    @Dorus
    Since this is the rx.net chat, better to link to the general documentation: http://reactivex.io/documentation/operators/combinelatest.html
    HairingX
    @HairingX
    Old habbits die hard 😆
    Robert Rajakone
    @robie2011
    i try to give an example. Assume we have following observables: a1, a2, a3.
    If one of this observable send a message than i want to call a fuction f1.
    Dorus
    @Dorus
    @robie2011 Either Observable.Merge(a1, a2, a3) or a1.CombineLatest(a2, a3) comes to mind, depending if you need all the latest values or just the one that changed.
    Robert Rajakone
    @robie2011
    @Dorus I dont need any values of these observables and they are of different generic parameter (suppose T1, T2, T3). I just want to call my function.
    Dorus
    @Dorus
    @robie2011 Merge would be enough then
    Pavel Kabir
    @wh1t3cAt1k
    Hi, can anyone look at this and point at my dunce hat? I am questioning my sanity and can't understand how this can happen
    image.png
    On System.Reactive v.5.0.0 I was experiencing duplicate timestamped (!) values after DistinctUntilChanged, so I put some side effect code with a semicolon in the screenshot above. The breakpoint at the semicolon has been hit. How can that be?
    The UtcTicks values are indeed the same in the "marble" and in the memorized last value.
    It seems like DistinctUntilChanged() is not working as expected, I am obviously missing something here
    What's worse is that even replacing DistinctUntilChanged() with Distinct() does not help, which surprises me even more
    Pavel Kabir
    @wh1t3cAt1k
    Using Synchronize() on both combined source observables does not change the picture, I still see the breakpoint being hit
    ParadiseFallen
    @ParadiseFallen
    hi, how i can dispose my object after sending it OnNext()?
    
    var message = new MessageRecivedArgs() { Data = buffer };
                        // fire message event args
                        MessageRecivedSubject.OnNext(message);
    MessageRecivedArgs is disposible object that consumes large memory
    how i can dispose it after all OnNext() done? even async listiners
    Thomas
    @thsbrown
    Hello everyone, not sure if this is the place to come to get some help on an issue I've been facing with Timeout
    I've since figured out how to get the desired behavior, but it feels very odd to me that Timeout documentation states timeout will issue an error notification if a particular period of time elapses without any emitted items. This doesn't seem to be the case though, it seems like no matter what the observable must be completed in order for this to be registered. I'm just wondering why that bit isn't documented 🤔
    Daz0269
    @Daz0269
    Hello there, I'm a Unity game developer and rookie for the reactive programming.
    I wanna start to use it.
    However I don't know what should I started with between UniRx and C# rx..
    Could anyone fammiliar with rx programming with these two and guide me which to use plz?