Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Oct 04 21:06
    harendra-kumar opened #1898
  • Oct 04 21:06
    harendra-kumar labeled #1898
  • Oct 04 21:06
    harendra-kumar milestoned #1898
  • Oct 04 15:01
    adithyaov synchronize #1896
  • Oct 04 15:01

    adithyaov on repli-m-sig

    fixup! fixup! fixup! Add replic… (compare)

  • Oct 04 14:50
    adithyaov commented #1886
  • Oct 04 14:50
    adithyaov assigned #1886
  • Oct 04 14:50
    adithyaov opened #1897
  • Oct 04 14:49
    adithyaov edited #1886
  • Oct 04 14:48

    adithyaov on uni-api-doc

    Add simple documentation to API… (compare)

  • Oct 04 14:38
    adithyaov synchronize #1896
  • Oct 04 14:38

    adithyaov on repli-m-sig

    fixup! Add replicateM1 to repla… fixup! fixup! Add replicateM1 t… (compare)

  • Oct 04 14:18
    adithyaov synchronize #1893
  • Oct 04 14:18

    adithyaov on unbox-sync-storable

    fixup! Add tests for the existe… (compare)

  • Oct 04 14:14
    adithyaov synchronize #1893
  • Oct 04 14:14

    adithyaov on unbox-sync-storable

    fixup! Add tests for the existe… fixup! Add instances of Unbox f… (compare)

  • Oct 04 13:59
    adithyaov synchronize #1893
  • Oct 04 13:59

    adithyaov on unbox-sync-storable

    fixup! Add instances of Unbox f… (compare)

  • Oct 04 13:42
    rnjtranjan synchronize #1828
  • Oct 04 13:42

    rnjtranjan on Remove_the_streamly-core_files_from_extra-source_files_in_streamly

    Use hlint as command (compare)

Harendra Kumar
@harendra-kumar
Yeah, that looks clean.
Julian Ospald
@hasufell
you prefer that over BFS?
I'm more worried about acquired resources than I am about 10k directory entries in memory
given the async nature of the filesystem
Harendra Kumar
@harendra-kumar
There is no clear choice, they are duals, the problems are also dual. The worst case of DFS is the best case of BFS. Its memory vs open dirs. DFS would save on memory when you have large trees, you can cut the traversal if the tree looks too deep which can make you run out of open fds. On the other hand in case of BFS you would have to put a limit on the pending entries so that you do not run out of memory.
Julian Ospald
@hasufell
Well, on windows I once managed to create a directory structure so deep that windows wasn't able to delete it anymore xD
Harendra Kumar
@harendra-kumar
hmm...interesting
BTW, if you read each directory entirely instead of reading it incrementally like a stream, the DFS traversal also opens only one dir at a time, but then it behaves like BFS from resource consumption point of view, it turns the open fd problem into a memory problem. That is what the listDir example I pointed to does.
Kristof Bastiaensen
@kuribas
is it possible in streamly to do operations which don't use list functions? For example, insert (x+1) for every even value of x.
[3, 5, 4, 2, 7] -> [3, 5, 4, 5, 2, 3, 7]
efficiently?
Harendra Kumar
@harendra-kumar
One would think of a concatMap for such jobs but concatMap is inefficient because it does not fuse. However, you can use unfoldMany instead which fuses and would be as efficient as it gets:
> import qualified Streamly.Prelude as Stream
> import qualified Streamly.Data.Unfold as Unfold
> Stream.toList $ Stream.unfoldMany (Unfold.lmap (\x -> if even x then [x, x + 1] else [x]) Unfold.fromList) $ Stream.fromList [3, 5, 4, 2, 7::Int]
[3,5,4,5,2,3,7]
Kristof Bastiaensen
@kuribas
right, thanks!
Kristof Bastiaensen
@kuribas
What would be the best way to generate a stream from a list of chunks in a database?
say I have a list of bytestrings stored in a database, which represents a continuous stream of some values.
Kristof Bastiaensen
@kuribas
also, do the lists in Unfold.lmap fuse away?
Harendra Kumar
@harendra-kumar

say I have a list of bytestrings stored in a database, which represents a continuous stream of some values.

