Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Christopher Davenport
@ChristopherDavenport
evalMap(_.start)?
Fabio Labella
@SystemFw
not quite
you'll see the PR in a minute though
Christopher Davenport
@ChristopherDavenport
Oh, missed the unit. I get it now.
Gavin Bisesi
@Daenyth
What's the point?
Fabio Labella
@SystemFw
@Daenyth a very minimal "fork this stream"
not crucial by any means, hence why I was hesitant
Gavin Bisesi
@Daenyth
I see, you get the cancellation behavior
Fabio Labella
@SystemFw
yeah, link the Fiber to the Stream lifetime
we already have something similar for F
Stream.supervise
this is just the logical extension to Stream
Dennis
@dennis4b_twitter
@Daenyth Yeah I guess it's just a misunderstanding about the meaning of available and not an expectation of not blocking (and honestly I would have assumed the same thing - that it means how much data is available, though obviously that can not always be known in advance). Anyway I worked around it, and pdfbox is happy with the InputStream from fs2.io :)
Dennis
@dennis4b_twitter
val stream = Stream.emit(42) ++ Stream.eval(timer.sleep(1000.millis))     // timer is Timer[IO]

stream.repeat.map(v => logger.info(s"Value is ${v}")).compile.drain
This prints "42" and then "()" since the return value of timer.sleep is also emitted. What I would like is to only emit the 42, spaced 1 second apart.
What is the correct way to do this?My usecase is polling a database table for pending tasks, so a "return-if-found-else-sleep" kind of loop.
Dennis
@dennis4b_twitter
nicer but has the same result: val s = Stream.emit(10) ++ Stream.sleep[IO](1000.millis)
Dennis
@dennis4b_twitter
(in this simple example I could use awakeEvery)
Oleg Pyzhcov
@oleg-py
@dennis4b_twitter just use .drain on the stream you don't care about?
Dennis
@dennis4b_twitter
Ugh that easy! Thank you!
Fabio Labella
@SystemFw
@dennis4b_twitter there are Stream.sleep and Stream.sleep_ as well (sleep_ is sleep.drain)
as well as the various other combinators that can you can zipRight or zipLeft (fixedDelay, fixedRate, and so on)
for example, I would write your code like this
ah, and don't log in map
Stream
   .emit(42)
   .repeat
   .evalMap(v => IO(logger.info(s"Value is $v"))
   .zipLeft(Stream.fixedDelay(1.second))
Dennis
@dennis4b_twitter
interesting, what if the "42" is actually the result of some def checkForWork: IO[Option[Int]] ? So it can return None, in which case I want to sleep, or Some(n), in which case I want that to be emitted?
Fabio Labella
@SystemFw
in that case you're better off with a simple flatMap
but in many cases, Stream allows you to separate the "when" from the "what", which isn't possible with IO
so I tend to have timed streams separated, and zip them with my logic stream
Dennis
@dennis4b_twitter
If you don't mind me asking (I am struggling a bit still with timed streams) -> suppose I have a SignallingRef[IO,Int], which is the number of outstanding jobs. On job completion, I execute .update(_ - 1). I want to push this new value to the user, which is easy by mapping the counter.discrete value. However, updates can come in quick succession, so after sending one update I want to not send the next update sooner than in one second, so possibilities are:
1) nothing comes, nothing to do
2) next comes at 0.3 seconds, next at 0.6, next at 0.8, and next at 3,5 seconds -> I would like to send the next value (that was set at 0,8 seconds) to the user at 1,0 seconds, and because the next update at 3,5 was more than 1 second send it immediately when receiving it (but then again not sending any more updates until 4,5)
I hope this makes sense :-) It's a bit like debouncing I guess
Fabio Labella
@SystemFw
there is a debounce combinator I think
is that enough?
Dennis
@dennis4b_twitter
wow, actually yes
a last one: I have a Stream[IO,Message]. I want to make this into a Stream[IO,Option[Message]] with the following semantics: if there is a was no Message coming from the original stream within the last 10 seconds, emit a None. (My usecase: inject keepalives into a stream of websocket messages to the client, but instead of at some fixed interval do it only if I haven't sent any legitimate message within the last 10 seconds)
Dennis
@dennis4b_twitter
I currently merge with an interval stream, but that way I send the keepalives suboptimally because I don't need to send them if I am already sending other traffic
Fabio Labella
@SystemFw
yeah once you get into a situation when you need finer control, the strategy is generally to have an explicit queue
this particular case you have won't necessarily be super easy to implement
you can have a look at the implementation of groupWithin to take some inspiration
if not I can probably write it down for you at some point
the main difficulty is that you have a timeout that can timeout itself
i.e. when the stream emits, you start a 10 second timeout
but if it does emit, you need to timeout the timeout
Dennis
@dennis4b_twitter
a Queue would work if there was something like timedDequeue(1.seconds) that would return Some(_) or None on timeout
Fabio Labella
@SystemFw
I've implemented this logic in several ways for different use cases
i.e. once in the cats effect channel, (for a similar use case), using one strategy
and once in groupWithin (with a different strategy)
(and several other times in other places)
if you look at groupWithin you can see how it's done
Dennis
@dennis4b_twitter
looking now
Fabio Labella
@SystemFw
let me send you a few links