Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Michael Pilquist
@mpilquist
available is not supported - we inherit the default implementation of that method, which always returns 0
Dennis
@dennis4b_twitter
Ok so the following won't work... (from the scrimage library)
  def fromStream(in: InputStream, `type`: Int = CANONICAL_DATA_TYPE): Image = {
    require(in != null)
    require(in.available > 0)
    val bytes = IOUtils.toByteArray(in)
    apply(bytes, `type`)
}
Michael Pilquist
@mpilquist
Ouch no
Seems like that's broken in scrimage, as that method is supposed to return the number of bytes that can be read without blocking

E.g., from JavaDoc of available:

Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream. The next invocation might be the same thread or another thread. A single read or skip of this many bytes will not block, but may read or skip fewer bytes.

Dennis
@dennis4b_twitter
Thanks, I'll work around it with the Array[Byte] constructor and report an issue
Gavin Bisesi
@Daenyth
require(in.available > 0) that's... not smart
if in.available returned 1, then the next line could still block on 5000 bytes after that one ready byte
Fabio Labella
@SystemFw
do you guys think this is useful? I'm debating whether to PR it or not
def spawn[F[_]: Concurrent, A](s: Stream[F, A]): Stream[F, Fiber[F, Unit]]
@mpilquist
Michael Pilquist
@mpilquist
Cool
Christopher Davenport
@ChristopherDavenport
evalMap(_.start)?
Fabio Labella
@SystemFw
not quite
you'll see the PR in a minute though
Christopher Davenport
@ChristopherDavenport
Oh, missed the unit. I get it now.
Gavin Bisesi
@Daenyth
What's the point?
Fabio Labella
@SystemFw
@Daenyth a very minimal "fork this stream"
not crucial by any means, hence why I was hesitant
Gavin Bisesi
@Daenyth
I see, you get the cancellation behavior
Fabio Labella
@SystemFw
yeah, link the Fiber to the Stream lifetime
we already have something similar for F
Stream.supervise
this is just the logical extension to Stream
Dennis
@dennis4b_twitter
@Daenyth Yeah I guess it's just a misunderstanding about the meaning of available and not an expectation of not blocking (and honestly I would have assumed the same thing - that it means how much data is available, though obviously that can not always be known in advance). Anyway I worked around it, and pdfbox is happy with the InputStream from fs2.io :)
Dennis
@dennis4b_twitter
val stream = Stream.emit(42) ++ Stream.eval(timer.sleep(1000.millis))     // timer is Timer[IO]

stream.repeat.map(v => logger.info(s"Value is ${v}")).compile.drain
This prints "42" and then "()" since the return value of timer.sleep is also emitted. What I would like is to only emit the 42, spaced 1 second apart.
What is the correct way to do this?My usecase is polling a database table for pending tasks, so a "return-if-found-else-sleep" kind of loop.
Dennis
@dennis4b_twitter
nicer but has the same result: val s = Stream.emit(10) ++ Stream.sleep[IO](1000.millis)
Dennis
@dennis4b_twitter
(in this simple example I could use awakeEvery)
Oleg Pyzhcov
@oleg-py
@dennis4b_twitter just use .drain on the stream you don't care about?
Dennis
@dennis4b_twitter
Ugh that easy! Thank you!
Fabio Labella
@SystemFw
@dennis4b_twitter there are Stream.sleep and Stream.sleep_ as well (sleep_ is sleep.drain)
as well as the various other combinators that can you can zipRight or zipLeft (fixedDelay, fixedRate, and so on)
for example, I would write your code like this
ah, and don't log in map
Stream
   .emit(42)
   .repeat
   .evalMap(v => IO(logger.info(s"Value is $v"))
   .zipLeft(Stream.fixedDelay(1.second))
Dennis
@dennis4b_twitter
interesting, what if the "42" is actually the result of some def checkForWork: IO[Option[Int]] ? So it can return None, in which case I want to sleep, or Some(n), in which case I want that to be emitted?
Fabio Labella
@SystemFw
in that case you're better off with a simple flatMap
but in many cases, Stream allows you to separate the "when" from the "what", which isn't possible with IO
so I tend to have timed streams separated, and zip them with my logic stream
Dennis
@dennis4b_twitter
If you don't mind me asking (I am struggling a bit still with timed streams) -> suppose I have a SignallingRef[IO,Int], which is the number of outstanding jobs. On job completion, I execute .update(_ - 1). I want to push this new value to the user, which is easy by mapping the counter.discrete value. However, updates can come in quick succession, so after sending one update I want to not send the next update sooner than in one second, so possibilities are:
1) nothing comes, nothing to do
2) next comes at 0.3 seconds, next at 0.6, next at 0.8, and next at 3,5 seconds -> I would like to send the next value (that was set at 0,8 seconds) to the user at 1,0 seconds, and because the next update at 3,5 was more than 1 second send it immediately when receiving it (but then again not sending any more updates until 4,5)
I hope this makes sense :-) It's a bit like debouncing I guess
Fabio Labella
@SystemFw
there is a debounce combinator I think
is that enough?
Dennis
@dennis4b_twitter
wow, actually yes
a last one: I have a Stream[IO,Message]. I want to make this into a Stream[IO,Option[Message]] with the following semantics: if there is a was no Message coming from the original stream within the last 10 seconds, emit a None. (My usecase: inject keepalives into a stream of websocket messages to the client, but instead of at some fixed interval do it only if I haven't sent any legitimate message within the last 10 seconds)
Dennis
@dennis4b_twitter
I currently merge with an interval stream, but that way I send the keepalives suboptimally because I don't need to send them if I am already sending other traffic
Fabio Labella
@SystemFw
yeah once you get into a situation when you need finer control, the strategy is generally to have an explicit queue
this particular case you have won't necessarily be super easy to implement
you can have a look at the implementation of groupWithin to take some inspiration
if not I can probably write it down for you at some point