by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 19:18
    akka-ci commented #29061
  • 19:18
    akka-ci unlabeled #29061
  • 19:18
    akka-ci labeled #29061
  • 19:11
  • 19:11
    akka-ci unlabeled #29061
  • 19:11
    akka-ci labeled #29061
  • 19:10
    ignasi35 commented #29061
  • 19:08
    ignasi35 synchronize #29061
  • 16:47
    johanandren commented #29103
  • 16:11
    akka-ci commented #29103
  • 16:06
    akka-ci unlabeled #29103
  • 16:06
    akka-ci labeled #29103
  • 15:17
    johanandren closed #28976
  • 15:17
    johanandren milestoned #28976
  • 15:17

    johanandren on wip-28976-lastSequenceNumber-patriknw

    (compare)

  • 15:17

    johanandren on master

    fix wrong lastSequenceNumber, #… (compare)

  • 15:17
    johanandren closed #29128
  • 14:51
    akka-ci unlabeled #29103
  • 14:51
    akka-ci labeled #29103
  • 14:50
    patriknw commented #29131
Patrik Nordwall
@patriknw
@Tochemey i see that the documentation shows wrong groupId “com.typesafe.akka”, which should be “com.lightbend.akka”
1 reply
Shivraj Sinhg
@Shiv1791

Hello @channel, can anyone please help me
I am using these two sbt artifact "com.typesafe.akka" %% "akka-actor" % "2.6.5" and "com.typesafe.akka" %% "akka-remote" % "2.6.5" and the Scala latest version.
I am trying to create a simple remote application in akka, I have given the below configuration
MembersService {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
}

MemberServiceLookup {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
}
}

with scala code:
object MembersService extends App {
val config = ConfigFactory.load.getConfig("MembersService")
val system = ActorSystem("MembersService", config)
val worker = system.actorOf(Props[Worker], "remote-worker")
println(s"Worker actor path is ${worker.path}")
}

object MemberServiceLookup extends App {
val config = ConfigFactory.load.getConfig("MemberServiceLookup")
val system = ActorSystem("MemberServiceLookup", config)
val worker = system.actorSelection("akka.tcp://MembersService@127.0.0.1:2552/user/remote-worker")
worker ! Worker.Work("Hi Remote Actor")
}

Now when I run this it gives me an exception that address already in used ?
what should I do in that case ?
Khal!l
@redkhalil

I need some help regarding akka http. It's not timing out when connecting, and the connection attempt is taking 1-2 minutes despite me setting the akka.http.client.connecting-time to 3 second

Failed Request in 90090 ms
Tcp command [Connect(host:10191,None,List(),Some(3 seconds),true)] failed because of java.net.ConnectException: Connection refused

Anybody can advise on how to set a timeout for the connection? Using val akka 2.5.31 and akka http 10.1.11

Really appreciate if someone can help on this. Has anybody successfully configured the connection timeout?

lekan
@horlahlekhon
hi good day, is it possible to forward ask a message to an actor.. i.e forward an ask message
Igmar Palsenberg
@igmar
Sure, why not
lekan
@horlahlekhon
ok, thanks.
i am trying to watch an actor from a supervisor, but after watching with...context.watch(actorRef), context.children() always returns empty ... what could cause this... kindly help thanks
Oliver Schrenk
@oschrenk

How can I prove that FileIO is picking up my dispatcher?

I'm trying to change the underlying dispatcher of FileIO. The docs say that it is using akka.stream. blocking-io-dispatcher

akka.stream {
  blocking-io-dispatcher = "my-dispatcher"
}

my-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 10
  }
  throughput = 1
}

But just to be really sure, I also set it explicitly in code

  FileIO
    .fromPath(source.value)
    .withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
    .log(s"reading ${source.value}")
    .fold(ByteString.empty)(_ ++ _)
    .map(_.toArray)
    .map(ImmutableImage.loader().fromBytes)
}

But the log statements are still being written using the default dispatcher. Why?

