Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
Aleksei Shamenev

Hi there 👋

I wonder if someone can help me understand how to properly drop an actor's state in akka. In some of our cases when an actor is being recovered it is possible to get an unparsable event/snapshot from its store. In this case we want to drop the current state of the actor and start from scratch. But I can not find anything like that in the Akka docs 😕

Any ideas how to achieve such a behaviour? Is it even possible?

Zhenhao Li
@coffius you can set akka.persistence.snapshot-store-plugin-fallback.snapshot-is-optional = true in your config.
      # Set this to true if successful loading of snapshot is not necessary.
      # This can be useful when it is alright to ignore snapshot in case of
      # for example deserialization errors. When snapshot loading fails it will instead
      # recover by replaying all events.
      # Don't set to true if events are deleted because that would
      # result in wrong recovered state if snapshot load fails.
this information is not available on the web doc. only in code...

Hi everyone, I think I am running into an issue because I may not understand something about the correct use of concurrency facilities in Akka. The scenario is this: I have some actor which is receiving messages from Kafka via Alpakka Kafka. It does this throughout the day. Message throughput is quite high I would say, at around 20 to 30 k msgs/sec. Periodically (three times a day) the actor receives a message that prompts it to upload a snapshot of the collected data to an AWS S3 bucket via Alpakka S3. This is done in the below code:

