Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 16:43
    zerowzl starred akka/akka
  • 15:01
    jk2K starred akka/akka
  • 13:01
    patriknw commented #29765
  • 11:52
    akka-ci commented #29766
  • 11:52
    akka-ci unlabeled #29766
  • 11:52
    akka-ci labeled #29766
  • 11:43

    octonato on master

    Adding example of cancelled ope… (compare)

  • 11:43
    octonato closed #29749
  • 11:36
    akka-ci commented #29749
  • 11:36
    akka-ci unlabeled #29749
  • 11:36
    akka-ci labeled #29749
  • 11:16
    akka-ci labeled #29766
  • 11:16
    jlprat opened #29766
  • 11:06
    akka-ci unlabeled #29749
  • 11:06
    akka-ci labeled #29749
  • 11:02
    muskan3006 synchronize #29749
  • 10:40
    akka-ci commented #29749
  • 10:40
    akka-ci unlabeled #29749
  • 10:40
    akka-ci labeled #29749
  • 10:11
    akka-ci unlabeled #29749
Zhenhao Li
@Zhen-hao
maybe it is old. I get the error only when I query a non-existing ddata entity
Arsene
@Tochemey

Hello guys. Happy weekend. I have an application that spawn some bunch of actors. On the other hand I am making some gRPC calls within those actors to an external resource. I know it is best practice to move those kind of blocking calls into a different dispatcher. I would like to know how I can set a different dispatcher for those gRPC calls rather than using the main dispatcher of the running actor system knowing well that the gRPC clients are using the same actor system. I am using Akka gRPC for the clients.

Hello guys, please any suggestion?

anilkumble
@anilkumble

Sorry @patriknw, Earlier, I didn’t post my question clearly. I have a cluster with 4 nodes I wanted to send a message to any of the nodes in a cluster from outside the cluster.
As you said, gRPC will be the good option.

I have downloaded the sample gRPC project(Java) and ran it where Single IP address (not a list of ips or cluster details) and port is configured in app config file.

What changes needs to be done if I wanted to communicate to the cluster not to a single machine ?

Kindly someone in the channel clarify me that,
1). how to communicate with cluster of nodes in gRPC rather than single IP from outside of the cluster?
2). In the sample gRPC example I can see few files in cert folder. What is the role of those files ? Is it ensures security ?

Knut Petter Meen
@kpmeen_gitlab
I have a question regarding Akka HTTP and WebSockets. I'm looking to add some form of rate limiting (e.g. bytes/sec) for incoming messages over a WebSocket. Will the akka-streams throttle function also apply back pressure to the client via the WebSocket? Or will the messages just accumulate on the server because the client isn't able to relate to the limit of the stream on the server?
3 replies
Ilya
@squadgazzz
Hello there! I have a question about Akka HTTP. Is it possible to set an akka.http.client.idle-timeout config when I use GrpcClientSettings.connectToServiceAt?
2 replies
Ori Dagan
@oridag
Hi, anyone knows when an alpaaka release with dependency on akka-http 10.2 is planned?
Enno
@ennru
You can pull up the Akka HTTP dependency yourself if you want to upgrade today. We'll likely open Alpakka for version 3.0.0 pull requests soon. Alpakka 3.0.0 will have Akka 2.6 and Akka HTTP 10.2 as minimal versions.
Patrik Nordwall
@patriknw
@anilkumble for example Kubernetes headless service for load balancing
@Tochemey Blocking gRPC calls? That sounds wrong. They return Future/CompletionStage. You can pipe the result back to the actor without blocking. https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#send-future-result-to-self
Arsene
@Tochemey
@patriknw Thanks for the reply.
Arsene
@Tochemey
@patriknw is it possible to use the PipeTo pattern in akka persistence?
Martin Zachrison
@cyberzac

How do I close a tcp socket connection from within the flow? In the snippet below:

Tcp().bind(interface, port).runForeach { connection => connection.handleWith(flow) }

How can I disconnect the connection inside the flow? The use case is that the client sends a close message and then the server should close the connection.

3 replies
anilkumble
@anilkumble

@anilkumble for example Kubernetes headless service for load balancing

@patriknw
Is this answer for the below question

