Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
brightinnovator
@brightinnovator
@iosven Thank you so much....Thanks a lot for your kindness and time....
Juan Martinez
@reidrac_gitlab
Hello. I have made a service with Akka Streams that uses TCP with TLS. Would it be possible in some way to access the client cert information? I'm not an expert, but I guess what I want is the server session context for each session once the client has been connected. I only see Tcp.IncomingConnection.
19 replies
Yufei Cai
@yufei-cai

@iosven Dropping messages is not intended and not good for me. I believe I found the issue. Artery TCP uses Akka streams to send messages between cluster members. Messages to 1 actor are deserialized sequentially whereas messages to different actors are deserialized concurrently in up to 4 "lanes" by default. Single-threaded deserialization cannot take advantage of multicore CPU. It generates too little demand upstream. The sender queue becomes full, after which messages are dropped.

The relevant documentation is the comment for the config option outbound-message-queue-size.
https://doc.akka.io/docs/akka/current/general/configuration-reference.html#config-akka-remote-artery

1 reply
yan
@yanoziel
Hello, design question...
i have this requirement that i can have multiple auction actors, each actor can have a different set of items in it, but also when adding items to a given auction, i dont want duplicated items, lets say
auction1 (item1, item2), auction2(item3, item1), here item1 for auction2, shouldnt be allowed cuz its already present in auction1...
how to design it, this is just a poc on a local machine
1 reply
Mike Krumlauf
@mjkrumlauf
Hello, I have an Elasticsearch Alpakka question. I need to create an index and write documents, but I need dynamic templates to be applied when the index is created. Is that possible in Alpakka Elasticsearch, or do I need to drop back to a REST client and the "raw" Elasticsearch API to do so?
lccsetsun
@lccsetsun
123
有没有java akka demo 示例
Zhenhao Li
@Zhen-hao

hi, I'm running a very simple Akka stream job that doesn't involve Cassandra at all. but I still this in the logs:

[2021-07-19 10:16:28,095] [WARN] [com.datastax.oss.driver.internal.core.control.ControlConnection] [] [s0-admin-1] - [s0] Error connecting to Node(endPoint=/127.0.0.1:9042, hostId=null, hashCode=480e8ba1), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)) {}

why is that?

