Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
raboof
@raboof:matrix.org
[m]
I guess those errors are meant to map 1-to-1 with https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll#response-body and aren't used for errors 'outside' of that? but I can see how it could be useful to have an API that allows per-request reporting of the latter type of errors as well, indeed
ctazz
@ctazz

I've written code that uses Akka.io to make TCP connections. This code has been running in production for three years or so, and has been performing well. Suddenly, with just a few days notice, I'm told that I need to use TCP with TLS (with no SSL certificate required). I'd like to make a small change to the existing code, but I don't see any documentation about how to do TLS with akka.io. Instead I see suggestions about switching to Streams.

I'd much prefer making a small change in existing code to re-implementing in Streams. Is there any way I can modify my existing akka.io TCP code to make a TLS connection? Perhaps by copying code from Akka 2 2.x, which did support TCP with TLS in akka.io? My production code is using Akka version 2.5.32.

3 replies
kerr
@hepin1989
an updated grpc bench result has been published.
Zhenhao Li
@Zhen-hao
hi, is there an easy way to turn a sink into a flow that passes data through the sink?
21 replies
anqit
@anqit
I posted a question on stack overflow about 5 months ago, but it didn't seem to get any traction. I was able to proceed with out having solved the issue, but it is starting to get a bit more pressing, and I would love to discuss possible solutions, if any
happy to clarify anything if needed
nrktkt
@nrktkt:matrix.org
[m]
what's the correct syntax for pattern matching a ?akka.http.scaladsl.model.Uri.Path`?
1 reply
nrktkt
@nrktkt:matrix.org
[m]
Path.Slash(Path.Segment(seg1, Path.Slash(Path.Segment(seg2, ....)))) works but...
Richie Lee
@redball3
Hi everyone, can anyone tell me if there is an equivalent way to doing FS2's Stream.iterate(elem)(_ +1) in akka? im really struggling to find anything
2 replies
Tilmann Bartsch
@bartsch:matrix.detact.de
[m]

Hello akka-community,

what is the reason that the WebsocketClientBlueprint.handshake-BidiFlow drops the entity of the websocket handshake GET-Request in the following lines?

case ResponseStart(status, protocol, attributes, headers, entity, close) =>
                val response = new HttpResponse(status, headers, attributes, HttpEntity.Empty, protocol)

