Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 08:28
    harendra-kumar commented #684
  • 07:05
    wildcat26 closed #727
  • 07:04
    wildcat26 opened #727
  • 06:28
    pranaysashank closed #690
  • 06:28
    pranaysashank closed #150
  • 06:28

    pranaysashank on master

    Use monadic seed in monadic sca… (compare)

  • 05:19
    pranaysashank synchronize #690
  • 05:19

    pranaysashank on scanlM

    Use monadic seed in monadic sca… (compare)

  • 04:50
    pranaysashank synchronize #690
  • 04:50

    pranaysashank on scanlM

    Use monadic seed in monadic sca… (compare)

  • Oct 20 23:43
    harendra-kumar synchronize #687
  • Oct 20 23:43

    harendra-kumar on windowsEvents

    Update documentation, format, a… (compare)

  • Oct 20 23:16
    harendra-kumar synchronize #687
  • Oct 20 23:16

    harendra-kumar on windowsEvents

    Revert "fixup" This reverts co… fixup (compare)

  • Oct 20 23:02
    harendra-kumar synchronize #687
  • Oct 20 23:02

    harendra-kumar on windowsEvents

    fixup (compare)

  • Oct 20 21:50
    harendra-kumar synchronize #687
  • Oct 20 21:50

    harendra-kumar on windowsEvents

    Add Windows file system event h… Update documentation, format, a… (compare)

  • Oct 20 19:58
    pranaysashank synchronize #690
  • Oct 20 19:58

    pranaysashank on scanlM

    Use monadic seed in monadic sca… (compare)

Adam Conner-Sax
@adamConnerSax
I think for applications that require the entire thing in memory—certain map-reduce folds or serializations—this is the way to go. Streamly works perfectly for things that are, well, better fits for streaming!
Harendra Kumar
@harendra-kumar
Streamly arrays are supposed to be used when you have to buffer large amounts of data in memory.
Adam Conner-Sax
@adamConnerSax

@harendra-kumar Yep. But Frames is also efficient binary storage and with some advantages since it retains much of the vinyl API. So I am trying to work at the edges of the interfaces. I’m getting there!
A (mostly) unrelated question: if I am using ST as my PrimMonad for a speculative stream (AheadT ST a), can the stream actions actually run concurrently? Or does that also require MonadAsync? And if it does, how is that clear from the types? I am writing a streamTransform for Frames (given inCoreAoS :: PrimMonad m => Streamly.SerialT m (Frames.Record as) -> m (FrameRec as))

streamTransform ::
  forall t1 as bs m.
  (IsStream t1
  , Prim.PrimMonad m
  , Frames.RecVec as
  , Frames.RecVec bs
  )
  => (t1 m (Frames.Record as) -> Streamly.SerialT m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
streamTransform f = inCoreAoS . f . Streamly.fromFoldable

and the following, e.g., compiles:

concurrentFilter :: (Frames.RecVec as) => (Frames.Record as -> Bool) -> Frames.FrameRec as -> Frames.FrameRec as
concurrentFilter f frame = runST $ streamTransform (Streamly.aheadly . Streamly.filter f) frame
Harendra Kumar
@harendra-kumar

@adamConnerSax Concurrent operations have a MonadAsync constraint. filter is not a concurrent operation, it does not have a MonadAsync constraint, that's why it works.

The Streamly.Prelude module doc says this: Functions having a MonadAsync constraint work concurrently when used with appropriate stream type combinator..

Adam Conner-Sax
@adamConnerSax
That makes sense! Thanks, @harendra-kumar ! BTW, why isn’t filter concurrent? I could filter the items independently, right?
Harendra Kumar
@harendra-kumar

filter applies a pure function to each element of the stream, like map. Its usually not expensive enough to do this concurrently, overhead of concurrency would usually be more than the cost of the function. However, filterM (like mapM) may make sense to make concurrent, e.g. if we are filtering using a predicate that requires IO or network access, then we can do multiple IO concurrently.

Its just not implemented yet, it should be simple to do so though.

For that matter most functions with a M suffix would make sense to be concurrent.
Adam Conner-Sax
@adamConnerSax
Thanks for the explanation. If you had a large enough finite stream, or an expensive enough filter function, wouldn’t it at some point be worth concurrency?
Harendra Kumar
@harendra-kumar
Expensive filter function case, yes, I mentioned that above. Large stream with a cheap filter function would require a different concurrency strategy. You will have to chunk the stream such that each large enough chunk can be filtered independently, but then just chunking (and later aggregating) overhead may be more than filtering overhead. So the answer is not simple.
Harendra Kumar
@harendra-kumar
@nhenin I have been working on the fs events notification feature for streamly. You can find the linux PR here: composewell/streamly#632, its ready to be merged. Feedback is welcome.
macOS PR is here: composewell/streamly#628 . While playing with the fs events feature on my mac book I think apple can proudly say that this is their "buggiest feature ever". I have documented the bugs that I discovered in the streamly docs, some bugs seems to have been fixed in the recent updates.
Nicolas Henin
@nhenin
@harendra-kumar thank you ! I'm going to have a look
sorry for the delay, I was not looking at the channel for a while :-)
streamA 
  :: ( MonadReader Dependencies m
       , Monad m)
    => S.SerialT m (Either Error a) 
    -> S.SerialT m (Either Error b) 
