Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Apr 16 09:47

    harendra-kumar on readme

    Update the README Remove all t… (compare)

  • Apr 16 08:51

    harendra-kumar on parser-fixes

    (compare)

  • Apr 16 08:44

    harendra-kumar on issue_1029

    (compare)

  • Apr 15 21:06
    harendra-kumar synchronize #1037
  • Apr 15 21:06

    harendra-kumar on array-stream

    Move array stream bench/tests t… (compare)

  • Apr 15 20:43
    harendra-kumar commented #1034
  • Apr 15 20:39
    harendra-kumar commented #1034
  • Apr 15 20:36
    harendra-kumar review_requested #1034
  • Apr 15 20:21
    harendra-kumar synchronize #1037
  • Apr 15 20:21

    harendra-kumar on array-stream

    Move array stream bench/tests t… (compare)

  • Apr 15 20:20
    harendra-kumar synchronize #1034
  • Apr 15 20:20

    harendra-kumar on array-stream-folds

    Add array streams folds Add fromParser and fold_ to arr… (compare)

  • Apr 15 19:23
    harendra-kumar closed #1030
  • Apr 15 19:23
    harendra-kumar closed #1029
  • Apr 15 19:23

    harendra-kumar on master

    Rename serially etc. to fromSer… (compare)

  • Apr 15 19:03
    harendra-kumar synchronize #1037
  • Apr 15 19:03

    harendra-kumar on array-stream

    Move array stream bench/tests t… (compare)

  • Apr 15 18:51
    harendra-kumar synchronize #1030
  • Apr 15 18:51

    harendra-kumar on issue_1029

    Rename serially etc. to fromSer… (compare)

  • Apr 15 17:22
    harendra-kumar review_requested #1037
maralorn
@maralorn:maralorn.de
[m]
e.g. S.drain $ mapM print $ racing stream1 stream2
Julian Ospald
@hasufell
how do you know what finishes first if you don't eliminate the streams?
maralorn
@maralorn:maralorn.de
[m]
@hasufell: I don‘t care.
@hasufell: I mean, with ahead I maybe would. But I can imagine streams, were I wouldn‘t.
Ah, wait. I guess I can try raising an exception.
Julian Ospald
@hasufell
the exception doesn't happen, unless you eliminate the stream
and since there are different eliminations (including head), I don't understand how you could have a race function prior to performing the elimination
Harendra Kumar
@harendra-kumar

@maralorn:maralorn.de there are a couple of ways to achieve what you want to achieve. You can take a look at the race section in this doc https://github.com/composewell/streamly/blob/master/docs/streamly-vs-async.md .

But there is another way as well.

maralorn
@maralorn:maralorn.de
[m]
@harendra-kumar: Thank you. That document is excatly what I needed
Amazingly a duckduckgo search for "Haskell streamly race" didn‘t find it …
Harendra Kumar
@harendra-kumar
Somehow search engines do not give much priority to searching in github repos. We should put all the docs on the website then searching would be better I guess.
google does provide a result from hackage as the second link. No results have the github repo though.
BTW, that document maybe a bit older, and some new stuff may be available after that. We will revise it. But if you are in doubt about something, just ask here.
Harendra Kumar
@harendra-kumar
The parallelMin operation is exactly race, it was introduced later.
@hasufell you can see this https://github.com/composewell/streamly-lz4/ .

@hasufell in the tar code what does this do:

  liftIO $ putStrLn $ "mid"
  head' <- liftIO $ S.head stream

Are you trying to read from the head of the original stream or after the 512 byte header?

maralorn
@maralorn:maralorn.de
[m]
@harendra-kumar: Have I observed correctly that S.drain $ asyncly $ S.fromListM [action1, action2]will not run the actions parallely?
Can you explain a bit why that is?
Harendra Kumar
@harendra-kumar
It may or may not, how are you testing? If they are very small actions e.g. just return 1 then they may run in the same thread. If the first action blocks then the second one will be spawned in a new thread. This is because it does not make sense to run tiny actions in separate threads. It spawns new thread only if it is really required.
async is for demand driven concurrency, it evaluates actions only when needed and at the rate at which the consumer is consuming. On the other hand parallel always spawns a thread irrespective of the size or demand.
maralorn
@maralorn:maralorn.de
[m]
@harendra-kumar: Huh, what I was observing was that action1 blocked (it waited on an stm action) and action2 didn‘t get started.
Same happened when using parallely.
Harendra Kumar
@harendra-kumar
Ah, fromListM has a bug which is fixed in master branch.
You can use fromFoldableM instead.
maralorn
@maralorn:maralorn.de
[m]
@harendra-kumar: Ah, thank you. Know things make a lot more sense.
It’s so important to have a support channel for a library.^^
Julian Ospald
@hasufell
if StreamD is faster, why is there even StreamK?
Julian Ospald
@hasufell
does it provide anything that StreamD can't?
Harendra Kumar
@harendra-kumar
Its because of appends and all derived problems due to appends. StreamD has quadratic complexity wrt to number of appends. StreamK is linear.
You can try using StreamD.cons or StreamD.append 1000s of times and then you can see.
StreamK provides a linear consing and appending.
maralorn
@maralorn:maralorn.de
[m]
Huh, the Tutorial says "fmap" is always serially. Does that mean that inserting an fmap between an asyncly and some concurrent stream will render that asyncly moot?
maralorn
@maralorn:maralorn.de
[m]
New question. I have written this translation from ListT m a to IsStream t => t m a. Have I done it wrong and does this function already exist somewhere? foldMap (\(x, rest) -> S.yield x <> toStream rest) <=< ListT.uncons
Harendra Kumar
@harendra-kumar

