but only if that scheduler is applied after the producer
Not sure what you mean by that, in fact
What I meant here was that even when I specifically ask the ThreadPoolScheduler to schedule events, OnNext()
continues to block. Eg this blocks -
return Observable.Create<int>(o =>
{
return ThreadPoolScheduler.Instance.Schedule(1, (ii, recurse) =>
{
Thread.Sleep(TimeSpan.FromSeconds(1));
o.OnNext(ii);
recurse(ii + 1);
});
});
OnNext()
blocking causing the loss of clock ticks, which I'd like to avoid. Uncommenting the ObserveOn()
call is the only way I've found to solve it, but I'd love to know if there is a more nuanced approach I am missing:using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace rx
{
internal static class Program
{
private static void Main()
{
using var subscription = TickProducer()
//.ObserveOn(ThreadPoolScheduler.Instance)
.Select(Transform)
.Subscribe(Consume);
Thread.Sleep(10000);
}
private static IObservable<int> TickProducer()
{
return Observable.Create<int>(o =>
{
var stopping = false;
var thread = new Thread(() =>
{
var started = DateTime.UtcNow;
var lastTick = 0;
while (!stopping)
{
var seconds = (int) DateTime.UtcNow.Subtract(started).TotalSeconds;
if (seconds != lastTick)
{
o.OnNext(lastTick = seconds);
}
}
});
thread.Start();
return () =>
{
stopping = true;
thread.Join();
};
});
}
private static int Transform(int counter)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
return -counter;
}
private static void Consume(int counter)
{
Console.WriteLine(counter);
if (new Random().Next(3) == 0)
Thread.Sleep(TimeSpan.FromSeconds(2)); // Occasional delay
}
}
}
DistinctUntilChanged()
is not working as expected, I am obviously missing something here
DistinctUntilChanged()
with Distinct()
does not help, which surprises me even more
var message = new MessageRecivedArgs() { Data = buffer };
// fire message event args
MessageRecivedSubject.OnNext(message);
MessageRecivedArgs
is disposible object that consumes large memory
Timeout
documentation states timeout will issue an error notification if a particular period of time elapses without any emitted items. This doesn't seem to be the case though, it seems like no matter what the observable must be completed in order for this to be registered. I'm just wondering why that bit isn't documented 🤔