streamB streamInput = undefined -- using streamB with something like fmap 

streamB
  :: ( MonadReader Dependencies m
     , Monad m)
  => S.SerialT m a 
  -> S.SerialT m b
streamB = ...
Is there a way to do this with streamly ?
5 replies
Nicolas Henin
@nhenin
@harendra-kumar : this is really cool !, for sure I'm going to use this
I have something that was doing the job but pretty ugly and for sure not performant ....
Harendra Kumar
@harendra-kumar

master branch is going through some breaking changes, we have a major release around the corner. We are trying to keep the changelog up to date to guide users about how to deal with the breaking changes.

Some of the Internal modules and APIs have been renamed. I know some packages depend on Internal APIs as well.

Harendra Kumar
@harendra-kumar
Parsers is the new exciting feature in this release, though it remains internal until the API stabilizes.
Terminating folds is the exciting new feature which will be exposed officially, you can think of it as parsers without backtracking. The composable folds no longer run forever e.g. the head fold would terminate immediately after producing the head element unlike before when it ran until the whole stream was drained. This is not yet committed to master, but will be committed soon.
Harendra Kumar
@harendra-kumar
Parsers are already committed to master and pretty stable. You can build the haddocks to check them out. The API is pretty much like Parsec/attoparsec but more scalable, can conveniently work on infinite streams. So if you know how to use other parsing libraries, you do not need to learn anything new.
Harendra Kumar
@harendra-kumar
@nhenin the fs events PRs are now merged into master. You can give it a try. There is also an executable to test, you can just say cabal run FileSystem.Event <dirname> to try it. If you create or modify files in the dir dirname it should print the events on screen.
Nicolas Henin
@nhenin
ok Great :-) I'll give you feedback when I'll use your update
Nicolas Henin
@nhenin
@harendra-kumar I want to start seriously testing my stream processors... do you have examples I could have look ?
I'm looking at your test folder at the moment : https://github.com/composewell/streamly/blob/master/test/Streamly/Test
to get inspiration :-)
Harendra Kumar
@harendra-kumar
Great. Our tests are not well organized yet. The old tests are in thetest directory itself. We are organizing the newer tests in test/Streamly/Test. Changes will be coming in 0.8.0 soon.
Nicolas Henin
@nhenin
Hi something that looks trivial but somehow I can't find how to pipe Folds :
Fold m a b -> Fold m b c -> Fold m a c
It looks like a Monad instance for the Fold : FoldAtoB >>= FoldBtoC
Nicolas Henin
@nhenin
I'm doing this so far S.scan foldAtoB & S.scan foldBtoC
Nicolas Henin
@nhenin
The real example :
detectingStarving
  :: ( MemoryStreamLoggable m log
     , MonadReader Dependencies m
     , MonadIO m
     , S.MonadAsync m
     , MonadCatch m
     , Appendable a
     , UUIDProvider a )
  => log Input
  -> log (Proposing.Packaging.Input a)
  ->  S.SerialT m ()
