by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Raymond Roestenburg
    @RayRoestenburg
    1.3.3 does not support 2.6
    2.0 will
    Jason Carreira
    @jasoncarreira
    Don't I lose typed actors wit pre 2.6?
    16 replies
    with
    Thomas
    @thomasschoeftner

    Hi - I have a question regarding testing of Streamlets.
    We have some Egresses - essentially Streamlets without outlets - which are being tested with ScalaTest.

    a) How would you write assertions on a Streamlet which has no Outlet?

    b) We tried to inject Mocks in the Streamlets, it appears these throw exceptions on the Akka-managed threads, which will never be caught by the unit-test thread.
    What would be the best way to inject test-behavior (ideally per Mocks) into Streamlets?

    154 replies
    Oleg Myagkov
    @OBenner
    Hi! I use RocksDB stateBackend, which storage in hdfs . I have error - Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Is it necessary to extend the docker image of cloudflow with the necessary libraries?
    21 replies
    Jason Carreira
    @jasoncarreira
    On to the next hurdle... Any guide or hints on setting up CloudFlow on AKS? I'm willing to be a guinea pig
    15 replies
    Thomas
    @thomasschoeftner
    Hi again - how can we inject streamlet configuration in tests based on the AkkaStreamlet Testkit?
    There is no blueprint, so we don't have a named streamlet instance to configure.
    Many thanks!
    8 replies
    Oleg Myagkov
    @OBenner

    Hi! I use RocksDB stateBackend, which storage in hdfs . I have error - Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Is it necessary to extend the docker image of cloudflow with the necessary libraries?

    in continuation of this topic ... We made a docker image with Hadoop libraries (
    flink-shaded-hadoop-2-uber) The previous error went away, but there was another one related to authorization, while hdfs has disabled authorization logFile

    2 replies
    Jason Carreira
    @jasoncarreira
    I got permissions to ACR and buildAndPublish works, but now I get this:
    kubectl cloudflow deploy acr4369eec9.azurecr.io/mtb/cloudflow-poc:2-dd21081-dirty                  
    
    [Error] Could not verify protocol version. Kubernetes API returned an error: No Cloudflow operators detected in the cluster. Exiting
    kubectl get cloudflow --all-namespaces
    NAMESPACE   NAME      AGE
    cloudflow   default   d3h
    11 replies
    Jason Carreira
    @jasoncarreira
    @RayRoestenburg You pointed me to this, I think? https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-i/
    Is there a good way to add the HTTP routes to cloudflow streamlets to be able to query their state like he does in that?
    Raymond Roestenburg
    @RayRoestenburg
    Have a look at HttpServerLogic, you can extend from AkkaServerStreamlet and expose an Http service
    6 replies
    kozy
    @kozy
    Hi, I am trying to create a streamlet with singleWebSocketRequest and fromSinkAndSourceMat. From what I understand singleWebSocketRequest materialises the stream. I am pretty new to this but I think the approach now should be to override the run() method and put that logic there. Would someone be so kind to confirm or point me elsewhere? Thanks!
    Raymond Roestenburg
    @RayRoestenburg
    Hi @kozy , that sounds correct. I would try to modify from this example: https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#singlewebsocketrequest (and as you say put this in a run() method). I have not had time myself to try this out, I might get to that next week.
    Jason Carreira
    @jasoncarreira
    @RayRoestenburg Trying to figure out how to hook in the ask pattern as part of the HttpServerLogic
    HttpServerLogic serverLogic = new HttpServerLogic(this) {
                @Override
                public Route createRoute(WritableSinkRef sinkRef) {
                    return path("aggregate", () ->
                            get(() ->
                                    completeOKWithFuture(
                                            AskPattern.ask(
                                                    actorRef,
                                                    (ActorRef<TransactionAggregate> replyTo) -> new TransactionAggregateActor.GetAggregation(replyTo),
                                                    Duration.ofSeconds(2),
                                                    system.scheduler()
                                            ))
                            )
                    )
                })
    
            };
    Doesn't like the type going into completeOkWithFuture, says it wants a CompletionStage<RequestEntity> but is getting CompletionStage<TransactionAggregate>. Also, do I have to do anything to make the server logic run?
    Raymond Roestenburg
    @RayRoestenburg
    You need to use the serverlogic in a streamlet:
      class TestHttpServer extends AkkaServerStreamlet {
         implicit val jsonformatData: RootJsonFormat[Data] = jsonFormat2(Data.apply)
    
         val outlet = AvroOutlet[Data]("out", _.id.toString)
         val shape = StreamletShape(outlet)
    
         override def createLogic = new HttpServerLogic(this) {
           val writer = sinkRef(outlet)
           override def route(): Route = {
             put {
               entity(as[Data]) { data ⇒
                 if (data.id == 42) {
                   onSuccess(writer.write(data)) { _ ⇒
                     complete(StatusCodes.OK)
                   }
                 } else complete(StatusCodes.BadRequest)
               }
             }
           }
         }
       }
    For instance (sorry for the Scala, we're gong to do a lot of docs next week, so hopefully we'll have far more examples)
    You likely still need to translate the response of the actor to a httpresponse?
    Are you sure that is not telling you that it needs a CompletionStage<ResponseEntity>?
    So you need to translate your TransactionAggregate to a HTTP ResponseEntity @jasoncarreira
    Jason Carreira
    @jasoncarreira
    How do I do that? There don't seem to be implementations of ResponseEntity
    Raymond Roestenburg
    @RayRoestenburg
    Please have a look at https://doc.akka.io/docs/akka-http/10.1.12/introduction.html, there are examples of how to complete with json responses there
    Jason Carreira
    @jasoncarreira
    HttpServerLogic<NotUsed> serverLogic = new HttpServerLogic<NotUsed>(this,
                    null, null, context) {
                @Override
                public Route createRoute(WritableSinkRef<NotUsed> sinkRef) {
                    return path("aggregate", () ->
                            get(() ->
                                    {
                                        CompletionStage<TransactionAggregate> completionStage = AskPattern.ask(
                                                actorRef,
                                                TransactionAggregateActor.GetAggregation::new,
                                                Duration.ofSeconds(2),
                                                system.scheduler()
                                        );
                                        Marshaller<TransactionAggregate, RequestEntity> marshaller =
                                                Jackson.<TransactionAggregate>marshaller(JsonMappers.mapper);
                                        return completeOKWithFuture(
                                                completionStage,
                                                marshaller
                                        );
                                    }
                            )
                    );
    
                }
            };
    Don't have an outlet or a marshaller to the outlet type...
    Jason Carreira
    @jasoncarreira
    When I run this with runLocal how do I hit this endpoint?
    Raymond Roestenburg
    @RayRoestenburg
    A streamlet must have at least 1 inlet or 1 outlet
    runLocal will show where the endpoints are
    (through localhost)
    Jason Carreira
    @jasoncarreira
    Okay, I've got an inlet to the streamlet. This is the one that's taking in results and passing them to an actor to keep an aggregate total. I also want to add an HTTP endpoint to query that actor for the current aggregate state. Is that possible, or do I need to somehow make the actor visible to a different streamlet which holds this HttpServerLogic for it's graph logic?
    [success] /Users/jason/projects/cloudflow-poc/transactionIngest/src/main/blueprint/blueprint.conf verified.
    [info] No local.conf file location configured. 
    [info] Set 'runLocalConfigFile' in your build to point to your local.conf location 
    ---------------------------------- Streamlets ----------------------------------
    processor [mtb.mercury.cloudflow.transaction.processor.TransactionProcessor]
    sink [mtb.mercury.cloudflow.transaction.egress.TransactionEgress]
        - HTTP port [3000]
    source [mtb.mercury.cloudflow.transaction.ingest.TransactionSource]
    --------------------------------------------------------------------------------
    
    --------------------------------- Connections ---------------------------------
    source.transactions -> processor.transaction-requests
    processor.transaction-responses -> sink.transaction-responses
    --------------------------------------------------------------------------------
    but getting localhost:3000 doesn't do anything
    Ah...
    2020-05-28 10:42:36,328 ERROR [localRunner] - Streamlet [sink] failed to launch. Reason: null
    cloudflow.runner.LocalRunner$StreamletLaunchFailure: Streamlet [sink] failed to launch. Reason: null
            at cloudflow.runner.LocalRunner$$anonfun$$nestedInanonfun$run$6$1.applyOrElse(LocalRunner.scala:151)
            at cloudflow.runner.LocalRunner$$anonfun$$nestedInanonfun$run$6$1.applyOrElse(LocalRunner.scala:149)
            at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
            at scala.util.Failure.recoverWith(Try.scala:236)
            at cloudflow.runner.LocalRunner$.$anonfun$run$6(LocalRunner.scala:149)
            at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
            at scala.collection.Iterator.foreach(Iterator.scala:941)
            at scala.collection.Iterator.foreach$(Iterator.scala:941)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
            at scala.collection.IterableLike.foreach(IterableLike.scala:74)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at scala.collection.TraversableLike.map(TraversableLike.scala:238)
            at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
            at scala.collection.AbstractTraversable.map(Traversable.scala:108)
            at cloudflow.runner.LocalRunner$.run(LocalRunner.scala:142)
            at cloudflow.runner.LocalRunner$.$anonfun$main$2(LocalRunner.scala:87)
            at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
            at scala.Console$.withOut(Console.scala:167)
            at cloudflow.runner.LocalRunner$.main(LocalRunner.scala:84)
            at cloudflow.runner.LocalRunner.main(LocalRunner.scala)
    Caused by: java.lang.NullPointerException: null
            at cloudflow.akkastream.util.javadsl.HttpServerLogic.<init>(HttpServerLogic.scala:96)
            at mtb.mercury.cloudflow.transaction.egress.TransactionEgress$1.<init>(TransactionEgress.java:54)
            at mtb.mercury.cloudflow.transaction.egress.TransactionEgress.createLogic(TransactionEgress.java:53)
            at cloudflow.akkastream.AkkaStreamlet.run(AkkaStreamlet.scala:70)
            at cloudflow.akkastream.AkkaStreamlet.run(AkkaStreamlet.scala:34)
            at cloudflow.streamlets.Streamlet.run(Streamlet.scala:106)
            at cloudflow.runner.LocalRunner$.$anonfun$run$7(LocalRunner.scala:147)
            at scala.util.Success.$anonfun$map$1(Try.scala:255)
            at scala.util.Success.map(Try.scala:213)
            at cloudflow.runner.LocalRunner$.$anonfun$run$6(LocalRunner.scala:145)
            ... 17 common frames omitted
    Jason Carreira
    @jasoncarreira
    Gotta be the nulls I'm passing in for the outlet and unmarshaller
    Don't know what else to pass in, though
    Raymond Roestenburg
    @RayRoestenburg
    there is a fix for that in a later version, you don't need to have an outlet any more in a httpserverlogic. But in the meantime you could add a dummy outlet.
    Jason Carreira
    @jasoncarreira
    When / where can I get a later version?
    Only building from source?
    Raymond Roestenburg
    @RayRoestenburg
    We're releasing 2.0 next week
    yes, and I would not do that right now
    We're in the process of some testing, and a lot has changed between 1.3.3 and 2.0
    Jason Carreira
    @jasoncarreira
    Okay. I can wait on adding this feature to next week. Do you have a writeup of the changes coming in 2.0?
    Raymond Roestenburg
    @RayRoestenburg
    Soon! :-)
    Jason Carreira
    @jasoncarreira
    Haha okay
    Raymond Roestenburg
    @RayRoestenburg
    You can look at the commits in master of course and the closed PRs
    Jason Carreira
    @jasoncarreira
    Will 2.0 support Akka 2.6.5?
    1 reply
    Jason Carreira
    @jasoncarreira
    Also, any chance Cloudflow will support querying / updating the Confluent Schema Registry?
    4 replies
    Anil Kumar
    @akandach
    How to connect to Azure event hub from the cloudflow? Any example available?
    1 reply