Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 00:46
    akka-ci commented #27496
  • 00:46
    akka-ci unlabeled #27496
  • 00:46
    akka-ci labeled #27496
  • 00:32
    akka-ci labeled #27496
  • 00:27
    helena commented #27496
  • 00:22
    helena closed #27737
  • 00:21
    akka-ci unlabeled #27708
  • 00:21
    akka-ci labeled #27708
  • 00:19
    helena synchronize #27708
  • Sep 17 22:50
    helena commented #27739
  • Sep 17 22:49
    helena labeled #27739
  • Sep 17 22:49
    helena labeled #27739
  • Sep 17 22:48
    helena opened #27739
  • Sep 17 21:35
    akka-ci commented #27737
  • Sep 17 21:32
    csilla1990 starred akka/akka
  • Sep 17 21:09
    akka-ci commented #27737
  • Sep 17 21:09
    akka-ci unlabeled #27737
  • Sep 17 21:09
    akka-ci labeled #27737
  • Sep 17 21:05
    csilla1990 starred akka/akka
  • Sep 17 21:05
    akka-ci unlabeled #27737
James Phillips
@jdrphillips
What am I missing?
Igmar Palsenberg
@igmar
@harshjain2 That is just akka within a container. What issues do you have ?
Nik Gushchin
@Dr-Nikson

Guys, hello!)

I’m working on actor that will do some deserialization (this process requires a lot of the CPU).
Main requirement is that I can process only one message at time (no parallelism allowed) — deserializer is mutable and stateful for performance reasons.

So, because of that I’ve created separate dispatcher for this kind of actors

avro-deserializing-dispatcher {
  type = PinnedDispatcher
  executor = "thread-pool-executor"

  thread-pool-executor.allow-core-timeout = off
}

And my dispatcher looks like:

implicit val executionContext: ExecutionContext = context.dispatcher

override def receive: Receive = {
  case Deserialize(record) =>
    val senderRef = sender()

    Thread.sleep(2000) // Just for tests

    val entity = deserializer
      .deserialize(record.topic(), record.headers(), record.value())

    Future.successful(entity).pipeTo(senderRef)
}

Is it correct?
Should I wrap it with concurrent.blocking?

I appreciate any help :)

Igmar Palsenberg
@igmar

Normally is like

Future {
} (executionContext)

or CompletableFuture.supplyAsync(() -> { ... }, customExecutor);
ShankarShastri
@shankarshastri
@Dr-Nikson , are you ensuring that the dispatcher is set when the actor is created?
Igmar Palsenberg
@igmar
I prefer to directly couple it with the future executing. And because you can't do it any other way in Java :)
Nik Gushchin
@Dr-Nikson
@shankarshastri yes, I’ve configured actor.deployment sections to ensure my actors’re using this dispatcher
Igmar Palsenberg
@igmar
Should be OK them. You can dump the dispatchers name to ensure it's really using the right one.
Helena Edelson
@helena
@shankarshastri, re "How to dynamically add cluster master nodes in a akka-cluster” - if you asking about the seed nodes and getting them dynamically, you can use Cluster Bootstrap and the relevant library for your dynamic environment in Akka Management https://doc.akka.io/docs/akka-management/current/bootstrap/index.html.
novemberswimmer
@novemberswimmer
Hello. I'm trying to find description and understand this parameter scala.collection.immutable.Map<java.lang.String,​scala.Option<Address>> roleLeaderMap of ClusterEvent.CurrentClusterState. Can someone please describe this parameter? Scala docs does not describe it.
Nik Gushchin
@Dr-Nikson

@igmar yep dispatcher is ok, thanks.

Now I’ve created a BalancingPool to achieve parallel execution (deserializing in my case), but it seems that I can’t configure router to just don’t wait for previuos message to finish processing and start the new one… but I’ve got:

