This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
EventSourcedBehavior
: What if I got an actor that besides having some state to be persisted, I also want to contain regular old actor state which shall be forgotten as soon as the actor stops? To give as an example the case I am envisioning: I have the stateful actor consume some commands that should sometimes mutate the state and sometimes also broadcast some messages based off of the state to some set of subscribing actors. I am sure I am just not seing something but as you are not supposed to mix EventSourcedBehavior
with any other behavior I am having difficulties. So what's the established way of doing this? Can I mark some fields of my state as not to be persisted or something?
Unit
hey all, I'm running an akka based play app in production and occasionally seeing this error -- trying to figure out what it means
Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)"
does anyone have any ideas or could point me in the right direction?
@amine.chikhaoui:matrix.org: I apologize for some vagueness, btw, as it's not totally clear what you mean by write to a database. If it's CRUD-style operations, then the transaction can be modeled in a persistent actor as:
Those events then allow either a projection to publish to Kafka or for the state of pending publishes to be tracked by the persistent actor.
If the DB write is itself appending internal state-changing events (i.e. you're already event-sourcing) and you want to have a message eventually published to Kafka which isn't easily completely derivable from the internal state events (e.g. it depends on the command and the internal state), I've sometimes defined a "command in an event's clothing" (e.g. RecordedNeedToPublishDomainEventToKafka
) which wraps the message to be published to Kafka and is tagged. Then an events by tag query in a projection looks for that tag and publishes the wrapped message. The event itself is a no-op/identity function in the event handler.
persistentActior ? DoAction(foo)
the reply would be within the persist callback as well, then the rest continues.
I am trying to migrate some old code from 2.5 to 2.6. It was using ActorPublisher
. I created a custom source which now has to use the ask pattern to get messages from the actor. The problem I have is that now I am hitting the case where the timeout expires just before the actor sends the response. I can’t just increase the timeout as I don’t know when the actor will reply (this is for a websocket). As far as I can tell this was never an issue with the old ActorPublisher
since it was all actors. Is there a solution to this?
The best that I can come up with is to include a deadline in the request message so that the reciever knows not to send any replies after this deadline. This has to be earlier than the timeout to avoid the race condition but not too early as that will be a period of unresponsiveness. I can use a long timeout to reduce the frequency that this happens. What are the downsides of making the ask timeout extremely long?
Source.ask
suffers this same problem.
Hi, I am using akka-http to create proxy. I have a proxy route that takes request and proxy it to some other target service and return back the response it gets.
package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.io.StdIn
import scala.util.Random
object HttpServerStreamingRandomNumbers {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem(Behaviors.empty, "myproxy")
implicit val executionContext = actorSystem.executionContext
val route = path("proxy"){
Route { context =>
println(context.request.entity.contentType)
context.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, ""))
}
}
val bindingFuture = Http().newServerAt("localhost", 9000).bind(route)
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => actorSystem.terminate())
}
}
when I send curl request -
curl --location --request POST 'localhost:9000/proxy' --data-raw ''
Inside route when I try to get content-type of my request, I get content-type as application/x-www-form-urlencoded
However I want that if in the request if content-type is not passed then http server should not add Content-type header. How I can achieve this?
Hi,
Is there a built-in way to deserialize (and serialize) the HttpRequest object itself?
context: I'm building a throttled dispatcher that should receive messages from many (micro)services, and pipe/funnel them to a sensitive resource (hence the throttling based on that resource capacity).
It'll also prioritize and such.
Anyway, since the services usage of the reource are multiple requests per "job", I figured my dispatcher API could just be a JsonArray of HttpRequest
s.
Internally I plan to use akka-streams to handle the capacity of the service as regular backpressure, and use the akka-http client API of cached connection pool flow to stream the requests to the resource, and responses back are also stream (chunked response).
So the question is basically, I want to deserialize JSON => HttpRequest
(more specifically JSON Array => List[HttpRequest]
).
Is there a built-in way to do so?
Would be nice if I could avoid all the needed definition and parsing validation…
reg. serialization part (on the client side) - it is less important since some (most) of the services are not in scala anyway, so we'll need to write this anyway…