Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 09:38
    adithyaov synchronize #332
  • 09:38

    adithyaov on bench-move

    Move Benchmark* to benchmark/ +… (compare)

  • 09:35

    pranaysashank on smallArray

    (compare)

  • 09:34
    pranaysashank closed #327
  • 09:34

    pranaysashank on master

    Add Streamly.Data.SmallArray to… Add tests for Streamly.Data.Sma… Add benchmarks for SmallArray. and 2 more (compare)

  • 08:34
    pranaysashank synchronize #327
  • 08:34

    pranaysashank on smallArray

    Add Streamly.Data.SmallArray to… Add tests for Streamly.Data.Sma… Add benchmarks for SmallArray. and 2 more (compare)

  • 05:38
    adithyaov synchronize #332
  • 05:38

    adithyaov on bench-move

    Remove benchmark flag from comm… (compare)

  • 05:26
    harendra-kumar commented #334
  • 05:25
    harendra-kumar opened #334
  • 05:13

    harendra-kumar on sample

    implement "sample" combinator (compare)

  • 04:49
    harendra-kumar opened #333
  • 04:45

    harendra-kumar on tap-exceptions

    implement StreamD style tapAsyn… move sendStopToProducer inside … (compare)

  • 04:37
    harendra-kumar closed #328
  • 04:37

    harendra-kumar on master

    Propagate exceptions from a con… (compare)

  • Dec 11 19:39
    adithyaov opened #332
  • Dec 11 19:38

    adithyaov on bench-move

    Move Benchmark* from src/ to be… (compare)

  • Dec 11 15:38
    harendra-kumar synchronize #328
  • Dec 11 15:38

    harendra-kumar on tap-exceptions

    Propagate exceptions from a con… (compare)

Tim Pierson
@o1lo01ol1o
Interesting. Thanks @pranaysashank !
Tim Pierson
@o1lo01ol1o
given the AheadT stream foo and some operations bar, will zipWith (,) (bar foo) foo compute foo once?
Pranay Sashank
@pranaysashank
No, it doesn’t . It computes it twice.
That should be true for Serial/Concurrent style
Tim Pierson
@o1lo01ol1o
Right, I guess that makes sense given the behavior of lists. Looking at the current master code, it seems like I need to move between Streams, Folds, and Unfolds in order to be able to split a stream and then merge the stream while only executing side effects once. For example, is there a clear way to do the following:
foo :: AheadT IO Foo
bar :: AheadT IO Foo -> AheadT IO Bar
split :: Something m a -> (Something m a, Something m a)
merge :: Something m a -> Something m b -> Something m (a,b)

fooSplitbarMerged :: AheadT IO Foo -> AheadT IO (Foo, Bar)
Harendra Kumar
@harendra-kumar

@o1lo01ol1o evaluating the stream only once and cloning it multiple times like in the zipWith example you gave would be possible with the Pipe type that we are planning to introduce in future.

The split signature that you gave above is not possible without buffering the whole stream. If you really want it, you can split the stream in a different manner, though it won't be as highly efficient as it can be with the planned Pipe type. You can do it by forking a thread and sending the original stream elements to two STM TBChan in that thread and then in the main thread generate one stream from each channel and zip them the way you want.

