Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Yuriy Badalyantc
@LMnet
def acquire: Task[String] = {
  Task(logger.info("Resource acquiring...")) >> Task.sleep(5.seconds) >> Task {
    logger.info("Resource acquired")
    "some resource"
  }
}

def release(resource: String): Task[Unit] = {
  Task(logger.info(s"Releasing $resource"))
}

val action = acquire.bracket { r =>
  Task(logger.info(s"Using $r"))
}(release)

val result = action.timeoutTo(100.millis, Task {
  logger.info("Resource acquiring timed out")
})

Task(logger.info("Started")) >> result.as(ExitCode.Success)

Hi everybody! With the code above I see this in my console:

21:30:02.159 Started
21:30:02.201 Resource acquiring...
21:30:02.304 Resource acquiring timed out

It looks like that resource isn't closed after timeout. Why?

Andrew Onyshchuk
@oandrew
@LMnet afaik it's not released because it hasn't been acquired. acquire get's cancelled before it succeeds.
Yuriy Badalyantc
@LMnet
@oandrew but what if my resource acquiring is still running in the background?
Looks like it could lead to a resource leak
Piotr Gawryś
@Avasil
It should release, the program is probably exiting too early
Yuriy Badalyantc
@LMnet

@Avasil yes, you r right. I added timeout after my operation and I saw this:

02:00:11.153 Started
02:00:11.187 Resource acquiring...
02:00:11.289 Resource acquiring timed out
02:00:16.194 Resource acquired
02:00:16.207 Releasing some resource
02:00:16.207 Using some resource
02:00:17.298 Stop

It looks better, but still strange: why I have Releasing some resource and Using some resource at the same time? I thought if something goes wrong with resource acqusition, it should not be used, it should be closed.

And it still a bit strange that I need to wait this manually
Piotr Gawryś
@Avasil
Indeed, I will check it out, thanks! It seems like it works correctly for cats.effect.IO and we just wrap IOApp so it should be fixable

It looks better, but still strange: why I have Releasing some resource and Using some resource at the same time? I thought if something goes wrong with resource acqusition, it should not be used, it should be closed.

Looks like it starts use and then tries to cancel instead of not running it at all. If you do Task.cancelBoundary in use at the beginning then it won't print. I'll double check if it isn't intended for some reason and if not I'll look for a fix

Yuriy Badalyantc
@LMnet
@Avasil I tried Task.cancelBoundary and it works as expected. Thanks for help!
But it would be cool if it would not be necessary.
Piotr Gawryś
@Avasil
You're welcome, I already made a PR for both Monix and Cats-Effect that will fix it, thanks for report
Yuriy Badalyantc
@LMnet
Great!
Krutarth Patel
@tunnol

Hi everyone, I need help understanding what's wrong with this setup

Observable
      .fromFuture(subscriber.subscribe())
      .flatMap(t => Observable.fromIterable(t))
      .mapParallelUnordered(5)(event => Task.fromFuture(processor.process(event)))
      .mapParallelUnordered(5)(event => Task.fromFuture(publisher.publish(event)))
      .mapParallelUnordered(5)(event => Task.fromFuture(purger.purge(event)))
      .restartUntil(_ => false)
      .consumeWith(Consumer.complete)
      .runToFuture

subscriber is basically subscribing / polling (internally) for new messages from a queue. and its further processed -> published to another queue -> purged from source queue.. I am observing a subset of messages read of queue gets re-processed but unable to see how that's happening.. is there something wrong with my setup ?

Krutarth Patel
@tunnol
to add more info to above, I added some debug logs to see where it stops.. these messages completes publisher.publish but doesn't get to purger.purge It gets dropped somewhere in between (reason I say dropped is because once message exits publish, it doesn't enter purge anytime later)
Piotr Gawryś
@Avasil
Does it keep processing messages when some messages skip purge step? And are there any errors?
Krutarth Patel
@tunnol
@Avasil yeah, because that message isn't purged from queue in first attempt, it comes back and attempts to process it. I didn't see any errors moving from publish stage to purge
Piotr Gawryś
@Avasil
Does it ever print for these messages if you do Task(println(event)) >> Task.fromFuture(purger.purge(event))?
Kasper Kondzielski
@ghostbuster91
Hi, is there any way to start monix-kafka consumer from given offset?
Piotr Gawryś
@Avasil

@ghostbuster91 We don't have anything nice unfortunately but it's possible:

  1. Turn off commits here: https://github.com/monix/monix-kafka/blob/master/kafka-1.0.x/src/main/resources/monix/kafka/default.conf#L79 or in KafkaConsumerConfig
  2. Use the constructor which accepts Task[KafkaConsumer[K, V]] and pass a consumer with your offset of choice

I'd love to have builders for it (as well as partitioned source) and I hope we will include them eventually but I always find something else I prioritize more :'D

