Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 20:57
    IgorFedchenko commented #4085
  • 20:32
    IgorFedchenko commented #4085
  • 20:01
    IgorFedchenko commented #4085
  • 19:55
    IgorFedchenko commented #4085
  • 16:22
    Aaronontheweb labeled #3997
  • 16:22
    Aaronontheweb closed #3997
  • 16:20
    IgorFedchenko commented #3997
  • 16:08
    IgorFedchenko commented #4085
  • 15:50
    Aaronontheweb assigned #4085
  • 15:50
    Aaronontheweb labeled #4085
  • 15:50
    Aaronontheweb labeled #4085
  • 15:49
    Aaronontheweb closed #4032
  • 14:58
    IgorFedchenko commented #4032
  • 14:57
    IgorFedchenko opened #4085
  • Dec 05 17:21
    Aaronontheweb synchronize #4079
  • Dec 05 17:20
    Aaronontheweb labeled #4084
  • Dec 05 17:20
    Aaronontheweb labeled #4084
  • Dec 05 17:20
    Aaronontheweb milestoned #4084
  • Dec 05 17:20

    Aaronontheweb on dev

    Remove string interpolation fro… (compare)

  • Dec 05 17:20
    Aaronontheweb closed #4084
Ismael Hamed
@ismaelhamed
Upgrading to 1.3.5 from 1.2.3, and I'm starting to see some random serialization exceptions upon restarting some nodes:
Cannot find serializer with id [7]. The most probable reason is that the configuration entry 'akka.actor.serializers' is not in sync between the two systems.
which is weird because that's Akka's persistence serializer, which is loaded automatically
Havret
@Havret
How to disable logging for akka persistence query?
image.png
Bartosz Sypytkowski
@Horusiath
@Havret only by using higher log level
@ismaelhamed have you specified persistence config as fallback explicitly? If I remember correctly, after 1.3 there were some problems with loading configurations - because we started supporting .net standard - i.e. app.config were no longer used due to lack of ConfigurationManager API.
@Greatsamps stash can be null when reached from constructor, it's set automatically after constructor call.
Greatsamps
@Greatsamps
@Horusiath Ok great, so doing a null pointer check in this case is ok
thanks for your help
or thinking about this, becasue the constructer has not completed, would that put the stash functionality on hold? should i refactor this?
Bartosz Sypytkowski
@Horusiath
if you need initialized actor, you can always use PreStart method override
AndreSteenbergen
@AndreSteenbergen
I am trying to use the amqp stream in combination with RabbitMQ. I intend to open source the example, but I am still figuring out how the do communication with clients using rabbitMQ streams. I have the following code in the PortalCoordinator (which is going to act as the server to clien portal).
   public class AmqpPortalCoordinator : ReceiveActor
{
    private readonly IActorRef chatserver;
    private readonly ActorMaterializer mat;
    const string inboundQueue = "InboundWeChatServer";

    private readonly AmqpConnectionDetails connectionSettings;
    private Task incomingTask;

    public AmqpPortalCoordinator(IActorRef chatserver)
    {
        this.chatserver = chatserver;

        mat = ActorMaterializer.Create(Context);

        connectionSettings = AmqpConnectionDetails.Create("localhost", 5672)
                                    .WithAutomaticRecoveryEnabled(true)
                                    .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));

        var queueDeclaration = QueueDeclaration
                                    .Create(inboundQueue)
                                    .WithDurable(true)
                                    .WithAutoDelete(false);

        var queueSource = NamedQueueSourceSettings.Create(connectionSettings, inboundQueue).WithDeclarations(queueDeclaration);
        var amqpSource = AmqpSource.CommittableSource(queueSource, 1000);

        var messageSource = amqpSource.Select(m =>
        {
            var bytes = m.Message.Bytes;
            m.Ack();

            return MessagePackSerializer.Typeless.Deserialize(bytes.ToArray());
        });

        incomingTask = messageSource.RunForeach(msg => chatserver.Tell(msg), mat);
    }
}
Or am I missing the point using amqp sources?
My biggest concern is actually how to dispose the source, when the server stops for some reason
AndreSteenbergen
@AndreSteenbergen
Oh wait ... KillSwitch, I think Marc told me last moth ...
AndreSteenbergen
@AndreSteenbergen
What do you guys think?
public class AmqpPortalCoordinator : ReceiveActor, IDisposable
{
    private readonly IActorRef chatserver;
    private readonly ActorMaterializer mat;
    const string inboundQueue = "InboundWeChatServer";

    private readonly AmqpConnectionDetails connectionSettings;
    private UniqueKillSwitch killswitch;
    private Task sourceTask;

