Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
:point_up: September 21, 2022 11:31 AM anyone??
How to configure send and receive buffer when using artery remoting ?

Hi all, I'm trying to use a dependency that adds Kinesis KPL support to akka. It has a KPLFlow class to provide that support. I'm relativly new to akka and flows, but my objective would be to have a kafka source, that is already created and have some type of sink, to replace the "native" kinesis sink and use this flow to deliver the records. Is there a way to extract from the flow class this? Or is it possible with just the flow "consume" from the kafka source and deliver to a target stream ?

I've create a stackoverflow question regarding this https://stackoverflow.com/questions/73873966/akka-kafka-source-to-kinesis-sink-using-kpl

Hey, I am trying to run end to end example given in the akka-persistence-cassandra on alma linux but on running "sbt -Dconfig.resource=local.conf -Dakka.cluster.roles.0=write run" I am getting this error: 'object actor is not a member of package akka'. how can I run the example?
Hi~How to use Alpakka-ElasticSearch realize paging search?
6 replies
sharding.init(Entity(entityKey)(createBehavior)) should create entities dynamically. but how to pass the entityId to the apply()?
Hi everyone, i currently am developing the clustersharding application, in there i need to query the active actors in the cluster sharding. As i know that the akka doc leading me to send a message(GetClusterShardingStats) to the shards. However i couldn't find any workable example. do anyone can share the example code to me?
Collignon-Ducret Rémi
Hello ! How can I intercept actor's message and responses this is for tracing purpose :-)
3 replies
hello guys, is akka compatible with oracle jdk 11? i am getting the following error when using it Caused by: com.typesafe.config.ConfigException$UnresolvedSubstitution: application.conf @ jar:
Patrik Nordwall
@/all Umbrella release 22.10 of Akka is out. Check out the new features and improvements in https://akka.io/blog/news/2022/10/26/akka-22.10-released
1 reply
Levi Ramsey
@brabo-hi Akka is compatible with JDK11... that exception shouldn't have anything to do with JDK11
1 reply
It's now JDK 1&
now akka support JDK 17:)
Hi guys, i have an akka http server that receives a websocket server connection, how can i connect this connect to an external websocket. basically having both akka http server + akka http client.
basically this server will only be used a connector to two websocket connections and act as ws server and ws client
        path("ws") {
          val flowExternal: Flow[Message, Message, Any] = ???
          val flowInternal: Flow[Message, Message, Any] = ???
          Http().singleWebSocketRequest(WebSocketRequest("ws://"), flowInternal)

Akka-HTTP: I'm trying to code a graceful failure from basicAuthentication and have tried wrapping my call to basicAuthentication like so:

val routes = handleRejections(totallyMissingHandler) { concat (
    (pathPrefix("admin")) {
       authenticateBasic(realm = "secure site", myUserPassAuthenticator) { userName =>
    }, ....etc...

and have a rejection handler totallyMissingHandler like so:

implicit def totallyMissingHandler = RejectionHandler.newBuilder()
  .handleNotFound { complete(StatusCodes.NotFound, "Oh man, what you are looking for is long gone.") }
  .handle { case AuthenticationFailedRejection(msg, _) => complete(StatusCodes.Unauthorized, msg.toString()) }

and when I go to the protected path (ex. localhost:8080/admin) the .handle code gets triggered IMMEDIATELY and not when my authentication fails. Can someone help me out here?

2 replies
Digvijay Singh
Hi folks, I have a use case where we need to close connections on the server regularly. Is there some setting to close client connections? Also, wondering if there is a way to close the connections gracefully, using Connection: Close header? I see akka-http handles that during service shutdown already.
hi everyone. I'm working with flatMapMerge in Akka Streams and have it configured currently with a breadth of 5. What I seem to be seeing is that none of the sub sinks are materialized until 'breadth' substreams have been created. I'm running local tests but can't replicate this behaviour well locally, apologies. Does anyone know if this some kind of optimisation with the FlattenMerge graph stage? The substreams take a reasonably long time to run, but given I don't think they're materialized until later I don't think this should be an issue. The flow is also very low throughput, meaning this issue causes quite a long delay in message processing. For flows with more throughput the breadth fills quickly and then the flow seems to process incoming messages 'normally' for a while, i.e. as it receives them.
I want to populate a nexus repo to take to an offline environment with akka, alpakka (and play). I've found https://github.com/lightbend/akka-dependencies which is awesome - is there anything similar for alpakka modules?
...or even just a list of all the module dependency names?
Gerret Sanders

I got tripped up yesterday by Source.future(). The scaladoc states:

Send the single value of the Future when it completes and there is demand. If the future fails the stream is failed with that exception.

That's true, but if the future fails, the stream supervisor is never triggered and that caused an issue in my code. I'm just wondering if that's expected behavior and if that's documented anywhere.

3 replies

hey everyone, I'm using Akka Persistence and Projections, but I've got a read-after-write problem that I'm looking for ideas on. For certain business/front end reasons, after a certain write where an event is persisted and the state is updated, I need to block until that event is read by the projection so that the read side is synchronized.

One way this was approached was to add the data we need as part of the state of the event sourced actor, and expose a command that gets the state of the actor, but that's less than ideal. Does anyone have other ideas on approaching this?

when i use 'parameters' func to get the query params like: parameters("from".as[Int], "size".as[Int]), if the size is: '10<iframe src=javascript:alert(1953)>', it will output the message to browser:HTTP/1.1 400 Bad Request connection: close server: ResourceManagement content-length: 124 X-Powered-By: Express date: Wed, 16 Nov 2022 03:10:35 GMT content-type: text/plain; charset=UTF-8 The query parameter 'size' was malformed: '10<iframe src=javascript:alert(1953)>' is not a valid 32-bit signed integer value
how can i handle this error message? i don't want to output this message to browser, Because it causes some loophole.
@TopSpoofer: you can register a custom RejectionHandler and handle rejections without exposing such details. This particular rejection is the MalformedQueryParamRejection but you might want to have a generic one.
5 replies
Eduardas Kazakas
Greetings, can somebody point me in the right direction how can I update the akka persistence event_journal. I have tried to implement some basic event sourced actor, but I would like to migrate from JSON serialization to Protobuf. There are plenty of examples how to use PersistentQuery for journal reading, but almost no examples how to do some write changes for the journal. There are modules like akka-gdpr, akka-persistence-update-jdbc, but these commercial modules. What I have figured out so far is that there should be some sort of journal actor that could probably be used for persisting data in event_journal.
Patrik Nordwall
@kamiKAZIK There is a migration tool in the new akka-persistence-r2dbc plugin. Maybe you can use that for inspiration. https://github.com/akka/akka-persistence-r2dbc/blob/main/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala
Hey all. I'm currently reading 'Akka in action' and I have a question regarding Akka FSMs.
Transitions can be monitored internally and externally (https://doc.akka.io/docs/akka/current/fsm.html#monitoring-transitions)
  • internal: onTransition(handler)
  • external: SubscribeTransitionCallBack
    My questing: when using onTransition, is the action executed sync or async? I.e.: can the onTransition logic lag behind the state changes, or is onTransition guaranteed to be executed before a possible next state change is triggered?
Hi, I'm consuming an external queue to run HTTP requests. Sometimes these requests receive Timeouts INFO akka.actor.LocalActorRef - Message [akka.actor.ReceiveTimeout$] to Actor[akka://OVDRootSystem/system/IO-TCP/selectors/$a/8#-66797564 6] was not delivered. [1] dead letters encountered. Instead of let akka retying I want to Nack the message from the queue and let the external redeveliver the message further to retry. I can't find how to properly handle this case. Any tips?
4 replies
Francisco Lopez-Sancho

Dear hakkers,

We are happy to announce the 2.0.0-M1 release of Akka Diagnostics which is the continuation of Akka Enhancements. This release is the first milestone leading up to version 2.0.0 of Akka Diagnostics.

Akka Diagnostics 2.0.0-M1 contains:

  • The Thread Starvation Detector.

Happy hakking!

– The Akka Team

Arne Claassen
Is there a way to extract path segments or use PathMatcher outside of the request context? I.e. I'm using Url for my internal representation and frequently need to inspect their Path to get at specific segments, but have found no good way of doing so. Since it has head and tail I was hoping there would be a simple way to turn it into List[String] but so far no luck
Hi, is it possible with Alpakka connecting to Cassandra to use a custom profile (datastax-java-driver config) or set the consistency level per session?
Araik Grigoryan
Hi. Is there a canonical way to get the Future-based variant of akka-http working with HTTP/2? Something like Http(actorSystem).singleRequest(...) (as described in https://doc.akka.io/docs/akka-http/10.4.0/client-side/request-level.html) but with HTTP/2.

Hi, The current Expand accepts a val extrapolate: In => Iterator[Out] ,which will do nothing if the first element from upstream is not arrived. What if I want to inject elements when the upstream is slow, includes the first elements.

usage: If I use akka-stream to implement a lock-step stream, I will inject EmptyActionFrame if there is no inputs.

2 replies
I think this can be achived by making the current Expand accepts a val extrapolate: Option[In]=> Iterator[Out]
Chetan Kumar
Is it wrong to use splitAfter and concatStreams together?
This test is failing with java.lang.IllegalArgumentException: Cannot push port (Split.out(1323563333)) twice, or before it being pulled
  "spiltAfter" should {
    "run" in {
      val probe = Source(1 to 10)
        .splitAfter(_ => true)

      val results = probe.request(10).expectNextN(10)

      results.size should be(10)
      results should contain allElementsOf (1 to 10)
I have a big update for on my scala implementation of the IETF Http Message Signing spec. This library works for Akka (though perhaps at present depends too much on IO)
So what is akka/akka ?
Hello all, when using akka persistence, can we safely access actorContext in theRun for future processing? eg.
              .thenRun((newState: State) => {
                context.pipeToSelf(futureProcessing) {
                  case Failure(exception) => InternalCommand.ResultProcess(message = exception.getMessage)
                  case Success(value) => InternalCommand.ResultProcess(message = value)
1 reply
Hello all, when using akka stream how can i append new line at the end of a file
Hi, everyone. I have an issue about actor parallel. I have a join-fork actorSystem which parallelism min is 4, and max is 16, running on a 24 core vm in cloud. And I made a router and when router received request, he will create a worker actor and distribute task to it What I found is that when there is 3 tasks in system, the 3 task worker actor only runs on same thread, which leads the JVM's cpu usage never grows over than 100%. But when there are 20 tasks, cpu comes to 1200%. So I'm wonder how can make akka parallel as possible as I can. Thx for read my question guys.
Ladinu Chandrasinghe
Hello all, what is the proper way to implement batched at-most-once processing using alpakka-kafka?
I have the following sub-optimal solution, but is there a better way that doesn't involve materializing an inner stream?
    def atMostOnceSource[K, V]: Source[ConsumerRecord[K, V], NotUsed] = {
        .committableSource[K, V](consumerSettings, Subscriptions.topics(allTopics))
        .groupedWithin(maxBatchSize, maxBatchDuration)
        .mapAsync(1) { messages: Seq[CommittableMessage[K, V]] =>
          val committableOffsetBatch =

            .map(_ => messages)

1 reply
Aditya Prasad
So, I have to use a shared singleton Java object in my Scala/Play codebase. The simplest way I can think of protecting access to this object (since it is heavyweight to construct) is using a synchronized{} block. It seems pretty gross, but it should work. Are there any better options? To clarify, it would be something like:
javaObj.synchronized {
Joseph Abrahamson
I've been looking at Erlang versus Akka and I realized one big distinction is that Akka has a lot of ceremony around "restarts" whereas Erlang processes just terminate and the supervisor is notified. I'm curious why Akka was designed this way? What sorts of situations are better handled in the "Akka way"?
1 reply
Animesh Raj

Hello Everyone

Hello sir ,
I am new to open source contribution.
I already know java , my tech stacks & tools includes C, C++ , Python , Java, JavaScript , HTML , CSS , SQL , Bootstrap, ReactJS, ExpressJS, NodeJS & Git . I need a little help from your side to contribute to these amazing projects.
I need advice and help
Currently I am Final Year of my Engineering

Tobias Pfeifer
in akka-http is there a way (directive) to add response headers to all responses? There is mapResponseHeaders but it only works when the inner route yields a RouteResult.Complete. What I'm looking for is a way to add a header to any response via a directive without access to the RejectionHandler and ExceptionHandler in the outer route. With sealing the innerRoute I will change the exception behaviour from the outer route if it has a custom exception handler.