Kasper Kondzielski
@ghostbuster91
@Avasil thanks for the info
ziggystar
@ziggystar
Is there some pattern to open a, let's say Observable[File] into an Observable[InputStream] and then read from the InputStream on several occasions, triggered by another Observable, and ensure proper closing?
ziggystar
@ziggystar
In case you're on SO, I've posted my problem as a question there. Will also be easier for others to find: https://stackoverflow.com/questions/59147184/how-to-transform-a-monix-reactive-stream-into-a-resourceful-value-and-ensure-a-p
ziggystar
@ziggystar
D'uh. There is just Observable.bracket, right?
Piotr Gawryś
@Avasil
Yup :D
Kasper Kondzielski
@ghostbuster91
Does take(n) on KafkaConsumerObservable guarantee to close the consumer after receiving n values?
Piotr Gawryś
@Avasil
Yes, IIRC it should close on all ways of termination
Kasper Kondzielski
@ghostbuster91
Awesome, thanks
Kasper Kondzielski
@ghostbuster91
Hi, I'am trying to setup tests for my consumers and producers. I am using real kafka instance through test containers. In the first test I would like to send and receive a message. Because I don't want to start kafka for each test, the instance of kafka will be reused between them. Because of that I randomly pick new groupId for each test. Somehow this isn't working, my consumer waits indefinitely. I noticed that if I kill it, and restart the test using the previous groupId everything works well. Any ideas?
Kasper Kondzielski
@ghostbuster91
Here is my test:
  it should "work" in {
    val consumerCfg = KafkaConsumerConfig.default.copy(
      bootstrapServers = List(container.kafkaContainer.getBootstrapServers),
      groupId = UUID.randomUUID().toString
    )
    val producerCfg = KafkaProducerConfig.default.copy(
      bootstrapServers = List(container.kafkaContainer.getBootstrapServers)
    )
    println(consumerCfg.groupId)
    val topicName = "my-topic2"
    val receivedMsg = for {
      _ <- Resource
        .make(
          Task
            .eval(KafkaProducer[String, String](producerCfg, s))
        )(_.close())
        .use(_.send(topicName, "message1"))
      response <- KafkaConsumerObservable[String, String](
        consumerCfg,
        List(topicName)
      ).take(1).toListL
    } yield response
    receivedMsg.runSyncUnsafe() shouldBe List("message1")
  }
Kasper Kondzielski
@ghostbuster91
It turned out I was missing autoOffsetReset = AutoOffsetReset.Earliest
Honza Strnad
@hanny24
Hi, it seems to me I have found a nasty bug in 3.1.0. monix/monix#1090
Carlos Eduardo Melo
@cemelo
@hanny24 there's no bug there. You're passing null under the implicit params list that expects a CanAwait. Without it, the error message is: Don't callAwaitablemethods directly, use theAwaitobject.. Using Await.result works as expected.
Honza Strnad
@hanny24
I don’t think that’s the (main) issue. There are even comments in the code that shows you how to “fix” the code.
Piotr Gawryś
@Avasil
Thanks for the report, I will take a look when I have a spare moment. I'd guess it has something to do with runToFuture not inserting any extra async boundaries so it tries progressing on main but it is blocked
Carlos Eduardo Melo
@cemelo
@hanny24 it is. I just tested it using scastie. Will send the code here in a sec.
Honza Strnad
@hanny24
@Avasil yes, that would somehow explain the behavior I was seeing.
Carlos Eduardo Melo
@cemelo
you're not supposed to call .result directly on the Future[_], as is stated in the documentation. That's why they added the implicit param to that method.
Honza Strnad
@hanny24
I know I’m not supposed to use it directly. I even know that with Await.result it works. Nevertheless the code I sent should work. I actually found the bug while I was using ScalaFutures trait from scalatest.
Carlos Eduardo Melo
@cemelo
k
Kasper Kondzielski
@ghostbuster91
Hi, how can I create a circe deserializer for monix-kafka?
Piotr Gawryś
@Avasil
I usually use String and do it in mapEval but if you need KafkaConsumer[YourA, YourB] then I can find an example tomorrow
matfournier
@matfournier
Is there a way to run an Obersvable for only a specified length of time? E.g. run this observable for 50 seconds. This is different from "cancel observable if no activity is detected for X time". This is more "attempt to consume this observable within this time frame, and if you can't, cancel"
My current way of taking an observable that throws after 50 seconds, and my regular observable and merging them together works in that it interrupts after 50 seconds; however, if the other observable finishes in 5 seconds, I'm still observing for the next 45 seconds...
Piotr Gawryś
@Avasil
@matfournier Try Observable.takeByTimespan
matfournier
@matfournier
Facepalm. Thank you
But how would I detect that I've hit the limit? That this has happened?
Piotr Gawryś
@Avasil
The simplest way is probably doOnEarlyStop. Something like doOnComplete or guarantee should work as well