detectingStarving inputLog outputLog
  = stream infinitely inputLog
      & S.scan foldConsumptionDeltaAndIsConsensusReached
      & S.scan foldStarvingInterpretation
      & S.mapM (\starvingOffset -> do
          let itemId = getDeterministicUUID starvingOffset
          Dependencies {logger} <- ask
          log logger INFO "Detected starving of pipeline"
          void $ nonIdempotentAppend outputLog Proposing.Packaging.PipelineStarving {..})



foldStarvingInterpretation
  :: (Monad m)
  => SF.Fold m (Integer,Any) Offset
foldStarvingInterpretation
  = SF.lfilter (== (0,Any True)) $ fromIntegralToOffset <$> SF.length

foldConsumptionDeltaAndIsConsensusReached :: (MonadReader Dependencies m) => SF.Fold m Input (Integer,Any)
foldConsumptionDeltaAndIsConsensusReached
  = (,) <$> foldToConsumptionDelta
        <*> foldToIsConsensusReached

foldToIsConsensusReached :: (Monad m , Monoid Any) => SF.Fold m Input Any
foldToIsConsensusReached
  = SF.foldMap
      (\case
        ConsensusReached _ -> Any True
        _ -> Any False )

foldToConsumptionDelta :: (MonadReader Dependencies m, Integral a) => SF.Fold m Input a
foldToConsumptionDelta
  = SF.lmapM
      (\inputItem -> do
       return $ case inputItem of
         LocalProposalProduced ->  1
         LocalProposalConsumed -> -1
         ConsensusReached  _   ->  0)
       SF.sum
Harendra Kumar
@harendra-kumar
Parsers are just like folds but more powerful than folds. Similar combinators will be coming to folds as well in the upcoming release.
The folds are being made as terminating folds so you could apply them repeatedly on a stream.
Harendra Kumar
@harendra-kumar
This paper https://rubenpieters.github.io/assets/papers/JFP20-pipes.pdf shows streamly is the fastest streaming library. Though it seems they used benchmarks that convert the stream to a list, thus the list creation overhead dominates the cost, it might have possibly masked the real performance difference in micro benchmarks.
Nicolas Henin
@nhenin
thanks
Nicolas Henin
@nhenin
Hi @harendra-kumar , I need to deserialize items (in json) that I have written with writeChunks2 (a tweak of writeChunks with Fold2)
An example of the content of the file :
{"tag":"DummyRequest","contents":-63}{"tag":"DummyRequest","contents":84}{"tag":"DummyRequest","contents":-70}{"tag":"DummyRequest","contents":-63}
I'm using 0.7.2
I don't see functions that could help me... I guess you have done some work for that in the new releases ? I'll have a look in the meantime
Nicolas Henin
@nhenin
I need to implement a function that does the following :
read
  ::   ( MonadAsync m
       ,  FromJSON item)
  => FilePath
  -> SerialT m item
I'm using Aeson so far but it could be any other alternatives
obviously
it's a bit the same logic as streaming items over a network in terms of parsing/deserialisation
Nicolas Henin
@nhenin
I'm looking at Parsers ...
Harendra Kumar
@harendra-kumar
This PR composewell/streamly#605 implements a JSON parser in streamly, it is still a work in progress, will be committed soon.
It works though.
Nicolas Henin
@nhenin
the issue I have is that I don't store the json in a conventional way
{"tag":"DummyRequest","contents":12}{"tag":"DummyRequest","contents":67}{"tag":"DummyRequest","contents":2}
instead of : 
[{"tag":"DummyRequest","contents":12},{"tag":"DummyRequest","contents":67},{"tag":"DummyRequest","contents":2}]
Nicolas Henin
@nhenin
I have found a temporary solution about this... 🙂
Harendra Kumar
@harendra-kumar
You can perhaps define a parser to parse a { ...} json object and then apply that parser on the stream coming from the file using parseMany.