Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
Zhenhao Li
@Zhen-hao
does Akka projection has to run in the same actor system as Akka persistence?
6 replies
Gaël Ferrachat
@gael-ft

Hello,

Is there a way to connect a Sink to a Source using the value produced by the Sink to build the Source element ?

Note, I already saw fromSinkAndSource but AFAIK, we can't use the element from 'sink side' to build the element of 'source side'

Example:
We are using a library which provides a Sink.
We would like to use to Output of the Sink to perform actions such as DB calls and then return the DB call as a Source.

Thanks

3 replies
Zhenhao Li
@Zhen-hao
In this documentation page https://doc.akka.io/docs/akka-projection/current/getting-started/running.html
why does replication_factor for keyspace akka_projection have value 1? isn't 2 safer?
3 replies
Max
@maxstreese

Hey guys, I got a question about batching I cannot understand. Consider the below code:

msgSource
  .wireTap(_ => Metrics.consumed.inc())
  .via(ActorFlow.ask(ref)(TopSecretSingleFastMessage)(Timeout(3.seconds)))
  .batch(5000, fst => List(fst))((s, msg) => s.prepended(msg)) <- this always creates batches of size 1
  // .groupedWithin(5000, 5.seconds) <- this would be fast
  .wireTap(msgs => Metrics.batched.set(msgs.size)) // either constantly 5000 or dynamic in case of batch (ideally...)
  .via(ActorFlow.ask(ref)(TopSecretMultiSlowMessage)(Timeout(3.seconds)))
  .runWith(commitSink)
  .onComplete {
    case Success(_) => ref ! Shutdown("Completed")
    case Failure(e) => ref ! Shutdown("Failed", Some(e))
  }

Focusing on the commented out stage and the one above: When I run the above with the batch stage, I get batch sizes of 1 and according to my metrics have a throughput of 400 msgs/sec. When I use the groupedWithin stage instead, I get a throughput of 50.000 msgs/sec. As the msgSource represents some Kafka topic which has a sizeable number of messages, there is really no limit in terms of throughput from that side. My question is this: If with groupedWithin I see a throughput of 50.000 msgs/sec and with batch I see a throughput of only 400 msgs/sec, then why doesn't batch increase the batch size up from 1? I mean clearly my stages below the aggregation stage (be that batch or groupedWithin are slower than upstream can provide, right? How come batch doesn't... batch?

12 replies
Nathan Fischer
@nrktkt:matrix.org
[m]
any alpakka maintainers on here?
would this be the right channel to talk about alpakka dev?
Gilad Hoch
@hochgi

Hi,
when replying a chunked response (entity of type Source[ByteString, _]),
what might cause the downstream to end early with:

akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null

context:
My source is basically Source.queue[ByteString].
I grab the materialized value of the queue itself using a promise:

val p = Promise[SourceQueueWithComplete[ByteString]]()
val s = Source.queue[ByteString](size, , OverflowStrategy.backpressure).mapMaterializedValue[NotUsed]{q => p.success(q) ; NotUsed}
p.future.foreach(doSomethingThatPushToQueue)
HttpResponse(entity = s, …)

Any ideas?

