Hi, wondering if someone could clarify some best practice advice to me.
I have a queue: scala.collection.mutable.Queue[T]
that gets populated from some external source. I would like to create an Observable[T]
from this queue that dequeues elements from the queue if they are there.
I'm able to quite easily do this by:
def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)
Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.
Observable.bufferTimedWithPressure(period, maxSize)
, but not counting the elements viewed by the Observable. Instead counting on the internal structure of the elements, is there such operation on observable ? Something like Observable[A].bufferTimedWithPressure2(period, maxSize: Int, sizer: A => Int). The idea is each element contains lots of data and I am not sure that inlining them is an effective way to process things.
EventQueueListener
SQSObservable.apply(queueUrl)
.collect(deserialize)
.consumeWith(msgHandler)
.runAsync
. How can I control the number of payloads that are processed at any point in time? When I run this the db CPU goes through the roof. Am I thinking about this wrong?
Observable
with one of the interval
functions and zip
it with your SQSObservable
. This way you can limit it to processing 1 payload per specific duration. You could also take it in batches and then wait etc.
mapParallelUnordered
back pressure the source observable in this case?
delayOnNext(duration)
. There is also delayOnNextBySelector
which takes a function A => Observable[B]
and could be used for the same thing like this: .delayOnNextBySelector(x => Observable(x).delaySubscription(1.milliseconds))
. Instead of Observable(x)
it could check a signal to choose delay
CircuitBreaker
object Main extends App with StrictLogging {
val exceptionLogger = UncaughtExceptionReporter { ex =>
logger.error("Uncaught exception", ex)
}
implicit val scheduler: SchedulerService =
Scheduler.io(reporter = exceptionLogger)
Configuration.load.fold(
err => logger.error(s"Errors while loading config $err"),
config => {
scheduler.scheduleAtFixedRate(
initialDelay = 0,
period = 5,
TimeUnit.SECONDS,
() => {
logger.debug(s"FROM LOG Loaded $config")
println(s"Loaded $config")
}
)
}
)
}