by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Jeremy Lyman
    @maarek
    Yep :P
    So with jetty camel it seems like Flow works but Source does not work
    Martin Krasser
    @krasserm
    yes, jetty endpoints create in-out message exchanges
    Jeremy Lyman
    @maarek
    When I used receiveBody, jetty would reply back with 200 echo but it would never stream.
    How do you recognize that with other camel endpoints? :P
    Martin Krasser
    @krasserm
    look at the docs
    Jeremy Lyman
    @maarek
    Lol
    Thanks for helping out with the sample above. Glad it's working finally.
    Martin Krasser
    @krasserm
    You're welcome
    Martin Krasser
    @krasserm
    @/all just released version 0.8.1 (with a fix for #37)
    Martin Krasser
    @krasserm
    @/all there's now a Streamz 0.9-M1 release with dependencies to FS2 0.10.0-M3, cats 0.9.0 and cats-effect 0.3. You can find the links to the 0.9.x documentation at https://github.com/krasserm/streamz/blob/master/README.md#streamz-09x
    XING Yun
    @BigFatDog
    Hello, how can I read a large and static Dask DataFrame into streamz? The DataFrame has a time attribute that spans about 1 month. I want to put a small part of the DataFrame into streamz firstly, and later read data in hour/minute steps per second. Any hints on this?
    Martin Krasser
    @krasserm
    You first need to be able to load a Dask DataFrame into a JVM application. I've never worked with Dask nor do I know a library that loads Dask or Pandas data frames into a JVM application. Given that you find such a library (or have some other Java/Scala code that can read the data frame directly) you need to write a custom Akka Streams Source or a custom FS2 Stream for "loading" the read data into Streamz.
    XING Yun
    @BigFatDog
    Thank you for your reply. It's complicated, I need to think another strategy. I'll come back if I still use this approach and get a solution
    Gavin Bisesi
    @Daenyth
    :wave:
    @krasserm dunno if you hang out in here, but I was hoping to jump on #54 before the big breaking release
    let me know what you think
    Wael
    @waeljammal
    Hi, I'm trying to understand how ack and nack work at the moment. Does the library handle exceptions and treat them as a nack? Seems unless I add a try/catch block the stream just hangs and next message is never received. How can I handle errors and do a nack? Also is there support for retry policy? Thanks
    Martin Krasser
    @krasserm
    @waeljammal that's an open ticket krasserm/streamz#40. At the moment, received messages are automatically acked if conversion to internal message format succeeds else the stream fails. See https://github.com/krasserm/streamz/blob/master/streamz-camel-akka/src/main/scala/streamz/camel/akka/EndpointConsumer.scala#L69 for details. Later downstream errors are not communicated back to the Camel endpoint. But a stream should never hang in this case. Are you using the Camel FS2 or Akka integration?
    Wael
    @waeljammal
    It was hanging because I did not have a supervisor on that akka flow, I have resolved that issue now. Only issue remaining is if the flow fails to send an email I need to nack it so it ends up in the DLQ to be retried later, would also be very handy if there was a way to specify a retry policy. Thanks for the link to the ticket as well.
    Martin Krasser
    @krasserm
    If streamz already supprted application-level acks you'd anyway have to configure a retry policy on the Camel endpoint and not via the streamz API. Alternatively, consider using error handling strategies supported by Akka Streams.
    Gregory Nwosu
    @gregnwosu
    hello ... I have a timed window from some upstream stream, some of the windows have no data, in this case i want to get the last element published from the stream from the previous window
    how can i code this?
    Gavin Bisesi
    @Daenyth
    @gregnwosu can you show an example of what you mean?
    Gregory Nwosu
    @gregnwosu
    sure one second
    Gregory Nwosu
    @gregnwosu

    ```df = streamz.timed_window(stream,0.1).map(concat_dfs).to_dataframe(example=df_structure)

    df```

    this yields

    a dataframe like

    symbol timestamp bid ask
    0 GBPUSD 2019-08-13 14:40:27.702121+00:00 1.20804 1.20821
    0 AUDUSD 2019-08-13 14:40:27.710115+00:00 0.68072 0.68087

    but sometimes the dataframe is completely empty
    Gavin Bisesi
    @Daenyth
    ```scala
    code here
    ```
    fwiw
    streamz.timed_window let me see where that comes from. I'm most familiar with the akka/fs2 layer
    Gregory Nwosu
    @gregnwosu
    im so sorry im in wrong chat
    theres a python library called streamz
    Gavin Bisesi
    @Daenyth
    ahh
    yeah I wondered about the underscores :D
    Tristan Lohman
    @gatorcse
    I am converting an akka source to an fs2 stream. The toStream() method takes an implicit materializer. What is being run with this materializer, is it just running the things in the Source, or is it also running everything in that fs2 stream, and everything that follows? Basically, I’m using a db library that uses Akka, and I was going to just grab the actor system from that driver and create a materializer out of it and pass that in to the toStream method, but I want to make sure that is safe. Also, this is probably obvious, but I have no idea how akka streams or any of that works, nor do I need anything with the materialized value, just the streamed elements.
    Martin Krasser
    @krasserm
    The materializer is only needed for Source internals. It is not used in the downstream in the FS2 part (created with toStream()). Creating an ActorMaterializer from the available actor system is safe.
    Gavin Bisesi
    @Daenyth
    @gatorcse note that the materializer has a lifecycle, so you should construct it at the start of your app
    "db library that uses akka" if that's slick+alpakka I have a snippet for streaming straight from slick
    Tristan Lohman
    @gatorcse
    it’s reactivemongo
    @Daenyth I create the mongo client instance in a resource, and then inside use I grab the underlying actor materializer (the driver client instance exposes it).
    Gavin Bisesi
    @Daenyth
    If they expose it for you I'd expect them to be responsible for shutdown
    You might also look at fs2-mongo iirc if you're not married to that library
    Tristan Lohman
    @gatorcse
    eh, I kinda already am. My client uses it everywhere. Plus I already know how to use it. I’m mostly doing simple calls and wrapping with IO.fromFuture(IO { …}) everywhere, but I have a few instances where I need to stream the change event stream.
    Gotta finish up that native cats mongo driver I’m workin on...
    Juan José Morales
    @juanux
    Hi. Do you know if there is a problem with this project in meaven. I'm unable to add it to my project due Not foud. Any help please?
    Juan José Morales
    @juanux
    I am using '' libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.10-M2" // uses FS2 1.0.0 ''
    Juan José Morales
    @juanux
    Solved. Problem with buils.sbt config
    Gavin Bisesi
    @Daenyth
    @juanux Can you file a ticket?