[16:55:33.048] [TestActorSystem-BalancingPool-/deserializersPool/router-7] - Start de-serializing record 2
[16:55:35.052] [TestActorSystem-BalancingPool-/deserializersPool/router-7] - Finish de-serializing record 2
[16:55:35.067] [TestActorSystem-BalancingPool-/deserializersPool/router-6] - Start de-serializing record 5
[16:55:35.071] [ScalaTest-run-running-KafkaAvroDeserializerPoolActorTest] - Collect de-serialized record 2
[16:55:37.070] [TestActorSystem-BalancingPool-/deserializersPool/router-6] - Finish de-serializing record 5
[16:55:37.071] [TestActorSystem-BalancingPool-/deserializersPool/router-5] - Start de-serializing record 4
[16:55:37.072] [ScalaTest-run-running-KafkaAvroDeserializerPoolActorTest] - Collect de-serialized record 5
[16:55:39.074] [TestActorSystem-BalancingPool-/deserializersPool/router-5] - Finish de-serializing record 4
[16:55:39.075] [TestActorSystem-BalancingPool-/deserializersPool/router-9] - Start de-serializing record 1
[16:55:39.075] [ScalaTest-run-running-KafkaAvroDeserializerPoolActorTest] - Collect de-serialized record 4
[16:55:41.080] [TestActorSystem-BalancingPool-/deserializersPool/router-9] - Finish de-serializing record 1
[16:55:41.081] [ScalaTest-run-running-KafkaAvroDeserializerPoolActorTest] - Collect de-serialized record 1

Now I’m using balancing pool like this:

val router: ActorRef = context.actorOf(
    BalancingPool(nrOfInstances).props(workerProps),
    "router"
  )

  def receive: Receive = {
    case w: KafkaAvroDeserializerActor.Message =>
      log.info("Message received")
      router.forward(w)
      log.info("Message routed")
  }

And config looks like that:

/deserializersPool/router {
      router = balancing-pool
      nr-of-instances = 5
      pool-dispatcher {
        executor = "thread-pool-executor"

        thread-pool-executor.allow-core-timeout = off
      }
    }

It there any options to process these messages in parallel or I should write something custom?

Johan Andrén
@johanandren
@novemberswimmer looks like we are missing a Java API for that, the corresponding type would be Map<String, Optional<Address>>, where the keys are the roles, and the values are the Optional leader addresses within the current DC - if there are no nodes of a certain role in the DC the Optional will be Optional.empty
vikramdarsi
@vikramdarsi
Hi, I am new to Akka, want to understand the meaning of message "akka.cluster.InternalClusterAction$InitJoinNack"
Issue we are observing was, the two seed-nodes have not formed the cluster and we see the above message is been exchanged between them, in addition to "akka.tcp://opendaylight-cluster-data@10.1.23.143:2550/system/cluster/core/daemon/joinSeedNodeProcess"
vikramdarsi
@vikramdarsi
@johanandren Thanks for sharing the link, Want to know what could be the reasons for that message exchange when the seed-node list is properly fed in the cluster config? how can this be further debugged for the reasons ?
When queried, akka:type=Cluster MBean on each node, the member list shown is empty . -> https://pastebin.com/YfXTknzG
Johan Andrén
@johanandren
@vikramdarsi I’d recommend reading up on cluster formation in the docs, and then verify that you are either doing the programatic join or the config join correctly: https://doc.akka.io/docs/akka/current/cluster-usage.html#cluster-usage
vikramdarsi
@vikramdarsi
@johanandren Actually it is a working 3 node akka cluster, when we did power down on all the three nodes and started only two among them, then the problem is seen... want to debug what could have gone wrong here
Johan Andrén
@johanandren
@vikramdarsi Perhaps configuring the one node you didn’t start as the seed node?
vikramdarsi
@vikramdarsi
@johanandren From akka.conf : seed-nodes = ["akka.tcp://opendaylight-cluster-data@10.1.23.142:2550",
"akka.tcp://opendaylight-cluster-data@10.1.23.143:2550",
"akka.tcp://opendaylight-cluster-data@10.1.23.144:2550"]
is akka.tcp://opendaylight-cluster-data@10.1.23.142:2550 called seed-node as per the above config? Sorry for the dumb question ?
We do not dynamically add new node to the cluster, seed-node list is static
Johan Andrén
@johanandren
@vikramdarsi please read up on how cluster formation works in Akka
vikramdarsi
@vikramdarsi
@johanandren Sure, Thanks for the information
RAJKUMAR NATARAJAN
@rajcspsg

