Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
Zhenhao Li
@Zhen-hao
@hochgi I think you have to read through https://doc.akka.io/docs/akka-http/current/common/marshalling.html. I know it is painful...
3 replies
Cyril Cheneson
@ccheneson
Hi, FYI, I have browsed through the online doc of akka-stream and for Sink.combine, it seems there is a formatting problem
https://doc.akka.io/docs/akka/current/stream/operators/Sink/combine.html . The previous and next page looks fine.
1 reply
Aditya Maheshwari
@adityamundra
Screenshot 2021-06-14 at 12.53.56 PM.png
Screenshot 2021-06-14 at 12.54.07 PM.png

Hi Akka experts, I have a question regarding akka-http server. I am trying to handle a request in one of my route and if I do not pass any Content-type in the request by default akka is setting it to application/octet-stream.

Please find my implementation below: -

package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import scala.io.StdIn
object HttpServerContentTypeTest {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem(Behaviors.empty, "RandomNumbers")
    implicit val executionContext = actorSystem.executionContext
    val route = path("proxy"){
      Route { context =>
        println(context.request.entity.contentType)
        context.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, ""))
      }
    }
    val bindingFuture = Http().newServerAt("localhost", 9000).bind(route)
    // press any key to stop the server
    StdIn.readLine()
    bindingFuture.flatMap(_.unbind()).onComplete(_ => actorSystem.terminate())
  }
}

I tested this via Postman(please find screenshot attached showing the headers and the body used). I am trying to send a post request with non-empty request body. Also I am not setting Content-type header.

I want that akka-http server internally does not set header Content-type. Is there a way to disable this default behavior?

12 replies
Igmar Palsenberg
@igmar
Is there any way to get a decent errors when materializing an JMS Source ?
2 replies
Zhenhao Li
@Zhen-hao
I have a question about this documentation page https://doc.akka.io/docs/akka-projection/current/running.html#tagging-events-in-eventsourcedbehavior
it says "must tag the events with a slice number". but with the Cassandra plugin already put tagged events into buckets. it seems users don't need to slice the tags for performance reasons. but how can the buckets be used for paralism in Akka projection?
5 replies
Zhenhao Li
@Zhen-hao
I find it confusing that Envelope can have both unserialized message/event and serialized one as payload in the Akka codebase. Wouldn't using separate Envelope types be safer and easier to understand?
Zhenhao Li
@Zhen-hao
I mean, for things like private def serializeAndDeserialize(envelope: Envelope): Envelope, readers really have to read every line in its definition to understand what's happening
Max
@maxstreese
Hi everyone! Hope this isn't too silly a question but I am working with an EventSourcedBehavior and I am wondering if there is some functionality that let's me run stuff after the behavior is restored? Or are there any best practices for this? So far the only one I can come up with is to have the behavior send itself a command which, due to how this restore works, will only be handled by the actor after the restore has finished, right?
Odd Möller
@odd
1 reply
Zhenhao Li
@Zhen-hao
does Akka projection has to run in the same actor system as Akka persistence?
6 replies
Gaël Ferrachat
@gael-ft

Hello,

Is there a way to connect a Sink to a Source using the value produced by the Sink to build the Source element ?

Note, I already saw fromSinkAndSource but AFAIK, we can't use the element from 'sink side' to build the element of 'source side'

Example:
We are using a library which provides a Sink.
We would like to use to Output of the Sink to perform actions such as DB calls and then return the DB call as a Source.

Thanks

3 replies
Zhenhao Li
@Zhen-hao
In this documentation page https://doc.akka.io/docs/akka-projection/current/getting-started/running.html
why does replication_factor for keyspace akka_projection have value 1? isn't 2 safer?
3 replies
Max
@maxstreese

Hey guys, I got a question about batching I cannot understand. Consider the below code:

msgSource
  .wireTap(_ => Metrics.consumed.inc())
  .via(ActorFlow.ask(ref)(TopSecretSingleFastMessage)(Timeout(3.seconds)))
  .batch(5000, fst => List(fst))((s, msg) => s.prepended(msg)) <- this always creates batches of size 1
  // .groupedWithin(5000, 5.seconds) <- this would be fast
  .wireTap(msgs => Metrics.batched.set(msgs.size)) // either constantly 5000 or dynamic in case of batch (ideally...)
  .via(ActorFlow.ask(ref)(TopSecretMultiSlowMessage)(Timeout(3.seconds)))
  .runWith(commitSink)
  .onComplete {
    case Success(_) => ref ! Shutdown("Completed")
    case Failure(e) => ref ! Shutdown("Failed", Some(e))
  }

Focusing on the commented out stage and the one above: When I run the above with the batch stage, I get batch sizes of 1 and according to my metrics have a throughput of 400 msgs/sec. When I use the groupedWithin stage instead, I get a throughput of 50.000 msgs/sec. As the msgSource represents some Kafka topic which has a sizeable number of messages, there is really no limit in terms of throughput from that side. My question is this: If with groupedWithin I see a throughput of 50.000 msgs/sec and with batch I see a throughput of only 400 msgs/sec, then why doesn't batch increase the batch size up from 1? I mean clearly my stages below the aggregation stage (be that batch or groupedWithin are slower than upstream can provide, right? How come batch doesn't... batch?

12 replies
Nathan Fischer
@nrktkt:matrix.org
[m]
any alpakka maintainers on here?
would this be the right channel to talk about alpakka dev?
Gilad Hoch
@hochgi

