This channel is available for all Akka enthusiasts—newbies as well as gurus—for the exchange of knowledge and the coordination of efforts around Akka. For more info see: http://bit.ly/akka-gitter
[_, Cancellable]
. How could I convert this to type of [_, NotUsed]
without materializing the stream?
NoMoreElementsNeeded
, it seems like the stream created from your source is cancelled before you call doSomethingThatPushToQueue
context.self.narrow[A|B]
is the answer
isInstanceOf[ActorContext[...]]
works. It compiles. I'll see if it works...
@bblfish:matrix.org If all you want to do is send an A|B
to yourself, I'd probably do something like
def someUtility(sender: (A|B) => Unit): Unit = {
sender(SomeAOrBMessage)
}
Behaviors.setup[A|B|C] { context =>
someUtility(context.self.tell(_))
}
(No idea whether or not I've completely mangled Scala 3 syntax there...)
asInstanceOf
works for the moment, which is allowing me to stabilise my code a bit. I'll look at that idea a bit later, and take these warnings into account :-) (I only use asInstanceOf as a measure of last resort...)
Hi, I have code that I think silently fails and this is my suspect since status
is deprecated and always set to Success(Done)
:
def upload(settings: SftpSettings, path: String, data: ByteString): Future[Either[Throwable, ByteString]] =
Source.single(data).runWith(Sftp.toPath(path, settings)).map(_.status.toEither.map(_ => data))
Does this mean that even exception cases will be mapped to Success(Done)
? Anyway, what is idiomatic way to handle error? Using recover
?
Hi team, I am trying to use Akka Extension. I have seen several examples where the companion object of the class extends the ExtensionId
but in this case the createExtension
function can only use the ActorSystem to create an instance of a class.
For cases where variables other than ActorSystem is need, I am trying as below but with this, the extension is registered multiple times.
trait BFactory {
protected def configKeyPath: String
}
case class A(client: String, config: Configuration) extends Extension
case class B(override val configKeyPath: String) extends ExtensionId[A] with BFactory {
override def createExtension(system: ExtendedActorSystem): A = {
val configuration = Configuration(system.settings.config, configKeyPath)
val client: String = ???
A(client, configuration)
}
}
val aInstance:A = B(configKeyPath).apply(system)
Does anyone have any idea on how to use ExtensionId
with case class?
Hi there 👋
I wonder if someone can help me understand how to properly drop an actor's state in akka. In some of our cases when an actor is being recovered it is possible to get an unparsable event/snapshot from its store. In this case we want to drop the current state of the actor and start from scratch. But I can not find anything like that in the Akka docs 😕
Any ideas how to achieve such a behaviour? Is it even possible?
# Set this to true if successful loading of snapshot is not necessary.
# This can be useful when it is alright to ignore snapshot in case of
# for example deserialization errors. When snapshot loading fails it will instead
# recover by replaying all events.
# Don't set to true if events are deleted because that would
# result in wrong recovered state if snapshot load fails.
Hi everyone, I think I am running into an issue because I may not understand something about the correct use of concurrency facilities in Akka. The scenario is this: I have some actor which is receiving messages from Kafka via Alpakka Kafka. It does this throughout the day. Message throughput is quite high I would say, at around 20 to 30 k msgs/sec. Periodically (three times a day) the actor receives a message that prompts it to upload a snapshot of the collected data to an AWS S3 bucket via Alpakka S3. This is done in the below code:
case PerformSnapshot => Effect
.none
.thenRun { s =>
implicit val sys = ctx.system
implicit val ec = ctx.executionContext
val timeFormatter = time.format.DateTimeFormatter.ofPattern("yyyyMMdd").withZone(timeZone)
val key = s"top-secred-data-bucket/${timeFormatter.format(s.currentSnapshot.timestamp)}/${s.currentSnapshot.metaData.id}"
val self = ctx.self
Source
.single(s.currentCollection.values.toSeq)
.runWith(blobSink(key))
.onComplete {
case Success(_) => self ! UpdateSnapshot(Some(s.currentSnapshot))
case Failure(e) => self ! Shutdown("Failed to upload to S3", Some(e))
}
}
The actor and application are running fine throughout the day. The load on the application is not near its provisioned resource limits. But when the snapshot is triggered this log signals something is off and triggers the app to shut down:
Kafka commit failed after=5 ms,
commitsInProgress=0,
exception=org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
My question: Does that signal that I am not as concurrent as I think I am with the above code and the S3 upload actually blocks the actor from processing further messages? Or is that 5 ms setting simply too low?
We are using an alpakka pipeline that uses Source(Consumer.committableSource) as KafkaConsumer and Sink.ActorRefWithAck as the sink for BackPressure.
The problem statement - We need to commit Kafka offset only after reading and processing the value in the sinkActor. How can we do it? How can I commit the offset in the SinkActor? We need to process the consumed element in the Sink actor as the Init method keeps one element in the buffer.