Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 11:32
    nagytech edited #4091
  • 11:32
    nagytech opened #4091
  • 10:13
    nagytech commented #3954
  • 05:29
    nagytech commented #4090
  • 05:29
    nagytech opened #4090
  • Dec 11 23:52
    scptre commented #4082
  • Dec 11 14:26
    nagytech commented #3954
  • Dec 11 11:18
    nagytech edited #4089
  • Dec 11 11:17
    nagytech opened #4089
  • Dec 11 11:00
    nagytech commented #4083
  • Dec 11 08:34
    jiyeongj commented #4083
  • Dec 11 08:33
    jiyeongj commented #4083
  • Dec 11 08:33
    jiyeongj commented #4083
  • Dec 11 07:57

    dependabot-preview[bot] on nuget

    (compare)

  • Dec 11 07:57

    dependabot-preview[bot] on dev

    Bump MongoDB.Driver from 2.9.1 … (compare)

  • Dec 11 07:57
    dependabot-preview[bot] closed #104
  • Dec 11 07:52
    dependabot-preview[bot] synchronize #104
  • Dec 11 07:52

    dependabot-preview[bot] on nuget

    Bump MongoDB.Driver from 2.9.1 … (compare)

  • Dec 11 07:52
    dependabot-preview[bot] edited #104
  • Dec 11 07:51
    dependabot-preview[bot] edited #104
Greatsamps
@Greatsamps
i have not tried sending a message to it to see if one is also thrown on ReceiveAny
any thoughts on this? i can obviously do a null check before calling UnstashAll, but concerned that its not actually working
Bartosz Sypytkowski
@Horusiath
@Greatsamps where are you actually calling BecomeLoaded()?
Greatsamps
@Greatsamps
@Horusiath this is a cut-down implementation, in reality there is another method in the constructor under the ReceiveAny that does some work, then BecomesLoaded()
Ismael Hamed
@ismaelhamed
Upgrading to 1.3.5 from 1.2.3, and I'm starting to see some random serialization exceptions upon restarting some nodes:
Cannot find serializer with id [7]. The most probable reason is that the configuration entry 'akka.actor.serializers' is not in sync between the two systems.
which is weird because that's Akka's persistence serializer, which is loaded automatically
Havret
@Havret
How to disable logging for akka persistence query?
image.png
Bartosz Sypytkowski
@Horusiath
@Havret only by using higher log level
@ismaelhamed have you specified persistence config as fallback explicitly? If I remember correctly, after 1.3 there were some problems with loading configurations - because we started supporting .net standard - i.e. app.config were no longer used due to lack of ConfigurationManager API.
@Greatsamps stash can be null when reached from constructor, it's set automatically after constructor call.
Greatsamps
@Greatsamps
@Horusiath Ok great, so doing a null pointer check in this case is ok
thanks for your help
or thinking about this, becasue the constructer has not completed, would that put the stash functionality on hold? should i refactor this?
Bartosz Sypytkowski
@Horusiath
if you need initialized actor, you can always use PreStart method override
AndreSteenbergen
@AndreSteenbergen
I am trying to use the amqp stream in combination with RabbitMQ. I intend to open source the example, but I am still figuring out how the do communication with clients using rabbitMQ streams. I have the following code in the PortalCoordinator (which is going to act as the server to clien portal).
   public class AmqpPortalCoordinator : ReceiveActor
{
    private readonly IActorRef chatserver;
    private readonly ActorMaterializer mat;
    const string inboundQueue = "InboundWeChatServer";

    private readonly AmqpConnectionDetails connectionSettings;
    private Task incomingTask;

    public AmqpPortalCoordinator(IActorRef chatserver)
    {
        this.chatserver = chatserver;

        mat = ActorMaterializer.Create(Context);

        connectionSettings = AmqpConnectionDetails.Create("localhost", 5672)
                                    .WithAutomaticRecoveryEnabled(true)
                                    .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));

        var queueDeclaration = QueueDeclaration
                                    .Create(inboundQueue)
                                    .WithDurable(true)
                                    .WithAutoDelete(false);

        var queueSource = NamedQueueSourceSettings.Create(connectionSettings, inboundQueue).WithDeclarations(queueDeclaration);
        var amqpSource = AmqpSource.CommittableSource(queueSource, 1000);

        var messageSource = amqpSource.Select(m =>
        {
            var bytes = m.Message.Bytes;
            m.Ack();

            return MessagePackSerializer.Typeless.Deserialize(bytes.ToArray());
        });

        incomingTask = messageSource.RunForeach(msg => chatserver.Tell(msg), mat);
    }
}
Or am I missing the point using amqp sources?
My biggest concern is actually how to dispose the source, when the server stops for some reason
AndreSteenbergen
@AndreSteenbergen
Oh wait ... KillSwitch, I think Marc told me last moth ...
AndreSteenbergen
@AndreSteenbergen
What do you guys think?
public class AmqpPortalCoordinator : ReceiveActor, IDisposable
{
    private readonly IActorRef chatserver;
    private readonly ActorMaterializer mat;
    const string inboundQueue = "InboundWeChatServer";

    private readonly AmqpConnectionDetails connectionSettings;
    private UniqueKillSwitch killswitch;
    private Task sourceTask;

