Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
James Phillips
@jdrphillips
@oleg-py Thanks for your reply above!
Oleg Pyzhcov
@oleg-py
yw
Phong Nguyen
@phongngtuan
thanks @oleg-py @Avasil :)
Michael Zuber
@mgzuber

Hi, wondering if someone could clarify some best practice advice to me.

I have a queue: scala.collection.mutable.Queue[T] that gets populated from some external source. I would like to create an Observable[T] from this queue that dequeues elements from the queue if they are there.

I'm able to quite easily do this by:

def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)

Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.

Oleg Pyzhcov
@oleg-py
Yes, I'd expect this to poll constantly, potentially taking away whole thread
My preference would be to replace the Queue with a Subject
The second best is to dequeue everything, but execute a Task every once in a while
Michael Zuber
@mgzuber
Replacing Queue with Subject still has the problem of having to create an Observable
Oleg Pyzhcov
@oleg-py
Subject is an observable :)
Michael Zuber
@mgzuber
yeah exactly
Oleg Pyzhcov
@oleg-py
So if you can replace your enqueue with onNext, on, say ConcurrentSubject.publish, you should be all set
Michael Zuber
@mgzuber
Yes!
Okay that's what I was looking for
Thanks for your help @oleg-py :)
Oleg Pyzhcov
@oleg-py
You're welcome :)
Yohann B
@ybr
Hi, I am looking for some way to have the same behaviour as Observable.bufferTimedWithPressure(period, maxSize), but not counting the elements viewed by the Observable. Instead counting on the internal structure of the elements, is there such operation on observable ? Something like Observable[A].bufferTimedWithPressure2(period, maxSize: Int, sizer: A => Int). The idea is each element contains lots of data and I am not sure that inlining them is an effective way to process things.
Yohann B
@ybr
hum just saw monix/monix#447
Alexandru Nedelcu
@alexandru

New PR: monix/monix#738

scuddalo
@scuddalo
@alexandru @Avasil monix noob question: I have a observable that fetches items from Amazon SQS, transforms the payload and calls some db function to persist to postgresql. it looks something like this: EventQueueListener SQSObservable.apply(queueUrl) .collect(deserialize) .consumeWith(msgHandler) .runAsync. How can I control the number of payloads that are processed at any point in time? When I run this the db CPU goes through the roof. Am I thinking about this wrong?
Piotr Gawryś
@Avasil
Something like max N payloads per second?
You could use 2nd Observable with one of the interval functions and zip it with your SQSObservable. This way you can limit it to processing 1 payload per specific duration. You could also take it in batches and then wait etc.
Should be quite simple to achieve any custom behavior you need
scuddalo
@scuddalo
thanks! Instead of writing a custom sqs observable... I was thinking of something like this:Observable .repeatEval(sqsClient.receiveMessage(queueUrl)) .flatMap(m => Observable.fromIterable(...)) .collect(deserialize) .mapParallelUnordered(1)(handlePayload)
@Avasil would mapParallelUnordered back pressure the source observable in this case?
Piotr Gawryś
@Avasil
I think the problem is that even processing elements one-by-one you are too fast
so in both cases you will process 1 element at a time
scuddalo
@scuddalo
oh..
Piotr Gawryś
@Avasil
but if you process let's say 1000 / s this way and your DB can handle 10 requests than it's no help :D
scuddalo
@scuddalo
true, that makes sense
may be I will try adding a forced delay
or is there a better way to do it?
Piotr Gawryś
@Avasil
Yes, I think it might be the simplest way to enforce limit. As long as you know how much your DB can handle. The downside is that you will probably use arbitrary number for requests / s but I think you could make it configurable or even dynamic with a little more work
scuddalo
@scuddalo
Anything I can look at to make it dynamic?
Piotr Gawryś
@Avasil
You would have to receive feedback from sqsClient / db. I haven't worked with Amazon SQS and don't know how possible it is
scuddalo
@scuddalo
I see. I use postgresql and doobie framework. How can i get feedback from the db?
Do I have to measure how long each request is taking?
and use that as a signal?
Piotr Gawryś
@Avasil
Sorry I don't have an answer right now, I'd start with trying delayOnNext(duration). There is also delayOnNextBySelector which takes a function A => Observable[B] and could be used for the same thing like this: .delayOnNextBySelector(x => Observable(x).delaySubscription(1.milliseconds)). Instead of Observable(x) it could check a signal to choose delay
scuddalo
@scuddalo
Thank you so much! I really appreciate your help
Piotr Gawryś
@Avasil
But hmm, I think you could actually measure how long DB takes to respond and decide on that
You could even go fancy and use CircuitBreaker
scuddalo
@scuddalo
I will take a look at the circuit breaker. I will start thinking about how to measure db time. Thanks again!
Piotr Gawryś
@Avasil
You're welcome, hopefully you manage to put my loose ideas together:)
Diego Pineda
@nullhappens
object Main extends App with StrictLogging {

  val exceptionLogger = UncaughtExceptionReporter { ex =>
    logger.error("Uncaught exception", ex)
  }

  implicit val scheduler: SchedulerService =
    Scheduler.io(reporter = exceptionLogger)

  Configuration.load.fold(
    err => logger.error(s"Errors while loading config $err"),
    config => {
      scheduler.scheduleAtFixedRate(
        initialDelay = 0,
        period = 5,
        TimeUnit.SECONDS,
        () => {
          logger.debug(s"FROM LOG Loaded $config")
          println(s"Loaded $config")
        }
      )
    }
  )
}
hello, I am very new to Monix or FP programming as a whole. I have an issue that I have not yet found a solution for. I need to build a simple app that runs a simple job every X amount of seconds. In trying to configure the application. I setup logback to be able to do logging, however when i use a scheduler to run my Runnable it seems that the log configuration gets lost (im thinking something to do with the classpath for the Runnable being different?). Below please find the code. Any help would be greatly appreciated.
StrictLogging just adds a logger variable and is part of typesafe scalalogging
the code basically never puts the debug statement to stdout
Radek Miček
@radekm
Hi! Why does Observable use Future for acknowledgments instead of Task?