Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Joshua Lutz
    @MystikalPooka
    seems my Delay and timers aren't getting through in my unity unit tests... odd
    Phiroc
    @Phiroc
    Hello,
    is this the right place to ask questions about Rx.NET?
    Dorus
    @Dorus
    Just ask and see who answers :)
    Phiroc
    @Phiroc
    OK. Has anyone used Rx.Net to monitor Windows directories?
    Joshua Lutz
    @MystikalPooka
    Not personally, but I'm sure the same logic applies as to any stream. Have any specific questions?
    Peter Bons
    @Peter_Bons_twitter
    Hi All. I need some pointers on how I can achieve the following: say I get a stream of data and I want to process this using a web api. This api is limited to 10 calls per seconds. As soon as I reach this limit I need to wait until the second has elapsed and resume processing. What operater could I use for this? I don't think Buffer will do because if I get less then 10 items to send to the api per second I do not want it to wait until the seconds has elapsed.
    Dorus
    @Dorus
    @Peter_Bons_twitter Source.Select(e => Observable.Merge(Observable.Empty().Delay(TimeSpan.FromSeconds(1)), MyCall())).Merge(10)
    This will take max 10 concurrent calls, and if a call return within one second, it will extend the call to one second.
    Jonathan Sheely
    @jsheely
    I'm trying to wrap my head around threading. ObservabeOn. I want all my timer work to happen in a background but I want the Console.WriteLine to happen on the main thread. I just can't figure out how to get the main thread in a console application
    I got close with Observable.Timer().ObservabeOn(new EventLoopScheduler()) which does the work on a background thread and writes it out to the same event loop thread. But I want Thread 1.
    Jonathan Sheely
    @jsheely
    Getting closer. Now I found ObserveOn(DispatcherScheduler.Current) which won't echo out anything out on subscribe. Which I think means the thread is deadlocked.
    Jonathan Sheely
    @jsheely
    Hmm still can't seem to get this to work right. Perhaps it's not possible?
    Jonathan Sheely
    @jsheely
    Okay. So I solved it by using https://github.com/StephenCleary/AsyncEx and doing AsyncContext.Run(() => {...}) and then using ObserveOn(SynchronizationContext.Current) in my console app
    using Nito.AsyncEx;
    using System;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Windows.Threading;
    
    namespace test_threading
    {
        public class Program
        {
            static async Task Main(string[] args)
            {
                Console.WriteLine($"Main Thread: {Thread.CurrentThread.ManagedThreadId}");
                bool isEnabled = true;
    
                AsyncContext.Run(() =>
                {
                    Console.WriteLine($"Async Thread: {Thread.CurrentThread.ManagedThreadId}");
                    var obs = Observable.Interval(TimeSpan.FromMilliseconds(300))
                    .TakeUntil(o => !isEnabled)
                    .Do(i =>
                    {
                        Console.WriteLine($"Background thread {Thread.CurrentThread.ManagedThreadId}");
                    })
                    .ObserveOn(SynchronizationContext.Current)
                    .Subscribe(o => Console.WriteLine($"[1] Runnning...{o} thread: {Thread.CurrentThread.ManagedThreadId}"));
    
                    _ = Task.Delay(TimeSpan.FromSeconds(5)).ContinueWith(t =>
                    {
                        Console.WriteLine($"Setting value to false, thread: {Thread.CurrentThread.ManagedThreadId}");
                        isEnabled = false;
                    });
                });
    
                Console.ReadLine();
            }
        }
    }
    Jonathan Sheely
    @jsheely
    Final result for anyone interested.
    Peter Bons
    @Peter_Bons_twitter
    @Dorus Great, works like a charm. Now I need to figure out how this actually works.
    Dorus
    @Dorus
    @Peter_Bons_twitter hehe, i can break it up for you
    Source
      .Select(e => Observable.Merge( // merge the following two streams:
        Observable.Empty().Delay(TimeSpan.FromSeconds(1)), // this observable will not complete the first second.
        MyCall()) // this will make the call.
      ).Merge(10) // this will run max 10 inner observables in paralle
    Tom Dowdell
    @tomdowdell
    Is there a way to subscribe to an observable without having rx hold a strong reference to the callback, etc?
    For example, within an object Foo a call is made to public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext); passing in a local (lambda) method for onNext. This seems to prevents the object from being GCed.
    The solution may be flexible; only requirement is to not have rx hold a strong reference.
    Johan Larsson
    @JohanLarsson
    There is weak event handler et al
    seanalford
    @seanalford
    Hello, I'm new to Rx and need help with choosing an operator. I have an Observable.Timer that InvokesCommand(SendKeyCommand(RefreshKey)) every 1 second. The SendKey command is also bound to buttons on the UI. Which operator should I use to filter out the 1 second timer Refresh if there the user is pressing keys on the UI?
    Johan Larsson
    @JohanLarsson
    There is Where for filtering but I did not get the spec.
    Adam Buchanan
    @RejectKid
    I have a stream of messages (lets say the object has an ID on it) i want to be able to every x milliseconds get the latest message for all IDs that have come accross how do i do this?
    captainmannering
    @captainmannering
    @RejectKid - did you get a solution for your question? I have the same need as you.
    Dorus
    @Dorus
    @captainmannering @RejectKid Something like
    timer = Observable.Timer(time).Share();
    
    Source
      .GroupBy(...)
      .SelectMany(group => group.Sample(timer))
    Johan Larsson
    @JohanLarsson
    @Dorus you up for reviewing a PR?
    Just a couple of lines so don't worry :)
    Dorus
    @Dorus
    @JohanLarsson I can take a look and give my opinion, but i dont see any link :)
    Dorus
    @Dorus
    @JohanLarsson Reminds me of Buffer(2,1)
    Johan Larsson
    @JohanLarsson
    yes, it is the same
    Dorus
    @Dorus
    looks good at first sight
    Johan Larsson
    @JohanLarsson
    But typed
    I'm not super happy with the names Pair() and Paired<T>
    Dorus
    @Dorus
    Buffer(2,1).Select(a => new Paired(a[0], a[1]))
    Johan Larsson
    @JohanLarsson
    Thought about WithPrevipous()
    Dorus
    @Dorus
    Why not use a tupel?
    {Previous: ..., Current: ...}
    Johan Larsson
    @JohanLarsson
    Why not use a struct?
    I don't use value tuples much so don't really know how the named fields work when it is in a library
    Dorus
    @Dorus
    Can those be generic? Tupels can have named fields.
    It depends on what version of .net and C# you have. Tupels are kinda new.
    Can those be generic?
    Dorus
    @Dorus
    But just your own class seems fine too.
    Johan Larsson
    @JohanLarsson
    Is that about struct?
    Dorus
    @Dorus
    Yeah i was wondering if a struct could be generic.
    But i see no harm in using a class, you could argue about the name indeed.
    Paired with current and previous fields is kinda weird.
    Johan Larsson
    @JohanLarsson
    agreed