This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
EventSourcedBehavior
and I am wondering if there is some functionality that let's me run stuff after the behavior is restored? Or are there any best practices for this? So far the only one I can come up with is to have the behavior send itself a command which, due to how this restore works, will only be handled by the actor after the restore has finished, right?
Hello,
Is there a way to connect a Sink to a Source using the value produced by the Sink to build the Source element ?
Note, I already saw fromSinkAndSource
but AFAIK, we can't use the element from 'sink side' to build the element of 'source side'
Example:
We are using a library which provides a Sink.
We would like to use to Output of the Sink to perform actions such as DB calls and then return the DB call as a Source.
Thanks
replication_factor
for keyspace akka_projection
have value 1
? isn't 2
safer?
Hey guys, I got a question about batching I cannot understand. Consider the below code:
msgSource
.wireTap(_ => Metrics.consumed.inc())
.via(ActorFlow.ask(ref)(TopSecretSingleFastMessage)(Timeout(3.seconds)))
.batch(5000, fst => List(fst))((s, msg) => s.prepended(msg)) <- this always creates batches of size 1
// .groupedWithin(5000, 5.seconds) <- this would be fast
.wireTap(msgs => Metrics.batched.set(msgs.size)) // either constantly 5000 or dynamic in case of batch (ideally...)
.via(ActorFlow.ask(ref)(TopSecretMultiSlowMessage)(Timeout(3.seconds)))
.runWith(commitSink)
.onComplete {
case Success(_) => ref ! Shutdown("Completed")
case Failure(e) => ref ! Shutdown("Failed", Some(e))
}
Focusing on the commented out stage and the one above: When I run the above with the batch
stage, I get batch sizes of 1 and according to my metrics have a throughput of 400 msgs/sec. When I use the groupedWithin
stage instead, I get a throughput of 50.000 msgs/sec. As the msgSource
represents some Kafka topic which has a sizeable number of messages, there is really no limit in terms of throughput from that side. My question is this: If with groupedWithin
I see a throughput of 50.000 msgs/sec and with batch
I see a throughput of only 400 msgs/sec, then why doesn't batch increase the batch size up from 1? I mean clearly my stages below the aggregation stage (be that batch
or groupedWithin
are slower than upstream can provide, right? How come batch
doesn't... batch?
Hi,
when replying a chunked response (entity of type Source[ByteString, _]
),
what might cause the downstream to end early with:
akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null
context:
My source is basically Source.queue[ByteString]
.
I grab the materialized value of the queue itself using a promise:
val p = Promise[SourceQueueWithComplete[ByteString]]()
val s = Source.queue[ByteString](size, , OverflowStrategy.backpressure).mapMaterializedValue[NotUsed]{q => p.success(q) ; NotUsed}
p.future.foreach(doSomethingThatPushToQueue)
HttpResponse(entity = s, …)
Any ideas?
preMaterialize
?
[_, Cancellable]
. How could I convert this to type of [_, NotUsed]
without materializing the stream?
NoMoreElementsNeeded
, it seems like the stream created from your source is cancelled before you call doSomethingThatPushToQueue
context.self.narrow[A|B]
is the answer
isInstanceOf[ActorContext[...]]
works. It compiles. I'll see if it works...
@bblfish:matrix.org If all you want to do is send an A|B
to yourself, I'd probably do something like
def someUtility(sender: (A|B) => Unit): Unit = {
sender(SomeAOrBMessage)
}
Behaviors.setup[A|B|C] { context =>
someUtility(context.self.tell(_))
}
(No idea whether or not I've completely mangled Scala 3 syntax there...)
asInstanceOf
works for the moment, which is allowing me to stabilise my code a bit. I'll look at that idea a bit later, and take these warnings into account :-) (I only use asInstanceOf as a measure of last resort...)
Hi, I have code that I think silently fails and this is my suspect since status
is deprecated and always set to Success(Done)
:
def upload(settings: SftpSettings, path: String, data: ByteString): Future[Either[Throwable, ByteString]] =
Source.single(data).runWith(Sftp.toPath(path, settings)).map(_.status.toEither.map(_ => data))
Does this mean that even exception cases will be mapped to Success(Done)
? Anyway, what is idiomatic way to handle error? Using recover
?
Hi team, I am trying to use Akka Extension. I have seen several examples where the companion object of the class extends the ExtensionId
but in this case the createExtension
function can only use the ActorSystem to create an instance of a class.
For cases where variables other than ActorSystem is need, I am trying as below but with this, the extension is registered multiple times.
trait BFactory {
protected def configKeyPath: String
}
case class A(client: String, config: Configuration) extends Extension
case class B(override val configKeyPath: String) extends ExtensionId[A] with BFactory {
override def createExtension(system: ExtendedActorSystem): A = {
val configuration = Configuration(system.settings.config, configKeyPath)
val client: String = ???
A(client, configuration)
}
}
val aInstance:A = B(configKeyPath).apply(system)
Does anyone have any idea on how to use ExtensionId
with case class?