    public AmqpPortalCoordinator(IActorRef chatserver)
    {
        this.chatserver = chatserver;

        mat = ActorMaterializer.Create(Context);

        connectionSettings = AmqpConnectionDetails.Create("localhost", 5672)
                                    .WithAutomaticRecoveryEnabled(true)
                                    .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));

        var queueDeclaration = QueueDeclaration
                                    .Create(inboundQueue)
                                    .WithDurable(true)
                                    .WithAutoDelete(false);

        var queueSource = NamedQueueSourceSettings.Create(connectionSettings, inboundQueue).WithDeclarations(queueDeclaration);
        var amqpSource = AmqpSource.CommittableSource(queueSource, 50);

        var (killswitch, task) = amqpSource
                                    .ViaMaterialized(KillSwitches.Single<CommittableIncomingMessage>(), Keep.Right)
                                    .ToMaterialized(Sink.ForEach<CommittableIncomingMessage>((m) => {
                                        var bytes = m.Message.Bytes;
                                        m.Ack();
                                        chatserver.Tell(MessagePackSerializer.Typeless.Deserialize(bytes.ToArray()));
                                    }), Keep.Both).Run(mat);
        this.killswitch = killswitch;
        this.sourceTask = task;
    }

    public void Dispose()
    {
        killswitch.Shutdown();
        sourceTask.Wait();
    }
}
nathvi
@nathvi
Would it be a good idea to structure request/response message classes in the following way?
 public class RequestMessage
{
    public class Response
    {}
}
nathvi
@nathvi
That way it's easy to find the response message by using the syntax RequestMsg.Response
nathvi
@nathvi
If I'm building a message buffer to remove duplicates in an actor, is it a good idea to be able to change the eviction policy at runtime?
nathvi
@nathvi
I'm wondering how I can verify that if I send a certain actor 1000 messages with the same message id, it only responds back to the sender one message?
public class PutwallActorTests : TestKit
    {
        [Fact]
        public void RequestUpdateStateSentMultipleTimes_OnlyOneResponseBack()
        {
            var probe = CreateTestProbe();
            IActorRef putwallActor = Sys.ActorOf(PutwallActor.Props());
            var guidId = Guid.NewGuid();
            for (int i = 0; i < 100; i++)
            {
                putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
            }

            var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
            Assert.Equal(guidId, cubbyStateResponse.MessageId);
        }
    }
I know this ^^^ isn't right, but I'm not sure where to go from here.
Stijn Herreman
@stijnherreman
an actor can hold state (depending on your design), so keep a list of message IDs you already replied to
nathvi
@nathvi
I know that part, I'm just not sure on how I can test that in my unit tests :smile:

For example if I added two response message statements like:

var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
var cubbyStateResponse2 = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();

The test would fail, but my functionality would be correct because the unit test times out on the second statement.

Stijn Herreman
@stijnherreman
oh, well it's not a 100% guarantee because it works with a timeout, but this should be good enough:
var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
probe.ExpectNoMsg()
nathvi
@nathvi
:sparkles: :clap:
Thanks!
Stijn Herreman
@stijnherreman
the ExpectNoMsg call will block until the timeout (default or via argument) is reached and then continue, asserting you didn't receive a message
of course if your actor replies 5 minutes later with a message, the test won't catch it, but hopefully that doesn't occur :)
nathvi
@nathvi
Well, this is all on my local machine, so it should be fine :)
nathvi
@nathvi
Here's my final test that works as expected
[Fact]
        public void RequestUpdateStateSentMultipleTimes_OnlyOneResponseBack()
        {
            var probe = CreateTestProbe();
            IActorRef putwallActor = Sys.ActorOf(PutwallActor.Props());
            var guidId = Guid.NewGuid();
            putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
            var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
            Assert.Equal(guidId, cubbyStateResponse.MessageId);

            for (int i = 0; i < 200; i++)
            {
                putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
                probe.ExpectNoMsg(TimeSpan.FromMilliseconds(1));
            }
        }
Stijn Herreman
@stijnherreman
I'd put the ExpectNoMsg outside of the loop with a longer timeout, and personally (in my own tests) I just send a second message instead of hundreds, to verify that it only responds to the first message. But other than that your code seems fine.
nathvi
@nathvi
Why the longer timeout?
nathvi
@nathvi
@stijnherreman
Caio Proiete
@caioproiete

Hello everyone. Looking for some pointers on the right way to do the following with Akka:

Actor A needs to send a message to Actor B, however, they can’t see each other, and the only communication channel between these two actors is an Azure Blobstore that they both can read/write.

Should I be looking at implementing a custom mailbox? Custom transport? Or something else? Any recommendations welcome.

Aaron Stannard
@Aaronontheweb
@caioproiete .... why is the azure blob store the only thing they can use to communicate with each other?
and these actors are running on different processes I take it?
Caio Proiete
@caioproiete
@Aaronontheweb These are remote actors that run in completely separate networks and they cannot see each other due to firewalls, etc. But they can go to a specific place (e.g. Blobstore) where they can both read/write and exchange messages
I could easily serialize my some messages and make it work, but I’d prefer to use the Akka infrastructure
Caio Proiete
@caioproiete
For example, what is the recommended way to use RabbitMQ, ZeroMQ, etc. with Akka, so that it becomes the channel where Akka messages go through?
I basically want that, but replacing the channel with something else that will take care of persisting the message to/from Azure Blobstore (or whatever the shared place is)
Stijn Herreman
@stijnherreman
@nathvi because your actor could still be busy, 1ms isn't that long. 1 second gives you better assurance that the actor doesn't send out a message.
Paul Trandem
@ptrandem
Does anyone know if there's a way to get the CurrentMessage in PostStop() ?
Aaron Stannard
@Aaronontheweb
@ptrandem technically there isn't a CurrentMessage in that state since it occurs after the actor has begun terminating
it'd be the system message that triggers the actor to run its PostStop sequence
that would be the current message there