Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
samwi
@samwi:matrix.org
[m]
DeathPactException by the parent because supervision failed.
Jesse
@JessePreiner
Heyhey, we've used testProbe.expectMessage(StatusReply.ack()) successfully, but testing errors doesn't seem to be working, ie testProbe.expectMessage(StatusReply.error("Error message")). The toStrings appear to be the same between expected/actual, but their equality is not
4 replies
anilkumble
@anilkumble
Why cluster client is not advised to build new applications?
As far as I know, it will create a tight coupling between the client and cluster systems. Is any other reason other than this?
Say If I am the only person building client and cluster systems then it won't be an issue right?
12 replies
anilkumble
@anilkumble
How to set trusted = /certs/ca.pem in GrpcClientSettings via code ?
How to disable security check in both gRPC server and client
Igor Baltiyskiy
@baltiyskiy
Why isn't outbound control connection restarted immediately when new incarnation appears? I observe that outbound connection is only restarted 15 minutes after.
14 replies
Johan Andrén
@johanandren
anilkumble
@anilkumble
public class Server1 {
  final ActorSystem system;
  public Server1(ActorSystem system) {
    this.system = system;
  }

  public static void main(String[] args) throws Exception {

    ActorSystem system = ActorSystem.create("Cluster", getConfig("127.0.0.1"));

    List<Address> seedList = new ArrayList<Address>();
    seedList.add(new Address("akka.tcp","Cluster", "127.0.0.1", 4001));
    seedList.add(new Address("akka.tcp","Cluster", "127.0.0.1", 4002));
    seedList.add(new Address("akka.tcp","Cluster", "127.0.0.1", 4003));
    seedList.add(new Address("akka.tcp","Cluster", "127.0.0.1", 4004));
    seedList.add(new Address("akka.tcp","Cluster", "127.0.0.1", 4005));
    Cluster.get(system).joinSeedNodes(seedList);

    new Server1(system).run();
  }

  private static Config getConfig(String ipAddress) throws Exception {
    Config appConfig = ConfigFactory.parseFile(new File("/Users/kumble-8174/Downloads/akka-grpc-quickstart-java/src/main/resources/application2.conf"));
    appConfig = ConfigFactory.parseString(
            String.format("akka.actor.provider=%s%n", "cluster") +                                                  
                    String.format("akka.http.server.preview.enable-http2=%s%n", "on") +                                                    
                    String.format("akka.remote.artery.canonical.hostname=%s%n", ipAddress) +                                                    
                    String.format("akka.remote.artery.canonical.port=%s%n", 4001)  ))
    ).withFallback(appConfig);
    return appConfig;
  }

  public CompletionStage<ServerBinding> run() throws Exception {

    Function<HttpRequest, CompletionStage<HttpResponse>> service = GreeterServiceHandlerFactory.create(new GreeterServiceImpl(system), system);
    CompletionStage<ServerBinding> bound = Http.get(system).bindAndHandleAsync(service, ConnectWithHttps.toHostHttps("127.0.0.1", 5001)
            .withCustomHttpsContext(serverHttpContext()), SystemMaterializer.get(system).materializer());
    bound.thenAccept(binding -> System.out.println("gRPC server bound to: " + binding.localAddress()));
    return bound;
  }

Above is my code. gRPC port is 5001 and Actorsystem is running on port 4001
Similarly, I have 3 more servers in which the gRPC ports are 5002,5003,5004 and ActorSystem ports are 4002,4003,4004

But this is not forming a cluster. Kindly point out if any mistakes in the code.
I have tried sending a request from grpc client. grpc server receives it and executed as expected

anilkumble
@anilkumble
Team,
what is the difference betweeen akka.remote.netty.tcp.hostname and akka.remote.artery.canonical.hostname
anilkumble
@anilkumble
which protocal will be used to communicate in grpc client and server?
3 replies
Sciss
@Sciss

Hi there. http question. I manage to upload a file using

        Multipart.FormData.fromFile(
          name        = "file",
          contentType = MediaTypes.`application/octet-stream`,
          file        = f,
          chunkSize   = 100000, // critical for performance?
        )

but how can I add other form fields? for example I need to add "description" -> "bla". I used JSON instead of form data before, but I guess I can't use that when I use a multi-part file upload. Can I add the missing fields somehow?

anilkumble
@anilkumble
What is the meaning of the below statement in Akka gRPC?
When a server handles several services the handlers must be combined with akka.grpc.javadsl.ServiceHandler.concatOrNotFound:
1 reply
Stan Chen
@stancsz

