This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
Hi all, I'm trying to use a dependency that adds Kinesis KPL support to akka. It has a KPLFlow class to provide that support. I'm relativly new to akka and flows, but my objective would be to have a kafka source, that is already created and have some type of sink, to replace the "native" kinesis sink and use this flow to deliver the records. Is there a way to extract from the flow class this? Or is it possible with just the flow "consume" from the kafka source and deliver to a target stream ?
I've create a stackoverflow question regarding this https://stackoverflow.com/questions/73873966/akka-kafka-source-to-kinesis-sink-using-kpl
path("ws") {
val flowExternal: Flow[Message, Message, Any] = ???
val flowInternal: Flow[Message, Message, Any] = ???
Http().singleWebSocketRequest(WebSocketRequest("ws://127.0.0.1:port/internal"), flowInternal)
handleWebSocketMessages(flowExternal)
}
Akka-HTTP: I'm trying to code a graceful failure from basicAuthentication and have tried wrapping my call to basicAuthentication like so:
val routes = handleRejections(totallyMissingHandler) { concat (
(pathPrefix("admin")) {
authenticateBasic(realm = "secure site", myUserPassAuthenticator) { userName =>
welcomeuser(userName)
}
}, ....etc...
and have a rejection handler totallyMissingHandler
like so:
implicit def totallyMissingHandler = RejectionHandler.newBuilder()
.handleNotFound { complete(StatusCodes.NotFound, "Oh man, what you are looking for is long gone.") }
.handle { case AuthenticationFailedRejection(msg, _) => complete(StatusCodes.Unauthorized, msg.toString()) }
.result()
and when I go to the protected path (ex. localhost:8080/admin
) the .handle
code gets triggered IMMEDIATELY and not when my authentication fails. Can someone help me out here?
Connection: Close
header? I see akka-http handles that during service shutdown already.
I got tripped up yesterday by Source.future()
. The scaladoc states:
Send the single value of the Future when it completes and there is demand. If the future fails the stream is failed with that exception.
That's true, but if the future fails, the stream supervisor is never triggered and that caused an issue in my code. I'm just wondering if that's expected behavior and if that's documented anywhere.
hey everyone, I'm using Akka Persistence and Projections, but I've got a read-after-write problem that I'm looking for ideas on. For certain business/front end reasons, after a certain write where an event is persisted and the state is updated, I need to block until that event is read by the projection so that the read side is synchronized.
One way this was approached was to add the data we need as part of the state of the event sourced actor, and expose a command that gets the state of the actor, but that's less than ideal. Does anyone have other ideas on approaching this?
HTTP/1.1 400 Bad Request
connection: close
server: ResourceManagement
content-length: 124
X-Powered-By: Express
date: Wed, 16 Nov 2022 03:10:35 GMT
content-type: text/plain; charset=UTF-8
The query parameter 'size' was malformed:
'10<iframe src=javascript:alert(1953)>' is not a valid 32-bit signed integer value
onTransition(handler)
SubscribeTransitionCallBack
onTransition
, is the action executed sync or async? I.e.: can the onTransition
logic lag behind the state changes, or is onTransition
guaranteed to be executed before a possible next state change is triggered?INFO akka.actor.LocalActorRef - Message [akka.actor.ReceiveTimeout$] to Actor[akka://OVDRootSystem/system/IO-TCP/selectors/$a/8#-66797564
6] was not delivered. [1] dead letters encountered.
Instead of let akka retying I want to Nack the message from the queue and let the external redeveliver the message further to retry. I can't find how to properly handle this case. Any tips?
Dear hakkers,
We are happy to announce the 2.0.0-M1 release of Akka Diagnostics which is the continuation of Akka Enhancements. This release is the first milestone leading up to version 2.0.0 of Akka Diagnostics.
Akka Diagnostics 2.0.0-M1 contains:
Happy hakking!
– The Akka Team
Url
for my internal representation and frequently need to inspect their Path
to get at specific segments, but have found no good way of doing so. Since it has head
and tail
I was hoping there would be a simple way to turn it into List[String]
but so far no luck
Should be possible, see https://doc.akka.io/docs/alpakka/current/cassandra.html#custom-session-creation
Thx :)
Http(actorSystem).singleRequest(...)
(as described in https://doc.akka.io/docs/akka-http/10.4.0/client-side/request-level.html) but with HTTP/2.
Hi, The current Expand
accepts a val extrapolate: In => Iterator[Out]
,which will do nothing if the first element from upstream is not arrived. What if I want to inject elements when the upstream is slow, includes the first elements.
usage: If I use akka-stream to implement a lock-step stream, I will inject EmptyActionFrame
if there is no inputs.
splitAfter
and concatStreams
together?java.lang.IllegalArgumentException: Cannot push port (Split.out(1323563333)) twice, or before it being pulled
"spiltAfter" should {
"run" in {
val probe = Source(1 to 10)
.splitAfter(_ => true)
.concatSubstreams
.runWith(TestSink())
val results = probe.request(10).expectNextN(10)
probe.expectComplete()
results.size should be(10)
results should contain allElementsOf (1 to 10)
}
}
theRun
for future processing? eg. Effect
.none
.thenRun((newState: State) => {
context.pipeToSelf(futureProcessing) {
case Failure(exception) => InternalCommand.ResultProcess(message = exception.getMessage)
case Success(value) => InternalCommand.ResultProcess(message = value)
}
})
def atMostOnceSource[K, V]: Source[ConsumerRecord[K, V], NotUsed] = {
Consumer
.committableSource[K, V](consumerSettings, Subscriptions.topics(allTopics))
.groupedWithin(maxBatchSize, maxBatchDuration)
.mapAsync(1) { messages: Seq[CommittableMessage[K, V]] =>
val committableOffsetBatch =
CommittableOffsetBatch(messages.map(_.committableOffset))
Source
.single(committableOffsetBatch)
.toMat(Committer.sink(committerSettings))(Keep.right)
.run()
.map(_ => messages)
}
.mapConcat(identity)
}
synchronized{}
block. It seems pretty gross, but it should work. Are there any better options? To clarify, it would be something like:javaObj.synchronized {
javaObj.setSomeState();
javaObj.callSomeMethod();
javaObj.unsetState();
}
@itsarraj
Hello Everyone
Hello sir ,
I am new to open source contribution.
I already know java , my tech stacks & tools includes C, C++ , Python , Java, JavaScript , HTML , CSS , SQL , Bootstrap, ReactJS, ExpressJS, NodeJS & Git . I need a little help from your side to contribute to these amazing projects.
I need advice and help
Currently I am Final Year of my Engineering
mapResponseHeaders
but it only works when the inner route yields a RouteResult.Complete
. What I'm looking for is a way to add a header to any response via a directive without access to the RejectionHandler and ExceptionHandler in the outer route. With sealing the innerRoute I will change the exception behaviour from the outer route if it has a custom exception handler.