case PerformSnapshot => Effect
      .thenRun { s =>
        implicit val sys = ctx.system
        implicit val ec = ctx.executionContext
        val timeFormatter = time.format.DateTimeFormatter.ofPattern("yyyyMMdd").withZone(timeZone)
        val key = s"top-secred-data-bucket/${timeFormatter.format(s.currentSnapshot.timestamp)}/${s.currentSnapshot.metaData.id}"
        val self = ctx.self
          .onComplete {
            case Success(_) => self ! UpdateSnapshot(Some(s.currentSnapshot))
            case Failure(e) => self ! Shutdown("Failed to upload to S3", Some(e))

The actor and application are running fine throughout the day. The load on the application is not near its provisioned resource limits. But when the snapshot is triggered this log signals something is off and triggers the app to shut down:

Kafka commit failed after=5 ms,
exception=org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.

My question: Does that signal that I am not as concurrent as I think I am with the above code and the S3 upload actually blocks the actor from processing further messages? Or is that 5 ms setting simply too low?

6 replies
hi, what is the advantage and loss that akka compare with caf(c++ actor framework).are they competitor?
Swoorup Joshi
one is boilerplatey, other is not prolly
Nathan Fischer
They're in pretty deferent languages, so I wouldn't say they're competitors
It seems unlikely that you'd choose your language based on the actor framework library, more likely the other way around
Swoorup Joshi
that reminds me of, who here has worked with elixer and gone back to scala?

We are using an alpakka pipeline that uses Source(Consumer.committableSource) as KafkaConsumer and Sink.ActorRefWithAck as the sink for BackPressure.

The problem statement - We need to commit Kafka offset only after reading and processing the value in the sinkActor. How can we do it? How can I commit the offset in the SinkActor? We need to process the consumed element in the Sink actor as the Init method keeps one element in the buffer.

Need to control consumption based on demand..
Any alternative to this design?
@JeasK: I wonder if you could use Flow.ask instead of Sink.ActorRefWithAck, then you could put the committing sink after that
Gilad Hoch

Hi, when answering a chunked http response in akka-http with a Source[ByteString, _] - what may cause a downstream early cancellation?

I'm replying with entity =

  .queue[ByteString](size,  OverflowStrategy.backpressure)
  .buffer(size, OverflowStrategy.backpressure)
  .preMaterialize() // grabbing the materialized queue so I can dynamically push to stream

And was getting curl: (56) Recv failure: Connection reset by peer in the middle of the stream when tested the API with curl.
To investigate I added this custom stage: https://gist.github.com/hochgi/cc354f9b80ca427a4f4d7313c78e4350
and I added this to the source given as entity:

val inspector = …

  .queue[ByteString](size,  OverflowStrategy.backpressure)
  .buffer(size, OverflowStrategy.backpressure)
  .preMaterialize() // grabbing the materialized queue so I can dynamically push to stream

So I can clearly see in the logs that:

[2021-07-07 11:28:32,480] [ERROR] [cls.FQCN|-dispatcher-akka.actor.default-dispatcher-11] - [some ctx] downstream completed
akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null

The configs I tried (I also use the client API - actually I pipe multiple requests from client into a single stream for the chunked response):

akka {

  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  stdout-loglevel = "OFF"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  http {

    client {
      idle-timeout = infinite
      stream-cancellation-delay = 1 minute

    host-connection-pool {
      max-connections = 48
      min-connections = 4
      max-open-requests = 256 # must be a power of 2
      idle-timeout = infinite

But adding and trying all these configs didn't help much.
Any idea what I can try next?


Can anyone tell me what is the correct way to launch a http2 request with akka-http?
The doc website says I have to


But in the akka-http version I use (akka-http_2.13 v10.1.12) there isn't a method called connectionTo() in the akka.http.javadsl.Http class. (Plus I don't know why the official javadoc doesn't include akka-http_2.13 so it's super hard to find any documentation)

I'm trying to launch http2 requests because I'm stuck on this error after migrating from akka-http_2.11 to akka-http_2.13, and I'm guessing it might because the server I'm sending requests to default with http2:

[ERROR] [07/08/2021 05:58:57.732] [graph-api-akka.actor.default-dispatcher-19] [akka://graph-api/system/Materializers/StreamSupervisor-2/flow-6-0-PoolFlow] Error in stage [akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse@25f31b7]: The server-side HTTP version is not supported
akka.http.scaladsl.model.IllegalResponseException: The server-side HTTP version is not supported
     at akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$3.onPush(OutgoingConnectionBlueprint.scala:191)
     at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
     at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
     at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)

And it is really painful for me to debug. The API server I'm requesting, Facebook Graph API, should support both http1.1 and http2. So I can't figure out why I got The server-side HTTP version is not supported. Or is there anything I can do in my akka-http code to force the requests to be http1.1?

Thank you so much for your time to help!

4 replies
Zhenhao Li
is the next Akka release going to support Scala 3?
2 replies
Tanjim Hossain

Hi, I'm trying to see the claim "~2.5 million actors per 1G heap" in action. but couldn't achieve it with some naive approach. the lowest number I can see is approx 800B with explicit GC call in jvisualvm.
I've tried subtracting the memory usage for 1 actor, but still much higher than the expected 400ish byte per actor. can someone point me out what I'm missing? p.s I'm really a newbie in jvm land

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

object GreeterMain {
  final case class Start(n: Int)

  def greeter = Behaviors.receiveMessage { message =>

  def apply(): Behavior[Start] =
    Behaviors.receive { (ctx, msg) =>
      (1 to msg.n).map { _ =>
  @main def hello: Unit = {
    val greeterMain: ActorSystem[Start] =
      ActorSystem(GreeterMain(), "AkkaQuickStart")
    greeterMain ! GreeterMain.Start(1000000)

jvisualvm result https://ibb.co/6W6RqLT

Patrik Nordwall
@audacioustux that number might be outdated, and actors have grown bigger (but also more capable). Might be smaller with classic actors.
2 replies
Tanjim Hossain
thanks a lot for the info
Shafqat Ullah
Is it more like 1 million actors per GB heap now? Does anyone else have any further data point?
Vinay Pandey
Hello -- I am relatively new to akka. I am trying to use Http CachedConnectionPool with Source.queue and I keep getting You have to wait for the previous offer to be resolved to send another request. I was under the impression that backpressure makes the source wait for the current requests to complete in case the buffer gets full. Is my understanding correct, and what can I do to fix this issue?
Vinay Pandey
I have also tried using throttling, with ThrottleMode.shaping -- this still gives me the above error
Hi everyone, just a quick question about sharding: Is there a best practice for getting an aggregate which would require getting some info from all existing entities? So far I could not find a pointer on this. Many thanks!
9 replies

I want to read 3 crore csv rows which is of 2GB csv file size and need to insert into MySQL via Java.

Could someone please help me know the fastest and memory efficient way to avoid out of memory exception as well load in lesser time?

Please kindly advise.

Zhenhao Li
@brightinnovator this doesn't seem to be about Akka. you might have a better chance posting on Stack Overflow.
Yufei Cai
Does Artery TCP have more throughput for messages between different actors on 2 nodes, than between the same pair of actors? I'm seeing more dropped messages when the traffic between 2 nodes were between 2 actors, than when the traffic was split between different senders and receivers. Lane config is unchanged from the default of 4 inbound and 1 outbound.

@brightinnovator I would use Akka Streams.

I'd write a small stream recipe and run it.

I'd try to get away with the official CSV parser, I have had very good experience with that: https://doc.akka.io/docs/alpakka/current/data-transformations/csv.html

However, if you have very very large rows in the CSV you might want to consider reading it in a plain fashion with the Akka Streams FileIO https://doc.akka.io/docs/akka/current/stream/stream-io.html#streaming-file-io and then behind that to employ an Akka Streams Framing in some way (splitting at some char(s) or char combination(s) within row, so just a bit different that in the example behind the following link): https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#parsing-lines-from-a-stream-of-bytestrings

@brightinnovator In order to have your MySQL database as the sink in such a stream recipe, you may want to look at https://doc.akka.io/docs/alpakka/current/slick.html

@brightinnovator Having your small stream recipe ready and running, you can then consider tuning, e.g. by increasing the default buffer size of stream stages, and/or by allowing for parallel inserts into the database, and/or tuning the database connection pool, and/or introducing one or more asynchronous boundaries (via the one-liner .async), and or adding explicit .buffer stages.

If you figure out that there is some max sweet spot in the throughput given your CSV, RAM, CPU, you could also add a throttle at that which in Akka Streams is pretty much a one-liner using the built-in stage .throttle(...).

@brightinnovator Lastly, if you want to know more about some patterns of maximizing throughput in Akka Streams, study this video: https://www.youtube.com/watch?v=MzosGtjJdPg
@yufei-cai Is this dropping of messages something intended by you or do you consider that a problem? Are you saturating some path your messages travel to the point where stuff would be dropped for some reason?
@yufei-cai I just quickly scanned for the word "drop" in https://doc.akka.io/docs/akka/current/remoting-artery.html but I could not relate any of the passages where "drop" appears to your words. Note I am not experienced with Artery, just wanted to understand.
@iosven Thank you so much....Thanks a lot for your kindness and time....
Juan Martinez
Hello. I have made a service with Akka Streams that uses TCP with TLS. Would it be possible in some way to access the client cert information? I'm not an expert, but I guess what I want is the server session context for each session once the client has been connected. I only see Tcp.IncomingConnection.
19 replies
Yufei Cai

@iosven Dropping messages is not intended and not good for me. I believe I found the issue. Artery TCP uses Akka streams to send messages between cluster members. Messages to 1 actor are deserialized sequentially whereas messages to different actors are deserialized concurrently in up to 4 "lanes" by default. Single-threaded deserialization cannot take advantage of multicore CPU. It generates too little demand upstream. The sender queue becomes full, after which messages are dropped.

The relevant documentation is the comment for the config option outbound-message-queue-size.

1 reply
Hello, design question...
i have this requirement that i can have multiple auction actors, each actor can have a different set of items in it, but also when adding items to a given auction, i dont want duplicated items, lets say
auction1 (item1, item2), auction2(item3, item1), here item1 for auction2, shouldnt be allowed cuz its already present in auction1...
how to design it, this is just a poc on a local machine
1 reply
Mike Krumlauf
Hello, I have an Elasticsearch Alpakka question. I need to create an index and write documents, but I need dynamic templates to be applied when the index is created. Is that possible in Alpakka Elasticsearch, or do I need to drop back to a REST client and the "raw" Elasticsearch API to do so?
有没有java akka demo 示例
Zhenhao Li

hi, I'm running a very simple Akka stream job that doesn't involve Cassandra at all. but I still this in the logs:

[2021-07-19 10:16:28,095] [WARN] [com.datastax.oss.driver.internal.core.control.ControlConnection] [] [s0-admin-1] - [s0] Error connecting to Node(endPoint=/, hostId=null, hashCode=480e8ba1), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)) {}

why is that?

Hi Akka Team, have a basic question on how to turn off warn logs, we have a three node Akka Cluster. when one of the node goes down, other two nodes are continuously writing below logs
2021-07-20T07:41:38,463 | WARN | opendaylight-cluster-data-akka.actor.default-dispatcher-15 | Materializer | 135 - com.typesafe.akka.slf4j - 2.5.26 | [outbound connection to [akka://opendaylight-cluster-data@], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
2021-07-20T07:41:38,463 | WARN | opendaylight-cluster-data-akka.actor.default-dispatcher-15 | RestartWithBackoffFlow | 135 - com.typesafe.akka.slf4j - 2.5.26 | Restarting graph due to failure. stack_trace: (akka.stream.StreamTcpException: Tcp command [Connect(,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused)
2021-07-20T07:41:39,156 | WARN | opendaylight-cluster-data-akka.actor.default-dispatcher-15 | Materializer | 135 - com.typesafe.akka.slf4j - 2.5.26 | [outbound connection to [akka://opendaylight-cluster-data@], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
Could not find a way to disable these akka stream logs from akka.conf file and hence used log4j2 to disable them
Using Akka-Http and Intellij, I and others have been seeing the following false errors for the past 5 years:
1) cannot resolve overloaded method 'complete' ... 2) No implicit FromRequestUnmarshaller ... 3) No implicit marshalling.ToEntityMarshaller
These errors are not displayed via VSCode-Metals; nor does SBT report them.
So my question is: Does anyone on the Akka-Http team know why Jetbrains has been unable to resolve these false errors?

hi All, I am running a set of JVM / akka systems on the same server. The different components communicate with each other via messages / remote actors or streams. Some of the components involve a fairly complex optimization tasks running every 5mn say, they will produce an output that is required and will flow through the system so in that sense they are somewhat blocking.

In order to isolate them, I am running the associated stream / src on actor having their own thread pool executor or fork dispatcher (I didn't notice / manage to het a significant difference when choosing one or the other).

My question is the following, at the moment each dipatcher is defined in the system config of its respective system but I am trying to use a "common dispatcher" that would be used by all the jvm. For example instead of defining 4 threadpool dispatcher in each system, would it be more efficient to define a 8 threapool dispatcher that could be shared. I don't know if this is possible / where the conf for the "shared dispatcher" would be defined since each system had its own config + where would I run the "shared" dispatcher.

Any comment / reference to an article or a blog for a similar problem would be super appreciated. Thank you in advance

15 replies
Youngoh Kim
akka version: 2.5.32
scalatest version: 3.0.8
I am trying to write test code for akka classic actor.
for the ask pattern I can assertion the value I expect, but for the tell pattern, how can I handle it in my test code?
are there any good examples of test codes that are actually used?
3 replies
José Luis Colomer Martorell


Akka version 2.6.9 and akka-lease-kubernetes 1.0.9

I'm trying to use k8s leases in cluster singleton and shards.

My application.conf has the following keys defined:

akka.cluster.sharding.use-lease = "akka.coordination.lease.kubernetes"
akka.cluster.singleton.use-lease = "akka.coordination.lease.kubernetes"

Both singleton and shards are classic (typed ones are not fixed until 2.6.11 and 2.6.15 respectively)

At bootstraping this error shows:

07:14:50.363 ERROR [akka.coordination.lease.scaladsl.LeaseProvider]   - Invalid lease configuration for 
leaseName [my-actor-system-singleton-akka://my-actor-system/system/sharding/myEntityCoordinator], 
configPath [akka.coordination.lease.kubernetes] lease-class [akka.coordination.lease.kubernetes.KubernetesLease]. 
The class must implement scaladsl.Lease or javadsl.Lease and have constructor with LeaseSettings parameter and 
optionally ActorSystem parameter.

That's a bit weird, because akka.coordination.lease.kubernetes.KubernetesLease has a secondary constructor with LeaseSettings and ExtendedActorSystem

  def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
    this(system, new AtomicBoolean(false), leaseSettings)

Then this error appears:

akka.actor.InvalidActorNameException: Invalid actor path element 
illegal character [/] at position: 48. Actor paths MUST: not start with `$`, include only ASCII letters and can only contain these 
special characters: -_.*$+:@&=,!~';.

Am I missing anything? maybe I have to define aditional keys in my application.conf?

Thanks in advance!