Hi, I got a question regarding the DirectoryChangesSource source, I'm trying to stream changes from the file paths and would like to stream it to the downstream. however the subfolder directory is not detected.
i.e.

stan@stan-ryzenrig:~/github/akka-stream-processor/fs:$ touch detected.md
stan@stan-ryzenrig:~/github/akka-stream-processor/fs:$ 
>> Path: fs:/detected.md, Change: Creation
Path: fs:/detected.md, Change: Modification

^ this works

stan@stan-ryzenrig:~/github/akka-stream-processor/fs:$ cd status\=unprocessed/
stan@stan-ryzenrig:~/github/akka-stream-processor/fs:/status=unprocessed$ touch undetected.md
stan@stan-ryzenrig:~/github/akka-stream-processor/fs:/status=unprocessed$ 
>>

no detection of the file changes

anyone has experience with file dir based source directory streaming?

anilkumble
@anilkumble
How service discovery works in akka grpc client ?
Is this similiar to client side load balancer ?
2 replies
mjsmith707
@mjsmith707
Can anyone confirm system.actorSelection works with classic actors still on 2.6.13? I have some multi-jvm tests (and other single jvm also) which fail after upgrading from akka 2.5. I've printed out the tree and confirmed the path is correct (via EAS.printTree) yet it throws ActorNotFound. These are also akka.testkit.TestActors so I'm not sure if that matters now
mjsmith707
@mjsmith707
Disregard the above. Something else is happening I think
anilkumble
@anilkumble
What is mean stateful and stateless actors ?
Frederick Roth
@froth
Hi, I have a question alpakka/file and akka-stream graphdsl. I have a Broadcast[ByteString] which I want to put into Archive.zip. However Archive.zip needs a Source[ByteString], can I somehow convert the Broadcast (actual type: Outlet[ByteString]) to a Source, so that I can use Archive.zip?
30 replies
Christof Nolle
@nolledge

Hi guys, I have an issue implementing an AuthenticationDirective[Identity]inside a trait. To be more precise the signature looks a lot like this:

trait AuthDirectives extends LazyLogging {

  def authenticate(implicit authService: AuthService): AuthenticationDirective[Identity] = ???
}

When extending the trait and using the authenticate like this with the implicit in scope:

    authenticate { _ => complete(StatusCodes.OK) }

It won't compile saying "missing parameter type". I guess what is happening is that the compiler assumes I want to provide the authService explicitly. What I actually want is my implicit being applied implicitly and than call the apply of the resulting AuthenticationDirective. It works when I am a little more precise of what I want:

    authenticate.apply { _ => complete(StatusCodes.OK) }

But I would like to get rid of the apply. Is there a way around that?

5 replies
Philipp Renerig
@renphi
Hey folks, I have a question regarding akka http since the update to 10.2.4 (from 10.1.12) the akka http server throws illegal-request-target (Illegal request-target: Invalid input '[', expected query-char, pct-encoded or 'EOI' (line 1, column 65))
when I send this URI: http://localhost:3000/Products?params=%7B%22from%22:0,%22size%22:32,%22attributes%22:[],%22active%22:true%7D with 10.1.12 this wasn't an issue, any ideas? Thanks a lot for any hints...
Zhenhao Li
@Zhen-hao
hi, I don't want to bother with setting up a query side DB for now. what's the easiest way to read all events of an actor? all I need to do is apply a fold function to get what I want. In my opinion, queries that can easily be expressed in a fold function don't need to dedicate read server, unless for performance optimization
4 replies
lekan
@horlahlekhon
Hi, is there a way to correlate response with request using websocket. like the way we use http request with context, i.e Http().cachedHostConnectionPoolHttps[UUID]("www.google.com") where UUID will be unique for every request and one can correlate request to its response.
OlafurD
@OlafurD

Hello

Im defining a readiness check for a clustered akka application, and i'm programatically accessing the HealthChecks object defined in akka.management. According to the rolling update docs (https://doc.akka.io/docs/akka/current/additional/rolling-updates.html#cluster-sharding) the readiness check should not return ready until sharding has been initalized on the node. However, it looks like the implementation is using akka.management.cluster.scaladsl.ClusterMembershipCheck which returns ready if the status of the node is up or weakly-up.

Am I misunderstanding something? Shouldn't there be a different check than the node status to see if sharding has been initialized, or does a node status of up or weakly-up imply that sharding has been initialized?

