Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Johan Larsson
    @JohanLarsson
    Pair<T> is not ideal as a public type with fields Current and Previous
    Dorus
    @Dorus
    Idk if i like verb type names.
    Yeah but Pairwised doesn't improve that.
    Dorus
    @Dorus
    interesting
    Johan Larsson
    @JohanLarsson
    ^ is the reason I'm going on about Paired<T> stay consistent with the framework
    Dorus
    @Dorus
    But then you can write almost all your types in paste tense.
    But it's not wrong since we work with async data. Mmmm
    I can agree with consistency.
    aka Observable<Timestamp<T>> timestamp(this Observable<T> source, IScheduler scheduler) in C#
    Adam Buchanan
    @RejectKid
    @Dorus Perfect thank you!
    RedlineTriad
    @RedlineTriad
    Is there an easier way to return the last element of an Observable on subscribe other than wrapping than wrapping it in a BehaviourSubject?
    José Manuel Nieto
    @SuperJMN
    I don't think so :(
    Dorus
    @Dorus
    @SuperJMN I think you can ;-)
    José Manuel Nieto
    @SuperJMN
    Oh, yes, now that you say! Sorry, I'm such a pessimistic person sometimes ! haha
    RedlineTriad
    @RedlineTriad
    @Dorus wouldn't that return all the previous ones? i only want the most recent.
    Dorus
    @Dorus
    @RedlineTriad You can pass in the buffer size. Set it to 1.

    See

    Replay<TSource>(IObservable<TSource>, Int32)
    Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying bufferSize notifications.

    RedlineTriad
    @RedlineTriad
    Ah, yep, that sound like what i want.
    Dorus
    @Dorus

    Another interesting you could sometimes use is

    Replay<TSource, TResult>(IObservable<TSource>, Func<IObservable<TSource>, IObservable<TResult>>, Int32)
    Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying bufferSize notifications.

    You call it with source.Replay((src) => { ... }, 1)
    (src) => { ... } here is the selector function and inside there nyouy can use src as many times as you want and it will replay the latest value each time you subscribe to it, but after Replay you have a normal Observable instead of ConnectableObservable that you need to connect to etc.

    For example source.Replay(src => src.concat(src)) will emit source like normal, but replay the last value on complete.

    @RedlineTriad
    RedlineTriad
    @RedlineTriad
    Sounds nice, but I don't understand half of it, I am currently at the level where I still have memory leaks.
    Bartolomeus-649
    @Bartolomeus-649

    What is the status of reactive programming in c#/.net?

    1. Is there a "native" reactive technology that goes beyond System.Observable, and which is fully supported, maintained and actively being developed by Microsoft?
    2. One could get the impression that https://github.com/dotnet/reactive is the current and active repo on Github, but on that page it say "Join the conversation" and links to: http://reactiveui.net/slack.
      Ones there the latest message is from 2015 and the the channel is called "#dont-talk-in-here".
    3. When you check out Microsofts documentation for Reactive Extensions) it is from 2011 and has huge warning that the content is no longer updated.
    4. On ReactiveUI.net there's a blog post from 2018 indicating that System.Reactive is abandoned by Microsoft, and superseded by something called "Reactor".

    All these things combined makes my customers confused about the status of reactive programming in .NET and unless I find and point them to clear, concise and unambiguous resources which ensures that Microsoft is behind the technology, actively developing and supporting it and that is has a road-map where it show that it is a core part of the .NET Framework for many versions to come.

    They were burned by Microsoft abandoning both Silverlight and LightSwitch.

    Dorus
    @Dorus
    @Bartolomeus-649 The slack space has more channels, including one #rxnet that does seem to be more active.
    Cannot help you with the remaining concerns, i would be interested in the answers as well.
    Rich Bryant
    @richbryant
    @Bartolomeus-649 the core System namespace defines IObservable<T> and IObserver<T>. Rx.net is, as the name implies, a set of extensions and implementations thereof. It is not (anymore) a Microsoft-run library although many MS staff including Oren still work on it and it's widely used within MS. Think of it like Json.net which became ubiquitous in MS stuff even though it was always OSS.
    If that makes your clients twitchy, well, I guess no decent streaming for you.
    kepam-gc
    @kepam-gc

    Hi. I have problem with error handling within R. I'm using SourceCache in .net. And I'm using
    .Subscribe(()=> throw new Exception, e=> Log.Error(e))

    I would suspect lambda passed to onError will trigger, its not triggering.
    At this moment I'm getting Unhandled exception and application crash.
    I need a way to not crash the system when exception in subscribe happening.
    Does anyone have any clue why it's behaving like this or how can I catch this exception?

    Dorus
    @Dorus
    @kepam-gc you can move your subscribe logic to Select instead. Calls in subscribe itself should not thow.
    .Select(()=> throw new Exception())
    .Subscribe(() => {}, e=> Log.Error(e));
    Shimmy
    @weitzhandler
    Rich Bryant
    @richbryant
    The Rx team can't do that. Only the .NET team can do that.
    whothefcares
    @whothefcares1_twitter
    Hi Guys, I am new to rx.net. wondering if I can push an observable into redis? so that I can subscribe to the observable on different server instance. Please any help is appreciated
    Rich Bryant
    @richbryant
    oof, tough question. Pop over to https://reactivex.slack.com/ and ask in the Rx channel there. It's much busier than this gitter.
    Dorus
    @Dorus
    @whothefcares1_twitter Well Rx is just a query language for async steams. You would either need to publish the events from the stream, or publish the entire stream and publish an operator that does what you want. You cannot send a live stream over the line just like you cannot send a file pointer over the line, lol.
    Jonathan Sheely
    @jsheely
    I need a hand with a round robin Observable. I'm creating a Buffer of 500 items and I want to join that buffer with one of 100 connections that round robin from a list.
    I think I want to zip with the buffer. Just trying to figure out how to get the other list to be HOT and repeat
    Jonathan Sheely
    @jsheely
    Observable.Range(0, 10).Select(o => {
            return o;    
        }).Repeat();
    This is close to what I want but I don't want it to actually run the .Select() each time. I want it to remember / store those values and just repeat them out when someone asks for the next one.
    Dorus
    @Dorus
    @jsheely Woulnd't it be better to use an AsyncQueue here instead of an observable to create a queue/buffer?
    TD
    @davi1td
    Hi all, I'm new to RX, i'm sure there is a better way than what i'm doing below, isn't there a way to repeat forever over time, but only after completion?
            public void StartGetEventsTimer()
            {
                Observable
                    .Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(5))
    
                    .ObserveOn(MainFrmSyncContext)
                    .Subscribe(async x =>
                    {
                        if (!GetDataIsrunning) await GetData();
                    });
    
            }
            public bool GetDataIsrunning;
            private async Task GetData()
            {
                if (GetDataIsrunning) return;
                Debug.WriteLine($"Started @ {DateTime.Now}");
                try
                {
                    GetDataIsrunning = true;
                    await UpdateGridsAsync();
                }
                finally
                {
                    GetDataIsrunning = false;
                }
    
    
            }
    Dorus
    @Dorus
    @davi1td Just a reminder Subscribe does not support async calls. This will run under fire-and-forget mechanism.
    https://stackoverflow.com/a/35190003/402027
    What you probably want to do here is make a SelectMany call for the getdata, and use Repeat() after that.
    TD
    @davi1td
    @Dorus Thanks that makes sense but have no idea how to do SelectMany as an alternative, Currently using a Defer , retry, repeat but it fires, i'll update with that and see what you think
    TD
    @davi1td
    @Dorus So this is what I came up with, does what I needed anyway, thanks
    public class EventServer
    {
        public static IObservable<List<Event>> EventsList()
        {
            var waitTime = TimeSpan.FromSeconds(5);
            var exceptionwaitTime = TimeSpan.FromSeconds(10);
            return Observable.FromAsync(EventsTableQuery)
                .Delay(waitTime)
                .RetryWhen(onException => onException.Delay(exceptionwaitTime))
                .Repeat();
        }
        private static async Task<List<Event>> EventsTableQuery()
        {
            var result = 
                await Task.Run(() =>
                {
                    var returnList = new List<Event>();
                    for (int i = 1; i < 5; i++)
                    {
                        returnList.Add(new Event()
                            {ID = i, Date = DateTime.Now, Message = $"Test{i}"});
                    }
                    return returnList;
                });
            return result;
        }
    }
    public class Event 
    {
        public int ID { get; set; }
        public DateTime Date { get; set; }
        public string Message { get; set; }
    }
    Dorus
    @Dorus

    @davi1td yes that works, but this delays the data after it has been generated. You could reverse that with:

    Observable.Timer(waitTime)
        .SelectMany(_ => Observable.FromAsync(EventsTableQuery))
        .RetryWhen(onException => onException.Delay(exceptionwaitTime))
        .Repeat();

    Or if you want to stay closer to what you had:

    Observable.FromAsync(EventsTableQuery)
        .RetryWhen(onException => onException.Delay(exceptionwaitTime))
        .Concat(Observable.Empty().Delay(waitTime))
        .Repeat();

    (I inverted the retryWhen and the Delay in both cases, so you might need to extend exceptionwaitTime with waitTime to get your behavior)

    TD
    @davi1td
    @Dorus Awesome Thanks for all of the input, much appreciated. So many options... Observable.Timer vs Observable.FromAsync is there a preferred one to use here ?
    TD
    @davi1td
    @Dorus Also All of the timings work perfectly fine the way you have them, i want to delay re-subscription longer if exception ...
    Dorus
    @Dorus

    @davi1td What i meant is that in your code, the delay for exceptions is waitTime + exceptionwaitTime, so 15 seconds. In mine it's only exceptionwaitTime, so 10 seconds :)

    As for the many options: Yes Rx surely is incredible flexible, there are just so many ways to set things up and methods generalize so well many can give you the same properties if you combine them correctly. Some years ago (whoa, last edit 2017), i tried to recreate a bunch of RxJs method using a small subset of them, and i got quite a few :-D

    TD
    @davi1td

    @Dorus Love that Gist :)
    So this is what I have now, both run exactly the same, 10 sec on retry

            return Observable.Timer(TimeSpan.FromSeconds(0), waitTime)
                .SelectMany(_ => Observable.FromAsync(x => BoschEventsTableQueryAsync()))
                .RetryWhen(onException => onException.Delay(exceptionwaitTime))
                .Repeat();
    
            return Observable.FromAsync(_ => BoschEventsTableQueryAsync())
                .RetryWhen(onException => onException.Delay(exceptionwaitTime))
                .Concat(Observable.Empty<List<BoschEvent>>().Delay(waitTime))
                .Repeat();

    Is one preferred over the other? Something I don't see?
    Thanks again ;)