1). how to communicate with cluster of nodes in gRPC rather than single IP from outside of the cluster?

Team, Have contacted lightbend via this. How will I be notified, through mail ? I wanted to know about the subscription plan.
Thanks
gavinbaumanis
@gavinbaumanis

I dont pretend to know anything I am talking about - so it might not even be possible... But I THINK I want to use the SMACK stack so that I can promise the following;
ALL messages MUST be sent
ALL messages MUST be sent in order
ALL Messages MUST be processed in order,
(messages canbe resent - but the order needs to be maintained. EG 1,2,3,4 - for some reason 3 has an issue -
1,2,4 got received - so we know 3 is missing. so resend 3, then 4,5,6 (It doesn;t matter that 4 is sent a second time after 3)

Can anyone help with a crude deisgn? That is what modules do I need to use?
akka-persistence / cluster / sharding etc. : Thanks very much

Derek Wickern
@dwickern

I'm trying to do something like this request/reply pattern, except from the side making the requests rather than serving them.
https://doc.akka.io/docs/alpakka/current/jms/consumer.html#request-reply

I post a request to a Sink, then wait for the response on a Source. I attach an ID to each request so that I can correlate the response to the original request. I guess at this point I'm looking to get an API like Request => Future[Response]. Any advice?

Enno
@ennru
@dwickern We have a PR that experiments with JMS Request/Reply, would be great if you'd like to take it further... akka/alpakka#2143
Derek Wickern
@dwickern
interesting
nathanccleung
@nathanccleung

@ennru Hi, currently i encountered an issue on connecting AWS Keyspaces(Cassandra) with Alpakka Cassandra("2.0.2"), Akka 2.6 typed and jdk 11. However I got warn like below, could you please let me know what i set wrong in configuration? Many Thanks.

Error while opening new channel (ConnectionInitException: [s0|id: 0xdc9f77c8, L:/172.29.225.48:58153 - R:18.141.148.50/18.141.148.50:9142] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.6.1, CLIENT_ID=275bbf27-2926-4330-8d08-45c80ef86fdf}): failed to send request (javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 18.141.148.50 found))

Configuration as below

datastax-java-driver {
  basic {
    contact-points = ["cassandra.ap-southeast-1.amazonaws.com:9142"]
    load-balancing-policy {
      class = DefaultLoadBalancingPolicy
      local-datacenter = "ap-southeast-1"
    }
  }
  advanced {
    reconnect-on-init = true
    connection {
      init-query-timeout = 10000 milliseconds
      control-connection.timeout = ${datastax-java-driver.advanced.connection.init-query-timeout}
      max-requests-per-connection = 32768
      max-requests-per-connection-remote = 2000
      pool {
        local.size = 1
        remote.size = 1
      }
    }
    auth-provider {
      class = PlainTextAuthProvider
      username = "user"
      password = "password"
    }
    ssl-engine-factory {
      class = DefaultSslEngineFactory
      truststore-path = "./src/main/resources/cassandra_truststore.jks"
      truststore-password = "truststore-password"
    }
  }
}
alpakka.cassandra {
  session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"
  service-discovery {
    name = ""
    lookup-timeout = 1 s
  }
  session-dispatcher = "akka.actor.default-dispatcher"
  datastax-java-driver-config = "datastax-java-driver"
}

Code

val sessionSettings = CassandraSessionSettings("alpakka.cassandra")
implicit val cassandraSession: CassandraSession = CassandraSessionRegistry.get(ctx.system).sessionFor(sessionSettings)
Source(Set(ddp))
        .via(CassandraFlow.create(CassandraWriteSettings.defaults, sql, statementBinder))
        .map (Some(_))
        .runWith(Sink.head)