It depends on how you parse the bytestrings to Haskell values. One efficient way (if your stream is large efficiency may be a concern but if your stream is not large or efficiency does not matter here then there could be other simpler ways of doing it) could be as follows:

  1. If you have a stream of bytestrings you can map it to a stream of streamly arrays using the streamly-bytestring package. bytestring to array mapping is free of cost as the underlying representation is the same.

  2. Then use foldArrMany from https://streamly.composewell.com/haddocks/streamly-0.8.2/Streamly-Internal-Data-Array-Stream-Foreign.html to fold the array stream to a stream of Haskell values. The fold to be used in foldArrMany can be constructed from a regular streamly Fold or Parser using fromFold or fromParser from https://streamly.composewell.com/haddocks/streamly-0.8.2/Streamly-Internal-Data-Array-Stream-Fold-Foreign.html .

If you are using some other way of deserializing/parsing the values then the method will be different. You will have to manually construct a stream from the parsed values. How the stream is to be constructed may depend on the efficiency required.

Harendra Kumar
@harendra-kumar

also, do the lists in Unfold.lmap fuse away?

Yes, it does. You can see there are no list constructors in the generated core. Haskell code:

Stream.length $ Stream.unfoldMany (Unfold.lmap (\x -> if even x then [x, x + 1] else [x]) Unfold.fromList) $ Stream.fromList [3, 5, 4, 2, 7 :: Int]

GHC generated core, this is entire core corresponding to the above code:


$s$wgo_r4zi
  = \ sc_s4qt sc1_s4qp sc2_s4qq sc3_s4qr _ sc5_s4qo ->
      case remInt# sc1_s4qp 2# of {
        __DEFAULT ->
          case remInt# sc2_s4qq 2# of {
            __DEFAULT ->
              case remInt# sc3_s4qr 2# of {
                __DEFAULT -> (# sc_s4qt, I# (+# 4# sc5_s4qo) #);
                0# -> (# sc_s4qt, I# (+# 5# sc5_s4qo) #)
              };
            0# ->
              case remInt# sc3_s4qr 2# of {
                __DEFAULT -> (# sc_s4qt, I# (+# 5# sc5_s4qo) #);
                0# -> (# sc_s4qt, I# (+# 6# sc5_s4qo) #)
              }
          };
        0# ->
          case remInt# sc2_s4qq 2# of {
            __DEFAULT ->
              case remInt# sc3_s4qr 2# of {
                __DEFAULT -> (# sc_s4qt, I# (+# 5# sc5_s4qo) #);
                0# -> (# sc_s4qt, I# (+# 6# sc5_s4qo) #)
              };
            0# ->
              case remInt# sc3_s4qr 2# of {
                __DEFAULT -> (# sc_s4qt, I# (+# 6# sc5_s4qo) #);
                0# -> (# sc_s4qt, I# (+# 7# sc5_s4qo) #)
              }
          }
      }

