Aaronontheweb on dev
Fix #4083 - Endpoint receive bu… (compare)
Hi, we use Akka.net for a while in our production environment already. recently, we are looking at Akka.net streams, and I have a question about it.
Let's say I have an Order class.
class Order {
public string OrderID {get;set;}
...
}
the logic is, we collect a bunch of orders and transform, save to database, a typically ETL process.
the prototype could be like this:
List<Order> orders = new List<Order>();
orders.add(new Order("orderID1"));
orders.add(new Order("orderID2"));
Source.From(orders).select(x=>x.OrderID = x.OrderID + "123").RunForeach(Console.WriteLine());
After that, obviously, the original order collection, each order's ID has _123 appended, so my question is, through each flow, should I create a new instance of Order and transform on the new instance, then pass the new one to the next flow, in other words, keep the original collection element immutable.
Hi! I have a problem with clustering. I have 2 nodes which use sharding, and some lighthouses. After I restarted the nodes, they joined the cluster without issues, but the sharded actors cannot be spawned. The errors I get look like this:
Trying to register to coordinator at ["/system/sharding/ps_filesCoordinator/singleton/coordinator"], but no acknowledgement. Total [1600] buffered messages.
And there is a ton of them spamming all the time. The root cause seems to be related to restoring state of shard coordinator
{
"Depth": 0,
"ClassName": "System.ArgumentException",
"Message": "Region [akka://Oddjob/system/sharding/ps_files#1292415200] not registered\r\nParameter name: e",
"Source": "Akka.Cluster.Sharding",
"StackTraceString": " at Akka.Cluster.Sharding.PersistentShardCoordinator.State.Updated(IDomainEvent e)\r\n at Akka.Cluster.Sharding.PersistentShardCoordinator.ReceiveRecover(Object message)\r\n at Akka.Persistence.Eventsourced.<RecoveryStarted>b__90_0(Object message)\r\n at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)\r\n at Akka.Persistence.Eventsourced.<>c__DisplayClass92_0.<Recovering>b__0(Receive receive, Object message)",
"RemoteStackTraceString": null,
"RemoteStackIndex": 0,
"ExceptionMethod": {
"Name": "Updated",
"AssemblyName": "Akka.Cluster.Sharding",
"AssemblyVersion": "1.3.2.0",
"AssemblyCulture": "",
"ClassName": "Akka.Cluster.Sharding.PersistentShardCoordinator+State",
"Signature": "State Updated(IDomainEvent)",
"MemberType": 8
},
"HResult": -2147024809,
"HelpURL": null
}
Exception in ReceiveRecover when replaying event type ["Akka.Cluster.Sharding.PersistentShardCoordinator+ShardHomeAllocated"] with sequence number [8] for persistenceId ["/system/sharding/ps_filesCoordinator/singleton/coordinator"]
The wierd thing is that it fails on sequence number 8 when there is 21 entries in the DB.
Any pointers on what could be the cause of the error here? I don't understand what "Region not registered" means :|
(I know that I can probably fix it by purging the DB, but I'd like to understand why is it happening).
Hi guys, I'm doing a demo project using akka cluster and facing a problem to send message to the cluster actor. Already run Lighthouse project as Seed-Nodes and I also can deploy the actor remotely and having actor reference but the actor are not able to receiving a message. If i change user-role=api then it can receive a message
class Program
{
static void Main(string[] args)
{
var config = ConfigurationFactory.ParseString(@"
akka {
actor {
provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
}
remote {
log-remote-lifecycle-events = DEBUG
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 5001
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = [""akka.tcp://akkademo@127.0.0.1:4053""]
roles = [actorserver]
}
}
");
using (ActorSystem system = ActorSystem.Create("akkademo",config))
{
Console.ReadLine();
}
}
}
and here is the API cluster side
class Program
{
static void Main(string[] args)
{
var config = ConfigurationFactory.ParseString(@"
akka {
actor {
provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
deployment {
/actorServerGraphQL {
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
allow-local-routees = on
use-role = actorserver
}
}
}
}
remote {
log-remote-lifecycle-events = DEBUG
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 5002
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = [""akka.tcp://akkademo@127.0.0.1:4053""]
roles = [api]
}
}
");
using (ActorSystem system = ActorSystem.Create("akkademo", config))
{
var actorServer = system.ActorOf(Props.Create<ActorServerGraphQL>().WithRouter(FromConfig.Instance), "actorServerGraphQL");
actorServer.Tell(new QueryDatabase("Start query"));
//var apiActor = system.ActorOf(Props.Create(() => new ApiActor(actorServer)), "apiActor");
//apiActor.Tell(new QueryDatabase("Start query"));
Console.ReadLine();
}
}
}
Do you think this might help ? http://getakka.net/articles/clustering/cluster-sharding.html#retrieving-sharding-state
@jalchr sorry, but I'm out of the context.
On GetShardRegionState shard region will reply with ShardRegionState containing data about shards living in the current actor system and what entities are alive on each one of them.
On GetClusterShardingStats shard region will reply with ClusterShardingStats having information about shards living in the whole cluster and how many entities alive in each one of them.
okay ... this gives info about "All active Shards" ... not a single one ... right ?
Tell
, shard will simply respond to a message sender (an actor)
_shard
is actually ShardRegion. General notion is that:Context.Parent
from within entity).private void HandleFindRunningJob(FindRunningNewVideo newVideo)
{
//var state = _shard.Ask<Shard.CurrentShardState>(Shard.GetCurrentShardState.Instance).Result;
//var exists = state.EntityIds.Contains(newVideo.File.ToLower());
var haveChild = Context.Child(newVideo.File.ToLower());
if (haveChild != ActorRefs.Nobody)
{
ApiBroadcaster.Tell(new NewVideoFound(newVideo.File));
_mediator.Tell(new Publish(Topics.Reporting, new ReportStatus(ReportStatusEnum.Info,
$"HandleFindRunningJob() : 'already processed' file = {newVideo.File}"
)));
_logger.Info($"HandleFindRunningJob() : 'already processed' file = {newVideo.File}");
}
}
I see what you mean. I have FileProcessing to be a stateful problem. It undergoes several steps. I need to make sure I have 1 actor (entity) per file.
Cause now, the duplicate jobs are causing file locking issues (a file is being used by another process) sort of things.
Before I move to persistent actors, I used var haveChild = Context.Child(newVideo.File.ToLower());
...
Now, I'm using shards and entities ... I need to mimic the same functionality ...
Much Appreciated
2017-12-09 16:03:35,998 [6] WARN Akka.Cluster.ClusterCoreDaemon - Cluster Node [akka.tcp://ArchiveSystem@140.125.4.1:16568] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21), Member(address = akka.tcp://ArchiveSystem@140.125.4.3:16666, Uid=807193699 status = Up, role=[web], upNumber=19)]. Node roles [importer,pubsub-node]
2017-12-09 16:03:36,013 [6] INFO Akka.Cluster.ClusterCoreDaemon - Marking node(s) as REACHABLE [Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21)]. Node roles [importer,pubsub-node]
2017-12-09 16:03:36,013 [6] INFO Archive.Shared.Cluster.ClusterStatus - UnreachableMember: Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21), Role(s): importer-old
2017-12-09 16:03:36,013 [6] INFO Archive.Shared.Cluster.ClusterStatus - UnreachableMember: Member(address = akka.tcp://ArchiveSystem@140.125.4.3:16666, Uid=807193699 status = Up, role=[web], upNumber=19), Role(s): web
2017-12-09 16:03:36,013 [6] INFO Akka.Cluster.ClusterCoreDaemon - Leader can currently not perform its duties, reachability status: [Reachability([akka.tcp://ArchiveSystem@140.125.4.1:16568 -> UniqueAddress: (akka.tcp://ArchiveSystem@140.125.4.2:16668, 81451205): Reachable [Reachable] (29)], [akka.tcp://ArchiveSystem@140.125.4.1:16568 -> UniqueAddress: (akka.tcp://ArchiveSystem@140.125.4.3:16666, 807193699): Unreachable [Unreachable] (28)])], member status: [$akka.tcp://ArchiveSystem@140.125.4.1:16568 $Up seen=$True, $akka.tcp://ArchiveSystem@140.125.4.1:16668 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.2:16568 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.2:16668 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.3:16666 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.4:16666 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.5:4053 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.6:4053 $Up seen=$False]
2017-12-09 16:03:36,138 [6] INFO Archive.Shared.Cluster.ClusterStatus - ReachableMember: Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21), Role(s): importer-old
2017-12-09 16:03:36,138 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address:
2017-12-09 16:03:36,138 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,138 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,170 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address:
//==============================================================
HandleStarted();
var properties = new AppProperties(app.Properties);
var token = properties.OnAppDisposing;
if (token != CancellationToken.None)
{
token.Register(() =>
{
// do stuff here for ending / disposing
HandleStopped();
log.Info("ASP.NET application stopped !");
});
}
log.Info("ASP.NET application started !");
private void HandleStopped()
{
log.Info("ASP.NET application stopped");
if (ClusterSystem == null) return;
log.Debug("Leaving cluster");
ClusterHelper.Tell(new ClusterHelper.RemoveMember());
var cluster = Akka.Cluster.Cluster.Get(ClusterSystem);
cluster.RegisterOnMemberRemoved(() => MemberRemoved(ClusterSystem));
cluster.Leave(cluster.SelfAddress);
asTerminatedEvent.WaitOne(20000);
ClusterSystem.Terminate().Wait(20000);
log.Info("Actor system terminated, exiting");
}
private async void MemberRemoved(ActorSystem actorSystem)
{
log.Info("Member removed ...");
await actorSystem.Terminate();
this.asTerminatedEvent.Set();
}
2017-12-09 16:03:43,795 [56] INFO Akka.Actor.LocalActorRef - Message Shutdown from akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml/$c to akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml/$c was not delivered. 4 dead letters encountered.
2017-12-09 16:03:43,826 [56] INFO Akka.Actor.LocalActorRef - Message FileValidationResult from akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml/$c to akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml was not delivered. 5 dead letters encountered.