by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Martin Krasser
    @krasserm
    So you have to join reply at the end of your transformations
    or, when using a graph builder, pipe the transformation result directly back to the source flow
    Jeremy Lyman
    @maarek
    Do you know what that would look like in the graph builder?
    I guess that's what the error is. Im trying to look at the documentation for the streams now.
    Would it be a bidirectional flow then?
    Martin Krasser
    @krasserm
    Just a second, I'm taking a closer look at your example ...
    Jeremy Lyman
    @maarek
                b.from(httpShape)
                        .via(greetingShape)
                        .viaFanOut(broadcaster)
                        .toInlet(httpShape.in());
                b.from(broadcaster)
                        .to(esShape);
                b.from(broadcaster)
                        .to(loggerSinkShape);
    Martin Krasser
    @krasserm
    This looks good.
    Jeremy Lyman
    @maarek
    This seems to pipe it back as you said
    By attaching the inlets.
    Martin Krasser
    @krasserm
    I didn't manage to get your example running
    Jeremy Lyman
    @maarek
    :thumbsup:
    It's missing some code?
    :)
    Martin Krasser
    @krasserm
    imports, base class, ...
    Jeremy Lyman
    @maarek
    Yep :P
    So with jetty camel it seems like Flow works but Source does not work
    Martin Krasser
    @krasserm
    yes, jetty endpoints create in-out message exchanges
    Jeremy Lyman
    @maarek
    When I used receiveBody, jetty would reply back with 200 echo but it would never stream.
    How do you recognize that with other camel endpoints? :P
    Martin Krasser
    @krasserm
    look at the docs
    Jeremy Lyman
    @maarek
    Lol
    Thanks for helping out with the sample above. Glad it's working finally.
    Martin Krasser
    @krasserm
    You're welcome
    Martin Krasser
    @krasserm
    @/all just released version 0.8.1 (with a fix for #37)
    Martin Krasser
    @krasserm
    @/all there's now a Streamz 0.9-M1 release with dependencies to FS2 0.10.0-M3, cats 0.9.0 and cats-effect 0.3. You can find the links to the 0.9.x documentation at https://github.com/krasserm/streamz/blob/master/README.md#streamz-09x
    XING Yun
    @BigFatDog
    Hello, how can I read a large and static Dask DataFrame into streamz? The DataFrame has a time attribute that spans about 1 month. I want to put a small part of the DataFrame into streamz firstly, and later read data in hour/minute steps per second. Any hints on this?
    Martin Krasser
    @krasserm
    You first need to be able to load a Dask DataFrame into a JVM application. I've never worked with Dask nor do I know a library that loads Dask or Pandas data frames into a JVM application. Given that you find such a library (or have some other Java/Scala code that can read the data frame directly) you need to write a custom Akka Streams Source or a custom FS2 Stream for "loading" the read data into Streamz.
    XING Yun
    @BigFatDog
    Thank you for your reply. It's complicated, I need to think another strategy. I'll come back if I still use this approach and get a solution
    Gavin Bisesi
    @Daenyth
    :wave:
    @krasserm dunno if you hang out in here, but I was hoping to jump on #54 before the big breaking release
    let me know what you think
    Wael
    @waeljammal
    Hi, I'm trying to understand how ack and nack work at the moment. Does the library handle exceptions and treat them as a nack? Seems unless I add a try/catch block the stream just hangs and next message is never received. How can I handle errors and do a nack? Also is there support for retry policy? Thanks
    Martin Krasser
    @krasserm
    @waeljammal that's an open ticket krasserm/streamz#40. At the moment, received messages are automatically acked if conversion to internal message format succeeds else the stream fails. See https://github.com/krasserm/streamz/blob/master/streamz-camel-akka/src/main/scala/streamz/camel/akka/EndpointConsumer.scala#L69 for details. Later downstream errors are not communicated back to the Camel endpoint. But a stream should never hang in this case. Are you using the Camel FS2 or Akka integration?
    Wael
    @waeljammal
    It was hanging because I did not have a supervisor on that akka flow, I have resolved that issue now. Only issue remaining is if the flow fails to send an email I need to nack it so it ends up in the DLQ to be retried later, would also be very handy if there was a way to specify a retry policy. Thanks for the link to the ticket as well.
    Martin Krasser
    @krasserm
    If streamz already supprted application-level acks you'd anyway have to configure a retry policy on the Camel endpoint and not via the streamz API. Alternatively, consider using error handling strategies supported by Akka Streams.
    Gregory Nwosu
    @gregnwosu
    hello ... I have a timed window from some upstream stream, some of the windows have no data, in this case i want to get the last element published from the stream from the previous window
    how can i code this?
    Gavin Bisesi
    @Daenyth
    @gregnwosu can you show an example of what you mean?
    Gregory Nwosu
    @gregnwosu
    sure one second
    Gregory Nwosu
    @gregnwosu

    ```df = streamz.timed_window(stream,0.1).map(concat_dfs).to_dataframe(example=df_structure)

    df```

    this yields

    a dataframe like

    symbol timestamp bid ask
    0 GBPUSD 2019-08-13 14:40:27.702121+00:00 1.20804 1.20821
    0 AUDUSD 2019-08-13 14:40:27.710115+00:00 0.68072 0.68087

    but sometimes the dataframe is completely empty
    Gavin Bisesi
    @Daenyth
    ```scala
    code here
    ```
    fwiw
    streamz.timed_window let me see where that comes from. I'm most familiar with the akka/fs2 layer
    Gregory Nwosu
    @gregnwosu
    im so sorry im in wrong chat
    theres a python library called streamz