This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
Hi team, I am trying to use Akka Extension. I have seen several examples where the companion object of the class extends the ExtensionId
but in this case the createExtension
function can only use the ActorSystem to create an instance of a class.
For cases where variables other than ActorSystem is need, I am trying as below but with this, the extension is registered multiple times.
trait BFactory {
protected def configKeyPath: String
}
case class A(client: String, config: Configuration) extends Extension
case class B(override val configKeyPath: String) extends ExtensionId[A] with BFactory {
override def createExtension(system: ExtendedActorSystem): A = {
val configuration = Configuration(system.settings.config, configKeyPath)
val client: String = ???
A(client, configuration)
}
}
val aInstance:A = B(configKeyPath).apply(system)
Does anyone have any idea on how to use ExtensionId
with case class?
Hi there 👋
I wonder if someone can help me understand how to properly drop an actor's state in akka. In some of our cases when an actor is being recovered it is possible to get an unparsable event/snapshot from its store. In this case we want to drop the current state of the actor and start from scratch. But I can not find anything like that in the Akka docs 😕
Any ideas how to achieve such a behaviour? Is it even possible?
# Set this to true if successful loading of snapshot is not necessary.
# This can be useful when it is alright to ignore snapshot in case of
# for example deserialization errors. When snapshot loading fails it will instead
# recover by replaying all events.
# Don't set to true if events are deleted because that would
# result in wrong recovered state if snapshot load fails.
Hi everyone, I think I am running into an issue because I may not understand something about the correct use of concurrency facilities in Akka. The scenario is this: I have some actor which is receiving messages from Kafka via Alpakka Kafka. It does this throughout the day. Message throughput is quite high I would say, at around 20 to 30 k msgs/sec. Periodically (three times a day) the actor receives a message that prompts it to upload a snapshot of the collected data to an AWS S3 bucket via Alpakka S3. This is done in the below code:
case PerformSnapshot => Effect
.none
.thenRun { s =>
implicit val sys = ctx.system
implicit val ec = ctx.executionContext
val timeFormatter = time.format.DateTimeFormatter.ofPattern("yyyyMMdd").withZone(timeZone)
val key = s"top-secred-data-bucket/${timeFormatter.format(s.currentSnapshot.timestamp)}/${s.currentSnapshot.metaData.id}"
val self = ctx.self
Source
.single(s.currentCollection.values.toSeq)
.runWith(blobSink(key))
.onComplete {
case Success(_) => self ! UpdateSnapshot(Some(s.currentSnapshot))
case Failure(e) => self ! Shutdown("Failed to upload to S3", Some(e))
}
}
The actor and application are running fine throughout the day. The load on the application is not near its provisioned resource limits. But when the snapshot is triggered this log signals something is off and triggers the app to shut down:
Kafka commit failed after=5 ms,
commitsInProgress=0,
exception=org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
My question: Does that signal that I am not as concurrent as I think I am with the above code and the S3 upload actually blocks the actor from processing further messages? Or is that 5 ms setting simply too low?
We are using an alpakka pipeline that uses Source(Consumer.committableSource) as KafkaConsumer and Sink.ActorRefWithAck as the sink for BackPressure.
The problem statement - We need to commit Kafka offset only after reading and processing the value in the sinkActor. How can we do it? How can I commit the offset in the SinkActor? We need to process the consumed element in the Sink actor as the Init method keeps one element in the buffer.
Hi, when answering a chunked http response in akka-http with a Source[ByteString, _]
- what may cause a downstream early cancellation?
Context:
I'm replying with entity =
Source
.queue[ByteString](size, OverflowStrategy.backpressure)
.buffer(size, OverflowStrategy.backpressure)
.preMaterialize() // grabbing the materialized queue so I can dynamically push to stream
And was getting curl: (56) Recv failure: Connection reset by peer
in the middle of the stream when tested the API with curl.
To investigate I added this custom stage: https://gist.github.com/hochgi/cc354f9b80ca427a4f4d7313c78e4350
and I added this to the source given as entity:
val inspector = …
Source
.queue[ByteString](size, OverflowStrategy.backpressure)
.via(inspector)
.buffer(size, OverflowStrategy.backpressure)
.preMaterialize() // grabbing the materialized queue so I can dynamically push to stream
So I can clearly see in the logs that:
[2021-07-07 11:28:32,480] [ERROR] [cls.FQCN|-dispatcher-akka.actor.default-dispatcher-11] - [some ctx] downstream completed
akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null
The configs I tried (I also use the client API - actually I pipe multiple requests from client into a single stream for the chunked response):
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
stdout-loglevel = "OFF"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
http {
client {
idle-timeout = infinite
stream-cancellation-delay = 1 minute
}
host-connection-pool {
max-connections = 48
min-connections = 4
max-open-requests = 256 # must be a power of 2
idle-timeout = infinite
}
}
}
But adding and trying all these configs didn't help much.
Any idea what I can try next?
Can anyone tell me what is the correct way to launch a http2 request with akka-http?
The doc website says I have to
Http.get(system)
.connectionTo("127.0.0.1")
.toPort(8443)
.http2();
But in the akka-http version I use (akka-http_2.13 v10.1.12) there isn't a method called connectionTo() in the akka.http.javadsl.Http class. (Plus I don't know why the official javadoc doesn't include akka-http_2.13 so it's super hard to find any documentation)
I'm trying to launch http2 requests because I'm stuck on this error after migrating from akka-http_2.11 to akka-http_2.13, and I'm guessing it might because the server I'm sending requests to default with http2:
[ERROR] [07/08/2021 05:58:57.732] [graph-api-akka.actor.default-dispatcher-19] [akka://graph-api/system/Materializers/StreamSupervisor-2/flow-6-0-PoolFlow] Error in stage [akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse@25f31b7]: The server-side HTTP version is not supported
akka.http.scaladsl.model.IllegalResponseException: The server-side HTTP version is not supported
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$3.onPush(OutgoingConnectionBlueprint.scala:191)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
...
And it is really painful for me to debug. The API server I'm requesting, Facebook Graph API, should support both http1.1 and http2. So I can't figure out why I got The server-side HTTP version is not supported
. Or is there anything I can do in my akka-http code to force the requests to be http1.1?
Thank you so much for your time to help!
Hi, I'm trying to see the claim "~2.5 million actors per 1G heap" in action. but couldn't achieve it with some naive approach. the lowest number I can see is approx 800B with explicit GC call in jvisualvm.
I've tried subtracting the memory usage for 1 actor, but still much higher than the expected 400ish byte per actor. can someone point me out what I'm missing? p.s I'm really a newbie in jvm land
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
object GreeterMain {
final case class Start(n: Int)
def greeter = Behaviors.receiveMessage { message =>
Behaviors.same
}
def apply(): Behavior[Start] =
Behaviors.receive { (ctx, msg) =>
(1 to msg.n).map { _ =>
ctx.spawnAnonymous(greeter)
}
Behaviors.same
}
@main def hello: Unit = {
Thread.sleep(10000)
val greeterMain: ActorSystem[Start] =
ActorSystem(GreeterMain(), "AkkaQuickStart")
greeterMain ! GreeterMain.Start(1000000)
}
}
jvisualvm result https://ibb.co/6W6RqLT
You have to wait for the previous offer to be resolved to send another request
. I was under the impression that backpressure makes the source wait for the current requests to complete in case the buffer gets full. Is my understanding correct, and what can I do to fix this issue?
@brightinnovator I would use Akka Streams.
I'd write a small stream recipe and run it.
I'd try to get away with the official CSV parser, I have had very good experience with that: https://doc.akka.io/docs/alpakka/current/data-transformations/csv.html
However, if you have very very large rows in the CSV you might want to consider reading it in a plain fashion with the Akka Streams FileIO https://doc.akka.io/docs/akka/current/stream/stream-io.html#streaming-file-io and then behind that to employ an Akka Streams Framing in some way (splitting at some char(s) or char combination(s) within row, so just a bit different that in the example behind the following link): https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#parsing-lines-from-a-stream-of-bytestrings
@brightinnovator Having your small stream recipe ready and running, you can then consider tuning, e.g. by increasing the default buffer size of stream stages, and/or by allowing for parallel inserts into the database, and/or tuning the database connection pool, and/or introducing one or more asynchronous boundaries (via the one-liner .async
), and or adding explicit .buffer
stages.
If you figure out that there is some max sweet spot in the throughput given your CSV, RAM, CPU, you could also add a throttle at that which in Akka Streams is pretty much a one-liner using the built-in stage .throttle(...)
.
Tcp.IncomingConnection
.
@iosven Dropping messages is not intended and not good for me. I believe I found the issue. Artery TCP uses Akka streams to send messages between cluster members. Messages to 1 actor are deserialized sequentially whereas messages to different actors are deserialized concurrently in up to 4 "lanes" by default. Single-threaded deserialization cannot take advantage of multicore CPU. It generates too little demand upstream. The sender queue becomes full, after which messages are dropped.
The relevant documentation is the comment for the config option outbound-message-queue-size.
https://doc.akka.io/docs/akka/current/general/configuration-reference.html#config-akka-remote-artery
hi, I'm running a very simple Akka stream job that doesn't involve Cassandra at all. but I still this in the logs:
[2021-07-19 10:16:28,095] [WARN] [com.datastax.oss.driver.internal.core.control.ControlConnection] [] [s0-admin-1] - [s0] Error connecting to Node(endPoint=/127.0.0.1:9042, hostId=null, hashCode=480e8ba1), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)) {}
why is that?