Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 15:56
    Unisay opened #1307
  • 10:37
    rnjtranjan assigned #1306
  • 10:36
    rnjtranjan opened #1306
  • 10:36

    rnjtranjan on issue_1304

    Implement scan for Unfold #1304 (compare)

  • 10:34
    rnjtranjan assigned #1305
  • 09:13
    rnjtranjan opened #1305
  • 09:12

    rnjtranjan on issue_1303

    Implement scan for folds (compare)

  • 02:53
    harendra-kumar opened #1304
  • 02:53
    harendra-kumar assigned #1304
  • 02:52
    harendra-kumar assigned #1303
  • 02:52
    harendra-kumar opened #1303
  • Oct 27 09:10
    hasufell commented #1301
  • Oct 27 09:09
    hasufell review_requested #1301
  • Oct 26 19:49
    hasufell synchronize #1301
  • Oct 25 15:21
    adithyaov edited #1302
  • Oct 25 15:20
    adithyaov edited #1291
  • Oct 25 15:20
    adithyaov edited #1291
  • Oct 25 15:20
    adithyaov opened #1302
  • Oct 25 15:20
    adithyaov synchronize #1291
  • Oct 25 15:20

    adithyaov on car-num-pars

    Use scientific to implement a f… (compare)

Harendra Kumar
@harendra-kumar

I'm wondering why this function doesn't exist folded :: (IsStream t, Monad m, Foldable f) => Fold m a (f b) -> t m a -> t m b.

How is it different than foldMany?

And on the reddit thread ppl are complaining that mapM etc require MonadAsync

I replied to the comments on reddit.

Julian Ospald
@hasufell
folded is applied only once
the output is e.g. a list and turned into a stream without it being evaluated via toList/fromList
Harendra Kumar
@harendra-kumar
Not sure how this will be useful, we are essentially folding to a container and then turning the container back into a stream. It will likely have the same perf characteristics as folding to the container and then streaming from it.
Julian Ospald
@hasufell
so the container has to be fully evaluated?
Harendra Kumar
@harendra-kumar
These are left folds, so yes. If you want to evaluate lazily then right folds/scan/map etc is what's needed. Also, why would you want to write to a list and then consume from the list when you can directly consume the stream?
Julian Ospald
@hasufell
I don't understand what you mean. If you have such a Fold and want to apply it over a stream without eliminating the stream, then it seems you can't.
Julian Ospald
@hasufell
I also don't see a way in conduit without pulling the entire list into memory
Las
@Las:matrix.org
[m]
Something I forgot to mention when I joined: You might want to add this link to the README: https://matrix.to/#/#composewell_streamly:gitter.im
Since Gitter just uses Matrix now, you can just join using Matrix, and I assume that many more people use Matrix than Gitter.
Harendra Kumar
@harendra-kumar
ok, thanks for the suggestion. @adithyaov can you please look into it and update the readme.
Adam Conner-Sax
@adamcs:matrix.org
[m]
I've been working on a library to make the Frames library, currently using Pipes, stream-type agnostic. And I've written "back-ends" for Pipes and Streamly. So for the functions tested, I expose the stream through a typeclass where I try to use them as idiomatically as possible. I've got benchmarks for loading data from a file to a stream, converting to lines of Text, parsing that text into a Frames (Vinyl) record and then folding into a Frame, an efficient in-memory format. The upshot is that Pipes and Streamly are very close with Streamly being about 3% faster for that task and 5% slower if an extra mapping step (transforming the records before folding) is added. It's close enough that I have no issue but it does provide a real-world head-to-head comparison.
Repo: https://github.com/adamConnerSax/Frames-streamly/tree/StreamTypeClass
The typeclass and implementations are in: https://github.com/adamConnerSax/Frames-streamly/tree/StreamTypeClass/src/Frames/Streamly/Streaming
I'm happy to try variations and ideas!
NB: There is also a "Frames" variant in the benchmarks where I am comparing the current released version of Frames. You can ignore that here since there are various implementation differences between the Frames file handling and mine.
Harendra Kumar
@harendra-kumar

Streamly uses CPS (StreamK) as the fallback stream type but tries to use the direct stream type (StreamD) wherever possible. The CPS type would be as efficient as any other streaming library in some cases it may be slightly better. However, when leveraged properly the direct type can be 10x faster, but you may not always be able to leverage it. For example if you are using streams to read chunks from files, and then processing the chunks independently then you won't see any difference in performance. However, if you are using stream processing to process a stream of chars directly then CPS won't stand any chance against stream-fusion. Using streamly you could process a stream a chars directly whereas in other streaming libraries you can't.

With that background, your results are expected. A margin of 5% is not significant to chase, that much difference you can even see because of the difference in generated code layout, without even making any change to the source. So it is hard to attribute where the difference is coming from.

You can try INLINE instead of INLINEABLE in your instance. Though that won't make much difference if CPS is in play.

Do you need these: -fspecialise-aggressively -fexpose-all-unfoldings? I won't recommend these unless they are making a difference.