anilkumble
@anilkumble
Team, may I know the reason for the warnings
19:55:04.566 WARN  sharding-akka.actor.default-dispatcher-27 akka.tcp://sharding@127.0.0.1:2552/system/sharding/entity - entity: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
19:55:04.566 INFO  sharding-akka.actor.default-dispatcher-25 akka://sharding/deadLetters - Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://sharding/system/sharding/entity#-930908833] to Actor[akka://sharding/deadLetters] was not delivered. [40] dead letters encountered. If this is not an expected behavior, then [Actor[akka://sharding/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
19:55:06.566 WARN  sharding-akka.actor.default-dispatcher-2 akka.tcp://sharding@127.0.0.1:2552/system/sharding/entity - entity: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
19:55:06.567 INFO  sharding-akka.actor.default-dispatcher-25 akka://sharding/deadLetters - Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://sharding/system/sharding/entity#-930908833] to Actor[akka://sharding/deadLetters] was not delivered. [41] dead letters encountered. If this is not an expected behavior, then [Actor[akka://sharding/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Derek P. Moore
@derekm

Hi all! @oNouguier_twitter is presenting about his Pravega Alpakka connector tomorrow at our monthly Pravega.io meetup: https://community.cncf.io/events/details/cncf-pravega-community-presents-pravega-community-meeting-1/

Feel free to join us tomorrow, Friday, April 16th, at 7 AM Pacific time to learn more about Olivier's work with Akka Streams and Pravega!

Frederick Roth
@froth
Hey, I have a question concerning akka typed and using an AbstractBehavior as the root Behavior of the ActorSystem. When I do so, the first message I send to the ActorSystem gets swallowed. I am not sure if this is a bug or I am doing someting wrong. I implemented a minimal example by using the akka/akka-scala-seed.g8 template: https://github.com/froth/akka-typed-question/blob/main/src/main/scala/com/github/froth/AkkaQuickstart.scala.
Frederick Roth
@froth
In Line 77 and 78 I send two messages to the ActorSystem. But only the second triggers the Actor
1 reply
Nikita Glushchenko
@NPCRUS
Hi guys, i bump into problem while writing tests for typed actors: someActor.ask(ref => Message(ref)). Is there any way to retrieve or wildcard this ref while writing tests, i need to mention that this call is happening somewhere from akka-http route calback
VarunVats9
@VarunVats9
Hi All, I'm looking for some solution on Akka Stream, where I want to make N parallel Rest API, and once all gives back the result, need to merge all those into one event, and then push that to Kafka.
4 replies
Louis FRULEUX
@bluesheeptoken

Hello guys, I might be missing something completely obvious... Using akka-http:10.2.4, I cannot create a Directive1 from parameter with .optional (NameOptionReceptacle for the full name)

import akka.http.scaladsl.server.Directives._

object Main extends App {
  parameter("name".?) // Does not compile, it probably cannot resolve the def forNOR[T] because there is probably a `FSOU` instance missing
}

I feel I am missing an obvious import, but I cannot see which one :/

Casper Koning
@CasperKoning

For an Akka Streams Flow where we use throttle to model an external API's rate limit,

Flow[ApiRequest]
  // Throttle the number of requests we can send to the external API
  .throttle(
    elements = requestsPerSecond,
    per = 1.second,
    maximumBurst = requestsPerSecond,
    mode = ThrottleMode.Shaping
  )
  // Execute the API request
  .mapAsync(parallelism) { request =>
    httpClient.executeRequest(request)
  }

How would one go about testing this?

For example, I'd like a test where one publishes 10 ApiRequests to the Flow, and verifies that only 2 make it through on the other end of the Flow every second, provided that requestsPerSecond = 2.

6 replies
bblfish
@bblfish:matrix.org
[m]
IS there a quick way to pretty print an HTTPMessage for documentation so that one gets what is sent on the wire?
bblfish
@bblfish:matrix.org
[m]
Something like this is a bit longer than I hoped for, but it will do
  extension (msg: HttpMessage)
    def documented: String =
      val l: List[String] = msg match
        case req: HttpRequest =>
          import req._
          s"${method.value} $uri ${protocol.value}" :: {
            for {h <- headers.toList} yield s"${h.name}: ${h.value}"
          }
        case res: HttpResponse =>
          import res._
          protocol.value +" "+ status.value :: {
            for {h <- headers.toList } yield s"${h.name}: ${h.value}"
          }
      import msg._
      val ct = entity match
        case HttpEntity.Empty => List()
        case e: HttpEntity if e.contentType != ContentTypes.NoContentType =>
          ("Content-Type: "+e.contentType) ::
          e.contentLengthOption.map("Content-Length: "+_).toList
      (l ::: ct).mkString("\n")