vikramdarsi
@vikramdarsi
Hi Akka Team, have a basic question on how to turn off warn logs, we have a three node Akka Cluster. when one of the node goes down, other two nodes are continuously writing below logs
2021-07-20T07:41:38,463 | WARN | opendaylight-cluster-data-akka.actor.default-dispatcher-15 | Materializer | 135 - com.typesafe.akka.slf4j - 2.5.26 | [outbound connection to [akka://opendaylight-cluster-data@10.1.23.87:2550], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(10.1.23.87:2550,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
2021-07-20T07:41:38,463 | WARN | opendaylight-cluster-data-akka.actor.default-dispatcher-15 | RestartWithBackoffFlow | 135 - com.typesafe.akka.slf4j - 2.5.26 | Restarting graph due to failure. stack_trace: (akka.stream.StreamTcpException: Tcp command [Connect(10.1.23.87:2550,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused)
2021-07-20T07:41:39,156 | WARN | opendaylight-cluster-data-akka.actor.default-dispatcher-15 | Materializer | 135 - com.typesafe.akka.slf4j - 2.5.26 | [outbound connection to [akka://opendaylight-cluster-data@10.1.23.87:2550], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(10.1.23.87:2550,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
vikramdarsi
@vikramdarsi
Could not find a way to disable these akka stream logs from akka.conf file and hence used log4j2 to disable them
Objektwerks
@objektwerks
Using Akka-Http and Intellij, I and others have been seeing the following false errors for the past 5 years:
1) cannot resolve overloaded method 'complete' ... 2) No implicit FromRequestUnmarshaller ... 3) No implicit marshalling.ToEntityMarshaller
These errors are not displayed via VSCode-Metals; nor does SBT report them.
So my question is: Does anyone on the Akka-Http team know why Jetbrains has been unable to resolve these false errors?
volgad
@volgad

hi All, I am running a set of JVM / akka systems on the same server. The different components communicate with each other via messages / remote actors or streams. Some of the components involve a fairly complex optimization tasks running every 5mn say, they will produce an output that is required and will flow through the system so in that sense they are somewhat blocking.

In order to isolate them, I am running the associated stream / src on actor having their own thread pool executor or fork dispatcher (I didn't notice / manage to het a significant difference when choosing one or the other).

My question is the following, at the moment each dipatcher is defined in the system config of its respective system but I am trying to use a "common dispatcher" that would be used by all the jvm. For example instead of defining 4 threadpool dispatcher in each system, would it be more efficient to define a 8 threapool dispatcher that could be shared. I don't know if this is possible / where the conf for the "shared dispatcher" would be defined since each system had its own config + where would I run the "shared" dispatcher.

Any comment / reference to an article or a blog for a similar problem would be super appreciated. Thank you in advance

15 replies
Youngoh Kim
@Kyocfe
akka version: 2.5.32
scalatest version: 3.0.8
I am trying to write test code for akka classic actor.
for the ask pattern I can assertion the value I expect, but for the tell pattern, how can I handle it in my test code?
are there any good examples of test codes that are actually used?
3 replies
José Luis Colomer Martorell
@beikern

Hello,

Akka version 2.6.9 and akka-lease-kubernetes 1.0.9

I'm trying to use k8s leases in cluster singleton and shards.

My application.conf has the following keys defined:

akka.cluster.sharding.use-lease = "akka.coordination.lease.kubernetes"
akka.cluster.singleton.use-lease = "akka.coordination.lease.kubernetes"

Both singleton and shards are classic (typed ones are not fixed until 2.6.11 and 2.6.15 respectively)

At bootstraping this error shows:

07:14:50.363 ERROR [akka.coordination.lease.scaladsl.LeaseProvider]   - Invalid lease configuration for 
leaseName [my-actor-system-singleton-akka://my-actor-system/system/sharding/myEntityCoordinator], 
configPath [akka.coordination.lease.kubernetes] lease-class [akka.coordination.lease.kubernetes.KubernetesLease]. 
The class must implement scaladsl.Lease or javadsl.Lease and have constructor with LeaseSettings parameter and 
optionally ActorSystem parameter.

That's a bit weird, because akka.coordination.lease.kubernetes.KubernetesLease has a secondary constructor with LeaseSettings and ExtendedActorSystem

  def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
    this(system, new AtomicBoolean(false), leaseSettings)

Then this error appears:

akka.actor.InvalidActorNameException: Invalid actor path element 
[kubernetesLease3-my-actor-system-singleton-akka://my-actor-system/system/sharding/myEntityCoordinator-my-actor-system@100.xxx.xxx.xxx:XXXXX], 
illegal character [/] at position: 48. Actor paths MUST: not start with `$`, include only ASCII letters and can only contain these 
special characters: -_.*$+:@&=,!~';.

Am I missing anything? maybe I have to define aditional keys in my application.conf?

Thanks in advance!

José Luis Colomer Martorell
@beikern

seems like val kubernetesLease = "com.lightbend.akka.management" %% "akka-lease-kubernetes" % 1.1.1 fixes the actor name problem.

But I believe that the 63 characters truncation is bitting me (makeDNS1039Compatible method) , generating name collisions . Well, it's better than before tough :)

José Luis Colomer Martorell
@beikern
akka/akka-management#910 Oh, there's an issue describing the problem.
Hope it helps others :)
James Phillips
@jdrphillips
Is there a way in akka-http to easily turn a Route and an HttpRequest into an HttpResponse? Without using the testkit and without starting a server up. Starting a server not bound to a port could be a solution but I didn't spot a way of doing that
1 reply
Adesh Atole
@AdeshAtole

Hello people. Would anyone recommend Akka for developing CRUD apps at scale?

Just a background about myself, I have been developing in Spring for last 2 years, and now came across Akka. I am trying to understand what kind of usecases can Akka be a perfect fit for? I went through some docs and other material, but didn't find any which addresses my concern of usecase fit. I am not able to comprehend where it can be used. Please help me in understanding the usecases it can cater to in some real world prod app.

14 replies
Objektwerks
@objektwerks
Does anyone on the Akka-Http team know why Jetbrains has been unable to resolve false Akka-Http DSL errors? Sad to see no comment by Lightbend on my above post and question. Despite multiple bugs filed on this issue, Jetbrains has not resolved this issue. The productivity hit is such that using Intellij is virtually unusable, except for big code refactoring efforts. Metals is no Intellij; but, at least, it doesn't falsely report Akka-Http DSL errors. I've enjoyed using Akka-Http for many years, going back to its predecessor, Spray. But, perhaps, it's time for a change, at least from a coding sanity perspective. Umm.... I will say no more. All the best to the Akka community, going forward. Cheers!
Raymond Te Hok
@rtehok

Hi there, I am looking for some help about this (issue)[https://stackoverflow.com/questions/68376263/akka-stream-check-source-and-upload] I have.

I managed to produce this code

    val source: Source[akka.util.ByteString, Any] = ???

    val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
      S3.multipartUpload(bucket = "csvfiles", key = s"${user.id.value}/$dataSetId.csv")

    val firstRowsSink: Sink[ByteString, Future[Seq[String]]] =
      Flow[akka.util.ByteString]
        .via(Framing.delimiter(akka.util.ByteString.fromString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))
        .map(_.utf8String)
        .filterNot(_ == "")
        .take(10)
        .toMat(Sink.seq)(Keep.right)

    val (eventualUpload: Future[MultipartUploadResult], eventualFirstRows: Future[Seq[String]]) =
      RunnableGraph
        .fromGraph(GraphDSL.create(s3Sink, firstRowsSink)(Tuple2.apply) {
          implicit builder =>
            (s3S, firstRowsS) =>
              import GraphDSL.Implicits._
              val broadcast = builder.add(Broadcast[ByteString](2, eagerCancel = false))
              source ~> broadcast.in
              broadcast.out(0) ~> Flow[ByteString].async ~> s3S
              broadcast.out(1) ~> Flow[ByteString].async ~> firstRowsS
              ClosedShape
        })
        .run()

    val result = for {
      seq <- eventualFirstRows
      fileMetaData <- extractMetadata(seq)
      projectId <- addProject(fileMetaData)
    } yield (projectId, fileMetaData)

    onComplete(result) {
      case Success((projectId, fileMetaData)) =>
        Future(
          eventualUpload.onComplete {
            case Failure(exception) =>
              logError(exception.getStackTrace.mkString("\n"))
            case Success(_) =>
              logDebug(s"File uploaded (name: $fileName)")
          }
        )
        genericCreatedWithJsonContent(projectId.toJsonResponse)
      case Failure(e) =>
        internalServerError(Some(s"Error while uploading the file $fileName"))
    }

While I am receiving the projectId early in the HTTP response (as expected), I get a akka.stream.SubscriptionWithCancelException$StageWasCompleted$ and the uploaded file to S3 has ~1000 rows (on a 10k+ rows file). Am I doing something wrong (maybe in the Future)? Why would the second broadcasted stream being cancelled?
Thank you for your help and input.

2 replies
brightinnovator
@brightinnovator
what are the open source tool is available to monitor/go through kafka messages ?
SeymurMammadrza
@SeymurMammadrza

Hi people. I am a newbie (intern) scala developer coming from java. Our new project is using Akka and I have really fundamental problems to resolve some issues. I'm preparing a demo project based on Akka Digital platform guide to learn somethings better. However, I don't know how to store my data coming from GRPC in State of Actor. I need it for altering in some points . Adding, Removing or updating a field of task are my goals. Given below, I have a "task" coming from .proto and "summary" to keep these tasks in the state. What is the best practice to keep tasks ? With map I could only save id and name of tasks but I want to keep them as objects. By that way, I could reach its fields to change. Sorry for my long post, I really couldn't find the way through research on internet and it has already been some days..

`message Task {
string taskId = 1;
string name = 2;
Category category = 3;
Priority priority = 4;
}

enum Category{
UNKNOWN_CATEGORY = 0;
LEISURE = 1;
WORK = 2;
CHORE = 3;
OTHER = 4;
}

enum Priority{
UNKNOWN_PRIORITY = 0;
LOW = 1;
MIDDLE = 2;
HIGH = 3;
}

message TodoList{
string todoListId = 1;
repeated Task tasks = 2;
}`

final case class Summary( tasks: Map[String,String], finished: Boolean) extends CborSerializable

Nathan Fischer
@nrktkt:matrix.org
[m]
why can you only save the id and name in a map?
case class Task(id: String, name: String, category: Category, priority: Priority)
Map[String, Task]
?
razertory
@razertory
Hey guys, is Akka a good way of implementing online code interview ?
Sean Farrow
@SeanFarrow
Hi all, are there any examples of asynchronously downloading a file usingAkka http? The file is very large and is compressed in the seven zip format, any help appreciated.
Johannes Rudolph
@jrudolph
@/all we are happy to announce the latest Akka HTTP release 10.2.5, see https://akka.io/blog/news/2021/07/27/akka-http-10.2.5-released for more information
SeymurMammadrza
@SeymurMammadrza
Dear @nrktkt:matrix.org, I kept id out of task entity and mapped with task. Thank you for your time and help. 🙂
volgad
@volgad
Hi All, I am looking for a reference / article / blog post on how to handle a TimeoutException from a remote Actor. I checked already that every message is handled properly and this doesnt seem to be issue. It seems more related to thread starvation. I tried to change the dispatcher from forkJoin to a dedicated dispatcher but this doesnt seem to have solved the issue. From akka doc, my understanding is that the next step is to try recovering my stream / supervise the actor with retry but is there any approach specific to Timeouts? Any reference would be appreciated. Thank you!
9 replies
I can also do a recover on the resulty of the future of ask or askWithStatus but I was looking for something more akka centric
sebarys
@sebarys

Hello everyone,
I have a question about akka-http logRequestResult directive. I've added to it function like below to log on debug successful responses, on info error responses and rejections:

  private def requestBasicInfoAndResponseStatus(req: HttpRequest): RouteResult => Option[LogEntry] = {
    case RouteResult.Complete(res) if res.status.isFailure() => Some(
      LogEntry(s"Request ${req.method} to ${req.uri} resulted in failure with status ${res.status}", Logging.InfoLevel)
    )
    case RouteResult.Complete(res) => Some(
      LogEntry(s"Request ${req.method} to ${req.uri} resulted in response with status ${res.status}", Logging.DebugLevel)
    )
    case RouteResult.Rejected(rejections) => Some(
      LogEntry(s"Request ${req.method} to ${req.uri} was rejected with rejections: $rejections", Logging.InfoLevel)
    )
  }

in logs I see very often

Request HttpMethod(POST/GET/PATCH) to http://URL/PATH was rejected with rejections: List()

I've read in docu that empty rejection list could be related to no routing for given path, but that is not the case - provided (path, method) is covered by Route. Is there any way to determine why it happens?

Carlos De Los Angeles
@cdelosangeles
Hello everyone! I have been dealing with a problem with akka cluster since last week. The general problem is that once the cluster is up and running, if I take down the primary seed (main seed for starting the cluster), when I restart the node it cannot rejoin the cluster. Essentially creating a split brain with one node inside. I see the other nodes receiving and successfully replying back to requests from the main seed node to join within milliseconds of receiving the request. I just never see the main node processing the response. This cluster has been running for a couple of years without issues and we just recently increased the number of nodes from 20 to 30 in two DCs for a total of 60 nodes. It is multi-dc but only for meta data transfer. A request received in one DC will never be processed in the other DC. Has anyone experienced something similar?
1 reply
Carlos De Los Angeles
@cdelosangeles
One more note on the question above. Other nodes are able to leave and join the cluster without issue.
Max
@maxstreese
Hi everyon, me again, the guy asking unlimited questions. This time it's Akka Streams's turn: I have a TCP connection. Everything is working fine. But I am wondering if there is a developed pattern for the following: The API I connect to offers compression which one can enable and disable. This means that ideally I would like to have some decompression stage in the flow that can be turned on and off. Is there any Akka Streams pattern for this? Currently I handle both the delimiting of messages as well as the decompression in my connection handler actor. But I would like to move this part to the stream if possible.
Carlos De Los Angeles
@cdelosangeles
Hello Max. How would you determine if a message needs to be decompressed or not? Message parameter/type? Or is it purely on/off for all messages?
7 replies
Stephen Hogan
@hogiyogi597
I am trying to serve my Swagger yaml using the getResourceFromDirectory route but it isn’t working. I have split my Swagger yaml into multple yaml files that are referenced by the main yaml file. However, when my server is loading the index.html, it can load the initial yaml file perfectly but then when it requests the referenced yamls it is failing to marshal them saying that the content type is incorrect. Has anyone had any experience working with Swagger (and not using the Swagger Akka libraries)?
1 reply
Quynh Anh "Emma" Nguyen
@qaemma

Hi, after recent akka version update, I start seeing this error akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$. How can I handle it?

Looking at https://doc.akka.io/api/akka/current/akka/stream/Attributes$$CancellationStrategy$.html, it seems to be a new behavior for 2.6.x. The default is PropagateFailure, which is not configurable, has this documentation:

Strategy that treats cancelStage in different ways depending on the cause that was given to the cancellation.
If the cause was a regular, active cancellation (SubscriptionWithCancelException.NoMoreElementsNeeded), the stage receiving this cancellation is completed regularly.
If another cause was given, this is treated as an error and the behavior is the same as with failStage.

So it seems it shouldn't raise this error to user level. What am I missing? I have tried with 2.6.10, 2.6.15 etc after seeing this akka/akka#30071 but it doesn't work in my case.

4 replies
Todor Kolev
@todor-kolev
Hi, is there a way to generate openapi spec from akka http routes directly without using annotations or I need to use a wrapper for that (tapir, endpoints4s etc)? Thanks!
federico cocco
@federico.cocco_gitlab
Hello. is somebody able to explain me the recover method for Akka Streams? My understanding is that instead of catching the error and transforming it, it will also close the streams. Is there another way i can catch an upstream error, transform it to continue the stream? Thanks
Shayne Koestler
@skoestler
Hello all, I'm using akka typed cluster sharding and I'm curious if there's a way to watch/supervise remote actors?
8 replies
sebarys
@sebarys

I've read in docu that empty rejection list could be related to no routing for given path, but that is not the case - provided (path, method) is covered by Route. Is there any way to determine why it happens?

is anyone able to help with such case, or at last redirect me where I could describe this issue?

Domantas Petrauskas
@ptrdom
Hi, does Akka Projection have a similar thing to Lagom ReadSideProcessor globalPrepare? The only thing I found is an alternative to prepare in HandlerLifecycle start. Looking at how the API is constructed, this could also be the responsibility of ShardedDaemonProcess, but I haven't found anything relevant there too.
constantOut
@constantOut
Hi everyone, could you please tell me about internals of new actor creation ?
I was assigned to a legacy project which has single actor that is responsible for creating actor per connection (we don't use Akka HTTP, we use another web framework) I have an impression that this actor is a "bottle neck", as sometimes we have spikes in new connections. Memory dump shows that we have more open sockets than worker actors (created per connection).
springdev2022
@springdev2022
which is the best open source reporting tool for java application which has open source designer as well?
Youngoh Kim
@Kyocfe
I changed the classic actor to use a lot of ask pattern through Await.result, could this be a problem?
If we want to replace this format, how should we develop it? The code is too readable to just use the tell pattern.
How can I improve this without performance issues? Currently, I think there is a container down issue due to cpu rise after the server has been used for a few days because of this part.
2 replies