Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:39
    patriknw labeled #29767
  • Oct 26 2020 07:38
    patriknw commented #29767
  • Oct 26 2020 07:07
    patriknw commented #29765
  • Oct 26 2020 06:56
    patriknw commented #25468
  • Oct 26 2020 06:30
    wtfiwtz starred akka/akka
  • Oct 26 2020 04:31
    YunSeongKim7 starred akka/akka
  • Oct 25 2020 16:21
    nitikagarw commented #25468
  • Oct 25 2020 09:22
    fubiao starred akka/akka
  • Oct 25 2020 05:09
    saguywalker starred akka/akka
  • Oct 24 2020 21:47
    tt4n starred akka/akka
  • Oct 24 2020 21:20
    akka-ci commented #29672
  • Oct 24 2020 21:05
    dope9967 commented #29672
  • Oct 24 2020 21:03
    akka-ci commented #29672
  • Oct 24 2020 21:03
    akka-ci unlabeled #29672
  • Oct 24 2020 21:03
    akka-ci labeled #29672
  • Oct 24 2020 20:44
    dope9967 synchronize #29672
  • Oct 24 2020 20:31
    akka-ci unlabeled #29672
Nathan Fischer
@nrktkt:matrix.org
[m]
any alpakka maintainers on here?
would this be the right channel to talk about alpakka dev?
Gilad Hoch
@hochgi

Hi,
when replying a chunked response (entity of type Source[ByteString, _]),
what might cause the downstream to end early with:

akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null

context:
My source is basically Source.queue[ByteString].
I grab the materialized value of the queue itself using a promise:

val p = Promise[SourceQueueWithComplete[ByteString]]()
val s = Source.queue[ByteString](size, , OverflowStrategy.backpressure).mapMaterializedValue[NotUsed]{q => p.success(q) ; NotUsed}
p.future.foreach(doSomethingThatPushToQueue)
HttpResponse(entity = s, …)

Any ideas?