Gabriele Favaretto
@gabriele83
Hi, what is the best way to distribute 10MB/20MB actor's state (sharded actor in a cluster), to thousands of shard actors in the cluster?
  • I have excluded DData for this reason: "you cannot have too large data entries, because then the remote message size will be too large"
  • I excluded Replicated Event Sourcing because I am using a kb8 custer with autoscaling and I cannot fulfill the following constraint: "All replicas need to be known up front"
29 replies
reggiemurphy
@reggiemurphy

Hi, is there any information out there on the performance of persistent actors with jdbc? We ran a test to get the max throughput of persisting events in our system and the results are lower than we expected. We are persisting events synchronously and when we record the time taken to persist a single event like below we are getting an average time of 9ms.

val startTime = Instant.now()
Effect.persist(SomeEvent).thenRun(_ => recordTime(Instant.now() - startTime)

We are using a MySQL database and the akka-persistence-jdbc plugin. We do serialize the events with avro but the 50th percentile time for this is only 0.8ms. Are there any more metrics we can get here around the persistence time? Would 9ms be expected for the time taken to persist one event?

We use HikariCP for the database connection pool and looking at the metrics the average connection usage in the thread pool is approximately 3ms so a breakdown of what is happening would really help.
Swoorup Joshi
@Swoorup
anyone here using cinnamon?
1 reply
pruthvi578
@pruthvi578
I get empty data while reading response entity sometimes. But able to manually get information using curl. Is there an error handling mechanism to ensure that we receive data from response entity in akka http client?
NOUGUIER Olivier
@oNouguier_twitter

Hello guys, I might be missing something completely obvious... Using akka-http:10.2.4, I cannot create a Directive1 from parameter with .optional (NameOptionReceptacle for the full name)

import akka.http.scaladsl.server.Directives._

object Main extends App {
  parameter("name".?) // Does not compile, it probably cannot resolve the def forNOR[T] because there is probably a `FSOU` instance missing
}

I feel I am missing an obvious import, but I cannot see which one :/

:-/ weird ... I never had issue with that, would you mind to share your conf / compilation error ?

raboof
@raboof:matrix.org
[m]
@pruthvi578: did you check the status code of the response entity?
volgad
@volgad
Hi All, would anybody have any experience / feedback running an akka system / akka cluster on Epyc or threadripper based servers? The context is in a data stream platform with services communicating with each other. Would this be very dependent onm fine tunning the dispatchers used by each actor / akka stream system? The server would also run kafka / mongo + some python services that consumer / send back to kafka and that akka can also receive as a stream
4 replies
Would the many cores be useful for this or I am better looking for more raw speed with less cores? Any feedback or reference would be super appreciated
Thanks a lot
Shubham Girdhar
@girdharshubham
Hi!
Is there a way to get a website's certificate information using Scala?
I was trying out Akka Http Client side api but couldn't make much progress there
Avshalom Orenstein
@avshRepo
Hi can someone explain how alfakka works behind the scenes? Meaning if I have topic with 3 partitions does it open new stream for each one partition?
Sven Ludwig
@iosven
Hi, is it possible to run a Spark program (with Spark Context and Spark Session) from within an Akka Streams recipe for each element that passes a certain stage, let's say a mapAsync stage with parallelism set to 1?
@avshRepo You probably are asking about alpakka-kafka? There is a separate gitter channel for it. You can subscribe to a whole topic consuming the partitions in parallel, or you can subscribe to a certain partition, I think. So it depends.
Andrey Ladniy
@AndreyLadniy

I need unusual akka-persistence behavior.

I want to use the database as the state of the persistent actor. The state of this object can be infinitely large, it is possible that it may not fit in memory, and to make a decision, you only need an object from this state and it is important to maintain the chronological order of creating and changing these objects (therefore, it is impossible to put them in separate actors).

Therefore, I came up with the idea to create a state table in the database, update it after writing the event in the 'thenRun' method, in case of failure to update this table, stop the actor. In the process of restoring the author's state based on the number of the last used message number in the table update process, also perform an update if there were messages, but there was no updates.

I think with this usage scheme, the database will have exactly the same state as in the case of the classic state in memory

To check commands, use query to this table.

Is it possible to use the persistent actor in this way?

7 replies
Andrey Ladniy
@AndreyLadniy
In fact, I have a simple task. There is a command "push object Y after object X". And if there is such "object X", the event "object Y pushed" is persisted (so ever after "object Z" can be pushed after "object Y" ). So it is important to maintain the chronological order of events, but the number of objects can be very large.