Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
Quynh Anh "Emma" Nguyen

Hi, after recent akka version update, I start seeing this error akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$. How can I handle it?

Looking at https://doc.akka.io/api/akka/current/akka/stream/Attributes$$CancellationStrategy$.html, it seems to be a new behavior for 2.6.x. The default is PropagateFailure, which is not configurable, has this documentation:

Strategy that treats cancelStage in different ways depending on the cause that was given to the cancellation.
If the cause was a regular, active cancellation (SubscriptionWithCancelException.NoMoreElementsNeeded), the stage receiving this cancellation is completed regularly.
If another cause was given, this is treated as an error and the behavior is the same as with failStage.

So it seems it shouldn't raise this error to user level. What am I missing? I have tried with 2.6.10, 2.6.15 etc after seeing this akka/akka#30071 but it doesn't work in my case.

4 replies
Todor Kolev
Hi, is there a way to generate openapi spec from akka http routes directly without using annotations or I need to use a wrapper for that (tapir, endpoints4s etc)? Thanks!
federico cocco
Hello. is somebody able to explain me the recover method for Akka Streams? My understanding is that instead of catching the error and transforming it, it will also close the streams. Is there another way i can catch an upstream error, transform it to continue the stream? Thanks
Shayne Koestler
Hello all, I'm using akka typed cluster sharding and I'm curious if there's a way to watch/supervise remote actors?
8 replies

I've read in docu that empty rejection list could be related to no routing for given path, but that is not the case - provided (path, method) is covered by Route. Is there any way to determine why it happens?

is anyone able to help with such case, or at last redirect me where I could describe this issue?

