Hi, wondering if someone could clarify some best practice advice to me.
I have a queue: scala.collection.mutable.Queue[T]
that gets populated from some external source. I would like to create an Observable[T]
from this queue that dequeues elements from the queue if they are there.
I'm able to quite easily do this by:
def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)
Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.
Observable.bufferTimedWithPressure(period, maxSize)
, but not counting the elements viewed by the Observable. Instead counting on the internal structure of the elements, is there such operation on observable ? Something like Observable[A].bufferTimedWithPressure2(period, maxSize: Int, sizer: A => Int). The idea is each element contains lots of data and I am not sure that inlining them is an effective way to process things.
EventQueueListener
SQSObservable.apply(queueUrl)
.collect(deserialize)
.consumeWith(msgHandler)
.runAsync
. How can I control the number of payloads that are processed at any point in time? When I run this the db CPU goes through the roof. Am I thinking about this wrong?
Observable
with one of the interval
functions and zip
it with your SQSObservable
. This way you can limit it to processing 1 payload per specific duration. You could also take it in batches and then wait etc.
mapParallelUnordered
back pressure the source observable in this case?
delayOnNext(duration)
. There is also delayOnNextBySelector
which takes a function A => Observable[B]
and could be used for the same thing like this: .delayOnNextBySelector(x => Observable(x).delaySubscription(1.milliseconds))
. Instead of Observable(x)
it could check a signal to choose delay
CircuitBreaker
object Main extends App with StrictLogging {
val exceptionLogger = UncaughtExceptionReporter { ex =>
logger.error("Uncaught exception", ex)
}
implicit val scheduler: SchedulerService =
Scheduler.io(reporter = exceptionLogger)
Configuration.load.fold(
err => logger.error(s"Errors while loading config $err"),
config => {
scheduler.scheduleAtFixedRate(
initialDelay = 0,
period = 5,
TimeUnit.SECONDS,
() => {
logger.debug(s"FROM LOG Loaded $config")
println(s"Loaded $config")
}
)
}
)
}
Task
run-loop which is more sophisticated. Since it's properly encapsulated it doesn't violate referential transparency from the point of the user. I might be missing something a bit so I'd appreciate any clarifications from @alexandru if necessary :)
@nullhappens
however when i use a scheduler to run my Runnable it seems that the log configuration gets lost (im thinking something to do with the classpath for the Runnable being different?).
What do you mean, do you get an error?
Would be helpful if you could provide sample project to run this example