Adam Conner-Sax
@adamcs:matrix.org
[m]
Thanks for the help! So how do I structure stream operations to make direct stream use more likely?
Adam Conner-Sax
@adamcs:matrix.org
[m]
For instance, I am processing the file I read in chunks, reading as bytes but processing lines of Text. But once it's lines of text, everything else happens on those chunks: they are converted to records and then folded into the in-memory structure. So is streamly using StreamK for loading and then StreamD for the rest? Or once things are StreamK am I stuck there?
Is there some documentation that explains this stuff and how to optimize with it in mind?
Harendra Kumar
@harendra-kumar

In this case you are probably getting the optimal performance from both libraries. Reading chunks is a very small part of your processing so it would not matter what you use for that. After this you are processing those chunks using text library operations and not streaming ops. text already uses stream fusion, therefore, those ops are as efficient as possible.

The structure of your code is largely dictated by the fact that you want it portable for pipes as well as streamly. What streamly gives that other libraries don't is the ability to work efficiently on streams of very small items too e.g. streams of Char . So you could avoid using text entirely and do that processing using streaming operations if you want to. Pipes and other libraries cannot do that as the overhead will be extremely high.

Because your chunk/text operations are already very efficient, your model works with good performance for both pipes and streamly. The use of streamly for entire processing would only give you better ergonomics by avoiding the use of text. But perf would remain the same I guess.

Julian Ospald
@hasufell
I think I tried t m Char in my "file copy" streaming code once and it was very slow? Are you referring to using StreamD in that case?
or is this more about e.g. Parser type, that wouldn't need to parse in chunks?
Harendra Kumar
@harendra-kumar

You might have used fusion breaking operations like bracket. bracket and other such operations (noted in the documentation) breaks the fusion loop, so in that case chunking is the only efficient option. However, you can then use char based streaming ops on the individual chunks efficiently.

But anyway in cases like file copy you do not want to copy char-by-char you read chunks and write those chunks directly without touching the bytes in them.

Julian Ospald
@hasufell
yeah
but I was wondering if something like Parser m Char b would be possible or if it needs to be Parser m (Array Char) b
given that I use CPS style parsers
Harendra Kumar
@harendra-kumar
The same reasoning applies to parsers as well. Operations like append/sequence/monad composition force CPS parsers, if you are not using that then Parser m Char b would be as efficient as it can be. But if CPS is forced then Parser m (Array Char) b i.e. chunked stream would be required. However, you could apply the char parsers on the chunks efficiently as most of the time you won't be needing CPS forcing ops there. We can clearly document which operations force CPS for parsers, currently it may be missing.
Adam Conner-Sax
@adamcs:matrix.org
[m]
Okay, that makes sense, I think.
-fspecialise-aggressively -fexpose-all-unfoldings do make everything about 10% faster (more or less) but they have the largest effect on the Frames library version, which uses Pipes but somewhat differently than in my version.
Thanks!
Adam Conner-Sax
@adamcs:matrix.org
[m]
One observation: I'm also having criterion regress allocs and numGCs and the streamly implementation allocates about twice as much and has about twice as many GCs as the pipes version. It doesn't cause much performance difference but it's curious...
Harendra Kumar
@harendra-kumar
Not sure, will have to review the implementation of your library to get a clue for that.
Adam Conner-Sax
@adamcs:matrix.org
[m]
I can try to break it down. But it only involves reading (unfolding) lines of Text, mapping over them (but the map is the same in both cases) and then folding. The fold is also the "same" though streamly and Pipes implement folds somewhat differently, right? Anyway, I'll try to benchmark those bits separately and see if I can find where the allocations/GCs differ.
Adam Conner-Sax
@adamcs:matrix.org
[m]

Benchmarking just the loading bit. 5000 lines of text. I create a stream of text and then count the lines (to make sure the stream "runs").
For Pipes I use the pipes-text library so the function is just Pipes.length . Pipes.Prelude.Text.readFileLn
For Streamly, I do

Streamly.map (T.pack . Streamly.Array.toList)
  . Streamly.Unicode.Array.lines
  . Streamly.Unicode.decodeUtf8
  . Streamly.File.toBytes

Pipes version runs in 4.8ms, 10.5 GCs, allocates 1.1e7 bytes
Streamly version runs in 6.9ms, 52.0 GCs, allocates 7.5e7 bytes
So that might be most of the GC/alloc difference

Adam Conner-Sax
@adamcs:matrix.org
[m]

Pipes is using Text.hGetLine and thus delegating all the splitting and decoding to Text. I'd like to try that with streamly. Not sure how to use the equivalent idiom of withFile. I guess bracket?
Okay, built with bracket as:

streamlyReadTextLines' :: (Streamly.IsStream t, Streamly.MonadAsync m, MonadCatch m) => FilePath -> t m Text
streamlyReadTextLines' fp = Streamly.bracket (liftIO $ IO.openFile fp IO.ReadMode) (liftIO . IO.hClose) $ Streamly.unfoldrM f where
  getOne :: IO.Handle -> IO (Either IOException Text)
  getOne h = try (Text.hGetLine h)
  f h = do
    tE <- liftIO $ getOne h
    case tE of
      Left _ -> return Nothing
      Right t -> return $ Just (t, h)
{-# INLINE streamlyReadTextLines' #-}

That version is much faster and has lower GC overhead: 1.5ms, 3 GCs, 3e6 bytes allocated.

Adam Conner-Sax
@adamcs:matrix.org
[m]
@harendra-kumar: I'm sure this is still not optimal. But this is such a common operation, perhaps it's worth putting an optimal version in streamly someplace?
Julian Ospald
@hasufell
Is there a way to express a non-empty stream, so that e.g. head doesn't return Maybe?
mostly thinking in terms of using streamly for async with fromEffect someAction
Julian Ospald
@hasufell
composewell/streamly#496 seems relevant
Adam Conner-Sax
@adamcs:matrix.org
[m]
Can I combine a scan and a fold to make a new fold? Like (x -> a -> m x) -> m x -> Fold m x b -> Fold m a b? I think I could write that but does the function exist someplace? And if not, is there a reason?
I'd be almost equally happy to do it on the unfold side, I guess: (x -> a -> m x) -> m x -> Unfold m b a -> Unfold m b x
Harendra Kumar
@harendra-kumar

@harendra-kumar: I'm sure this is still not optimal. But this is such a common operation, perhaps it's worth putting an optimal version in streamly someplace?

You are using a very inefficient way of converting the stream to text. You are decoding the byte stream, then converting it to list and then converting back to text. Instead you can use File.toChunks to read arrays, convert the arrays to bytestring (using streamly-bytestring) and then convert that to text.

1 reply
Harendra Kumar
@harendra-kumar

Can I combine a scan and a fold to make a new fold? Like (x -> a -> m x) -> m x -> Fold m x b -> Fold m a b? I think I could write that but does the function exist someplace? And if not, is there a reason?

Unimplemented functions signatures are already there in both Fold and Unfold modules. I raised #1303 and #1304 for these. You can take up the issue and send a PR if you want to implement it.

1 reply
Harendra Kumar
@harendra-kumar

Is there a way to express a non-empty stream, so that e.g. head doesn't return Maybe?

There is no type safe way to express empty streams. You can write a partial wrapper on head (or your own fold) to avoid Maybe if you know the stream is non-empty. There is also a headElse to supply default value - https://streamly.composewell.com/haddocks/streamly-0.8.0/Streamly-Internal-Data-Stream-IsStream-Eliminate.html#v:headElse .

Julian Ospald
@hasufell
how did you make that haddock website?
Adam Conner-Sax
@adamcs:matrix.org
[m]
Also, a different point of confusion: I have chunks of Word8 then bytestring. Do I make those into Text and then parse into lines--if I could figure out how--or into Arrays of Char or streams of Char and then parse?
Julian Ospald
@hasufell
So I wrote a simple newline parser... 100mb file as input, StreamK takes 3.5s, StreamD 2s and attoparsec below 1s: https://github.com/hasufell/parselines/blob/master/app/Main.hs
what could be the reason that attoparsec is more than twice as fast?
Julian Ospald
@hasufell

I'm trying this:

  newlineParserD :: (MonadIO m, MonadThrow m) => ParserD.Parser m (FArray.Array Word8) (FArray.Array Word8)
  newlineParserD = ParserD.Parser step initial extract
    where
      initial = pure $ ParserD.IPartial (mempty :: FArray.Array Word8)
      step s a
        | FArray.byteLength a == 0 = pure $ ParserD.Done 0 s
        | otherwise = do
            ma <- FArray.breakOn _lf a
            case ma of
              (prefix, Just suffix) ->
                pure $ ParserD.Done (FArray.length suffix) (s <> prefix)
              (prefix, Nothing)     ->
                pure $ ParserD.Partial 0 (s <> prefix)
      extract s = pure s

but it doesn't terminate. Somehow, count in Done doesn't seem to do what I think it does

Julian Ospald
@hasufell
I guess I mixed up the bytestream with the chunk stream... what I need here is the ability to not just reset the number of elements, but cut off part of the last element, so the next parser won't see the entire chunk, but only part of it
Julian Ospald
@hasufell
The parser API doesn't seem expressive enough for this
Julian Ospald
@hasufell
basically, the idea is to use as much breakOn on Arrays as possible (since that uses C's memchr)
Adam Conner-Sax
@adamcs:matrix.org
[m]
@harendra-kumar: After much effort (see above), I'm unable to write anything nearly as fast as using Text.hGetLine in a bracket. If you have a clear way to IO.Handle -> t m Text where that Text is lines not the text of the original chunks, I'd be happy to test against what I've got. But it's slower to use Streamly.FileSystem.Handle.read and then parse and once it's in arrays, the parsing to get from those arrays to lines doesn't seem at all simple.
That is, using readChunks makes it difficult to get lines of text out.