Nathan Fischer
@nrktkt:matrix.org
[m]
I'm not sure what's causing that error off the top of my head, but is there a reason you need to use a promise instead of something like preMaterialize?
Gilad Hoch
@hochgi
I'm not aware of preMaterialize. Thanks for the tip, I'll educate myself on this and see if this solves my problem.
Gilad Hoch
@hochgi
@nrktkt:matrix.org It does simplifies my code, and saves an async boundary (so thank you!), but the problem still remains :(
4 replies
Matias Partanen
@mpartan
def streamSomething: Source[Something, Cancellable] = {
    Source
      .tick(0.second, 1.second, NotUsed)
      .mapAsync(1) { _ =>
        doSomething
      }
}

override def streamSomething(in: Request): Source[Response, NotUsed] = {
  ...
}
I'm using the following structure with Akka GRPC / ScalaPB gRPC web. Source.tick returns Source that is of type [_, Cancellable]. How could I convert this to type of [_, NotUsed] without materializing the stream?
6 replies
Nathan Fischer
@nrktkt:matrix.org
[m]
looking at usages of NoMoreElementsNeeded, it seems like the stream created from your source is cancelled before you call doSomethingThatPushToQueue
Swoorup Joshi
@Swoorup
is there a dynamic version of this?
7 replies
I want the usage to be like runnableGraph.run() to get a new source
bblfish
@bblfish:matrix.org
[m]
Can one narrow an ActorContext[A|B|C] to an ActorContet[A|B] ? (In Scala3)
I have a function that needs the narrower context, and so I thought it would be better to code for that.
especially as the function/method is going to be called in two different places where the narrower context is needed.
bblfish
@bblfish:matrix.org
[m]
Oh, asking the question helped me find the answer on Google. context.self.narrow[A|B] is the answer
bblfish
@bblfish:matrix.org
[m]
Ah, no that does not work, that alllows me to narrow the actorRef not the context.
Swoorup Joshi
@Swoorup
why do you want to do that? @bblfish:matrix.org
isn’t context pretty much just used within the scope where you create it?
bblfish
@bblfish:matrix.org
[m]
as I said, I have a function that needs the context to send a message to itself, and that is called in two different places.
It may be that just isInstanceOf[ActorContext[...]] works. It compiles. I'll see if it works...
Swoorup Joshi
@Swoorup
use a partial function?
bblfish
@bblfish:matrix.org
[m]
I'll need to clean up later ...
Swoorup Joshi
@Swoorup
Behaviors.receiveMessagePartial {
  case Down if remaining == 1 =>
    notifyWhenZero.tell(Done)
    zero
  case Down =>
    counter(remaining - 1)
}
bblfish
@bblfish:matrix.org
[m]
Mhh. I'll have to think of modelling it like that...
Swoorup Joshi
@Swoorup
the context would have all the possible message the behaviour takes
you use narrow to limit visibility of internal messages to consumers of that actorref.
Nathan Fischer
@nrktkt:matrix.org
[m]
see if you can get a stack trace from that on the second usage of the stream
Zhenhao Li
@Zhen-hao
hi, what is the offset in EventEnvelope for? is it needed only by the query/read side? namely, not needed for actor state recovery
3 replies
Swoorup Joshi
@Swoorup
anybody rolls out their own ES ontop of akka? I find the default ES api bit too verbosy.
2 replies
Hamed Nourhani
@hnourhani
Hi guys , i have a problem with Akka testkit , i have logic and want to know if an actor not receives an specific message after receiving the initial message ,
but when i use expectNoMsg , tests fail and i receive this error message :
```
java.lang.AssertionError: assertion failed: received unexpected message RealMessage
Levi Ramsey
@leviramsey

@bblfish:matrix.org If all you want to do is send an A|B to yourself, I'd probably do something like

def someUtility(sender: (A|B) => Unit): Unit = {
  sender(SomeAOrBMessage)
}

Behaviors.setup[A|B|C] { context =>
  someUtility(context.self.tell(_))
}

(No idea whether or not I've completely mangled Scala 3 syntax there...)

1 reply
bblfish
@bblfish:matrix.org
[m]
Thanks. The asInstanceOf works for the moment, which is allowing me to stabilise my code a bit. I'll look at that idea a bit later, and take these warnings into account :-) (I only use asInstanceOf as a measure of last resort...)
Blaž Marinović
@bmarinovic

Hi, I have code that I think silently fails and this is my suspect since status is deprecated and always set to Success(Done):

def upload(settings: SftpSettings, path: String, data: ByteString): Future[Either[Throwable, ByteString]] =
    Source.single(data).runWith(Sftp.toPath(path, settings)).map(_.status.toEither.map(_ => data))

Does this mean that even exception cases will be mapped to Success(Done)? Anyway, what is idiomatic way to handle error? Using recover?

2 replies
Łukasz Drygała
@ldrygala
Hi, i’m trying to update alpakka-google-cloud-storage to 3.0.1 and i got akka.stream.alpakka.google.ResumableUpload$InvalidResponseException, with message: No Location header.. Does anyone have similar problem?
1 reply
Swoorup Joshi
@Swoorup
with ActorSink.actorRefWithBackpressur
why do I need a init message?
how can I bypass the inconvenience
2 replies
Nitika Agarwal
@nitikagarw

Hi team, I am trying to use Akka Extension. I have seen several examples where the companion object of the class extends the ExtensionId but in this case the createExtension function can only use the ActorSystem to create an instance of a class.
For cases where variables other than ActorSystem is need, I am trying as below but with this, the extension is registered multiple times.

trait BFactory {
  protected def configKeyPath: String
}

case class A(client: String, config: Configuration) extends Extension

case class B(override val configKeyPath: String) extends ExtensionId[A] with BFactory {

  override def createExtension(system: ExtendedActorSystem): A = {
    val configuration = Configuration(system.settings.config, configKeyPath)
    val client: String = ???
    A(client, configuration)
  }
}

val aInstance:A = B(configKeyPath).apply(system)

Does anyone have any idea on how to use ExtensionId with case class?

2 replies
Michael Mirold
@mmirold
Hi all. That question might only be tangential to akka itself, but using a current Intellij with files that contain many Akka HTTP directives renders our machines almost unusable (perpetual 600% CPU doing some recursive implicit optimization). Is there someone else with that issue? I already had opened an issue with Jetbrains but to not much avail. The issue seems to be related with wildcard imports.
Max
@maxstreese
This message was deleted
This message was deleted
2 replies
Zhenhao Li
@Zhen-hao
hi, what's the implicit that turns a akka.persistence.typed.scaladsl.EffectBuilder into a akka.persistence.typed.scaladsl.Effect?
4 replies
Seeta Ramayya
@Seetaramayya
This message was deleted
Aleksei Shamenev
@coffius

Hi there 👋

I wonder if someone can help me understand how to properly drop an actor's state in akka. In some of our cases when an actor is being recovered it is possible to get an unparsable event/snapshot from its store. In this case we want to drop the current state of the actor and start from scratch. But I can not find anything like that in the Akka docs 😕

Any ideas how to achieve such a behaviour? Is it even possible?

Zhenhao Li
@Zhen-hao
@coffius you can set akka.persistence.snapshot-store-plugin-fallback.snapshot-is-optional = true in your config.