$s$wgo1_r4zj
  = \ sc_s4pK
      sc1_s4pF
      sc2_s4pG
      sc3_s4pH
      sc4_s4pI
      sc5_s4pJ
      sc6_s4pE ->
      case remInt# sc1_s4pF 2# of {
        __DEFAULT ->
          case remInt# sc2_s4pG 2# of {
            __DEFAULT ->
              $s$wgo_r4zi
                sc_s4pK sc3_s4pH sc4_s4pI sc5_s4pJ sc2_s4pG (+# sc6_s4pE 1#);
            0# ->
              $s$wgo_r4zi
                sc_s4pK
                sc3_s4pH
                sc4_s4pI
                sc5_s4pJ
                (+# sc2_s4pG 1#)
                (+# 2# sc6_s4pE)
          };
        0# ->
          case remInt# sc2_s4pG 2# of {
            __DEFAULT ->
              $s$wgo_r4zi
                sc_s4pK sc3_s4pH sc4_s4pI sc5_s4pJ sc2_s4pG (+# 2# sc6_s4pE);
            0# ->
              $s$wgo_r4zi
                sc_s4pK
                sc3_s4pH
                sc4_s4pI
                sc5_s4pJ
                (+# sc2_s4pG 1#)
                (+# 3# sc6_s4pE)
          }
      }
The remInt# corresponds to the even check, the rest is just a tight loop and (+) operation to increment the count. There are no traces of a list.
Harendra Kumar
@harendra-kumar
GHC has magical powers!
Harendra Kumar
@harendra-kumar

One thing to note here though - gcc would optimize out the even check (x % 2) to just a simple bitwise and operation whereas GHC uses a very expensive remInt# operation.

Perhaps we can specialize this implementation in base to a more efficient one for Int:

even n          =  n `rem` 2 == 0
odd             =  not . even
Kristof Bastiaensen
@kuribas
maybe llvm does better there?
Harendra Kumar
@harendra-kumar
I was thinking about that maybe before we generate the asm it can be optimized out if we use the llvm backend. We can check the assembly to verify.
But the problem is llvm backend is not always good for GHC, in many cases now NCG works better than llvm. At least that was the case last I checked. Maybe NCG itself can do such simple optimizations, it just has to recognize that a remInt# 2 can be replaced by an equivalent bitwise operation.
Kristof Bastiaensen
@kuribas
that sounds reasonable
one usecase I had in mind was time series manipulation. For example sampling a timeseries into a fixed period.
If the input is irregular that means both aggregating and generating values.
For example taking the all values in the next period, and averaging them.
then iterating.
I just wonder if this usecase is too specific for streamly...
Kristof Bastiaensen
@kuribas
hmm, maybe foldMany to group by period...
and zipping by the period grid
Harendra Kumar
@harendra-kumar
See chunksOfTimeout to group by time.
Or intervalsOf .
Kristof Bastiaensen
@kuribas
hmm neat.
can the time be made explicit there?
like a stream (Time, Double)
well, I guess I can look at the implementation and modify it for my needs.
Harendra Kumar
@harendra-kumar
That can be implemented perhaps in terms of the existing combinators.
Let me check a bit.
One very powerful combinator that you can look at is classifySessionsBy, it derives time from explicit timestamps as you want - https://streamly.composewell.com/haddocks/streamly-0.8.2/Streamly-Internal-Data-Stream-IsStream-Reduce.html#v:classifySessionsBy .
Kristof Bastiaensen
@kuribas
those are internal functions, can I use them?
Harendra Kumar
@harendra-kumar
One way could be to apply a rollingMap on the input stream to get the time diff between every two successive elements, then use foldMany with a fold that distributes the input to two different folds (and zips the output), (1) your grouping fold, (2) a sum fold to sum the time interval, then use Fold.takeWhile on the entire fold to terminate it once you have accumulated required time interval.

those are internal functions, can I use them?

These are not your regular Internal functions, as they are not "used only in the guts of the library". We introduce experimental APIs as internal before we expose those. In most cases such APIs are marked as pre-release in documentation. So yes you could use those, however the API may be unstable and these function signatures may change in future, or in some cases the API may go away. But if they go away there will be some equivalent way of doing things, of course.

Harendra Kumar
@harendra-kumar
As of now there are a lot of such Internal APIs which we are going to release in future releases as things stabilize.
BTW, classifySessionsBy even though internal, is being used in large production systems by some users of streamly.
Kristof Bastiaensen
@kuribas
right
Harendra Kumar
@harendra-kumar

The next release 0.9.0 of streamly will consist of two packages, streamly and streamly-core. The original streamly package is broken down into these two packages. streamly-core will only depend on ghc boot libraries, currently it depends on two non-boot libraries as well (monad-control, heaps) for backward compatibility, these dependencies will be removed in future releases when the deprecated functionality goes away.

The streamly-core package consists of the following functionality:

  • Serial Streams
  • Fold
  • Unfold
  • Parser
  • Array
  • File Handle IO
  • Console IO
  • Unicode streams

    The streamly package is a super package that re-exports everything from streamly-core package, and the following:

  • Concurrent streams

  • Filesystem events, dir, file IO
  • Network IO
  • Unicode functionality that depends on unicode-data

Overall, the streamly package will continue to be as it was earlier. I will update about the significant breaking changes in this release later.

maralorn
@maralorn:maralorn.de
[m]
Is this something I should worry about?
```
warning: [-Wmissed-specialisations]
Could not specialise imported function ‘Streamly.Internal.Control.Concurrent.captureMonadState’
when specialising ‘Streamly.Internal.Data.SVar.newParallelVar’
when specialising ‘Streamly.Internal.Data.Stream.IsStream.Common.mkParallel’
Probable fix: add INLINABLE pragma on ‘Streamly.Internal.Control.Concurrent.captureMonadState’
It happens when I compile my own project, which is quite performance sensitive.