Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    @Bartolomeus-649 The slack space has more channels, including one #rxnet that does seem to be more active.
    Cannot help you with the remaining concerns, i would be interested in the answers as well.
    Rich Bryant
    @richbryant
    @Bartolomeus-649 the core System namespace defines IObservable<T> and IObserver<T>. Rx.net is, as the name implies, a set of extensions and implementations thereof. It is not (anymore) a Microsoft-run library although many MS staff including Oren still work on it and it's widely used within MS. Think of it like Json.net which became ubiquitous in MS stuff even though it was always OSS.
    If that makes your clients twitchy, well, I guess no decent streaming for you.
    kepam-gc
    @kepam-gc

    Hi. I have problem with error handling within R. I'm using SourceCache in .net. And I'm using
    .Subscribe(()=> throw new Exception, e=> Log.Error(e))

    I would suspect lambda passed to onError will trigger, its not triggering.
    At this moment I'm getting Unhandled exception and application crash.
    I need a way to not crash the system when exception in subscribe happening.
    Does anyone have any clue why it's behaving like this or how can I catch this exception?

    Dorus
    @Dorus
    @kepam-gc you can move your subscribe logic to Select instead. Calls in subscribe itself should not thow.
    .Select(()=> throw new Exception())
    .Subscribe(() => {}, e=> Log.Error(e));
    Shimmy
    @weitzhandler
    Rich Bryant
    @richbryant
    The Rx team can't do that. Only the .NET team can do that.
    whothefcares
    @whothefcares1_twitter
    Hi Guys, I am new to rx.net. wondering if I can push an observable into redis? so that I can subscribe to the observable on different server instance. Please any help is appreciated
    Rich Bryant
    @richbryant
    oof, tough question. Pop over to https://reactivex.slack.com/ and ask in the Rx channel there. It's much busier than this gitter.
    Dorus
    @Dorus
    @whothefcares1_twitter Well Rx is just a query language for async steams. You would either need to publish the events from the stream, or publish the entire stream and publish an operator that does what you want. You cannot send a live stream over the line just like you cannot send a file pointer over the line, lol.
    Jonathan Sheely
    @jsheely
    I need a hand with a round robin Observable. I'm creating a Buffer of 500 items and I want to join that buffer with one of 100 connections that round robin from a list.
    I think I want to zip with the buffer. Just trying to figure out how to get the other list to be HOT and repeat
    Jonathan Sheely
    @jsheely
    Observable.Range(0, 10).Select(o => {
            return o;    
        }).Repeat();
    This is close to what I want but I don't want it to actually run the .Select() each time. I want it to remember / store those values and just repeat them out when someone asks for the next one.
    Dorus
    @Dorus
    @jsheely Woulnd't it be better to use an AsyncQueue here instead of an observable to create a queue/buffer?
    TD
    @davi1td
    Hi all, I'm new to RX, i'm sure there is a better way than what i'm doing below, isn't there a way to repeat forever over time, but only after completion?
            public void StartGetEventsTimer()
            {
                Observable
                    .Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(5))
    
                    .ObserveOn(MainFrmSyncContext)
                    .Subscribe(async x =>
                    {
                        if (!GetDataIsrunning) await GetData();
                    });
    
            }
            public bool GetDataIsrunning;
            private async Task GetData()
            {
                if (GetDataIsrunning) return;
                Debug.WriteLine($"Started @ {DateTime.Now}");
                try
                {
                    GetDataIsrunning = true;
                    await UpdateGridsAsync();
                }
                finally
                {
                    GetDataIsrunning = false;
                }
    
    
            }
    Dorus
    @Dorus
    @davi1td Just a reminder Subscribe does not support async calls. This will run under fire-and-forget mechanism.
    https://stackoverflow.com/a/35190003/402027
    What you probably want to do here is make a SelectMany call for the getdata, and use Repeat() after that.
    TD
    @davi1td
    @Dorus Thanks that makes sense but have no idea how to do SelectMany as an alternative, Currently using a Defer , retry, repeat but it fires, i'll update with that and see what you think
    TD
    @davi1td
    @Dorus So this is what I came up with, does what I needed anyway, thanks
    public class EventServer
    {
        public static IObservable<List<Event>> EventsList()
        {
            var waitTime = TimeSpan.FromSeconds(5);
            var exceptionwaitTime = TimeSpan.FromSeconds(10);
            return Observable.FromAsync(EventsTableQuery)
                .Delay(waitTime)
                .RetryWhen(onException => onException.Delay(exceptionwaitTime))
                .Repeat();
        }
        private static async Task<List<Event>> EventsTableQuery()
        {
            var result = 
                await Task.Run(() =>
                {
                    var returnList = new List<Event>();
                    for (int i = 1; i < 5; i++)
                    {
                        returnList.Add(new Event()
                            {ID = i, Date = DateTime.Now, Message = $"Test{i}"});
                    }
                    return returnList;
                });
            return result;
        }
    }
    public class Event 
    {
        public int ID { get; set; }
        public DateTime Date { get; set; }
        public string Message { get; set; }
    }
    Dorus
    @Dorus

    @davi1td yes that works, but this delays the data after it has been generated. You could reverse that with:

    Observable.Timer(waitTime)
        .SelectMany(_ => Observable.FromAsync(EventsTableQuery))
        .RetryWhen(onException => onException.Delay(exceptionwaitTime))
        .Repeat();

    Or if you want to stay closer to what you had:

    Observable.FromAsync(EventsTableQuery)
        .RetryWhen(onException => onException.Delay(exceptionwaitTime))
        .Concat(Observable.Empty().Delay(waitTime))
        .Repeat();

    (I inverted the retryWhen and the Delay in both cases, so you might need to extend exceptionwaitTime with waitTime to get your behavior)

    TD
    @davi1td
    @Dorus Awesome Thanks for all of the input, much appreciated. So many options... Observable.Timer vs Observable.FromAsync is there a preferred one to use here ?
    TD
    @davi1td
    @Dorus Also All of the timings work perfectly fine the way you have them, i want to delay re-subscription longer if exception ...
    Dorus
    @Dorus

    @davi1td What i meant is that in your code, the delay for exceptions is waitTime + exceptionwaitTime, so 15 seconds. In mine it's only exceptionwaitTime, so 10 seconds :)

    As for the many options: Yes Rx surely is incredible flexible, there are just so many ways to set things up and methods generalize so well many can give you the same properties if you combine them correctly. Some years ago (whoa, last edit 2017), i tried to recreate a bunch of RxJs method using a small subset of them, and i got quite a few :-D

    TD
    @davi1td

    @Dorus Love that Gist :)
    So this is what I have now, both run exactly the same, 10 sec on retry

            return Observable.Timer(TimeSpan.FromSeconds(0), waitTime)
                .SelectMany(_ => Observable.FromAsync(x => BoschEventsTableQueryAsync()))
                .RetryWhen(onException => onException.Delay(exceptionwaitTime))
                .Repeat();
    
            return Observable.FromAsync(_ => BoschEventsTableQueryAsync())
                .RetryWhen(onException => onException.Delay(exceptionwaitTime))
                .Concat(Observable.Empty<List<BoschEvent>>().Delay(waitTime))
                .Repeat();

    Is one preferred over the other? Something I don't see?
    Thanks again ;)

    TD
    @davi1td

    So in the below code, while swallowing all errors is the desired flow, I'm now trying to bubble up separately for other observers to see via the Subject:EventErrors and exposing with MyErrorListener observable , is this OK or is there a better way to do this?

            private static readonly Subject<Exception> EventErrors = new();
            public static IObservable<Exception> MyErrorListener = EventErrors.AsObservable();
    
    Observable.FromAsync(EventsTableQuery)
        .RetryWhen(onException => onException.Delay(exceptionwaitTime).Do(ex =>EventErrors.OnNext(new IOException())))
        .Concat(Observable.Empty().Delay(waitTime))
        .Repeat();

    Thanks -TD

    David Young
    @thedewi
    :wave: Hi all! I've looked for many hours and I can't figure this out: how can I project blocking I/O into an Observable, but allowing the next read to start immediately no matter how slow consumption is?
    It seems like the observer always has to return from OnNext() before the observable is allowed to start blocking for the next item...
    Bart de Boer
    @boekabart
    That is true indeed. Essentially what I think you say is, you don't want any back pressure at all; this means you'll need a form of Queue between observable and consumer. For example, convert your observable to an (async) enumerable..
    2 questions to consider:
    • think in advance how 'bounded' your queue should be - if data comes in consistently faster than you consume, you'll eventually run out of memory. you need smth like a 'drop' strategy...
    • Is consuming the I/O as quickly as you can absolutely necessary? I've learned that instead of 'reading as fast as I can, worry about consuming later' is not always better than 'consuming as fast as I can' => the latter can be achieved by eg. letting the consumer enumerating the I/O (sync or async) instead of observing I/O. "let the consumer pull" instead of "push to the consumer".
    @thedewi
    David Young
    @thedewi
    I don't think on average I'll get behind, it just feels wrong that a consumer can slow down my producer which is reading a hardware buffer so it must not be late
    A buffer size of 1 would be infinitely better than 0
    But I can't figure out how I am supposed to implement that with Rx.NET
    David Young
    @thedewi
    Converting to an Enumerable does unblock my producer, but then I need something pulling at the consumer side and I picked Rx specifically to avoid that
    David Young
    @thedewi
    I feel like I'm missing something fundamental. Why is there support for various schedulers if everything is working so hard to be synchronous?
    David Young
    @thedewi
    I can convert to enumerable and back to observable, which fixes the blocking but then I can't find any way to unsubscribe.
    David Young
    @thedewi
    Am I trying to do something that is fundamentally against the Rx.NET contract?
    David Young
    @thedewi
    aha! .ObserveOn(ThreadPoolScheduler.Instance) actually lifts backpressure, but only if that scheduler is applied after the producer I want to remove backpressure from.
    Bart de Boer
    @boekabart
    Marshalling to an different thread (for example with ObserveOn) is indeed a way to do it; but be aware that you have 0 queue control (the thread pool work-item queue is your queue now).
    What's the IO you're reading? A socket?
    Bart de Boer
    @boekabart

    but only if that scheduler is applied after the producer

    Not sure what you mean by that, in fact

    David Young
    @thedewi
    @boekabart Reading sensors via a blocking API with a small buffer, so I want my reading thread to be blocked on Read() not an unpredictable OnNext(), or I might lose samples just by any intermittent delay that occurs on the consumer end.
    (Which I do anticipate since my consumer is sometimes a single-threaded event loop)
    David Young
    @thedewi
    When you say "0 queue control" you make it sound like there was a queue I could control? Where would that be? In my experience there was no queue - since OnNext() did not return until all subscribers finished processing.

    but only if that scheduler is applied after the producer

    Not sure what you mean by that, in fact

    What I meant here was that even when I specifically ask the ThreadPoolScheduler to schedule events, OnNext() continues to block. Eg this blocks -

                return Observable.Create<int>(o =>
                {
                    return ThreadPoolScheduler.Instance.Schedule(1, (ii, recurse) =>
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(1));
                        o.OnNext(ii);
                        recurse(ii + 1);
                    });
                });
    David Young
    @thedewi
    Even in that case, I had to use .ObserveOn(ThreadPoolScheduler.Instance) to ensure a non-blocking OnNext()
    David Young
    @thedewi
    Here's a full example of OnNext() blocking causing the loss of clock ticks, which I'd like to avoid. Uncommenting the ObserveOn() call is the only way I've found to solve it, but I'd love to know if there is a more nuanced approach I am missing:
    using System;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Threading;
    
    namespace rx
    {
        internal static class Program
        {
            private static void Main()
            {
                using var subscription = TickProducer()
                    //.ObserveOn(ThreadPoolScheduler.Instance)
                    .Select(Transform)
                    .Subscribe(Consume);
                Thread.Sleep(10000);
            }
    
            private static IObservable<int> TickProducer()
            {
                return Observable.Create<int>(o =>
                {
                    var stopping = false;
                    var thread = new Thread(() =>
                    {
                        var started = DateTime.UtcNow;
                        var lastTick = 0;
                        while (!stopping)
                        {
                            var seconds = (int) DateTime.UtcNow.Subtract(started).TotalSeconds;
                            if (seconds != lastTick)
                            {
                                o.OnNext(lastTick = seconds);
                            }
                        }
                    });
                    thread.Start();
                    return () =>
                    {
                        stopping = true;
                        thread.Join();
                    };
                });
            }
    
            private static int Transform(int counter)
            {
                Thread.Sleep(TimeSpan.FromSeconds(0.5));
                return -counter;
            }
    
            private static void Consume(int counter)
            {
                Console.WriteLine(counter);
                if (new Random().Next(3) == 0)
                    Thread.Sleep(TimeSpan.FromSeconds(2)); // Occasional delay
            }
        }
    }
    Expected output is -1, -2, -3, ... with every counting number, no skipped numbers.
    David Young
    @thedewi
    Note that on average this consumer can definitely keep up - it can output 1.3 numbers per second. A queue length of 1 (or 2 for reliability) would result in no data loss. My problem is the queue length is 0.