Anton Kuroedov
@atk91
Hello everyone! I have an application which processes http requests and some of them cause quite large memory allocations. So If I just regulary make these allocations inside my routes, application will crash with OutOfMemoryError while processing too much concurrent requests. One possible solution that I see is to make n actors which will receive messages with task description, then process it, and send the result back. Since actor does its job in single thread and sequentially, total memory usage won't exceed n * maxMemoryUsagePerTask. Is this a "correct" approach or do I reinventing the wheel?
Patrik Nordwall
@patriknw
@gavinbaumanis take a look at Reliable Delivery https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html
Aaron Bossert
@abossert
I am new to Akka, so please be gentle...I am using 2.6.10 and attempting to use the functional design pattern as shown in the documentation and have a question about expensive operations. I have a stream of messages coming from another actor that need to be processed. The way I designed the analytic to run is that I instantiate an instance of my class (let's call it MyAnalytic) that then loads up a few lookups in the form of hashmaps and every time the calculate method is called, it spits out a MyAnalyticInstance that contains the result of the analytic calculation. I was using this in an Apache Storm topology before successfully by loading the analytic class up in its "prepare" method, so the only time the analytic class had to be loaded was one time when the bolt started up. I want to implement the same kind of workflow using Akka Actors and am not sure what the best practice is for this type of use case. Can an actor be re-used so that I can replicate the same behavior as I have in my Storm Topology? Is it better to pass a reference to the analytic class when spawning actors? Or is there a different/better way to do this?
Zachary Kagan
@zakagan
Hi! I have a question about redirect method and routes. I am trying to redirect to a signed url that provides access to a GCS bucket. However, the bucket path has a protected character in it (=) and the google's signed url method percent encodes it as "%3D" in the signed url. The problem is, the akka http redirect method decodes this back to "=", which doesn't match the signed url. Is there any way to prevent this encoding/decoding on redirects?
Roman
@rshirochenko
Hi! Can not find how to pass custom header (key->value with strings) for akka grpc stream
call. Seems that https://doc.akka.io/api/akka-grpc/1.0.2/akka/grpc/GrpcClientSettings.html there are no options for it. Found a way for non stream calls https://doc.akka.io/docs/akka-grpc/current/client/details.html#request-metadata, but I need something for stream. Any known workaround here?
2 replies
Zhenhao Li
@Zhen-hao

hi, I have now some concerns about this warning related to distributed data

Oct 15 17:54:00 server my-app[4702]: WARNING: An illegal reflective access operation has occurred
Oct 15 17:54:00 server my-app[4702]: WARNING: Illegal reflective access by org.lmdbjava.ByteBufferProxy$AbstractByteBufferProxy (file:/nix/store/qzr9x2nijszm2zana05m5m76hg3cdch9-my-app/lib/org.lmdbjava.lmdbjava-0.7.0.jar) to field java.nio.Buffer.address
Oct 15 17:54:00 server my-app[4702]: WARNING: Please consider reporting this to the maintainers of org.lmdbjava.ByteBufferProxy$AbstractByteBufferProxy
Oct 15 17:54:00 server my-app[4702]: WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
Oct 15 17:54:00 server my-app[4702]: WARNING: All illegal access operations will be denied in a future release

it seems that these are logged when a query is made against a ddtata key that has not been created. but I think it is very normal to have this use case. for example, I'm implementing a cache with ddata

9 replies
Dominik Dorn
@domdorn

Hi!

I'd like to register a new HTTP-VERB in my Akka-HTTP application (to be specific "KILL" to kill a specific entity actor).. my code looks like this:

  import akka.http.scaladsl.server.directives.MethodDirectives._
  def route(scenarioId: ScenarioId)(
    implicit correlationId: CorrelationId,
    userId: UserId) =
    concat(
      method(HttpMethod.custom("KILL", false, false, requestEntityAcceptance = Disallowed)) {
        complete(s"kill received for scenario $scenarioId")
      },
      method(HttpMethods.DELETE) {
        complete(s"kill received (through delete) for scenario $scenarioId")
      }
    )

while the request is matched when I'm using the DELETE verb, my own KILL verb is not recognized:

⇒  curl -XDELETE http://localhost:8091/api/v4/scenario/1/
kill received (through delete) for scenario ScenarioId(id=1)%                                                                                                                                                                               domdorn@MacBook-Pro:~/work/masterdata-service/cli⚡
⇒  curl -XKILL http://localhost:8091/api/v4/scenario/1/
Unsupported HTTP method%

I looked through the code and assume, I'd have to register my verb in the ObjectRegistry of HttpMethods, but access to that method is private, so I don't know how to do that.

Arsene
@Tochemey
Hello Akka geeks, when using PipeToSelf, what happened to the next message? Do I need to explicitly stash them? I need some education on this.
2 replies
Zachary Kagan
@zakagan
Hi, sorry for the spam. I added some additional detail to the percent decoding issue I'm facing on redirected uris as a stackoverflow question. If you have a chance can you take a look?
Igmar Palsenberg
@igmar

hi, I have now some concerns about this warning related to distributed data

Oct 15 17:54:00 server my-app[4702]: WARNING: An illegal reflective access operation has occurred
Oct 15 17:54:00 server my-app[4702]: WARNING: Illegal reflective access by org.lmdbjava.ByteBufferProxy$AbstractByteBufferProxy (file:/nix/store/qzr9x2nijszm2zana05m5m76hg3cdch9-my-app/lib/org.lmdbjava.lmdbjava-0.7.0.jar) to field java.nio.Buffer.address
Oct 15 17:54:00 server my-app[4702]: WARNING: Please consider reporting this to the maintainers of org.lmdbjava.ByteBufferProxy$AbstractByteBufferProxy
Oct 15 17:54:00 server my-app[4702]: WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
Oct 15 17:54:00 server my-app[4702]: WARNING: All illegal access operations will be denied in a future release

it seems that these are logged when a query is made against a ddtata key that has not been created. but I think it is very normal to have this use case. for example, I'm implementing a cache with ddata

Just ignore it. Its a warning, and will most likely be solved before it becomes a real issue.

brabo-hi
@brabo-hi
Hi, i have some akka persistent sharded actors that hold one single websocket connection each. I am running on the following issue: every time when new actor persistent is created (with a new web socket connection) all other connections are being dropped and close. Do you have idea on what could be the cause ?
Arnout Engelen
@raboof
Safta Catalin Mihai
@saftacatalinmihai
Hello. I have a ( maybe dumb ) question:
Is there a way to differentiate between a normal recovery of a persistent actor (after passivation lets say) and a recovery caused by using remember-entities=on after a crash from the RecoveryCompleted signal ?
sdas987
@sdas987
Alpakka SQS Source discards the Message Attributes set in the message which is put in the queue. Is there a workaround for it, thanks in advance.
1 reply
Patrik Nordwall
@patriknw
@saftacatalinmihai no difference. Load snapshot (if any) then replay events after that point.
@Tochemey other messages may be received while waithing for the future to complete.
anilkumble
@anilkumble
Hi, I am using default dispatcher,
If I create 100 actors by iterating a for loop.
How does thread will be allocated to an actor ?
Will each actor holds a thread ? Or threads will be shared among actors
1 reply
Jan Ypma
@jypma
@anilkumble Threads are shared amongst actors, and Akka's dispatcher will decide which actors get invoked on what thread, and how many messages they will process before moving on to the next actor.
Nikita Matveenko
@nikitapecasa

Hello, it looks like
contact-point-discovery.required-contact-point-nr is used only to form a new cluster, when joining existing cluster the config is ignored.
Is it expected behaviour?

There are two places in documentation, one makes the usage more clear, while another one might mislead a bit.

4 replies
Zhenhao Li
@Zhen-hao
hi, I have my own serializer like class AvroSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest
how can I write unit tests for it? how can I provide an ExtendedActorSystem?
4 replies
yangpei1010110
@yangpei1010110
hello
how to exit the room???
Adrian
@adrian-salajan
left of your profile picture there is button with 3 lines and circles (Room settings) > Leave
Salman Ahmed
@debugging
I created a akka-http app using the g8 template
doing: ~compile doesn't seem to react to my code changes...
any tips?
~run also doesn't work...making code changes doesn't reload when I refresh the browser
Daniel Cullender
@dcullender
Hi Akka gRPC team, is there a way to enable compression?
Daniel Cullender
@dcullender
On the server side
2 replies
Arsene
@Tochemey
Hello geeks. At times it is imperative to perform some bunch of async operations inside a persistent actor. Even though it is not recommended. Using the pipeToSelf and Effect.noReply I would like to know how best can one guarantee that the other incoming messages are properly stashed. Because while waiting for the async operation to complete other messages may be coming as well. Let us say my persistence actor is performing two or three async ops before an event is persisted. The easiest and craziest way it to just await on the future to complete that can be damaging or block that future using a different thread pool.