by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
  • Jan 29 2019 17:37
    pchlupacek commented #1406
Fabio Labella
@SystemFw
ah, gotcha, you want a stream of arrays
David Flynn
@WickedUk_gitlab
yeah
Fabio Labella
@SystemFw
def readAtOffsets(
    fileName: String,
    offsetLengths: List[(Long, Int)]
): Stream[IO, Array[Byte]] = {
  val cursor =
    Blocker[IO].flatMap(ReadCursor.fromPath[IO](Paths.get(fileName), _))

  Stream.resource(cursor).flatMap { cursor =>
    Stream
      .emits(offsetLengths)
      .flatMap {
        case (offset, length) =>
          cursor
            .seek(offset)
            .readUntil(length, offset + length)
            .void
            .stream
            .chunks
            .map(_.toArray)
      }
  }
}
David Flynn
@WickedUk_gitlab

I did a benchmark vs the simple:

def readAtOffsetsSimple(fileName: String, offsetLengths: Seq[(Long, Int)]): Unit = {
      val fc = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ)

      for {
        (offset, length) <- offsetLengths
      } yield {
        val buf = ByteBuffer.allocate(length)
        val len = fc.read(buf, offset)
        if (len < 0) None
        else Some(buf, len)
      }

      fc.close()
    }

The stream solution for 1 million random 100 byte reads in a 700 Mb file gave an average of just under 3 minutes. The simple version was about 4 seconds a factor of ~45

Fabio Labella
@SystemFw
couple of things I've changed:
  • I'm opening the file once, whereas you open and close it at each read.
  • If you emit the List in a Stream, it's then easier to do the traversal you want, as a simple Stream.flatMap
  • you can avoid reconstruction of that Chunk
yeah check if the opening and closing has something to do with it
your code opens and closes the file a million times (literally, in your benchmark)
David Flynn
@WickedUk_gitlab
That's better. Now getting just under a minute, a factor of ~14.5
Also not on any sort of performance rig to give definite benchmarks so should be taken with a pinch of salt.
Fabio Labella
@SystemFw
what's the actual buffer size you are likely to read in your use case?
David Flynn
@WickedUk_gitlab
Probably somewhere around 100, and increasing offsets
Time is critical so given the file is less than 1G probably makes sense in that case to load the lot into memory.
But for non-critical reads of binary files, the stream solution may be useful.
Would be interesting to compare reads from memory vs reads from SSD, I suspect it's still a significant factor
David Flynn
@WickedUk_gitlab

Another question, how best to debug stream applications. Obviously there is a lot of machinery under the hood (I stepped a simple stream all the way through). Sadly auto-stepping and reading a report doesn't exist on intellij (or does it via some plugin)?

When I've debugged systems like this before, I've used stepping filters to cut out all the machinery. I looks like breakpoints on the code portions also work, though that's clunky to step. Any suggestions?

David Flynn
@WickedUk_gitlab

One further question: Does fs2 support memoisation?

If I pass my stream elements through f(s) and if f is a pure function f(s) for a particular s should return the same value each time. Thus if f is a big computation, f only needs running once. If f has side effects but are captured, and it's okay to rerun the captured side effects each time, again, can that be memoised to avoid having to run f each time ?

Fabio Labella
@SystemFw
I generally don't use debuggers (and I don't mean this with any snark whatsoever), but a combination of using referential transparency and println debugging. But tbh most of my problems end up being distributed systems problems these days, so...
in general, memoisation breaks referential transparency
there are some tools in cats effect to memoise effects, but you need to apply them manually
btw, referential transparency is what easily could tell that you were opening/closing files repeatedly in the example above
David Flynn
@WickedUk_gitlab
By using RT, you mean extracting the function somewhere else (e.g. a repl) and trying it out with the same inputs (perhaps gained vs a println)?
Fabio Labella
@SystemFw
often, even just replacing definition (basically inlining) yields a lot of insight
take this for example
val resource = Stream.resource(ReadCursor.fromPath[IO](Paths.get(fileName), blocker))

val outStream = offsetLengths.foldLeft(Stream.empty.covaryAll[IO, Array[Byte]]) { case (streamAcc, (offset, length)) =>
          streamAcc ++ resource.flatMap {
            _.seek(offset).readUntil(length, offset + length).void.stream.mapChunks { c => Chunk(c.toArray) }
          }
        }