Domantas Petrauskas
Hi, does Akka Projection have a similar thing to Lagom ReadSideProcessor globalPrepare? The only thing I found is an alternative to prepare in HandlerLifecycle start. Looking at how the API is constructed, this could also be the responsibility of ShardedDaemonProcess, but I haven't found anything relevant there too.
Hi everyone, could you please tell me about internals of new actor creation ?
I was assigned to a legacy project which has single actor that is responsible for creating actor per connection (we don't use Akka HTTP, we use another web framework) I have an impression that this actor is a "bottle neck", as sometimes we have spikes in new connections. Memory dump shows that we have more open sockets than worker actors (created per connection).
which is the best open source reporting tool for java application which has open source designer as well?
Youngoh Kim
I changed the classic actor to use a lot of ask pattern through Await.result, could this be a problem?
If we want to replace this format, how should we develop it? The code is too readable to just use the tell pattern.
How can I improve this without performance issues? Currently, I think there is a container down issue due to cpu rise after the server has been used for a few days because of this part.
2 replies
Gaël Ferrachat

Hello here,

From my understanding, by default, Akka completes a stream operator on a UpstreamFinish
I would like to know how to keep an operator alive after upstream completion ? Do I have to create my own GraphStage to override the method onUpstreamFinish() ?

My code currently looks like:

val someFuture: Future[MyObject]
val longlifeSink: Sink[MyObject, Future[Done]]


From example above, I would like my longlifeSink to continue even after the UpstreamFinish, how to do that ?
Note I am asking the question as I would like to avoid having to create a GraphStage ...

@gael-ft: typically after upstream completes there is nothing left to do - after the sink processed its last incoming element what is the value of keeping it 'alive'?
Levi Ramsey
One approach, if you want the same sink to be fed by multiple streams with independent lifecycles is to have use Source.queue or Source.actorRef (or similar) and have your wireTap just send elements to the queue/actor.
Gaël Ferrachat

Hello @raboof:matrix.org ,

The idea is that the longlifeSink is a quite heavy operator.
But when the Source emit the value from the Future, it then emit the onUpstreamFinish, and it looks like it closes the ongoing wireTap before it actually complete.

I dont wan't it to remain alive, I want it to run all the code of the wireTap Sink, and then closes the operator.
Am I missing something ?

Hello @leviramsey ,

As I said earlier, reusing the Sink is not what I am trying to achieve, I want it to run completly before it get closed by the default onUpstreamFinish which calls (by default) completeStage

I was not very clear on first message ...
What does your sink look like? is it a custom graph?
Tom Burke
Hey guys, I am new to the Gitter community and not sure if this is right room to be posting this, if not it would be great for someone to direct me I am currently working on behalf of a company that use Akka as their main backend and are looking for 5-6 new contractors to join their team. You could be based anywhere as they have teams that work in all timezones and the rates are €350 - €450 per day - 12 month contracts Let me know if this could be for you
Patrik Nordwall
We don't encourage job ads in this forum, but leaving it for this time. Hope you find the right people for the job.
Gaël Ferrachat
Here what my Sink looks like:

// in fact a method arg
val wuid: String = ...

// method body
      .setup { case (mat, _) =>
        implicit val materializer: ActorMaterializer = mat
        implicit val classicSystem: classic.ActorSystem = mat.system

        val workspaceDb = ReactiveDatabase(classicSystem.toTyped).forWorkspace(wuid)

          .flatMapConcat { urn =>
            val dbReadQuery: org.reactivestreams.Publisher[Metadata] = ...

              .map(replaceValueOf(urn, _)) // some changes in the Metadata itself, but anyway ...
              .zipWithIndex // just trick to get the batch number available for logs
              .flatMapConcat { case (metaSeq, index) =>
                val dbWriteQuery = ...

                  .log(s"test", extractBatchLog(urn, index))
      .mapMaterializedValue(_ => NotUsed)
Levi Ramsey
Remember that wireTap by design drops elements if the sink backpressures (e.g. because it's currently processing a message)... is the sink in the wireTap busy when upstream finishes emitting?
Gaël Ferrachat
I tried with alsoTo as well :(
In fact it is alsoTo, until it works hehe
Levi Ramsey
If the element-dropping isn't desirable, alsoTo can be used, at the expense of propagating backpressure upstream. Alternatively, especially if the upstream is bursty, you can put a backpressuring buffer in front of the sink
Gaël Ferrachat
I'll try some of the proposal you gave me, and review a little bit more the workflow, and come back then to let you know.
Really appreciated the help here ;)
Vipin Tiwari
Hello,I'm seeing below error
"akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException: The http server closed the connection unexpectedly before delivering responses for 1 outstanding requests" from app which utilizes a internal library. From research i found that I can add keep-alive timeout to resolve the same. https://doc.akka.io/docs/akka-http/current/common/timeouts.html#keep-alive-timeout.
My question is, shall i add this config (akka.http.host-connection-pool.keep-alive-timeout) in main app or the internal lib where the actual call taking place. Thanks for help
Levi Ramsey
It depends on how the internal library starts the ActorSystem. I would try in the main app first, since that's where Akka would look by default.
1 reply
Vadim Bondarev

Hello, has anyone thought about implementing additional "zombie fencing" mechanism for akka-cluster+sharding applications using in akka idiomatic way without some external linearizable system like (zookeeper or etcd). SBR doesn't give 100% guarantee against multiple writes (specifically for zombie processes) existing at the same time. I understand that preventing concurrent writes can be done on the db level for some dbs that support serializable or higher consistency level which is not the case for most dbs our there.

My best idea right now is to have custom crdt (MVRegister) to detect concurrent writes. I recently implemented exactly that for my project. Although this approach might work well for some applications, it still have drawbacks. Mainly the fact that even with sharding that gives us single writer principle all writes have to go through the ddata layer but we only take advantage of that during network partitions.
I wonder if anyone's had other ideas about how it can be achieved in akka idiomatic way.

31 replies
Chaminda Bandara

When I connect to WildFly 10 I am getting below error...

[root@3gq4 jmxquery-1.3]# ./check_jmx -U service:jmx:rmi:///jndi/rmi:// -O java.lang:type=Memory -A HeapMemoryUsage -K used -I HeapMemoryUsage -J used -vvvv -w 731847066 -c 1045495808 -username admin -password password
JMX CRITICAL - Failed to retrieve RMIServer stub: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out] connecting to java.lang:type=Memory by URL service:jmx:rmi:///jndi/rmi:// Failed to retrieve RMIServer stub: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out]
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:270)
at jmxquery.DefaultJMXProvider.getConnector(DefaultJMXProvider.java:17)
at jmxquery.JMXQuery.connect(JMXQuery.java:91)
at jmxquery.JMXQuery.runCommand(JMXQuery.java:68)
at jmxquery.JMXQuery.main(JMXQuery.java:114)
Caused by: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out]
at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:136)
at com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:205)
at javax.naming.InitialContext.lookup(InitialContext.java:417)
at javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1955)
at javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1922)
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:287)
... 5 more
Caused by: java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out
at sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:307)
at sun.rmi.transport.tcp.TCPChannel.newConnection(TCPChannel.java:202)
at sun.rmi.server.UnicastRef.newCall(UnicastRef.java:343)
at sun.rmi.registry.RegistryImpl_Stub.lookup(RegistryImpl_Stub.java:116)
at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:132)
... 10 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:246)
... 14 more

Yufei Cai

Hi, I have 2 questions about the split-brain resolver.

  1. When a cluster node is downed by another node, it logs "SBR took decision DownSelfQuarantinedByRemote" many times. One node logged it 18 times before going down (Cluster size was 15). Is that expected behavior?

  2. A node experienced about 4-10s of stop-the-world garbage collection. It marked a subset of the cluster "unreachable" after waking up because it did not process their messages on time. After which an SBR leader took the decision "DownIndirectlyConnected" and downed the garbage-collecting node as well as all other nodes marked unreachable by it. Is that supposed to happen?

14 replies

I need to run 1000+ socket clients at several nodes. Each client listens to unique endpoint.


