This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
Hi Akka experts, I have a question regarding akka-http server. I am trying to handle a request in one of my route and if I do not pass any Content-type
in the request by default akka is setting it to application/octet-stream
.
Please find my implementation below: -
package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import scala.io.StdIn
object HttpServerContentTypeTest {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem(Behaviors.empty, "RandomNumbers")
implicit val executionContext = actorSystem.executionContext
val route = path("proxy"){
Route { context =>
println(context.request.entity.contentType)
context.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, ""))
}
}
val bindingFuture = Http().newServerAt("localhost", 9000).bind(route)
// press any key to stop the server
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => actorSystem.terminate())
}
}
I tested this via Postman(please find screenshot attached showing the headers and the body used). I am trying to send a post request with non-empty request body. Also I am not setting Content-type
header.
I want that akka-http server internally does not set header Content-type
. Is there a way to disable this default behavior?
EventSourcedBehavior
and I am wondering if there is some functionality that let's me run stuff after the behavior is restored? Or are there any best practices for this? So far the only one I can come up with is to have the behavior send itself a command which, due to how this restore works, will only be handled by the actor after the restore has finished, right?
Hello,
Is there a way to connect a Sink to a Source using the value produced by the Sink to build the Source element ?
Note, I already saw fromSinkAndSource
but AFAIK, we can't use the element from 'sink side' to build the element of 'source side'
Example:
We are using a library which provides a Sink.
We would like to use to Output of the Sink to perform actions such as DB calls and then return the DB call as a Source.
Thanks
replication_factor
for keyspace akka_projection
have value 1
? isn't 2
safer?
Hey guys, I got a question about batching I cannot understand. Consider the below code:
msgSource
.wireTap(_ => Metrics.consumed.inc())
.via(ActorFlow.ask(ref)(TopSecretSingleFastMessage)(Timeout(3.seconds)))
.batch(5000, fst => List(fst))((s, msg) => s.prepended(msg)) <- this always creates batches of size 1
// .groupedWithin(5000, 5.seconds) <- this would be fast
.wireTap(msgs => Metrics.batched.set(msgs.size)) // either constantly 5000 or dynamic in case of batch (ideally...)
.via(ActorFlow.ask(ref)(TopSecretMultiSlowMessage)(Timeout(3.seconds)))
.runWith(commitSink)
.onComplete {
case Success(_) => ref ! Shutdown("Completed")
case Failure(e) => ref ! Shutdown("Failed", Some(e))
}
Focusing on the commented out stage and the one above: When I run the above with the batch
stage, I get batch sizes of 1 and according to my metrics have a throughput of 400 msgs/sec. When I use the groupedWithin
stage instead, I get a throughput of 50.000 msgs/sec. As the msgSource
represents some Kafka topic which has a sizeable number of messages, there is really no limit in terms of throughput from that side. My question is this: If with groupedWithin
I see a throughput of 50.000 msgs/sec and with batch
I see a throughput of only 400 msgs/sec, then why doesn't batch increase the batch size up from 1? I mean clearly my stages below the aggregation stage (be that batch
or groupedWithin
are slower than upstream can provide, right? How come batch
doesn't... batch?
Hi,
when replying a chunked response (entity of type Source[ByteString, _]
),
what might cause the downstream to end early with:
akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null
context:
My source is basically Source.queue[ByteString]
.
I grab the materialized value of the queue itself using a promise:
val p = Promise[SourceQueueWithComplete[ByteString]]()
val s = Source.queue[ByteString](size, , OverflowStrategy.backpressure).mapMaterializedValue[NotUsed]{q => p.success(q) ; NotUsed}
p.future.foreach(doSomethingThatPushToQueue)
HttpResponse(entity = s, …)
Any ideas?
preMaterialize
?
[_, Cancellable]
. How could I convert this to type of [_, NotUsed]
without materializing the stream?
NoMoreElementsNeeded
, it seems like the stream created from your source is cancelled before you call doSomethingThatPushToQueue
context.self.narrow[A|B]
is the answer
isInstanceOf[ActorContext[...]]
works. It compiles. I'll see if it works...
@bblfish:matrix.org If all you want to do is send an A|B
to yourself, I'd probably do something like
def someUtility(sender: (A|B) => Unit): Unit = {
sender(SomeAOrBMessage)
}
Behaviors.setup[A|B|C] { context =>
someUtility(context.self.tell(_))
}
(No idea whether or not I've completely mangled Scala 3 syntax there...)
asInstanceOf
works for the moment, which is allowing me to stabilise my code a bit. I'll look at that idea a bit later, and take these warnings into account :-) (I only use asInstanceOf as a measure of last resort...)
Hi, I have code that I think silently fails and this is my suspect since status
is deprecated and always set to Success(Done)
:
def upload(settings: SftpSettings, path: String, data: ByteString): Future[Either[Throwable, ByteString]] =
Source.single(data).runWith(Sftp.toPath(path, settings)).map(_.status.toEither.map(_ => data))
Does this mean that even exception cases will be mapped to Success(Done)
? Anyway, what is idiomatic way to handle error? Using recover
?