dependabot[bot] on nuget
Bump NUnit from 3.7.1 to 3.13.3… (compare)
dependabot[bot] on nuget
Bump Fsharp.Core from 6.0.1 to … (compare)
dependabot[bot] on nuget
Bump FluentAssertions from 5.10… (compare)
dependabot[bot] on nuget
Bump System.Collections.Immutab… (compare)
dependabot[bot] on nuget
Bump System.Threading.Channels … (compare)
dependabot[bot] on nuget
Bump System.Configuration.Confi… (compare)
dependabot[bot] on nuget
Bump Microsoft.Extensions.Depen… (compare)
dependabot[bot] on nuget
Bump Microsoft.Extensions.Depen… (compare)
Receive<ClusterManager.SubscribeToManager>(ic =>
{
Program.MyActorSystem.Settings.InjectTopLevelFallback(ClusterClientReceptionist.DefaultConfig());
clusterClient = Program.MyActorSystem.ActorOf(ClusterClient.Props(ClusterClientSettings.Create(Program.MyActorSystem)));
Context.Watch(clusterClient);
clusterClient.Tell(new ClusterClient.Send(ActorPaths.ClusterManagerActor.Path, new ClusterManager.SubscribeToManager()));
});
Receive<Terminated>(ic =>
{
_logger.Info("Address Terminated: {0}", ic.AddressTerminated.ToString());
});
@Horusiath do you have any idea why Sink.FromSubscriber
gives this error
[INFO][16-Apr-16 14:10:51][Thread 0008][akka://ReactiveRabbit/user/StreamSupervisor-0/Flow-0-0-unknown-operation] Message RequestMore from NoSender to akka://ReactiveRabbit/user/StreamSupervisor-0/Flow-0-0-unknown-operation was not delivered. 1 dead letters encountered.
here is how it's called
var trailMessages = new List<string> { "message1","message2", "message3"};
Source.From(trailMessages)
.Map(
msg =>
new Message(Encoding.UTF8.GetBytes(msg)))
.RunWith(Sink.FromSubscriber(connection.Publish(inboundExchange.Name, "")), mat);
internal class ExchangeSubscriber : ISubscriber<Routed>
{
private readonly IModel _channel;
private readonly string _exchange;
private readonly AtomicReference<ISubscription> _active = new AtomicReference<ISubscription>();
private int _publishingThreadRunning = 0;//can have 2 values, 0 means not running, 1 means running.
private readonly ConcurrentQueue<Routed> _buffer = new ConcurrentQueue<Routed>();
private int _closeRequested = 0;//can have 2 values, 0 means not requested, 1 means requested.
public ExchangeSubscriber(IModel channel, string exchange)
{
if (exchange.Length >= 255)
throw new ArgumentOutOfRangeException(nameof(exchange), exchange.Length, "exchange.Length>=255");
_channel = channel;
_exchange = exchange;
}
public void OnNext(Routed element)
{
if(element==null)
throw new ArgumentNullException(nameof(element));
_buffer.Enqueue(element);
var running = Interlocked.Exchange(ref _publishingThreadRunning, 1);
if (running == 0)
{
Task.Factory.StartNew(PublishFromBuffer, TaskCreationOptions.AttachedToParent);//revisit this
}
}
private void PublishFromBuffer()
{
Routed element;
if (_buffer.TryDequeue(out element))
{
Publish(element);
}
var nonEmpty = _buffer.Count > 0;
Interlocked.Exchange(ref _publishingThreadRunning, nonEmpty ? 1 : 0);
if(nonEmpty)
PublishFromBuffer();
}
private void Publish(Routed routed)
{
try
{
_channel.BasicPublish(_exchange, routed.RoutingKey,
Conversions.ToBasicProperties(routed.Message),
routed.Message.Body);
_active.Value.Request(1);
}
catch (Exception ex)
{
_active.Value.Cancel();
CloseChannel();
}
}
public void OnSubscribe(ISubscription subscription)
{
var sub = _active.CompareAndSet(null, subscription);
if(sub)
subscription.Request(1);
else
{
subscription.Cancel();
}
}
public void OnError(Exception cause)
{
ShutdownWhenFinished();
}
public void OnComplete()
{
ShutdownWhenFinished();
}
public void OnNext(object element)
{
OnNext((Routed)element);
}
private void CloseChannel()
{
var closedRequested = Interlocked.Exchange(ref _closeRequested, 1) == 0;
lock (_channel)
{
if (closedRequested && _channel.IsOpen)
_channel.Close();
}
}
private void ShutdownWhenFinished()
{
// Future {
// publishingThreadRunning.single.await(!_)
// closeChannel()
//}
Task.Factory.StartNew(() =>
{
while (true)
{
if (Thread.VolatileRead(ref _publishingThreadRunning) == 0)
break;
}
}, TaskCreationOptions.AttachedToParent).Wait();
CloseChannel();
}
}
BecomeStacked(msg =>
{
return OldBecome(msg) || msg.Match()
.With<Response>(m =>
{
UnbecomeStacked();
})
.WasHandled;
});
Suspend
is supported now, there is an overload of Receive that accepts a Func<T, Task> it have been renamed to ReceiveAsync in 1.0.7