Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Mathias
    @sirthias
    or use the error channel built into the streaming infrastructure
    Felix Palludan Hargreaves
    @hejfelix
    Any information on that error channel?
    Mathias
    @sirthias
    in the latter case the first error will stop the stream and clean up all resources
    you could use things like recover to deal with errors, but you have to be aware of the fact that elements might be skipped in the process.
    for example, if you map a Spout and your mapping function throws an exception
    then this exception will "kill" the stream, cause all resources to be cleaned up, and yield an error at all drains
    (if the drains are downstream)
    Felix Palludan Hargreaves
    @hejfelix
    I think we'll stick to Either
    Felix Palludan Hargreaves
    @hejfelix
    is it possible to name sub-streams?
    sidenote: It would be extremely helpful if you could link to the newest scaladocs from the github readme
    Mathias
    @sirthias
    yes, it's possible to assign names to parts of a pipeline.
    Re the docs: Of course we need docs, but so far they don't exist yet. Even the ScalaDoc wouldn't be helpful at this point (as there are simply way to few).
    At this point the only way to find your way around swave is to read the code and ask here or on the ML...
    Felix Palludan Hargreaves
    @hejfelix
    How does naming work for spout.fanout.sub?
    Mathias
    @sirthias
    so you basically want to give the exit points of a fan-out individual names?
    Currently there is no support for sth like that.
    You can name parts of pipeline, including modules that have several ingresses and/or egresses.
    But there is no support for assigning names to those ingresses/egresses. At least not at this point.
    Felix Palludan Hargreaves
    @hejfelix
    Mathias
    @sirthias
    It allows for differing element weights. Essentially it has the same semantics as the akka-stream throttle variant with a cost parameter.
    E.g. you might want to throttle based on bytes/second. Then you might say: cost = 1024, per = 1.second, costFn = _.byteCount
    Felix Palludan Hargreaves
    @hejfelix
    ahh
    didn’t see the costFn
    now it all makes sense :)
    thanks
    Felix Palludan Hargreaves
    @hejfelix
    could you quickly list the differences between the different fanInMerge strategies?
    Mathias
    @sirthias
    which one in particular are you interested in?
    Felix Palludan Hargreaves
    @hejfelix
    Merge
    Mathias
    @sirthias
    merge simply requests from its upstreams as much as it can and pushes all incoming elements down to its downstream, without paying attention to where the element comes from
    to contrast it with concat: concat first drains the first upstream, then the second, etc.
    Felix Palludan Hargreaves
    @hejfelix
    just in case I forget: thanks a lot for your help, we are progressing quite nicely with our project. Library seems quite stable so far
    Mathias
    @sirthias
    Thanks, Felix! That's great to hear!
    David Karnok
    @akarnokd
    Hi! I'm trying to understand how swave works (v 0.5.0). Is it by default asynchronous in every stage, i.e., even if I define a flow without any async()? Why didn't you go native Reactive-Streams instead of the converter? Given a Spout instnce, can I convert it to RS Publisher multiple times (currently, it throws me some "already sealed" exception)?
    Mathias
    @sirthias
    @akarnokd Have you seen the documentation that's already available at http://swave.io ?
    David Karnok
    @akarnokd
    Mathias
    @sirthias
    That chapter will probably be written on monday. But the quick start chapter for example should be helpful. And the other chapters on Spouts and Drains as well.
    David Karnok
    @akarnokd
    Oh, I see now. Spout is single use only (like Java Stream).
    So if a Spout fails, I have to rebuild the whole stream, right?
    Mathias
    @sirthias
    If a Spout produces an error it will shut down along with the whole stream, yes. Since it'
    s single-use it cannot be restarted.
    So, you'll have to recreate and start it again.
    David Karnok
    @akarnokd
    Thanks.
    Peter Schmitz
    @petomat

    Hey Mathias, great library! Keep on going! I currently looking for some fanIn semantics like the following example shows:

        val s1 = Spout(1, 2, 3)
        val s2 = Spout("A", "B", "C")
        def mergeIntoTuple[A, B](s1: StreamOps[A], s2: Spout[B]): s1.Repr[(A, B)] = {
          case class Wrap[T](channel: Int, value: T) extends PS
          s1
            .map(Wrap(1, _))
            .attach(s2.map(Wrap(2, _)))
            .fanInMerge(eagerComplete = false)
            .scan((Option.empty[A], Option.empty[B])) {
              case ((opt1, opt2), Wrap(1, value: A @unchecked)) => Some(value) -> opt2
              case ((opt1, opt2), Wrap(2, value: B @unchecked)) => opt1 -> Some(value)
              case _ => sys.error("Not possible.")
            }
            .collect { case (Some(value1), Some(value2)) => value1 -> value2 }
        }
        implicit val env = StreamEnv()
        val streamRun = mergeIntoTuple(s1, s2).to(Drain.foreach(println).dropResult).run()
        // (1,A)
        // (2,A)
        // (2,B)
        // (3,B)
        // (3,C)
        env.shutdownOn(streamRun.termination)

    Did I miss such a combinator? If not, a more generic (and not so heavy one, i.e. custom Stage) one would be nice, to just write s1.attach(s2).attach(s3).fanInMergeIntoTuple.map { case (x, y, z) => ... } ....

    Peter Schmitz
    @petomat
    By the way, improving documentation at least some short scaladoc example for combinators would be helpful, but I know important, but tedious and we are all short of time ...
    Mathias
    @sirthias
    There's already 'fanInToTuple', which is just a specialized 'fanInToProduct'.
    Is that what you're looking for?
    Peter Schmitz
    @petomat

    I know about fanInToTuple but the semantics is different:

      val s1 = Spout(1, 2, 3)
      val s2 = Spout("A", "B", "C")
      implicit val env = StreamEnv()
      val streamRun = s1.attach(s2).fanInToTuple.to(Drain.foreach(println).dropResult).run()
      // (1,A)
      // (2,B)
      // (3,C)
      env.shutdownOn(streamRun.termination)

    whereas my example yields:

      // (1,A)
      // (2,A)
      // (2,B)
      // (3,B)
      // (3,C)

    So it fanInToTuple waits for each input to change, but my example above changes whenever one input changes (given initial elements are collected). It's like AND versus OR.

    Eric Torreborre
    @etorreborre
    Hi @sirthias, I just want to tell you that the swave.io site is down
    Mathias
    @sirthias
    Thank you, Eric!
    There was a f*ckup w/ my registrar, it's in the process of coming back...
    Eric Torreborre
    @etorreborre
    Does that work for you? It's still down for me