This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
Hello everyone,
I have a question about akka-http logRequestResult
directive. I've added to it function like below to log on debug successful responses, on info error responses and rejections:
private def requestBasicInfoAndResponseStatus(req: HttpRequest): RouteResult => Option[LogEntry] = {
case RouteResult.Complete(res) if res.status.isFailure() => Some(
LogEntry(s"Request ${req.method} to ${req.uri} resulted in failure with status ${res.status}", Logging.InfoLevel)
)
case RouteResult.Complete(res) => Some(
LogEntry(s"Request ${req.method} to ${req.uri} resulted in response with status ${res.status}", Logging.DebugLevel)
)
case RouteResult.Rejected(rejections) => Some(
LogEntry(s"Request ${req.method} to ${req.uri} was rejected with rejections: $rejections", Logging.InfoLevel)
)
}
in logs I see very often
Request HttpMethod(POST/GET/PATCH) to http://URL/PATH was rejected with rejections: List()
I've read in docu that empty rejection list could be related to no routing for given path, but that is not the case - provided (path, method) is covered by Route. Is there any way to determine why it happens?
Hi, after recent akka version update, I start seeing this error akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$
. How can I handle it?
Looking at https://doc.akka.io/api/akka/current/akka/stream/Attributes$$CancellationStrategy$.html, it seems to be a new behavior for 2.6.x. The default is PropagateFailure
, which is not configurable, has this documentation:
Strategy that treats cancelStage in different ways depending on the cause that was given to the cancellation.
If the cause was a regular, active cancellation (SubscriptionWithCancelException.NoMoreElementsNeeded), the stage receiving this cancellation is completed regularly.
If another cause was given, this is treated as an error and the behavior is the same as with failStage.
So it seems it shouldn't raise this error to user level. What am I missing? I have tried with 2.6.10, 2.6.15 etc after seeing this akka/akka#30071 but it doesn't work in my case.
recover
method for Akka Streams? My understanding is that instead of catching the error and transforming it, it will also close the streams. Is there another way i can catch an upstream error, transform it to continue the stream? Thanks
I've read in docu that empty rejection list could be related to no routing for given path, but that is not the case - provided (path, method) is covered by Route. Is there any way to determine why it happens?
is anyone able to help with such case, or at last redirect me where I could describe this issue?
ReadSideProcessor
globalPrepare
? The only thing I found is an alternative to prepare
in HandlerLifecycle
start
. Looking at how the API is constructed, this could also be the responsibility of ShardedDaemonProcess
, but I haven't found anything relevant there too.
Hello here,
From my understanding, by default, Akka completes a stream operator on a UpstreamFinish
I would like to know how to keep an operator alive after upstream completion ? Do I have to create my own GraphStage
to override the method onUpstreamFinish()
?
My code currently looks like:
val someFuture: Future[MyObject]
val longlifeSink: Sink[MyObject, Future[Done]]
Source
.fromFuture(someFuture)
.wireTap(longlifeSink)
.runWith(Sink.head)
From example above, I would like my longlifeSink
to continue even after the UpstreamFinish
, how to do that ?
Note I am asking the question as I would like to avoid having to create a GraphStage ...
Hello @raboof:matrix.org ,
The idea is that the longlifeSink
is a quite heavy operator.
But when the Source
emit the value from the Future, it then emit the onUpstreamFinish, and it looks like it closes the ongoing wireTap
before it actually complete.
I dont wan't it to remain alive, I want it to run all the code of the wireTap
Sink, and then closes the operator.
Am I missing something ?
Hello @leviramsey ,
As I said earlier, reusing the Sink is not what I am trying to achieve, I want it to run completly before it get closed by the default onUpstreamFinish
which calls (by default) completeStage
// in fact a method arg
val wuid: String = ...
// method body
Flow
.setup { case (mat, _) =>
implicit val materializer: ActorMaterializer = mat
implicit val classicSystem: classic.ActorSystem = mat.system
val workspaceDb = ReactiveDatabase(classicSystem.toTyped).forWorkspace(wuid)
Flow[Urn]
.flatMapConcat { urn =>
val dbReadQuery: org.reactivestreams.Publisher[Metadata] = ...
Source
.fromPublisher(dbReadQuery)
.map(replaceValueOf(urn, _)) // some changes in the Metadata itself, but anyway ...
.grouped(20)
.async
.zipWithIndex // just trick to get the batch number available for logs
.flatMapConcat { case (metaSeq, index) =>
val dbWriteQuery = ...
Source
.fromPublisher(dbWriteQuery)
.log(s"test", extractBatchLog(urn, index))
}
}
}
.mapMaterializedValue(_ => NotUsed)
.toMat(Sink.ignore)(Keep.right)
}
alsoTo
, until it works hehe
Hello, has anyone thought about implementing additional "zombie fencing" mechanism for akka-cluster+sharding applications using in akka idiomatic way without some external linearizable system like (zookeeper or etcd). SBR doesn't give 100% guarantee against multiple writes (specifically for zombie processes) existing at the same time. I understand that preventing concurrent writes can be done on the db level for some dbs that support serializable or higher consistency level which is not the case for most dbs our there.
My best idea right now is to have custom crdt (MVRegister) to detect concurrent writes. I recently implemented exactly that for my project. Although this approach might work well for some applications, it still have drawbacks. Mainly the fact that even with sharding that gives us single writer principle all writes have to go through the ddata layer but we only take advantage of that during network partitions.
I wonder if anyone's had other ideas about how it can be achieved in akka idiomatic way.
When I connect to WildFly 10 I am getting below error...
[root@3gq4 jmxquery-1.3]# ./check_jmx -U service:jmx:rmi:///jndi/rmi://194.135.92.51:9990/jmxrmi -O java.lang:type=Memory -A HeapMemoryUsage -K used -I HeapMemoryUsage -J used -vvvv -w 731847066 -c 1045495808 -username admin -password password
JMX CRITICAL - Failed to retrieve RMIServer stub: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out] connecting to java.lang:type=Memory by URL service:jmx:rmi:///jndi/rmi://194.135.92.51:9990/jmxrmijava.io.IOException: Failed to retrieve RMIServer stub: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out]
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:270)
at jmxquery.DefaultJMXProvider.getConnector(DefaultJMXProvider.java:17)
at jmxquery.JMXQuery.connect(JMXQuery.java:91)
at jmxquery.JMXQuery.runCommand(JMXQuery.java:68)
at jmxquery.JMXQuery.main(JMXQuery.java:114)
Caused by: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out]
at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:136)
at com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:205)
at javax.naming.InitialContext.lookup(InitialContext.java:417)
at javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1955)
at javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1922)
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:287)
... 5 more
Caused by: java.rmi.ConnectIOException: error during JRMP connection establishment; nested exception is:
java.net.SocketTimeoutException: Read timed out
at sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:307)
at sun.rmi.transport.tcp.TCPChannel.newConnection(TCPChannel.java:202)
at sun.rmi.server.UnicastRef.newCall(UnicastRef.java:343)
at sun.rmi.registry.RegistryImpl_Stub.lookup(RegistryImpl_Stub.java:116)
at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:132)
... 10 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:246)
... 14 more
Hi, I have 2 questions about the split-brain resolver.
When a cluster node is downed by another node, it logs "SBR took decision DownSelfQuarantinedByRemote" many times. One node logged it 18 times before going down (Cluster size was 15). Is that expected behavior?
A node experienced about 4-10s of stop-the-world garbage collection. It marked a subset of the cluster "unreachable" after waking up because it did not process their messages on time. After which an SBR leader took the decision "DownIndirectlyConnected" and downed the garbage-collecting node as well as all other nodes marked unreachable by it. Is that supposed to happen?
Hi!
I need to run 1000+ socket clients at several nodes. Each client listens to unique endpoint.
https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html
I plan to represent each client as actor with supervision, recovery, event handling e.t.c. and hope that akka cluster will
What else should I expect?
For describing a sink which is constructed based on the first element of the stream, one would previously use Sink.lazyInit
, which is of course now deprecated. The deprecation message suggests Flow.prefixAndTail(1)
with Sink.lazyFutureSink
in place of lazyInit
, but it seems that this better matches the intent:
def logic: Source[ByteString, NotUsed] = ???
def mySink(id: ByteString): Sink[ByteString, Future[Done]] = ???
val finished = logic.prefixAndTail(1).to {
Sink.fromMaterializer { (mat, att) =>
val donePromise = Promise[Done]()
Sink.foreach {
case (prefix, tail) =>
prefix.headOption match {
case None => donePromise.success(Done) // could also fail it, I guess, if that's one's preference
case Some(head) =>
donePromise.completeWith(tail.runWith(mySink(head).addAttributes(att))(mat))
}
}
.addAttributes(att)
.mapMaterializedValue(_ => donePromise.future)
}
}
I for one don't see an apparent way with prefixAndTail(1).to(Sink.lazyFutureSink(...))
to have the Future[Sink]
depend on the element emitted by prefixAndTail
.
ActorContext[_].scheduleOnce(...)
inside the CommandHandler
of an EventSourcedBehavior
: How do you pass the Cancellable
around? Can I do that somehow as part of the persistent actor or do I need to resort to some other non-persistent actor to do the scheduling? In a non-persistent actor this form of scheduling and cancellation of a message to be send from an actor to itself is no problem but in the persistent case I am currently stuck.