These are chat archives for sirthias/swave

27th
Sep 2016
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 06:42
btw: using the pushspout reduces all that boilerplate to this:
object MySwaveQueueApp extends App {

  implicit val system                       = ActorSystem()
  implicit val executor                     = system.dispatcher
  implicit val dispatcher: ExecutionContext = system.dispatcher.prepare()
  implicit val env                          = StreamEnv()

  val bufferSize                = 2
  val pushSpout: PushSpout[Int] = Spout.push[Int](bufferSize, 2 * bufferSize)

  system.scheduler.schedule(0.seconds, 1.seconds) {
    pushSpout.offer(Random.nextInt)
  }

  pushSpout.take(3).foreach(println)

}
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 08:02
say I have a Spout[Seq[Int]], what’s the canonical way to create a sub-spout for each sequence?
is it maybe flattenConcat?
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 08:36
I’ve run into some trouble with the following code
  val bla: Spout[Seq[((String, String), Boolean)]] =
    pushSpout.map(publishable => publishable.filenames.map(publishable.project -> _)).fanOutBroadcast()
        .sub.map(_.filter(needsSigning).map(_ -> true)).end
        .sub.map(_.filterNot(needsSigning).map(_ -> false)).end
      .fanInMerge()

  bla.foreach(x => println(x.toString))
this compiles and runs, but it yields this exception
java.lang.IllegalStateException: Unexpected subscribe() from out 'ForeachDrainStage@757abc34/awaitingOnSubscribe' in PushSpoutStage@6b0d98e0/active
    at swave.core.impl.stages.Stage.illegalState(Stage.scala:99)
    at swave.core.impl.stages.Stage._subscribe0(Stage.scala:125)
    at swave.core.impl.stages.spout.PushSpoutStage._subscribe0(PushSpoutStage.scala:16)
    at swave.core.impl.stages.Stage._subscribe(Stage.scala:120)
    at swave.core.impl.stages.Stage.subscribe(Stage.scala:115)
    at swave.core.Drain.consume(Drain.scala:21)
    at swave.core.Spout$.to$extension(Spout.scala:35)
    at swave.core.Spout$.drainTo$extension(Spout.scala:55)
    at swave.core.Spout$.foreach$extension(Spout.scala:52)
    at controllers.UploadControllerSwaveLike$class.$init$(UploadControllerSwaveLike.scala:17)
    at controllers.UploadToolController.<init>(UploadToolController.scala:16)
    at modules.DefaultComponents.uploadToolController$lzycompute(DefaultComponents.scala:27)
    at modules.DefaultComponents.uploadToolController(DefaultComponents.scala:27)
    at modules.DefaultComponents.router$lzycompute(DefaultComponents.scala:24)
    at modules.DefaultComponents.router(DefaultComponents.scala:22)
    at play.api.BuiltInComponents$class.injector(Application.scala:250)
    at play.api.BuiltInComponentsFromContext.injector$lzycompute(ApplicationLoader.scala:103)
    at play.api.BuiltInComponentsFromContext.injector(ApplicationLoader.scala:103)
    at play.api.BuiltInComponents$class.application(Application.scala:259)
    at play.api.BuiltInComponentsFromContext.application$lzycompute(ApplicationLoader.scala:103)
    at play.api.BuiltInComponentsFromContext.application(ApplicationLoader.scala:103)
    at modules.ApplicationLoader.load(ApplicationLoader.scala:7)
    at play.core.server.DevServerStart$$anonfun$mainDev$1$$anon$1$$anonfun$get$1$$anonfun$apply$1$$anonfun$1$$anonfun$2.apply(DevServerStart.scala:168)
    at play.core.server.DevServerStart$$anonfun$mainDev$1$$anon$1$$anonfun$get$1$$anonfun$apply$1$$anonfun$1$$anonfun$2.apply(DevServerStart.scala:164)
    at play.utils.Threads$.withContextClassLoader(Threads.scala:21)
    at play.core.server.DevServerStart$$anonfun$mainDev$1$$anon$1$$anonfun$get$1$$anonfun$apply$1$$anonfun$1.apply(DevServerStart.scala:164)
    ... 14 more
I am unsure why this happens. I must be creating an illegal stream graph since there are no events pushed in at the time of the error
Mathias
@sirthias
Sep 27 2016 08:41
Ok, thanks for reporting. I'm on the train right now but will look into this ASAP
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 08:42
I don’t think it’s neccessarily a bug, but any insights would be super :)
Mathias
@sirthias
Sep 27 2016 08:42
An IllegalStateException is always a bug.... :)
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 08:43
yes, question is if the bug comes from my code or your library
bonus info: deleting the bla.foreach(…) line still yields the same bug
Mathias
@sirthias
Sep 27 2016 08:44
Since it is thrown from swave code it's a bug in the library. If you made any type of mistake you should get a helpful error message, not an internal ISE
I'll look into your snippet and report back here
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 08:44
thanks :)
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 08:51
I may have found the problem: I was draining the original spout twice
there was some lingering foreach later in the file
and now it works :)
I was draining pushSpout somewhere in my class
and later trying to drain bla
which was based on pushSpout
Felix Palludan Hargreaves
@hejfelix
Sep 27 2016 09:05
I believe the error message could be improved in this case since this will likely happen to many people because it is basically the only stateful part of the program
Mathias
@sirthias
Sep 27 2016 16:35
Ok, I see.
Yes, there should be a very clear error when you try to reuse any type of stream element (be it a Spout, Pipe, Drain or Module).
I'll add dedicated tests for that.
Thank you for reporting!