Tim Pierson
@o1lo01ol1o
@harendra-kumar Thanks, looks like there's still a lot of implementation to do wrt to Pipes, do you have any notion of which release you might target?
Harendra Kumar
@harendra-kumar
The upcoming release is 0.7, in the release after that i.e. 0.7 + 1 we will focus on parsers and may have better support for pipes as well. In 0.7 + 2 we will try to complete the pipes stuff. That might be 6 months from now. However, it should not stop you from using streamly in any situation, let us know what exactly the problem is and there may be an alternate solution for that.
Tim Pierson
@o1lo01ol1o
:thumbsup:
There's nothing blocking me currently. I have some utility functions laying around and I'm trying to gauge the path-of-least resistance for usable apis.
Harendra Kumar
@harendra-kumar
In case you have not seen it, the streamly based wc implementation is idiomatic and faster than other implementations: https://github.com/ChrisPenner/wc/#interjection
Harendra Kumar
@harendra-kumar
implemented a wc with correct handling of utf8 spaces. The version by chris penner did not handle multi-byte spaces . The serial version is very small ~10 lines, and takes 2 seconds for 500MB file, whereas macOs wc takes around 8 seconds for the same input. Parallel version is a bit longer than I would like it to be. 90% of the code is error handling. It takes 1 second for the same input.
composewell/streamly#291
Harendra Kumar
@harendra-kumar
I will be speaking about streamly at FnConf 2019 (https://confengine.com/functional-conf-2019/schedule) in Bengaluru on 15th Nov. If anyone here is planning to attend, you can use my speaker discount code harendra-10di$c-functional when registering for the conference.
Tim Pierson
@o1lo01ol1o
regarding the SString.decodeUtf8__ family of functions, what does unreachable state indicate? I'm running some tests on the first 8 million bytes of wikipedia (https://cs.fit.edu/~mmahoney/compression/enwik8.bz2) and run into errors. The file is decompressed to a bytestring first using
toByteString :: (Monad m) => SerialT m Word8 -> m ByteString
toByteString stream = BSL.pack <$> S.toList stream

unzipFile :: (MonadIO m) => Path b File -> m ByteString
unzipFile f = do
    cb  <- liftIO $ openFile (toFilePath f) ReadMode
    cb' <- liftIO . toByteString $ S.unfold FH.read cb
    return $ BZip.decompress cb'
Harendra Kumar
@harendra-kumar
It means a bug in decoding! We should have printed enough context to debug it, but we haven't printed any info in the message. We can add that quickly.
We should probably recover from that state instead of erroring out.
Its coming from here:
    step' table _ (FreshPointDecodeFirst st x) = do
        let (Tuple' sv cp) = decode0 table x
        return $
            case sv of
                12 ->
                    Skip $
                    transliterateOrError
                        "Streamly.Streams.StreamD.decodeUtf8ArraysWith: Invalid UTF8 codepoint encountered"
                        (FreshPointDecodeInit st)
                0 -> error "unreachable state"
                _ -> Skip (FreshPointDecoding st sv cp)
Harendra Kumar
@harendra-kumar
@o1lo01ol1o which version (commit id) of streamly are you using?
Tim Pierson
@o1lo01ol1o
master:
- git: https://github.com/composewell/streamly
  commit: 0b30c6affd0cdbed7af0f62df366489b9ce67c3c
Might make sense to add that file as a test case then:
#!/usr/bin/env bash

mkdir data
pushd data
wget https://cs.fit.edu/~mmahoney/compression/enwik8.bz2
popd
Harendra Kumar
@harendra-kumar
I pushed debug code on debug branch. Will it be possible for you to port your code to latest master branch, debug branch is based on latest master? I will merge the debug branch to master as soon as CI passes. You can then build your code with debug flag on. when the error occurs it will print useful information to debug why it happened.
In the meanwhile I will try to see if I can reproduce it using this file.
Tim Pierson
@o1lo01ol1o
will do
Harendra Kumar
@harendra-kumar
Some APIs may have been renamed or move, you can ask me and I will let you know where to find in case you have some trouble.
Tim Pierson
@o1lo01ol1o
Using the either version and only taking the Right parses the error is now: Prelude.chr: bad argument: 860561971010101
Using decodeUtf8 : : Prelude.chr: bad argument: 51289257
Harendra Kumar
@harendra-kumar
That's strange, we are not using "chr" in the utf8 decoding code. we are using "unsafeChr" which should not give this error. Are you using "chr" somewhere in your code?
I have merged the debug branch in master. But I guess we have to figure out the "chr" error first.
Tim Pierson
@o1lo01ol1o
yes, immediately after decoding, I lowercase the char:
prep =
        S.filter (flip Set.member alphabet)
            .   S.map toLower
            -- .   S.map fromNormalRight
            -- .   S.filter isNormalRight
            .   SString.decodeUtf8
            $   joinly
            $   (S.fromList . BSL.unpack)
            <$> unzipFile fp
{-# INLINE joinly #-}
joinly :: (IsStream t, Monad m, Monad (t m)) => m (t m b) -> t m b
joinly x = S.concatMapM (const x) (return ())
Harendra Kumar
@harendra-kumar

@o1lo01ol1o I downloaded that file and I ran utf8 decode on that successfully. I even ran decodeUtf8Lax on the compressed file and it did not have any trouble.

Are you using some library with bindings to some C program in your code which may potentially corrupt the memory of your system?

Harendra Kumar
@harendra-kumar
I pushed even more strict debug code to master branch. If you compile the library with "debug" flag on it should print diagnostic information about what is going wrong.
Tim Pierson
@o1lo01ol1o
Are you using some library with bindings to some C program in your code which may potentially corrupt the memory of your system?
Hmmm, yes, actually, I am.
That's troubling, but thanks for looking into it!
Harendra Kumar
@harendra-kumar
Ah, then we have something to blame it on. It was looking like a mystery to me.
Tim Pierson
@o1lo01ol1o
@harendra-kumar I've been doing some digging and I can reproduce the bug without suspect c-ffi code: https://github.com/o1lo01ol1o/strmtst
the above uses
base ^>= 4.12.0.0
                     , containers
                     , exceptions
                     , monad-control
                     , mtl
                     , path
                     , random >= 1.1
                     , safe-exceptions
                     , streamly
                     , text 
                     , transformers-base
                     , bytestring 
                     , bzlib
and stackage lts-14.7. If you have a second, could you clone and confirm that you can reproduce?
the readme has instructions
Harendra Kumar
@harendra-kumar
Sure, I will try. bzlib at the bottom of this list could also be a suspect as it uses the C library. I don't deny that we can also have a bug in streamly as we also juggle with pointers and raw memory in the array manipulation code (Haskell code though but pointers are pointers). Let me check it.
we can manually unzip the file and try removing the unzip part from the pipeline.
Harendra Kumar
@harendra-kumar
As I suspected if I remove the unzip part, it works fine. If I add unzip I can reproduce the problem. The problem is likely to be in the bzip library.
Harendra Kumar
@harendra-kumar
it is also possible that there is a bug in streamly which is exposed by bzip. Let me see if that is a possiblity.
Harendra Kumar
@harendra-kumar

I think I got the issue, its a bug in streamly, a foreign pointer touch was missing for the decoding table memory. So it got freed by GHC and reused by bzip lib, corrupting it.

Thanks for reporting, I will push a fix soon.

Tim Pierson
@o1lo01ol1o
Ah, great! Glad you were able find the underlaying cause!
Harendra Kumar
@harendra-kumar
@o1lo01ol1o the fix is now available on master branch.
Tim Pierson
@o1lo01ol1o
@harendra-kumar Great, looks like it's all been fixed. Thanks for the quick fix!
Harendra Kumar
@harendra-kumar
streamly 0.7.0 has been released on hackage (http://hackage.haskell.org/package/streamly) . This release came after a long time but includes a lot of new functionality. See the changelog in the package. You can achieve a lot with this release.
Harendra Kumar
@harendra-kumar
My Functional Conf talk about streamly is now available at youtube: https://www.youtube.com/watch?v=uzsqgdMMgtk