Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
amine.chikhaoui
@amine.chikhaoui:matrix.org
[m]
Hi everybody, I'm experimenting with Akka + Play framework for the first time and have some questions. For example say I have a web service endpoint /doAction which I want to use in a hybrid mode in terms of consistency. It would do a write to a database + send a message to a Kafka topic for consumption by downstream systems. The request from the client would return as soon as the database write is done and the rest is eventually consistent. My confusion is how to make sure that this process is atomic i.e what if the app crashes halfway through (between the database update and sending the kafka message)
3 replies
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
@leviramsey: by Persistence Query/Projections you mean queries of the events journal ?
2 replies
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
got it @leviramsey though if the actor that writes to the DB is persistent, isn't that not recommended ? As I would imagine the Actor would persist the incoming event and then write to the database in the callback, but if the event is replayed for any reason that would attempt another write to the DB ?
my understanding of akka terminology is still basic so I might be saying nonsense :p
Levi Ramsey
@leviramsey
It's a little unorthodox for an actor to query it's own events... the more normal approach would be to have another actor querying the event stream
1 reply
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
oh the persist callback can side effect ? but isn't that the same thing that gets called during replay
btw the persistent actor in my case wouldn't have any state, I can't think of a state that I can keep
Levi Ramsey
@leviramsey
No, only the event handler is called during replay, so it should be pure
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
as the state is kept in the DB basically
hm got it, I had a confusion and thought the event handler is both called during replay and in the persist callback
Levi Ramsey
@leviramsey
The event handler will typically get called (and does automatically in typed) in the persist cycle, thus why you shouldn't side effect there, but the callback only gets called on successful persistence of events
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
and it's not very odd if I use a persistent actor but without any state basically ?
Levi Ramsey
@leviramsey
Well it kind of will have state: things that should be published to Kafka but haven't yet been
(in which case the persistence query may well just be to find out which persistent actors need to be woken up to publish to Kafka)
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
but that's more of the state kept in the events journal right ? not the actor state itself
hmm I was thinking the query would be against the journal
but maybe you mean the query is against the persistent actors state ?
Levi Ramsey
@leviramsey
@amine.chikhaoui:matrix.org I'll have to go away from keyboard for a little while before continuing
Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
oh it's indeed the journal from reading https://doc.akka.io/docs/akka/current/persistence-query.html
@leviramsey: sure thanks a lot of the patience with my questions :)
Swoorup Joshi
@Swoorup
should I use actorselection or just pass actorRef?
6 replies
Swoorup Joshi
@Swoorup
is there an Nobody typed ActorRef?
2 replies
Swoorup Joshi
@Swoorup
even within the same system, is it ok to pass streamRef?
Matt Dziuban
@mrdziuban

hey all, I'm running an akka based play app in production and occasionally seeing this error -- trying to figure out what it means

Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)"

does anyone have any ideas or could point me in the right direction?

Levi Ramsey
@leviramsey

@amine.chikhaoui:matrix.org: I apologize for some vagueness, btw, as it's not totally clear what you mean by write to a database. If it's CRUD-style operations, then the transaction can be modeled in a persistent actor as:

  • persist an event saying what message you want to publish to Kafka after updating the DB (signalling intent to update the DB)
  • in the callback for that persist, update the DB (in typed, you'd follow that up with a message to self to move to the next step)
  • after successfully updating the DB, persist an event saying that the DB was updated

Those events then allow either a projection to publish to Kafka or for the state of pending publishes to be tracked by the persistent actor.

If the DB write is itself appending internal state-changing events (i.e. you're already event-sourcing) and you want to have a message eventually published to Kafka which isn't easily completely derivable from the internal state events (e.g. it depends on the command and the internal state), I've sometimes defined a "command in an event's clothing" (e.g. RecordedNeedToPublishDomainEventToKafka) which wraps the message to be published to Kafka and is tagged. Then an events by tag query in a projection looks for that tag and publishes the wrapped message. The event itself is a no-op/identity function in the event handler.

Amine Chikhaoui
@amine.chikhaoui:matrix.org
[m]
@leviramsey: thanks. Yes it's a CRUD style operation, that sounds reasonable as a sequence, the only thing I want to make sure is possible is that the request from the client would wait until the Database update is done so that means if my Controller asks a persistent actor persistentActior ? DoAction(foo) the reply would be within the persist callback as well, then the rest continues.
Jason Pickens
@steinybot

I am trying to migrate some old code from 2.5 to 2.6. It was using ActorPublisher. I created a custom source which now has to use the ask pattern to get messages from the actor. The problem I have is that now I am hitting the case where the timeout expires just before the actor sends the response. I can’t just increase the timeout as I don’t know when the actor will reply (this is for a websocket). As far as I can tell this was never an issue with the old ActorPublisher since it was all actors. Is there a solution to this?

The best that I can come up with is to include a deadline in the request message so that the reciever knows not to send any replies after this deadline. This has to be earlier than the timeout to avoid the race condition but not too early as that will be a period of unresponsiveness. I can use a long timeout to reduce the frequency that this happens. What are the downsides of making the ask timeout extremely long?

2 replies
I think Source.ask suffers this same problem.
Swoorup Joshi
@Swoorup
Thinking of designing a websocket architecture. I want to be able to have some sort of command architecture, where the client can subscribe/unsubscribe to a set of particular server events. Is killswitches, mergehub, broadcasthub with akka streams the way to go around this? Is there any guidelines reference to do so?
10 replies
Marcin Chwedczuk
@marcin-sumologic
Hi, I see that Agents (https://doc.akka.io/docs/akka/2.5/project/migration-guide-2.4.x-2.5.x.html#agents) disappeared from Akka 2.6. I wonder if there is any migration guide showing how I can change my code.
raboof
@raboof:matrix.org
[m]
Marcin Chwedczuk
@marcin-sumologic
PS. What I really would like to see is a code snipped with example Agent -> Actor migration.
Swoorup Joshi
@Swoorup
anybody using cats effect with akka?
6 replies
Aditya Maheshwari
@adityamundra

Hi, I am using akka-http to create proxy. I have a proxy route that takes request and proxy it to some other target service and return back the response it gets.

package com.example

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.io.StdIn
import scala.util.Random

object HttpServerStreamingRandomNumbers {


  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem(Behaviors.empty, "myproxy")
    implicit val executionContext = actorSystem.executionContext
    val route = path("proxy"){
      Route { context =>
        println(context.request.entity.contentType)
        context.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, ""))
      }
    }
    val bindingFuture = Http().newServerAt("localhost", 9000).bind(route)
    StdIn.readLine()
    bindingFuture.flatMap(_.unbind()).onComplete(_ => actorSystem.terminate())
  }
}