(source: https://github.com/akka/akka-http/blob/eaf87a3d80b7ac037f50e19e6220ca3e0531c41b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala#L103-L104)

Is there a way the access this entity?

Tilmann Bartsch
@bartsch:matrix.detact.de
[m]

:point_up: Edit: Hello akka-community,

what is the reason that the WebsocketClientBlueprint.handshake-BidiFlow drops the entity of the websocket handshake GET-Response in the following lines?

case ResponseStart(status, protocol, attributes, headers, entity, close) =>
                val response = new HttpResponse(status, headers, attributes, HttpEntity.Empty, protocol)

(source: https://github.com/akka/akka-http/blob/eaf87a3d80b7ac037f50e19e6220ca3e0531c41b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocketClientBlueprint.scala#L103-L104)

Is there a way the access this entity?

eyal farago
@eyalfa
hi, starting to look into akka-grpc for grpc client. I've noticed the default client backend is netty, any specific reason for that? what's the current status of the akka-http client backend? thanks!
1 reply
Andy Navarro
@navandres
Hi there: I'm looking into an issue I'm running into while trying to debug one of my akka microservices.
2022-04-18 15:07:00,611 [ akka.remote.Remoting] WARN [akka.remote.Remoting] - Association to [akka.tcp://MyakkaProject@localhost:4949] having UID [-585843325] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote ActorSystem must be restarted to recover from this situation.
Trying multiple things here, but it seems to be that akka cluster remote is timeing out on trying to determine that my akka actor system is alive and giving up, since I'm actually debuging that akka app
Andy Navarro
@navandres
Tried bumpping akka.cluster.auto-down-unreachable-after to 1000s, but doesn't seem to be working.
14 replies
Any ideas that might help me?
Trevor Burton-McCreadie
@thinkmorestupidless
I'm working on something using Akka Projections and have a question regarding this section in the docs about tags on events in Akka Persistence https://doc.akka.io/docs/akka-projection/current/running.html#tagging-events-in-eventsourcedbehavior - specifically this sentence - "As a rule of thumb, the number of tags should be a factor of ten greater than the planned maximum number of cluster nodes. It doesn’t have to be exact" - What's the impact of having, say, 10 tags vs. 100 tags or 1000? Are there more queries hitting the database? What should I be aware of when ramping up the number of tags?
1 reply
lixiaofeng0316
@lixiaofeng0316
hello akka
kerr
@hepin1989
@lixiaofeng0316 hello hakker
Seeta Ramayya
@Seetaramayya

In the context of akka/akka-stream-contrib#93

If I do not know the initial page token, how to use PagedSource (or any other source operator)? (I like PagedSource, it is clean approach :) )

For example, our 3rd party REST API provides a token after first invocation, with which we need to use to iterate it until the token is null / None. I could not come up with any nice solution apart from the following (and Option[Option[T]]), do you have any suggestions?

I feel this is common use case, do you have already something in the library for this use case?

case class Page[T, K](items: immutable.Iterable[T], nextKey: Option[K])
final class PagedSourceStageAsync[T, K](initial: Option[K], f: Option[K] => Future[Page[T, K]])
    extends GraphStage[SourceShape[T]] {
  import scala.concurrent.ExecutionContext.Implicits.global
  val outlet: Outlet[T] = Outlet("PagedSourceStageAsync.out")
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {
      private[this] var state: Option[K] = initial
      private[this] var asyncHandler: Try[Page[T, K]] => Unit = _

      override def preStart(): Unit = {
        val ac = getAsyncCallback[Try[Page[T, K]]] {
          case Failure(ex) => fail(outlet, ex)
          case Success(Page(items, None)) =>
            push(outlet, items)
            complete(outlet)
          case Success(Page(items, Some(nextKey))) =>
            push(outlet, items)
            state = Some(nextKey)
        }
        asyncHandler = ac.invoke
      }

      def onPull(): Unit = f(state).onComplete(asyncHandler)

      setHandler(outlet, this)
    }

  override def shape: SourceShape[T] = SourceShape(outlet)
}
4 replies
Adrian Legaspi
@akilegaspi
This message was deleted
10 replies
anqit
@anqit

I have a question about reading entities in with akka-http. I am creating a REST API and have a data model that includes 2 parent models that can have several children models that link them together. For example:

case class ParentA(id: String)
case class ParentB(id: String)
case class Child(idA: String, idB: String, ...other fields)

The "other fields" all have default values, so I would like to create an endpoint that allows creating multiple of these children entities providing only the non-default parent B ID with the paretn A ID coming from the URL path, for example:

POST https://so-cool-api.com/parentA/<parentA_id>
BODY:
{
    "parentBIds": ["idB1", "idB2", ...]
}

This would then create several Child instances for each parentBId provided, all with idA = <parentA_id>.
However, I would also like to allow default values to be overriden:

POST https://so-cool-api.com/parentA/<parentA_id>
BODY:
{
    "children": [
        { "idB": "idB1", ...other fields },
        { "idB": "idB2", ...other fields }
    ]
}

Is this possible without doing custom JSON handling in the endpoint and relying solely on implicit marshalling? Is this even a good idea? I believe the form of the body in the first example will be used most and is therefore convenient to allow, but maybe it's better to only allow the second type of request, and just omit fields that I want to rely on default values for?
Thanks for any advice!

1 reply
Chris Thompson
@Chris_Signify_twitter
Got some of the highest paying Scala contracts in the US right now, 6-12 months, Scala/Akka/Cats/Zio. Ping me if you are interested in getting the highest rate of your career! :) chris.thompson@signify-tech.com
dan
@danbergelt

I am attempting to keep websocket connection open in combination with a webSocketClientFlow, and am having a fair bit of difficulty.

My top-level flow looks something like this (pseudocode). Imagine for certain messages, I want to proxy them somewhere else, other message types I simply want to return a default message to the client:

Flow[Message].flatMapConcat({
  case msg: T => proxy(msg)
  case msg: U => Source.single(TextMessage(""))
})

The proxy function is something like the following, I'm attempting to keep the connection open with how this is described in the docs:

Source.single(msg).concatMat(Source.maybe)(Keep.right).via(out)

Where the connected Flow is simply something like:

val out = Http().webSocketClientFlow(WebSocketRequest(uri)).mapAsync(1)(transformMessage)

I'm passing to an echo ws:// endpoint as a test, and while the first message gets echoed back to me, following messages do not get sent. So, curious what I'm missing here.

I'm also curious to hear of any alternative recommended approaches to selectively passing messages into the connected Flow, if this is not optimal - the HTTP framework I'm using (Play) expects a Flow to be returned inside the endpoint fn, so this was the most straightforward method I settled on. And order matters, so merging and/or the Graph DSL didn't seem like they fit my needs
NatElkins-RF
@NatElkins-RF
Hi, is there a guide to a very bare-bones Akka configuration setup? I am just trying to get a very small proof-of-concept set up but I'm having some trouble. Repo is here: https://github.com/NatElkins/akka_test
15 replies
image.png
^^ That's the error I receive when I run it
Altern Egro
@alternegro
This message was deleted
Pierre Marais
@Deeds67

Hello, could anyone assist to compose these two Directives. One Directive depends on the value provided by the other one, and I'm trying to compose them into one directive:

def firstDirective(): Directive1[String] = {
    ...
    }

def secondDirective(valueFromFirst: String): Directive0 = {
   ...
}

def composedFirstAndSecond = firstDirective().tmap(x => secondDirective(x))

This compiles, but when using the composedFirstAndSecond directive, it fails to compile:

type mismatch;
[error]  found   : akka.http.scaladsl.server.StandardRoute
[error]  required: akka.http.scaladsl.server.Directive[Unit] => (akka.http.scaladsl.server.RequestContext => scala.concurrent.Future[akka.http.scaladsl.server.RouteResult])
Srepfler Srdan
@schrepfler
Has anyone benchmarked akka projections writing into postgresql (from Kafka with external offset into DB)? I'm looking at around 200 tps
I'm writing using vanilla r2dbc if it makes any sense
Srepfler Srdan
@schrepfler
via the R2DBCHandler
Ehenoma
@ehenoma
Whats the role of a non-persistent actor once I build a system with persistent actors.
How can I then trust a non-persistent actor if its state is not recoverable
1 reply
Unless it is short lived
Daave
@Yomanz
I'm having an issue where alpakka-amqp is creating lots of queues for no apparent reason, I'm using the RPC & PubSub examples from the docs.
Yaroslav Hryniuk
@yarhrn
Hi, is there any reason why akka-http uses old akka?
5 replies
Patrik Nordwall
@patriknw
Akka Projections 1.2.4 has been released. A particularly nasty bug has been fixed, which is relevant if you have forgot to use the ProjectionBehavior.Stop message with for example Sharded Daemon Process.
Aljosha
@AljoshaVieth

Hey there,
im building a local cluster which uses one jvm and I need to send a message from one actor to another on a different node as soon as all nodes are up. Im listening to the up events and would like to send the message as soon as ai recognize a specific node is up. However, I need an actorref to that actor and since I´m in a Behaviour of MemberEvent, I cannot use the ask pattern properly because I have to catch the reponse inside this behaviour.
I use a structure similiar to the cluster system described here: https://developer.lightbend.com/guides/akka-sample-cluster-scala/

Is it possible, to get the actorref by using the members adress of the node?

Or is it possible to implement it anyway with the ask pattern?

I would really appreciate your feedback

Trevor Burton-McCreadie
@thinkmorestupidless
Is it possible to use something like https://grpc.github.io/grpc-java/javadoc/io/grpc/ClientInterceptor.html in Akka-gRPC - use-case is for logging/monitoring, to be able to do something when a request is sent and a response received to that request (or it the request fails)
Avinash Kumar
@avinash0161
I want to know the CPU time taken by a specific portion of an actor code. Any suggestions for it? I have looked at Kamon but I don't think it has support for giving such a metric. I am looking at profilers such as Yourkit. It will work when I am running the application locally on my local machine. But I don't know what options will work when I run my application in a cloud based distributed setting.
Gaël Ferrachat
@gael-ft

Hi there !

I know we can "mock" Akka (typed) Extensions by providing ExtensionSetup within ActorSystemSetup when we create the ActorSystem.

I can't find how to do it for classic Extensions ? Someone already did it ?

anqit
@anqit

Hey there, I'm trying to read a query parameter in an REST Api I'm building with Akka-http and having trouble unmarshalling it to a domain object. This is my route definition:

override lazy val getManyRoute: Route = (pathEndOrSingleSlash & get & parameters("userId".as[UserId].?)) { maybeUserId =>
        complete {
            eventService.getEventsBy(maybeUserId)
        }
    }

Everything compiles fine, so I am assuming the necessary unmarshalling implicits are in scope. Using curl to test the endpoint, I get the following error:

➜  ~ curl "localhost:8080/events?userId=6be0f45e-084e-41db-9e5b-e6bc8d208b54"
The query parameter 'userId' was malformed:
expected whitespace or eof got 'be0f45...' (line 1, column 2)%

It seems like it's reading just the first character ('6') from the UUID, and for some reason is not including the rest of the string in the parameter. However, if I change the Route definition to read a String and manually "unmarshall" it, it works fine. Something like the following:

override lazy val getManyRoute: Route = (pathEndOrSingleSlash & get & parameters("userId".?)) { maybeUserId =>
                                                                                    //   ^ no more unmarshalling
        complete {
            eventService.getEventsBy(maybeUserId.map(methodToUnmarshallStringToUserId))
        }
    }

Any ideas? Is there something I'm missing in the original route definition?

5 replies
anqit
@anqit
If this helps at all, this is the log message from the request:
  Request : HttpRequest(HttpMethod(GET),http://localhost:8080/events/?userId=6be0f45e-084e-41db-9e5b-e6bc8d208b54&,List(Timeout-Access: <function1>, Host, User-Agent: curl/7.64.1, Accept: */*),HttpEntity.Strict(none/none,0 bytes total),HttpProtocol(HTTP/1.1))
  Response: Rejected(List(MalformedQueryParamRejection(userId,expected whitespace or eof got 'be0f45...' (line 1, column 2),None), TransformationRejection(akka.http.scaladsl.server.directives.BasicDirectives$$Lambda$1058/0x0000000800799840@412742fd), MethodRejection(HttpMethod(POST))))
Taher Vohra
@taherv
Hi, A newbie akka user (coming from c++/golang world).
How do I get detailed internal information as my test program is running.
I have written a test akka stream program, would just like to learn how akka boots up, system starts, actors/streams are created, shutdown etc.
Thanks for your help
Taher Vohra
@taherv
Also is this code sample expected to work ?
https://doc.akka.io/docs/akka/current/stream/operators/Source/actorRef.html#source-actorref
The "if elem == Done.done()" never triggers and the stream never gets completed, I tried the sample code
1 reply