Nathan Fischer
@nrktkt:matrix.org
[m]
I'm not sure what's causing that error off the top of my head, but is there a reason you need to use a promise instead of something like preMaterialize?
Gilad Hoch
@hochgi
I'm not aware of preMaterialize. Thanks for the tip, I'll educate myself on this and see if this solves my problem.
Gilad Hoch
@hochgi
@nrktkt:matrix.org It does simplifies my code, and saves an async boundary (so thank you!), but the problem still remains :(
4 replies
Matias Partanen
@mpartan
def streamSomething: Source[Something, Cancellable] = {
    Source
      .tick(0.second, 1.second, NotUsed)
      .mapAsync(1) { _ =>
        doSomething
      }
}

override def streamSomething(in: Request): Source[Response, NotUsed] = {
  ...
}
I'm using the following structure with Akka GRPC / ScalaPB gRPC web. Source.tick returns Source that is of type [_, Cancellable]. How could I convert this to type of [_, NotUsed] without materializing the stream?
6 replies
Nathan Fischer
@nrktkt:matrix.org
[m]
looking at usages of NoMoreElementsNeeded, it seems like the stream created from your source is cancelled before you call doSomethingThatPushToQueue
Swoorup Joshi
@Swoorup
is there a dynamic version of this?
7 replies
I want the usage to be like runnableGraph.run() to get a new source
bblfish
@bblfish:matrix.org
[m]
Can one narrow an ActorContext[A|B|C] to an ActorContet[A|B] ? (In Scala3)
I have a function that needs the narrower context, and so I thought it would be better to code for that.
especially as the function/method is going to be called in two different places where the narrower context is needed.
bblfish
@bblfish:matrix.org
[m]
Oh, asking the question helped me find the answer on Google. context.self.narrow[A|B] is the answer
bblfish
@bblfish:matrix.org
[m]
Ah, no that does not work, that alllows me to narrow the actorRef not the context.
Swoorup Joshi
@Swoorup
why do you want to do that? @bblfish:matrix.org
isn’t context pretty much just used within the scope where you create it?
bblfish
@bblfish:matrix.org
[m]
as I said, I have a function that needs the context to send a message to itself, and that is called in two different places.
It may be that just isInstanceOf[ActorContext[...]] works. It compiles. I'll see if it works...
Swoorup Joshi
@Swoorup
use a partial function?
bblfish
@bblfish:matrix.org
[m]
I'll need to clean up later ...
Swoorup Joshi
@Swoorup
Behaviors.receiveMessagePartial {
  case Down if remaining == 1 =>
    notifyWhenZero.tell(Done)
    zero
  case Down =>
    counter(remaining - 1)
}
bblfish
@bblfish:matrix.org
[m]
Mhh. I'll have to think of modelling it like that...
Swoorup Joshi
@Swoorup
the context would have all the possible message the behaviour takes
you use narrow to limit visibility of internal messages to consumers of that actorref.
Nathan Fischer
@nrktkt:matrix.org
[m]
see if you can get a stack trace from that on the second usage of the stream
Zhenhao Li
@Zhen-hao
hi, what is the offset in EventEnvelope for? is it needed only by the query/read side? namely, not needed for actor state recovery
3 replies
Swoorup Joshi
@Swoorup
anybody rolls out their own ES ontop of akka? I find the default ES api bit too verbosy.
2 replies
Hamed Nourhani
@hnourhani
Hi guys , i have a problem with Akka testkit , i have logic and want to know if an actor not receives an specific message after receiving the initial message ,
but when i use expectNoMsg , tests fail and i receive this error message :
```
java.lang.AssertionError: assertion failed: received unexpected message RealMessage
Levi Ramsey
@leviramsey

@bblfish:matrix.org If all you want to do is send an A|B to yourself, I'd probably do something like

def someUtility(sender: (A|B) => Unit): Unit = {
  sender(SomeAOrBMessage)
}

Behaviors.setup[A|B|C] { context =>
  someUtility(context.self.tell(_))
}

(No idea whether or not I've completely mangled Scala 3 syntax there...)

1 reply
bblfish
@bblfish:matrix.org
[m]
Thanks. The asInstanceOf works for the moment, which is allowing me to stabilise my code a bit. I'll look at that idea a bit later, and take these warnings into account :-) (I only use asInstanceOf as a measure of last resort...)
Blaž Marinović
@bmarinovic

Hi, I have code that I think silently fails and this is my suspect since status is deprecated and always set to Success(Done):

def upload(settings: SftpSettings, path: String, data: ByteString): Future[Either[Throwable, ByteString]] =
    Source.single(data).runWith(Sftp.toPath(path, settings)).map(_.status.toEither.map(_ => data))

Does this mean that even exception cases will be mapped to Success(Done)? Anyway, what is idiomatic way to handle error? Using recover?

2 replies
Łukasz Drygała
@ldrygala
Hi, i’m trying to update alpakka-google-cloud-storage to 3.0.1 and i got akka.stream.alpakka.google.ResumableUpload$InvalidResponseException, with message: No Location header.. Does anyone have similar problem?
1 reply
Swoorup Joshi
@Swoorup
with ActorSink.actorRefWithBackpressur
why do I need a init message?
how can I bypass the inconvenience
2 replies
Nitika Agarwal
@nitikagarw

Hi team, I am trying to use Akka Extension. I have seen several examples where the companion object of the class extends the ExtensionId but in this case the createExtension function can only use the ActorSystem to create an instance of a class.
For cases where variables other than ActorSystem is need, I am trying as below but with this, the extension is registered multiple times.

trait BFactory {
  protected def configKeyPath: String
}

case class A(client: String, config: Configuration) extends Extension

case class B(override val configKeyPath: String) extends ExtensionId[A] with BFactory {

  override def createExtension(system: ExtendedActorSystem): A = {
    val configuration = Configuration(system.settings.config, configKeyPath)
    val client: String = ???
    A(client, configuration)
  }
}

val aInstance:A = B(configKeyPath).apply(system)

Does anyone have any idea on how to use ExtensionId with case class?

2 replies
Michael Mirold
@mmirold
Hi all. That question might only be tangential to akka itself, but using a current Intellij with files that contain many Akka HTTP directives renders our machines almost unusable (perpetual 600% CPU doing some recursive implicit optimization). Is there someone else with that issue? I already had opened an issue with Jetbrains but to not much avail. The issue seems to be related with wildcard imports.
Max
@maxstreese
This message was deleted
This message was deleted
2 replies
Zhenhao Li
@Zhen-hao
hi, what's the implicit that turns a akka.persistence.typed.scaladsl.EffectBuilder into a akka.persistence.typed.scaladsl.Effect?
4 replies
Seeta Ramayya
@Seetaramayya
This message was deleted
Aleksei Shamenev
@coffius

Hi there 👋

I wonder if someone can help me understand how to properly drop an actor's state in akka. In some of our cases when an actor is being recovered it is possible to get an unparsable event/snapshot from its store. In this case we want to drop the current state of the actor and start from scratch. But I can not find anything like that in the Akka docs 😕

Any ideas how to achieve such a behaviour? Is it even possible?

Zhenhao Li
@Zhen-hao
@coffius you can set akka.persistence.snapshot-store-plugin-fallback.snapshot-is-optional = true in your config.
      # Set this to true if successful loading of snapshot is not necessary.
      # This can be useful when it is alright to ignore snapshot in case of
      # for example deserialization errors. When snapshot loading fails it will instead
      # recover by replaying all events.
      # Don't set to true if events are deleted because that would
      # result in wrong recovered state if snapshot load fails.
this information is not available on the web doc. only in code...
Max
@maxstreese

Hi everyone, I think I am running into an issue because I may not understand something about the correct use of concurrency facilities in Akka. The scenario is this: I have some actor which is receiving messages from Kafka via Alpakka Kafka. It does this throughout the day. Message throughput is quite high I would say, at around 20 to 30 k msgs/sec. Periodically (three times a day) the actor receives a message that prompts it to upload a snapshot of the collected data to an AWS S3 bucket via Alpakka S3. This is done in the below code:

case PerformSnapshot => Effect
      .none
      .thenRun { s =>
        implicit val sys = ctx.system
        implicit val ec = ctx.executionContext
        val timeFormatter = time.format.DateTimeFormatter.ofPattern("yyyyMMdd").withZone(timeZone)
        val key = s"top-secred-data-bucket/${timeFormatter.format(s.currentSnapshot.timestamp)}/${s.currentSnapshot.metaData.id}"
        val self = ctx.self
        Source
          .single(s.currentCollection.values.toSeq)
          .runWith(blobSink(key))
          .onComplete {
            case Success(_) => self ! UpdateSnapshot(Some(s.currentSnapshot))
            case Failure(e) => self ! Shutdown("Failed to upload to S3", Some(e))
          }
      }

The actor and application are running fine throughout the day. The load on the application is not near its provisioned resource limits. But when the snapshot is triggered this log signals something is off and triggers the app to shut down:

Kafka commit failed after=5 ms,
commitsInProgress=0,
exception=org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.

My question: Does that signal that I am not as concurrent as I think I am with the above code and the S3 upload actually blocks the actor from processing further messages? Or is that 5 ms setting simply too low?

6 replies
tim
@goodchinas
hi, what is the advantage and loss that akka compare with caf(c++ actor framework).are they competitor?