    public AmqpPortalCoordinator(IActorRef chatserver)
    {
        this.chatserver = chatserver;

        mat = ActorMaterializer.Create(Context);

        connectionSettings = AmqpConnectionDetails.Create("localhost", 5672)
                                    .WithAutomaticRecoveryEnabled(true)
                                    .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));

        var queueDeclaration = QueueDeclaration
                                    .Create(inboundQueue)
                                    .WithDurable(true)
                                    .WithAutoDelete(false);

        var queueSource = NamedQueueSourceSettings.Create(connectionSettings, inboundQueue).WithDeclarations(queueDeclaration);
        var amqpSource = AmqpSource.CommittableSource(queueSource, 50);

        var (killswitch, task) = amqpSource
                                    .ViaMaterialized(KillSwitches.Single<CommittableIncomingMessage>(), Keep.Right)
                                    .ToMaterialized(Sink.ForEach<CommittableIncomingMessage>((m) => {
                                        var bytes = m.Message.Bytes;
                                        m.Ack();
                                        chatserver.Tell(MessagePackSerializer.Typeless.Deserialize(bytes.ToArray()));
                                    }), Keep.Both).Run(mat);
        this.killswitch = killswitch;
        this.sourceTask = task;
    }

    public void Dispose()
    {
        killswitch.Shutdown();
        sourceTask.Wait();
    }
}
nathvi
@nathvi
Would it be a good idea to structure request/response message classes in the following way?
 public class RequestMessage
{
    public class Response
    {}
}
nathvi
@nathvi
That way it's easy to find the response message by using the syntax RequestMsg.Response
nathvi
@nathvi
If I'm building a message buffer to remove duplicates in an actor, is it a good idea to be able to change the eviction policy at runtime?
nathvi
@nathvi
I'm wondering how I can verify that if I send a certain actor 1000 messages with the same message id, it only responds back to the sender one message?
public class PutwallActorTests : TestKit
    {
        [Fact]
        public void RequestUpdateStateSentMultipleTimes_OnlyOneResponseBack()
        {
            var probe = CreateTestProbe();
            IActorRef putwallActor = Sys.ActorOf(PutwallActor.Props());
            var guidId = Guid.NewGuid();
            for (int i = 0; i < 100; i++)
            {
                putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
            }

            var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
            Assert.Equal(guidId, cubbyStateResponse.MessageId);
        }
    }
I know this ^^^ isn't right, but I'm not sure where to go from here.
Stijn Herreman
@stijnherreman
an actor can hold state (depending on your design), so keep a list of message IDs you already replied to
nathvi
@nathvi
I know that part, I'm just not sure on how I can test that in my unit tests :smile:

For example if I added two response message statements like:

var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
var cubbyStateResponse2 = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();

The test would fail, but my functionality would be correct because the unit test times out on the second statement.

Stijn Herreman
@stijnherreman
oh, well it's not a 100% guarantee because it works with a timeout, but this should be good enough:
var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
probe.ExpectNoMsg()
nathvi
@nathvi
:sparkles: :clap:
Thanks!
Stijn Herreman
@stijnherreman
the ExpectNoMsg call will block until the timeout (default or via argument) is reached and then continue, asserting you didn't receive a message
of course if your actor replies 5 minutes later with a message, the test won't catch it, but hopefully that doesn't occur :)
nathvi
@nathvi
Well, this is all on my local machine, so it should be fine :)
nathvi
@nathvi
Here's my final test that works as expected
[Fact]
        public void RequestUpdateStateSentMultipleTimes_OnlyOneResponseBack()
        {
            var probe = CreateTestProbe();
            IActorRef putwallActor = Sys.ActorOf(PutwallActor.Props());
            var guidId = Guid.NewGuid();
            putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
            var cubbyStateResponse = probe.ExpectMsg<PutwallActor.CubbyMessage.RequestUpdateState.Response>();
            Assert.Equal(guidId, cubbyStateResponse.MessageId);

            for (int i = 0; i < 200; i++)
            {
                putwallActor.Tell(new PutwallActor.CubbyMessage.RequestUpdateState(true, guidId), probe.Ref);
                probe.ExpectNoMsg(TimeSpan.FromMilliseconds(1));
            }
        }
Stijn Herreman
@stijnherreman
I'd put the ExpectNoMsg outside of the loop with a longer timeout, and personally (in my own tests) I just send a second message instead of hundreds, to verify that it only responds to the first message. But other than that your code seems fine.
nathvi
@nathvi
Why the longer timeout?
nathvi
@nathvi
@stijnherreman
Caio Proiete
@caioproiete

Hello everyone. Looking for some pointers on the right way to do the following with Akka:

Actor A needs to send a message to Actor B, however, they can’t see each other, and the only communication channel between these two actors is an Azure Blobstore that they both can read/write.

Should I be looking at implementing a custom mailbox? Custom transport? Or something else? Any recommendations welcome.

Aaron Stannard
@Aaronontheweb
@caioproiete .... why is the azure blob store the only thing they can use to communicate with each other?
and these actors are running on different processes I take it?
Caio Proiete
@caioproiete
@Aaronontheweb These are remote actors that run in completely separate networks and they cannot see each other due to firewalls, etc. But they can go to a specific place (e.g. Blobstore) where they can both read/write and exchange messages
I could easily serialize my some messages and make it work, but I’d prefer to use the Akka infrastructure
Caio Proiete
@caioproiete
For example, what is the recommended way to use RabbitMQ, ZeroMQ, etc. with Akka, so that it becomes the channel where Akka messages go through?
I basically want that, but replacing the channel with something else that will take care of persisting the message to/from Azure Blobstore (or whatever the shared place is)
Stijn Herreman
@stijnherreman
@nathvi because your actor could still be busy, 1ms isn't that long. 1 second gives you better assurance that the actor doesn't send out a message.