Hi,
when replying a chunked response (entity of type Source[ByteString, _]),
what might cause the downstream to end early with:

akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null

context:
My source is basically Source.queue[ByteString].
I grab the materialized value of the queue itself using a promise:

val p = Promise[SourceQueueWithComplete[ByteString]]()
val s = Source.queue[ByteString](size, , OverflowStrategy.backpressure).mapMaterializedValue[NotUsed]{q => p.success(q) ; NotUsed}
p.future.foreach(doSomethingThatPushToQueue)
HttpResponse(entity = s, …)

Any ideas?

Nathan Fischer
@nrktkt:matrix.org
[m]
I'm not sure what's causing that error off the top of my head, but is there a reason you need to use a promise instead of something like preMaterialize?
Gilad Hoch
@hochgi
I'm not aware of preMaterialize. Thanks for the tip, I'll educate myself on this and see if this solves my problem.
Gilad Hoch
@hochgi
@nrktkt:matrix.org It does simplifies my code, and saves an async boundary (so thank you!), but the problem still remains :(
4 replies
Matias Partanen
@mpartan
def streamSomething: Source[Something, Cancellable] = {
    Source
      .tick(0.second, 1.second, NotUsed)
      .mapAsync(1) { _ =>
        doSomething
      }
}

override def streamSomething(in: Request): Source[Response, NotUsed] = {
  ...
}
I'm using the following structure with Akka GRPC / ScalaPB gRPC web. Source.tick returns Source that is of type [_, Cancellable]. How could I convert this to type of [_, NotUsed] without materializing the stream?
6 replies
Nathan Fischer
@nrktkt:matrix.org
[m]
looking at usages of NoMoreElementsNeeded, it seems like the stream created from your source is cancelled before you call doSomethingThatPushToQueue
Swoorup Joshi
@Swoorup
is there a dynamic version of this?
5 replies
I want the usage to be like runnableGraph.run() to get a new source
bblfish
@bblfish:matrix.org
[m]
Can one narrow an ActorContext[A|B|C] to an ActorContet[A|B] ? (In Scala3)
I have a function that needs the narrower context, and so I thought it would be better to code for that.
especially as the function/method is going to be called in two different places where the narrower context is needed.
bblfish
@bblfish:matrix.org
[m]
Oh, asking the question helped me find the answer on Google. context.self.narrow[A|B] is the answer
bblfish
@bblfish:matrix.org
[m]
Ah, no that does not work, that alllows me to narrow the actorRef not the context.
Swoorup Joshi
@Swoorup
why do you want to do that? @bblfish:matrix.org
isn’t context pretty much just used within the scope where you create it?
bblfish
@bblfish:matrix.org
[m]
as I said, I have a function that needs the context to send a message to itself, and that is called in two different places.
It may be that just isInstanceOf[ActorContext[...]] works. It compiles. I'll see if it works...
Swoorup Joshi
@Swoorup
use a partial function?
bblfish
@bblfish:matrix.org
[m]
I'll need to clean up later ...
Swoorup Joshi
@Swoorup
Behaviors.receiveMessagePartial {
  case Down if remaining == 1 =>
    notifyWhenZero.tell(Done)
    zero
  case Down =>
    counter(remaining - 1)
}
bblfish
@bblfish:matrix.org
[m]
Mhh. I'll have to think of modelling it like that...
Swoorup Joshi
@Swoorup
the context would have all the possible message the behaviour takes
you use narrow to limit visibility of internal messages to consumers of that actorref.
Nathan Fischer
@nrktkt:matrix.org
[m]
see if you can get a stack trace from that on the second usage of the stream
Zhenhao Li
@Zhen-hao
hi, what is the offset in EventEnvelope for? is it needed only by the query/read side? namely, not needed for actor state recovery
3 replies
Swoorup Joshi
@Swoorup
anybody rolls out their own ES ontop of akka? I find the default ES api bit too verbosy.
2 replies
Hamed Nourhani
@hnourhani
Hi guys , i have a problem with Akka testkit , i have logic and want to know if an actor not receives an specific message after receiving the initial message ,
but when i use expectNoMsg , tests fail and i receive this error message :
```
java.lang.AssertionError: assertion failed: received unexpected message RealMessage
Levi Ramsey
@leviramsey

@bblfish:matrix.org If all you want to do is send an A|B to yourself, I'd probably do something like

def someUtility(sender: (A|B) => Unit): Unit = {
  sender(SomeAOrBMessage)
}

Behaviors.setup[A|B|C] { context =>
  someUtility(context.self.tell(_))
}

(No idea whether or not I've completely mangled Scala 3 syntax there...)

1 reply
bblfish
@bblfish:matrix.org
[m]
Thanks. The asInstanceOf works for the moment, which is allowing me to stabilise my code a bit. I'll look at that idea a bit later, and take these warnings into account :-) (I only use asInstanceOf as a measure of last resort...)
Blaž Marinović
@bmarinovic

Hi, I have code that I think silently fails and this is my suspect since status is deprecated and always set to Success(Done):

def upload(settings: SftpSettings, path: String, data: ByteString): Future[Either[Throwable, ByteString]] =
    Source.single(data).runWith(Sftp.toPath(path, settings)).map(_.status.toEither.map(_ => data))

Does this mean that even exception cases will be mapped to Success(Done)? Anyway, what is idiomatic way to handle error? Using recover?

2 replies