12:01:47.624UTC DEBUG[default-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://default/system/Materializers/StreamSupervisor-0)] - [reading /Users/oliver/Downloads/e7lp1y1tuwt41.jpg] Element: [...]

Is the underlying file operation still done on my thread pool? Is logging just working on another threadpool?

How can I prove that the reading is done by my own threadpool?

Thomas
@thomasschoeftner

Hi, I have a question regarding an akka-streams Graph, I built around ZipLatest:
I was trying to build a graph for processing data from a SourceWithContext. The idea is to split the context from the actual data, process all data chunks and then zip the transformed data with the context of the last element on the stream (each with the same context):

      val broadcast = b.add(Broadcast[(Out, Ctx)](2))
      val zip = b.add(ZipLatest[Out2, Ctx])
      val getLastContextFlow = Flow[(Out, Ctx)].reduce((_, cur) => cur).map(_._2)

      broadcast.out(0).map(_._1) ~> processingFlow ~> zip.in0
      broadcast.out(1)           ~> getLastContextFlow  ~> zip.in1
      stream.FlowShape(broadcast.in, zip.out)

However, this does only work, if the processingFlowonly emits a single message.
I presume, this is because reducewill only emit when the upstream is completed, so the ZipLatestwill simply time-out.

Is there a more elegant way to achieve this?
I can't really funnel the context through the processingFlow because that's a library and it supports neither FlowWithContext, nor tuple-elements on the stream?

Any suggestions are welcome - thanks!

Vasily Kirichenko
@vasily-kirichenko
@thomasschoeftner I'm not sure, but maybe https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/expand.html could help.
1 reply
Vasily Kirichenko
@vasily-kirichenko

@thomasschoeftner I don't think it's possible as streams are infinite in general. So, in order to yield elements you need to traverse the stream to the end once, to get the last element's context, then traverse it again, attaching the found context to every element:

val source: SourceWithContext[Int, String, _] = Source(1 to 5).asSourceWithContext(x => s"$x ctx")

val resSource: Future[SourceWithContext[Int, String, _]] = 
  source.runWith(Sink.last).map(_._2).map { lastCtx =>
    source
      .map(_ + 10)
      .mapContext(_ => lastCtx)
  }

println(Await.result(resSource.flatMap(_.runWith(Sink.seq)), 5.seconds))

out: Vector((11,5 ctx), (12,5 ctx), (13,5 ctx), (14,5 ctx), (15,5 ctx))

But, honestly, it'd be easier just convert the stream to a Seq and travers it twice.

2 replies
chandrakeerthy
@chandrakeerthy
Lightbend announced opensourcing of Multi-DC persistence and split brain resolver. Has anyone have timeframe when it will be available.
Ignasi Marimon-Clos
@ignasi35
@chandrakeerthy the issue tracking the current sprint (started yestarday, lasts 3 weeks) includes the work required to move SBR to OSS. Not sure when the next Akka release will be but expect something within the next 3 or 4 weeks.
chandrakeerthy
@chandrakeerthy
@ignasi35 Thanks
eyal farago
@eyalfa
hi guys, I've got a question about streams and resource management, I'm implementing a system where streams depend on some resources. these resources needs to be closed once the stream completes (or fails) and more importantly can't be closed while the stream is still active. I've tried implementing this by relying on watcHTermination and futures composition, in addition I have several custom graph stages that communicate their completion by completing a promise in their postStop method.
long story short, it proved that watchTermination does not provide strong enough guarantees since the graph stage preceding it may still process events after watch termination matVal future completes. these events may either be postStop, on upstream/downstream finished/failed or even a push/pull on another port.
thinking about it seems as if what I rally need is a way to 'scope' parts of the graph which seems impossible with the current API, a compromise might be watching the actors interpreting my graph (even better if I could somehow map async islands to actors) - is this somehow possible via the materializer API?
yeryomenkom
@yeryomenkom
HI! is there a way to watch actor from outside of the actor system?
Ishan Shah
@nullptr7
My IDE is complaining about "Too many arguments for method parameters(ParameterDirectives.ParamMagnet)" error. I am currently using scalaVersion := "2.13.2" akka := "10.1.12", however it does run. But when I call the service I gives "Request is missing required query parameter 'query'"
parameters('query.?, 'operationName.?, 'variables.?) { (query, operationName, variables) =>
QueryParser.parse(query) match {
case Success(ast) =>
variables.map(parse) match {
case Some(Left(error)) => complete(BadRequest, formatError(error))
case Some(Right(json)) => executeGraphQL(executeFn, ast, operationName, json, tracing.isDefined, authToken)
case None => executeGraphQL(executeFn, ast, operationName, Json.obj(), tracing.isDefined, authToken)
}
case Failure(error) => complete(BadRequest, formatError(error))
}
}
Patrick Skjennum
@Habitats
any recommendations for an up2date video/talk for an intermediate/intro to akka http?
that doesn't just explain "actors and why actors"
Matt Oliver
@halfmatthalfcat
More about Akka Streams playing within Akka HTTP but a great talk by Heiko nonetheless
Patrick Skjennum
@Habitats
hey I actually attended that talk -- totally forgot. thanks!
Amit
@triami
Hello, I have a simple linear stream that has a kafka based source ("auto-offset-reset" : "latest"), the flow has a transformer and a sink, which writes these elements to a file system. The sink and the flow are custom graph stages. An external app is responsible for producing these elements on a trigger(web based). What is the right way to know when the consumption stream is fully initialized, I want to trigger the production of these elements when that happens? In my current implementation, I am scheduling that trigger on the 'prestart' of my custom sink graph stage with an artificial delay. Though that works for the most part, it is not deterministic and sometimes fires before the kafka consumer is fully initialized. Please let me know if there is a better way to do this.
Adriano Santos
@sleipnir
Just out of curiosity, is there any movement to use something from the Loom in the AKKA?
Igmar Palsenberg
@igmar
Akka hardly uses threads, so I wonder how useful it would be.
Adriano Santos
@sleipnir
Thanks for the answer @igmar . Sorry, but the fact of using "not many" or few threads (which is a relative and therefore unreliable measure) for using a reactive model does not mean that threads do not act and that any gains from improvements in this field should be discarded. But the question was more about whether there was any case study in that direction.
Igmar Palsenberg
@igmar
It uses 1 thread / core by default. I haven't seen any move in this direction.
Igmar Palsenberg
@igmar
Maybe @patriknw knows better
Adriano Santos
@sleipnir
I must be mistaken, I thought that AKKA dispatchers used threads and that they were configurable and that the default dispatcher used the forkJoinPool scheduler which is the base scheduler for the Loom project. Sorry for the mistake.
I thought it was possible to deploy the dispatcher using fibers from the Loom project
Igmar Palsenberg
@igmar
AFAIK, Akka uses a fixed thereadpool. One thread / core on the machine. And there is a threadpool for things like Futures
But that is a separate one, less under control of Akka.
Adriano Santos
@sleipnir
cool @igmar
Johan Andrén
@johanandren
@sleipnir we currently support the LTS versions of Java available right now, 8 and 11. Loom is still a work in progress all we can do is wait for it to land in a LTS that we support is follow it and think about how we can benefit.
Adriano Santos
@sleipnir
Great @johanandren thanks :D
Marc Rooding
@mrooding
Hi all, I've created a PR for akka-stream-alpakka-elasticsearch (akka/alpakka#2326) but until it's merged and released I'd like to build my own JAR. I've been able to build the alpakka jar but for some reason, when running it, it complains about the ApiVersion being compiled with JDK 13 and I'm trying to run it in a JDK 11 container. I'm not sure where this comes from since I don't have JDK 13 installed on my machine at all. Any clue what's going on here?
Enno
@ennru
ApiVersion is a Java-enum in Alpakka Elasticsearch. So it must be your machine that makes it JDK 13?!
Marc Rooding
@mrooding
Yeah that's my guess too but how, if the only JDK I have is 1.8. I think I've been able to fix it by adding some javac options in Common.scala but let's give it a test run first before I celebrate
Charles Hunt
@CharlesAHunt
nifty new tool for visualizing and managing Akka clusters: https://github.com/Headstorm/shukra
Sven Ludwig
@iosven
hi, I need some help. Having upgraded from Akka 2.5.27 to 2.5.30 and Akka HTTP from 10.1.11 to 10.1.12 in a small codebase that has a Web Socket route and two or three normal HTTP routes, I get in 3 unit tests akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$ was thrown. without any further information. I had no luck searching for this Exception on the net.
What could cause this?
The exception only occurs in tests of the Web Socket route.
Sven Ludwig
@iosven
I could not find the reason for this, I suspect it is due to some changes in 2.5 made for Akka 2.6 compatibility. For now I reverted back to 2.5.27 and 10.1.11
Sven Ludwig
@iosven
If anyone wants to try my case, just make an example of a WebSocket client as in https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#singlewebsocketrequest that sends 1 message to a Web Socket server and is supposed to receive like 10 or more messages from the server.
Johan Andrén
@johanandren
Please create an issue in the Akka HTTP issue tracker: https://github.com/akka/akka-http
The exception is part of the new support for distinguishing cancellation and failure coming upstream.
So a cancellation will be a NoMoreElementsNeeded internally, that shouldn’t bubble up to userland however, unless you have some homebuilt custom operators/graphstages.