Hi

I'm looking for akka actor routing logic.

send n different messages to n different actors and get results from actors to parent actor
(i.e. similar to below in java threading).

ExecutorService executorService = 
  new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,   
  new LinkedBlockingQueue<Runnable>());

  Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution " + Math.random();
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

List<Future<String>> futures = executorService.invokeAll(callableTasks);
Brian Maso
@bmaso
@rajcspsg Look up Akka's RoundRobinRouter (https://doc.akka.io/docs/akka/2.2.3/scala/routing.html#RoundRobinRouter) -- the router should take the place of the "parent actor" you are describing, and the routees would be the n actors to which messages (tasks) are routed. And of course you'd need to use the ask pattern if you wanted the results of the n calculations sent back to you.
RAJKUMAR NATARAJAN
@rajcspsg
I'm think of Broadcast like below
object Broadcastpool extends App {
   val actorSystem = ActorSystem("Hello-Akka")
   val router =
   actorSystem.actorOf(BroadcastPool(5).props(Props[BroadcastPoolActor]))
   val result = router ? "hello"
 }
but I don't know how to use the actors created by me rather than actorSystem.actorOf(BroadcastPool(5).props(Props[BroadcastPoolActor]))
Dhash Shrivathsa
@DhashS
what do you mean use them?
you have a router
the router (i don't think) modifies sender
Whatever your BroadcastPoolActor is will implement its behavior
so that should return one response, since the ask is a promise that completes when it has been replied to
but since it's a broadcast, they'll all reply and will end up in the promise temporary actor's deadLetters
or something
RAJKUMAR NATARAJAN
@rajcspsg
here is my sample code


import akka.actor.{ActorSystem, Props}
import akka.routing.BroadcastPool
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

object RouterApp extends App {

  implicit val timeout: Timeout = 5 seconds

  val actorSystem = ActorSystem("Hello-Akka")
  val actor1 = actorSystem.actorOf(Props(new BroadcastPoolActor(0)),  "actor0")
  val actor2 = actorSystem.actorOf(Props(new BroadcastPoolActor(1)),  "actor1")
  val actor3 = actorSystem.actorOf(Props(new BroadcastPoolActor(2)),  "actor2")
  val router =
    actorSystem.actorOf(BroadcastPool(5).props(Props[BroadcastPoolActor]))
  val result = router ? "hello"

}
instead of actorSystem.actorOf(BroadcastPool(5).props(Props[BroadcastPoolActor])) i want to have broadcastpooll with my actors actor1, actor2 and actor3
is it possible?
Dhash Shrivathsa
@DhashS
existing actors?
RAJKUMAR NATARAJAN
@rajcspsg
yes
Dhash Shrivathsa
@DhashS
oh.
I'd just keep them in a list and write a little loop
if it's constant at compile time, it's easy
if it's dynamic, you'll have to add logic to register and deregister them
sinanspd
@sinanspd
Also You can do a distributed pubsub and have your actors subscribe to a publisher. That would technically mimic a broadcast that you can "add" your actors in anytime
Dhash Shrivathsa
@DhashS
also, this is how it works with newly created actors

object RouterApp extends App {


  val actorSystem = ActorSystem("Hello-Akka")
  val router =
    actorSystem.actorOf(BroadcastPool(5).props(Props[MyActor]),  "fiveOfMyActorsUnderASingleBroadcastActor")
  val result = router ! "invoke your method pls"

}