because of RT, we can always replace an expression for its value, without any change in behaviour
so I can replace resource
streamAcc ++ Stream.resource(ReadCursor.fromPath...).flatMap
which tells you that the readPath program is happening on each iteration
this is similar (in a way) to a debugger, in the sense that you can "zoom" in and out by replacing definitions or abstracting them out
and can tell you a surprisingly large amount about what's going on (since most of the time it boils down to a semantic error in one's own code)
when that's not enough, adding println debugging generally makes up the rest
also, we have debug and debugChunks on Stream, and we're in the process of adding tracing to IO, to help with the remaining cases
David Flynn
@WickedUk_gitlab
One of the problems I have working out at an arbitrary point what effects have been built up so far. When it's not using free monads (or similar) you can look at whatever got output, logs and the contents of various variables at the point of execution and that gives you a good indication of how you got to that state. If the state is in error at that point you know the events that rightly or wrongly created it, if the state is correct you know something later probably went wrong. With leaving everything to the end before it gets interpreted, you have the problem of everything else added on afterwards, and all the irrelevancies before, making it hard to zoom in to that little bit which went wrong (which you might not know the exact point or scope/size of the problem).
This is why I mentioned about using a debugger (logs might contain so much stuff or not capture the evidence you're after)
Same sort of comment about receiving someone else's code and having to work out what it does and how it works before you make changes, a debugger to see the flow around and how things are getting calculated can be useful and I'm not sure how to replace that without printlns everywhere.
Evgenii
@ibanezn04
Hi, i'am trying to unzip archive and parse json files inside to objects, but for some reason the stream is always empty. Can somebody help me understand what going on?
https://gist.github.com/ibanezn04/15429e56b9e19184c69b752b97eddd1d
Artūras Šlajus
@arturaz

@ibanezn04 this might help. It's an implementation of unzipping using 7zip because JDK fails on some newer ZIP files. Unfortunately 7zip needs random seek access, so you have to buffer the zip file either to memory or disk.

https://gist.github.com/arturaz/9a45a8b3807fc625e4feb006b3a94ab1

And I'm not sure, but maybe this issue could help functional-streams-for-scala/fs2#1822

Gavin Bisesi
@Daenyth
Hmm, I just ran into something with the gzip pipe I've never seen before
NonProgressiveDecompressionException: buffer size 4096 is too small; gunzip cannot make progress
Does anyone know under what circumstances this will trigger? I can obviously bump up the size, but I'd like to know why it happened in the first place
Rob Kelly
@rlkelly

is there a recommended way to do:

def foo[F, O, O2, O3](s: Stream[F, O], left: O => O2, right: F => O3) : Stream[F, (O2, O3)] = ???

if I want to split them into two streams and zip them back together, preferably asynchronously?

would it just be s.map(left).parZip(s.map(right)) ?
Anthony Cerruti
@srnb_gitlab
My brain tells me there should be a way to turn (O => O2, O => O3) into O => (O2, O3) in cats, but I can't remember the name of it
@rlkelly What happens if you (left, right).product?
Sorry
It'd be left.product(right)
@ val a: Int => String = (i: Int) => i.toString
a: Int => String = ammonite.$sess.cmd2$$$Lambda$1964/810898134@7c5f29c6

@ val b: Int => Int = (i: Int) => i * 2
b: Int => Int = ammonite.$sess.cmd3$$$Lambda$1973/33299582@483b7dc4

@ a.product(b)
res4: Int => (String, Int) = cats.instances.Function1Instances$$anon$7$$Lambda$1984/543938464@de63949

@ res4(3)
res5: (String, Int) = ("3", 6)
Anthony Cerruti
@srnb_gitlab
Then you could s.map over that product
Anthony Cerruti
@srnb_gitlab
Evaluating both functions asynchronously though is a problem I don't know how to solve unfortunately
David Flynn
@WickedUk_gitlab
Any thoughts on an efficient way to do the 'common' takeWhileThenUntil(f)(f2) - or takeWhileThenWhileNot(f) (as before if f == f2) - application would be the stream contains a bunch of messages (header, body, header, body...), f is true if a line is part of a header else the line is part of a body - I want a stream of (head+body)
assume no line in a body resembles a header so no state is needed