I plan to represent each client as actor with supervision, recovery, event handling e.t.c. and hope that akka cluster will

  • spawn my actors where each actor represents socket client
  • supervision policies will help to recover actors from failing states
  • evenly distribute actors across several nodes
  • gives me guarantee that I have at most once actor for each client
  • akka messaging will help me to enable communication between actors with at least once semantics.

What else should I expect?

Hi guys! Could anyone help me to find out how to specify SSL context provider for ConnectionContext from akka-http? Netty has this feature with SslContextBuilder#sslContextProvider, but I can't find anything similar with ConnectionContext. I need it to set up REST and gRPC api routes.
Levi Ramsey

For describing a sink which is constructed based on the first element of the stream, one would previously use Sink.lazyInit, which is of course now deprecated. The deprecation message suggests Flow.prefixAndTail(1) with Sink.lazyFutureSink in place of lazyInit, but it seems that this better matches the intent:

def logic: Source[ByteString, NotUsed] = ???
def mySink(id: ByteString): Sink[ByteString, Future[Done]] = ???

val finished = logic.prefixAndTail(1).to {
  Sink.fromMaterializer { (mat, att) =>
    val donePromise = Promise[Done]()
    Sink.foreach {
      case (prefix, tail) =>
        prefix.headOption match {
          case None => donePromise.success(Done) // could also fail it, I guess, if that's one's preference
          case Some(head) =>
    .mapMaterializedValue(_ => donePromise.future)

I for one don't see an apparent way with prefixAndTail(1).to(Sink.lazyFutureSink(...)) to have the Future[Sink] depend on the element emitted by prefixAndTail.

4 replies
Zhenhao Li
shameless plug, I've built https://pairtime.com, a marketplace for pair problem solving (beyond programming), as an event sourced, distributed, and stateful app using Akka.
While I enjoyed getting so much help from the community and Akka team, I'm also willing to pay for people's time and knowlege.
So, it's tool to make the community more interdependent. You can create a service profile and allow yourself to be found by the world (only if you want) in PairTime's profile search (https://pairtime.com/service-providers). I don't want to be the only one who are publicly providing pairing service on Akka.
Matt Oliver
So I'm running into a weird situation with two Typed Singletons (using request/response). First singleton creates a messageAdapter for the second and fires off a message. The second singleton successfully receives the message, however when responding, the message is not received by the first singleton (ever). However, if I first ask the second singleton from the first using the same message (which is received successfully) and then subsequently use request/response, messages are received correctly.
Hi all, got a question about the usage of ActorContext[_].scheduleOnce(...) inside the CommandHandler of an EventSourcedBehavior: How do you pass the Cancellable around? Can I do that somehow as part of the persistent actor or do I need to resort to some other non-persistent actor to do the scheduling? In a non-persistent actor this form of scheduling and cancellation of a message to be send from an actor to itself is no problem but in the persistent case I am currently stuck.
3 replies
Hi, seem to be running into an unexpected performance degradation using Cluster Sharding. Details described at akka/akka#30546, would appreciate some guidance or whether these numbers look like normal behaviour? Thanks
Hey there! Does anyone know if it's possible to specify custom ciphers for GrpcClientSettings?
Patrik Nordwall
Akka 2.6.16 is out with a new Durable State persistence model. https://discuss.lightbend.com/t/akka-2-6-16-released-including-durable-state-persistence/8660
Odd Möller
That's great news! :point_up:
How to pass custom context to supervision strategy in akka stream ?
@leviramsey ^^ : Can you please help here. I need to get the whole payload printed just before resuming the stream in supervision strategy. Is it possible to pass the whole payload to the supervision strategy ?
2 replies

Hello folks I am kind of confused here in the intro doc of the Durable State that states that only events are stored in the event sourcing model which to me is not accurate because we do have snapshot store in the event sourcing model that keeps the stateful actor full state on disk except lightbend decides to scrap that. This is from the intro doc:

Akka Persistence also supports Event Sourcing based implementation, where only the events that are persisted by the actor are stored, but not the actual state of the actor. By storing all events, using this model, a stateful actor can be recovered by replaying the stored events to the actor, which allows it to rebuild its state.

I know that stateful actor in the event sourcing model can recover by just using their state snapshot which is not different from the Durable State. Yeah I can agree that when someone disable snapshot then all events must be replayed.

6 replies
Chetan Kumar
Hello folks is there a way to dynamically change parameters of groupedWeightedWithin operator, apart from implementing a CustomGraphStage
Hey guys, I'm trying to make a flow that has 2 inputs (input a, and b) and an output. I want the latest value in input a to determine if input b is allowed to pass through to the output or not
So kind of like a stateful gate
It looks like I may be able to use mergeLatest?
A combination of zipLatest and statefulMapConcat.
Can someone please explain how Akka helps in reducing the infrastructure cost ? Is it like we need less nodes once we move to Akka ?
Patrik Nordwall
VarunVats9: Akka is rather efficient. Performance has always been an important factor. Meaning that often less resources (hardware, nodes) are required for handling the same or higher load compared to other solutions. We have seen that in many real world cusomer cases that have migrated their applications to Akka.