Aaronontheweb on dev
Remove string interpolation fro… (compare)
Cannot find serializer with id [7]. The most probable reason is that the configuration entry 'akka.actor.serializers' is not in sync between the two systems.
public class AmqpPortalCoordinator : ReceiveActor
{
private readonly IActorRef chatserver;
private readonly ActorMaterializer mat;
const string inboundQueue = "InboundWeChatServer";
private readonly AmqpConnectionDetails connectionSettings;
private Task incomingTask;
public AmqpPortalCoordinator(IActorRef chatserver)
{
this.chatserver = chatserver;
mat = ActorMaterializer.Create(Context);
connectionSettings = AmqpConnectionDetails.Create("localhost", 5672)
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));
var queueDeclaration = QueueDeclaration
.Create(inboundQueue)
.WithDurable(true)
.WithAutoDelete(false);
var queueSource = NamedQueueSourceSettings.Create(connectionSettings, inboundQueue).WithDeclarations(queueDeclaration);
var amqpSource = AmqpSource.CommittableSource(queueSource, 1000);
var messageSource = amqpSource.Select(m =>
{
var bytes = m.Message.Bytes;
m.Ack();
return MessagePackSerializer.Typeless.Deserialize(bytes.ToArray());
});
incomingTask = messageSource.RunForeach(msg => chatserver.Tell(msg), mat);
}
}
public class AmqpPortalCoordinator : ReceiveActor, IDisposable
{
private readonly IActorRef chatserver;
private readonly ActorMaterializer mat;
const string inboundQueue = "InboundWeChatServer";
private readonly AmqpConnectionDetails connectionSettings;
private UniqueKillSwitch killswitch;
private Task sourceTask;
public AmqpPortalCoordinator(IActorRef chatserver)
{
this.chatserver = chatserver;
mat = ActorMaterializer.Create(Context);
connectionSettings = AmqpConnectionDetails.Create("localhost", 5672)
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));
var queueDeclaration = QueueDeclaration
.Create(inboundQueue)
.WithDurable(true)
.WithAutoDelete(false);
var queueSource = NamedQueueSourceSettings.Create(connectionSettings, inboundQueue).WithDeclarations(queueDeclaration);
var amqpSource = AmqpSource.CommittableSource(queueSource, 50);
var (killswitch, task) = amqpSource
.ViaMaterialized(KillSwitches.Single<CommittableIncomingMessage>(), Keep.Right)
.ToMaterialized(Sink.ForEach<CommittableIncomingMessage>((m) => {
var bytes = m.Message.Bytes;
m.Ack();
chatserver.Tell(MessagePackSerializer.Typeless.Deserialize(bytes.ToArray()));
}), Keep.Both).Run(mat);
this.killswitch = killswitch;
this.sourceTask = task;
}
public void Dispose()
{
killswitch.Shutdown();
sourceTask.Wait();
}
}
public class RequestMessage
{
public class Response
{}
}
public class PutwallActorTests : TestKit
{
[Fact]
public void RequestUpdateStateSentMultipleTimes_OnlyOneResponseBack()
{
var probe = CreateTestProbe();
IActorRef putwallActor = Sys.ActorOf(PutwallActor.Props());
var guidId = Guid.NewGuid();
for (int i = 0; i < 100; i++)
{
putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
}
var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
Assert.Equal(guidId, cubbyStateResponse.MessageId);
}
}
For example if I added two response message statements like:
var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
var cubbyStateResponse2 = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
The test would fail, but my functionality would be correct because the unit test times out on the second statement.
[Fact]
public void RequestUpdateStateSentMultipleTimes_OnlyOneResponseBack()
{
var probe = CreateTestProbe();
IActorRef putwallActor = Sys.ActorOf(PutwallActor.Props());
var guidId = Guid.NewGuid();
putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
Assert.Equal(guidId, cubbyStateResponse.MessageId);
for (int i = 0; i < 200; i++)
{
putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(1));
}
}
Hello everyone. Looking for some pointers on the right way to do the following with Akka:
Actor A needs to send a message to Actor B, however, they can’t see each other, and the only communication channel between these two actors is an Azure Blobstore that they both can read/write.
Should I be looking at implementing a custom mailbox? Custom transport? Or something else? Any recommendations welcome.