Huh, the Tutorial says "fmap" is always serially. Does that mean that inserting an fmap between an asyncly and some concurrent stream will render that asyncly moot?

asyncly just forces the type of the argument stream to AsyncT, so it remains in effect irrespective of fmap or any other serial operations. Any combinators within that block that support concurrency will have async behavior.

Harendra Kumar
@harendra-kumar

New question. I have written this translation from ListT m a to IsStream t => t m a. Have I done it wrong and does this function already exist somewhere? foldMap (\(x, rest) -> S.yield x <> toStream rest) <=< ListT.uncons

Looks ok at first look. Can you give the full definition? Which ListT are you using? Does it type check?

The tutorial gives code for interoperation with other streaming libraries https://hackage.haskell.org/package/streamly-0.7.3/docs/Streamly-Tutorial.html#g:39, list-t would be something similar.

Julian Ospald
@hasufell
So... is the idea to have an API that hides the Array type from the stream without giving up chunk size?
maralorn
@maralorn:maralorn.de
[m]
Is there anywhere an example code to stream values via aeson instance into a file?
Harendra Kumar
@harendra-kumar

So... is the idea to have an API that hides the Array type from the stream without giving up chunk size?

Not necessarily hide, we can even work on chunk level i.e. on a stream of arrays. Stream of arrays would be similar to a lazy bytestring but more explicit.

Is there anywhere an example code to stream values via aeson instance into a file?

aeson works with bytestring, you can use streamly-bytestring to work with bytestrings. I know people use streamly with aeson extensively but there is no example. Maybe we can provide one, but streamly will have its own json parser in future.

maralorn
@maralorn:maralorn.de
[m]
Next question: Is there a function to chunk by time? I would like to collect outputs for 500ms or 500 entries, whatever comes first and then emit them (when there is at least one entry).
Harendra Kumar
@harendra-kumar
maralorn
@maralorn:maralorn.de
[m]
@harendra-kumar: I will see.
Ah, looks nice.
But that does also call the fold when nothing get’s emitted in that time interval?
Harendra Kumar
@harendra-kumar

If you can use the master branch (soon to be released as 0.8.0), you can combine folds to do whatever you want. You can generate the haddock docs and see the docs for "Streamly.Internal.Data.Fold" module.

For example:

>>> Stream.fold (Fold.takeInterval 1.0 Fold.toList) $ Stream.delay 0.1 $ Stream.fromList [1..]
[1,2,3,4,5,6,7,8,9,10,11]

Now you can combine this with Fold.take 1 to get at least one element before you check for the interval.

maralorn
@maralorn:maralorn.de
[m]
I am currently stuck with streamly 0.7.0. But good to know anyways. I guess it wouldn‘t be hard to write what I want myself.
Harendra Kumar
@harendra-kumar
@hasufell I have a working prototype of a well performing version of your tar-bytestring repo. Added some functionality in streamly for efficient parsing of array streams. The code is quite idiomatic, its composed monadically using parsers. Runs in constant memory and same timings as before. Here is the gc data for untarring the ghc-9.0.1 2GB tarball:
   2,529,138,120 bytes allocated in the heap
         756,424 bytes copied during GC
          82,776 bytes maximum residency (2 sample(s))
          32,464 bytes maximum slop
               0 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      2207 colls,     0 par    0.037s   0.035s     0.0000s    0.0001s
  Gen  1         2 colls,     0 par    0.000s   0.000s     0.0002s    0.0003s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    2.924s  ( 33.413s elapsed)
  GC      time    0.037s  (  0.035s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    2.961s  ( 33.449s elapsed)

  %GC     time       0.0%  (0.0% elapsed)

  Alloc rate    864,985,759 bytes per MUT second

  Productivity  98.7% of total user, 99.9% of total elapsed