dependabot-preview[bot] on nuget
dependabot-preview[bot] on dev
Bump MongoDB.Driver from 2.9.1 … (compare)
dependabot-preview[bot] on nuget
Bump MongoDB.Driver from 2.9.1 … (compare)
Tell
, shard will simply respond to a message sender (an actor)
_shard
is actually ShardRegion. General notion is that:Context.Parent
from within entity).private void HandleFindRunningJob(FindRunningNewVideo newVideo)
{
//var state = _shard.Ask<Shard.CurrentShardState>(Shard.GetCurrentShardState.Instance).Result;
//var exists = state.EntityIds.Contains(newVideo.File.ToLower());
var haveChild = Context.Child(newVideo.File.ToLower());
if (haveChild != ActorRefs.Nobody)
{
ApiBroadcaster.Tell(new NewVideoFound(newVideo.File));
_mediator.Tell(new Publish(Topics.Reporting, new ReportStatus(ReportStatusEnum.Info,
$"HandleFindRunningJob() : 'already processed' file = {newVideo.File}"
)));
_logger.Info($"HandleFindRunningJob() : 'already processed' file = {newVideo.File}");
}
}
I see what you mean. I have FileProcessing to be a stateful problem. It undergoes several steps. I need to make sure I have 1 actor (entity) per file.
Cause now, the duplicate jobs are causing file locking issues (a file is being used by another process) sort of things.
Before I move to persistent actors, I used var haveChild = Context.Child(newVideo.File.ToLower());
...
Now, I'm using shards and entities ... I need to mimic the same functionality ...
Much Appreciated
2017-12-09 16:03:35,998 [6] WARN Akka.Cluster.ClusterCoreDaemon - Cluster Node [akka.tcp://ArchiveSystem@140.125.4.1:16568] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21), Member(address = akka.tcp://ArchiveSystem@140.125.4.3:16666, Uid=807193699 status = Up, role=[web], upNumber=19)]. Node roles [importer,pubsub-node]
2017-12-09 16:03:36,013 [6] INFO Akka.Cluster.ClusterCoreDaemon - Marking node(s) as REACHABLE [Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21)]. Node roles [importer,pubsub-node]
2017-12-09 16:03:36,013 [6] INFO Archive.Shared.Cluster.ClusterStatus - UnreachableMember: Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21), Role(s): importer-old
2017-12-09 16:03:36,013 [6] INFO Archive.Shared.Cluster.ClusterStatus - UnreachableMember: Member(address = akka.tcp://ArchiveSystem@140.125.4.3:16666, Uid=807193699 status = Up, role=[web], upNumber=19), Role(s): web
2017-12-09 16:03:36,013 [6] INFO Akka.Cluster.ClusterCoreDaemon - Leader can currently not perform its duties, reachability status: [Reachability([akka.tcp://ArchiveSystem@140.125.4.1:16568 -> UniqueAddress: (akka.tcp://ArchiveSystem@140.125.4.2:16668, 81451205): Reachable [Reachable] (29)], [akka.tcp://ArchiveSystem@140.125.4.1:16568 -> UniqueAddress: (akka.tcp://ArchiveSystem@140.125.4.3:16666, 807193699): Unreachable [Unreachable] (28)])], member status: [$akka.tcp://ArchiveSystem@140.125.4.1:16568 $Up seen=$True, $akka.tcp://ArchiveSystem@140.125.4.1:16668 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.2:16568 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.2:16668 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.3:16666 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.4:16666 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.5:4053 $Up seen=$False, $akka.tcp://ArchiveSystem@140.125.4.6:4053 $Up seen=$False]
2017-12-09 16:03:36,138 [6] INFO Archive.Shared.Cluster.ClusterStatus - ReachableMember: Member(address = akka.tcp://ArchiveSystem@140.125.4.2:16668, Uid=81451205 status = Up, role=[importer-old], upNumber=21), Role(s): importer-old
2017-12-09 16:03:36,138 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address:
2017-12-09 16:03:36,138 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,138 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,154 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address: akka.tcp://ArchiveSystem@140.125.4.3:16666,
2017-12-09 16:03:36,170 [6] WARN Archive.Shared.Cluster.ClusterStatus - Unreachable Member; Role: web, Status: Up, Address:
//==============================================================
HandleStarted();
var properties = new AppProperties(app.Properties);
var token = properties.OnAppDisposing;
if (token != CancellationToken.None)
{
token.Register(() =>
{
// do stuff here for ending / disposing
HandleStopped();
log.Info("ASP.NET application stopped !");
});
}
log.Info("ASP.NET application started !");
private void HandleStopped()
{
log.Info("ASP.NET application stopped");
if (ClusterSystem == null) return;
log.Debug("Leaving cluster");
ClusterHelper.Tell(new ClusterHelper.RemoveMember());
var cluster = Akka.Cluster.Cluster.Get(ClusterSystem);
cluster.RegisterOnMemberRemoved(() => MemberRemoved(ClusterSystem));
cluster.Leave(cluster.SelfAddress);
asTerminatedEvent.WaitOne(20000);
ClusterSystem.Terminate().Wait(20000);
log.Info("Actor system terminated, exiting");
}
private async void MemberRemoved(ActorSystem actorSystem)
{
log.Info("Member removed ...");
await actorSystem.Terminate();
this.asTerminatedEvent.Set();
}
2017-12-09 16:03:43,795 [56] INFO Akka.Actor.LocalActorRef - Message Shutdown from akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml/$c to akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml/$c was not delivered. 4 dead letters encountered.
2017-12-09 16:03:43,826 [56] INFO Akka.Actor.LocalActorRef - Message FileValidationResult from akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml/$c to akka://ArchiveSystem/system/sharding/FileHandler/67/%5C%5Cnas20%5Cd%24%5Carchive%5Cincoming%5Creuters%5C201712096110wd-northkorea-missilescontributors.xml was not delivered. 5 dead letters encountered.
@object if you don't have remember-entites
flag set, cluster sharding won't produce too much events. Snapshot store will be used (by default) every 1000 consecutive sharding events stored, but this applies if shard coordinator will live long enough to produce that 1000 events within a single incarnation.
Also I hope to be able to finish DData-based cluster sharding soon (see akkadotnet/akka.net#3199). With it, event journals will no longer be needed.