Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Aleksandar Prokopec
    @axel22

    For this, use the await method instead of get on the channels service: https://github.com/reactors-io/reactors/blob/master/reactors-core/shared/src/main/scala/io/reactors/services/package.scala#L280

    (which will return you an asynchronous event stream with the channel value)

    Hope this helps!
    Aleksandar Prokopec
    @axel22
    Note also that your main method needs to be lifted into a reactor so that you can work with the Events[Channel] value returned by await. The standard way to do that is to use the piggyback scheduler and start a new reactor with it from the main method - here's an example test that does that: https://github.com/reactors-io/reactors/blob/d7bc66586cc19707db1c18c1c40f245973033dc4/reactors-core/jvm/src/test/scala/io/reactors/concurrent/jvm-services-tests.scala#L28
    Aleksandar Prokopec
    @axel22
    Btw, if you are defining a reactor with extends Reactor (a named template), then you don't need to use self (you only absolutely need self if you are defining an anonymous reactor - val proto = Reactor[String] { self => .... }).
    daniil-timofeev
    @daniil-timofeev
    Thanks for your advices
    daniil-timofeev
    @daniil-timofeev
    Now i move reactor in separate class, and spawn it from object with App trait. Separated reactor creates other reactors inside yourself.
    When i spawn that separate reactor with piggyback sheduler, it thrown an exception, when in default, there is no exception, but thread will go out from reactor and operations withhin event flows not passed
    object MoscowFrostDepths extends App{
    
    val system = new ReactorSystem("test-system")
      system.spawn({Proto.apply[CurvesReactor]}.withScheduler(JvmScheduler.Key.piggyback))
    }
    daniil-timofeev
    @daniil-timofeev
    CurvesReactor make some preprocessing and create 30-40 reactors for 1d grid numerical iterations with different coefficients. if coefficient fails in production, special reactor is used to write coefficient construction values to file, if grid calculation processed successfuly over time, result is writed to another file from all 30-40 reactors.
    java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    [error] (run-main-0) java.lang.IllegalStateException: Frame removed before being scheduled.
    java.lang.IllegalStateException: Frame removed before being scheduled.
        at io.reactors.ReactorSystem.trySpawnReactor(ReactorSystem.scala:164)
        at io.reactors.ReactorSystem.spawn(ReactorSystem.scala:89)
    ...
    Aleksandar Prokopec
    @axel22
    Could you share the complete example in a gist so that I can debug it?
    daniil-timofeev
    @daniil-timofeev
    ok, i share this example. Before i shared you can paste this code to scastie https://scastie.scala-lang.org
    and get the same exception
    import io.reactors._
    import io.reactors.JvmScheduler
    
    val system = new ReactorSystem("example-system")
    
    class WriteConsoleReactors extends Reactor[String] {
      main.events.onEvent { println(_) }
    }
    
    system.spawn(Proto[WriteConsoleReactors].withScheduler(JvmScheduler.Key.piggyback))
    Aleksandar Prokopec
    @axel22

    Thanks - I was able to reproduce this. The error is caused because the Scastie creates under-the-hood creates an inner class for WriteConsoleReactors. You can see the correct error message if you use the newThread scheduler: system.spawn(Proto[WriteConsoleReactors].withScheduler(JvmScheduler.Key.newThread)).

    Unfortunately, the exception is swallowed by the piggyback scheduler, so it's unintuitive what happens. I will fix this in the next release to ensure that the proper exception is shown.

    About the exception itself - since you cannot write top-level classes in Scastie, you should instead declare your reactors anonymously, as follows:

    import io.reactors._
    import io.reactors.JvmScheduler
    
    val system = new ReactorSystem("example-system")
    
    val writeConsoleReactor = Reactor[String] { self =>
      println("started ctor")
      self.main.events.onEvent { x =>
        println(x)
        println("exiting!")
        self.main.seal()
      }
    }
    
    val ch = system.spawn(writeConsoleReactor.withScheduler(JvmScheduler.Key.newThread))
    ch ! "There"
    
    println("done")
    Here's another example with the piggyback scheduler:
    import io.reactors._
    import io.reactors.JvmScheduler
    
    val system = new ReactorSystem("example-system")
    
    val writeConsoleReactor = Reactor[String] { self =>
      println("started ctor")
      self.main.events.onEvent { x =>
        println(x)
        println("exiting!")
        self.main.seal()
      }
    
      val console = self.main.channel
      self.system.spawn(Reactor[Unit] { self =>
          console ! "Hi!"
        self.main.seal()
      })
    }
    
    system.spawn(writeConsoleReactor.withScheduler(JvmScheduler.Key.piggyback))
    
    Thread.sleep(1000)
    
    println("done")
    Brandon Elam Barker
    @bbarker
    Any chance of a release/build for scala 2.12?
    Aleksandar Prokopec
    @axel22
    Hi Brandon!
    Thanks for the ping! Yes, I think that this should be possible. I am at the moment a bit preoccupied, and the earliest that I can jump on it is April 22nd.
    (otherwise, if you need it earlier, you could maybe tweak the build file to enable this, and I could publish it)
    Brandon Elam Barker
    @bbarker
    The PR hopefully saves some keystrokes, but it isn't all the way there yet in terms of building correctly, I think. No rush on my part, whenever you have time is great - thanks.
    Aleksandar Prokopec
    @axel22
    Thanks for the PR! I have a few additional changes that should land soon, at which point the PR will be automatically merged.
    daniil-timofeev
    @daniil-timofeev
    @cantor-chron Thanks for your code review. Make fixes according to your observation comments. I can implement test units just at the next week due to high current workload
    Cantor Chron
    @cantor-chron
    @daniil-timofeev Soungs great! Thanks a lot for the patch!
    daniil-timofeev
    @daniil-timofeev
    @axel22 Whether you plan to publish a release for scala 2.12.2?
    Aleksandar Prokopec
    @axel22
    @daniil-timofeev Scala 2.12 introduced a pretty important regression in Scala specialization (scala/bug#10277). Until this is fixed, using event streams and channels of primitive values causes boxing, and ScalaJS frontend for Reactors cannot be compiled at all. The current milestone for the issue is Scala 2.12.3.
    If you're only using the JVM version and do not care about specialization, then I would suggest that you build the 2.12 yourself and either publish it under your own Maven repo, or use the JAR directly. I would prefer not to release 2.12 officially until the issues above are fixed in Scala. Note that building Reactors with 2.12.2 is already supported in master (https://github.com/reactors-io/reactors/blob/master/project/Build.scala#L28).
    (++ 2.12.2 reactors-jvm/package should do it)
    daniil-timofeev
    @daniil-timofeev
    Thank you for the answer! I will wait 2.12.3 in that case.
    Matt Hicks
    @darkfrog26
    do you guys know that the benchmarks link is down?
    daniil-timofeev
    @daniil-timofeev
    @axel22 @cantor-chron please look
    to the following gist?
    daniil-timofeev
    @daniil-timofeev
    assert(want1Too == 1) now failed.
    Whether it is possible to allow to subscribe with onEvent after higher Events flow reaction?
    Bertrand Kang
    @bertrandkang
    @darkfrog26 It's internal - I'm working on posting the benchmarks publicly (https://github.com/reactors-io/benchmarks/tree/gh-pages and reactors-io.github.io/benchmarks/reactors-core/report).
    @daniil-timofeev Yes, it should be possible. Let me look into this.
    Cantor Chron
    @cantor-chron

    @bertrandkang I don't think this is possible - the subscription starts from the point where onEvent is called. The two onEvent calls are separate - the second subscription does not see the events that happened before it started. This is because we wanted to make streams stateless.

    If you really want the second subscription to get notified, you should cache state on mux explicitly - by calling toSignal(0) on it: val mux = factory.mux.toSignal(0)

    Then it should work.
    daniil-timofeev
    @daniil-timofeev
    as i undesrstand the code, onEvent regist callback at the Events.Emitter's Push trait, so map, collect and most other Event methods just apply functions to each value, coming from Events.Emitter to onEvent, and do this for each onEvent separately.
    daniil-timofeev
    @daniil-timofeev
    right?
    Cantor Chron
    @cantor-chron
    Yes. The map, collect, mux and so on are all lazy and stateless - they will execute no operations unless you explicitly call onEvent to create a subscription. Every time a onEvent is called, the operator chain creates a new subscription that starts a separate chain of event propagations.
    In other words, two onEvent calls do not share the same event pipeline by default. If you want to share some part of the pipeline between two subscriptions, then you need to call the toSignal method.
    daniil-timofeev
    @daniil-timofeev
    Thanks for the comment. I fixed my example and source code and it works now!
    Bertrand Kang
    @bertrandkang
    @darkfrog26 The benchmarks should now be regularly uploaded to http://reactors.io/benchmarks/reactors/report/ by our CI.
    Brandon Elam Barker
    @bbarker
    Nice
    Uros Nedic
    @urosn
    I am interested how Reactors is different from Akka?
    Fyodor Fedorowicz
    @fyodorovski
    @urosn There is a nice writeup about that http://reactors.io/resources/docs/reactors.pdf
    Brandon Elam Barker
    @bbarker
    How crazy would it be to attempt a port of reactors to dotty, just to make sure it is well supported in the future? Especially given that there seem to still be a long-standing blocker issue for scala 2.12 (#70), it might be nice to hit the ground running with Dotty - still, iirc dotty doesn't support javascript compilation yet, so now may not be the ideal time, for that reason and other reasons i don't know about
    Aleksandar Prokopec
    @axel22
    This is something for the Dotty developers to say, but since it's mostly standard Scala code, should be doable. JS won't work, since Dotty does not have it as a backend, and type specialization would have to be disabled.
    Brandon Elam Barker
    @bbarker
    ah, dotty doesn't support specialization yet?
    Aleksandar Prokopec
    @axel22
    I think that if it does, then it has a different implementation and api.
    Merlijn Boogerd
    @mboogerd
    Hi all! I am having trouble understanding the exact details of thread-safety in reactors. As a generalisation of the actor model, it appears to me that messages sent to a single channel are serialised and events on its eventstream handled in a thread-safe way. However, how about one reactor with several channels, do we get the same safety or are we required to manage our own synchronisation when handling multiple event streams within a reactor?
    Merlijn Boogerd
    @mboogerd
    I guess the above might be a silly question given the ambition of the project, but I wasn't 100% certain from just the documentation. Anyway, I added a small test and can see that connectors opened from within a reactor have their corresponding event-stream serialized with the main event-stream. I am really happy with how this framework is set up!
    What is the likelihood of this becoming the new core for Akka? :)
    Brandon Elam Barker
    @bbarker
    @mboogerd I haven't used reactors myself yet, but the last question (which I would also be curious about) would probably be better asked of the Akka devs
    daniil-timofeev
    @daniil-timofeev
    @mboogerd Hi. As i understand reactors, Events is build on Observable pattern, and multiple streams inside an actor does not run concurently. So you can sync two streams with .sync method, which will cache events from one of the event branches into corresponding array, until the another cache is empty.