when I send curl request -

curl --location --request POST 'localhost:9000/proxy' --data-raw ''

Inside route when I try to get content-type of my request, I get content-type as application/x-www-form-urlencoded
However I want that if in the request if content-type is not passed then http server should not add Content-type header. How I can achieve this?

1 reply
Gilad Hoch
@hochgi

Hi,
Is there a built-in way to deserialize (and serialize) the HttpRequest object itself?

context: I'm building a throttled dispatcher that should receive messages from many (micro)services, and pipe/funnel them to a sensitive resource (hence the throttling based on that resource capacity).
It'll also prioritize and such.
Anyway, since the services usage of the reource are multiple requests per "job", I figured my dispatcher API could just be a JsonArray of HttpRequests.
Internally I plan to use akka-streams to handle the capacity of the service as regular backpressure, and use the akka-http client API of cached connection pool flow to stream the requests to the resource, and responses back are also stream (chunked response).

So the question is basically, I want to deserialize JSON => HttpRequest (more specifically JSON Array => List[HttpRequest]).
Is there a built-in way to do so?
Would be nice if I could avoid all the needed definition and parsing validation…

reg. serialization part (on the client side) - it is less important since some (most) of the services are not in scala anyway, so we'll need to write this anyway…

Alex Myodov
@amyodov
Hello. Is there a way to configure Akka Cluster to always try to reconnect to the seed nodes? So, like, even if there is just one seed node - it will get back into the cluster even if it went down and then up again.
6 replies
Zhenhao Li
@Zhen-hao
@hochgi I think you have to read through https://doc.akka.io/docs/akka-http/current/common/marshalling.html. I know it is painful...
3 replies
Cyril Cheneson
@ccheneson
Hi, FYI, I have browsed through the online doc of akka-stream and for Sink.combine, it seems there is a formatting problem
https://doc.akka.io/docs/akka/current/stream/operators/Sink/combine.html . The previous and next page looks fine.
1 reply
Aditya Maheshwari
@adityamundra
Screenshot 2021-06-14 at 12.53.56 PM.png
Screenshot 2021-06-14 at 12.54.07 PM.png

Hi Akka experts, I have a question regarding akka-http server. I am trying to handle a request in one of my route and if I do not pass any Content-type in the request by default akka is setting it to application/octet-stream.

Please find my implementation below: -

package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import scala.io.StdIn
object HttpServerContentTypeTest {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem(Behaviors.empty, "RandomNumbers")
    implicit val executionContext = actorSystem.executionContext
    val route = path("proxy"){
      Route { context =>
        println(context.request.entity.contentType)
        context.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, ""))
      }
    }
    val bindingFuture = Http().newServerAt("localhost", 9000).bind(route)
    // press any key to stop the server
    StdIn.readLine()
    bindingFuture.flatMap(_.unbind()).onComplete(_ => actorSystem.terminate())
  }
}

I tested this via Postman(please find screenshot attached showing the headers and the body used). I am trying to send a post request with non-empty request body. Also I am not setting Content-type header.

I want that akka-http server internally does not set header Content-type. Is there a way to disable this default behavior?

12 replies
Igmar Palsenberg
@igmar
Is there any way to get a decent errors when materializing an JMS Source ?
2 replies
Zhenhao Li
@Zhen-hao
I have a question about this documentation page https://doc.akka.io/docs/akka-projection/current/running.html#tagging-events-in-eventsourcedbehavior
it says "must tag the events with a slice number". but with the Cassandra plugin already put tagged events into buckets. it seems users don't need to slice the tags for performance reasons. but how can the buckets be used for paralism in Akka projection?
5 replies
Zhenhao Li
@Zhen-hao
I find it confusing that Envelope can have both unserialized message/event and serialized one as payload in the Akka codebase. Wouldn't using separate Envelope types be safer and easier to understand?
Zhenhao Li
@Zhen-hao
I mean, for things like private def serializeAndDeserialize(envelope: Envelope): Envelope, readers really have to read every line in its definition to understand what's happening
Max
@maxstreese
Hi everyone! Hope this isn't too silly a question but I am working with an EventSourcedBehavior and I am wondering if there is some functionality that let's me run stuff after the behavior is restored? Or are there any best practices for this? So far the only one I can come up with is to have the behavior send itself a command which, due to how this restore works, will only be handled by the actor after the restore has finished, right?
Odd Möller
@odd
1 reply
Zhenhao Li
@Zhen-hao
does Akka projection has to run in the same actor system as Akka persistence?
6 replies