These are chat archives for paypal/squbs

24th
Dec 2016
Anil Gursel
@anilgursel
Dec 24 2016 01:18
@akara figured out the problem in the stream. It looks like the custom stage was shutting down before Broadcast was sending it anything. What’s surprising though is that Broadcast has the following logic:
    def onPush(): Unit = {
      pendingCount = downstreamsRunning
      val elem = grab(in)

      var idx = 0
      val itr = out.iterator

      while (itr.hasNext) {
        val o = itr.next()
        val i = idx
        if (!isClosed(o)) {
          push(o, elem)
          pending(i) = true
        }
        idx += 1
      }
    }
so, it skips it and continues if the port is already closed.
Akara Sucharitakul
@akara
Dec 24 2016 01:19
That's cbState, right?
Anil Gursel
@anilgursel
Dec 24 2016 01:19
yes
Akara Sucharitakul
@akara
Dec 24 2016 01:20
But why is cbState closed? Is that because of the scope of the test?
Anil Gursel
@anilgursel
Dec 24 2016 01:20
In the test, the Source has limited number of elements.
and with upstreamFinished, I was prematurely closing the stage.
Akara Sucharitakul
@akara
Dec 24 2016 01:21
Got it. So we seem to have a timing issue. The feedback loop in case of a stream depends on the stream being alive.
Anil Gursel
@anilgursel
Dec 24 2016 01:21
I need to work on those parts.
yes
Akara Sucharitakul
@akara
Dec 24 2016 01:22
But in reality, the cbState is a state machine that exists perpetually as long as this process is still alive.
This assumes sharing of state across multiple materialized streams, of course.
Anil Gursel
@anilgursel
Dec 24 2016 01:22
correct
Akara Sucharitakul
@akara
Dec 24 2016 01:23
Either we have to make sure it does not finish when upstreamFinished, and keep it alive. But for how long?
Anil Gursel
@anilgursel
Dec 24 2016 01:23
I am currently focusing on no state sharing across materialization. Always keeping it in mind; however, the problem is already difficult enough for one materialization :smile:
I cannot say anything on that. The code I had was a copy from Zip mostly, so, that part I need to think about and re-write.
Akara Sucharitakul
@akara
Dec 24 2016 01:24
Yes, but given a circuit breaker is never by itself - it is supposed to break the circuit for all connections to the same host.
It makes me think whether this feedback loop in the stream is the right way.
Anil Gursel
@anilgursel
Dec 24 2016 01:25
that’s absolutely true.
I am actually worried about the parallelism in the flow. The feedback loop gets executed pretty late in the game. So, with maxFailure being 3, after 20 failures it still doesn’t get updated.
Akara Sucharitakul
@akara
Dec 24 2016 01:26
Again, we can do so but we need to know when to shutdown cbState.
Anil Gursel
@anilgursel
Dec 24 2016 01:26
yes, I am questioning the feedback loop as well. However, I am not sure what’s a better alternative. Agent has similar problems.
Akara Sucharitakul
@akara
Dec 24 2016 01:26
Yes, buffers can come in the way very easily.
Worse comes to worst, it goes to Atomic/Volatile.
Anil Gursel
@anilgursel
Dec 24 2016 01:28
that would require us to update the Atomic in one GraphStage and read it from another one, which is not gonna be great.
Akara Sucharitakul
@akara
Dec 24 2016 01:28
Remember PersistentBuffer with commit?
Anil Gursel
@anilgursel
Dec 24 2016 01:29
It was quite challenging though :worried:
Akara Sucharitakul
@akara
Dec 24 2016 01:29
Yes.
I just had conversation with the team using it this morning. They are running into some issues they fill the buffer faster than the read out. This is because there is no back-pressure through the buffer.
Over time, that's not going to be healthy.
Anil Gursel
@anilgursel
Dec 24 2016 01:31
it has to be a temporary buffer; otherwise, it would fill up the space.
Akara Sucharitakul
@akara
Dec 24 2016 01:31
But there is a lot more to do on the read side. We need to check whether we have enough Kafka producers.
Yes. They have some monitoring. But it needs more. The flow should be unbalanced only if downstream gets stuck, like Kafka rebalance.
In normal runs, that must not happen.
Anil Gursel
@anilgursel
Dec 24 2016 01:32
I agree
Akara Sucharitakul
@akara
Dec 24 2016 01:32
So they have to either throttle the incoming, or increase the speed on the outgoing. I think there is still a lot to do on the latter.
Akara Sucharitakul
@akara
Dec 24 2016 01:54
@anilgursel One more for preview: akara/squbs@4c8046d
This is the one that has all marshaller move from httpclient to ext. I also beefed up the test coverage on both the httpclient and marshaller. The only one thing I'm working on is the doc. It is not yet part of this commit.
It potentially has a small merge conflict with #363 so I'll wait for #363 to be merged before submitting this PR, with conflicts resolved.
Anil Gursel
@anilgursel
Dec 24 2016 18:49
ok, will review it as well.
Anil Gursel
@anilgursel
Dec 24 2016 18:55

@akara Looks like the problem is with stream running mechanics. Even with the usage of Atomic, I still have the problem of CB state not getting updated. Here is the basic stream:

    def withCircuitBreaker(circuitBreaker: CircuitBreaker): Flow[In, Try[Out], Mat] = {

      import org.squbs.streams.StreamTimeout._
      Flow.fromGraph(GraphDSL.create(flow.withTimeout(circuitBreaker.callTimeout)) { implicit b =>
        timeoutFlow =>
          import GraphDSL.Implicits._

          val feedback = Flow[Try[Out]].map { e =>
            e match {
              case Success(_) => circuitBreaker.succeed()
              case Failure(_) => circuitBreaker.fail()
            }
            e
          }
          val partition = b.add(Partition(2, (_: In) => if(circuitBreaker.shortCircuit()) 1 else 0))
          val merge = b.add(Merge[Try[Out]](2))
          val failFast = Flow[In].map{ _ => Failure(new RuntimeException("CB FailFast")) } // TODO Add fallback and right exceptions

          partition ~> timeoutFlow ~> feedback ~> merge
          partition ~> failFast                ~> merge

          FlowShape(partition.in, merge.out)
      })
    }

circuitBreaker.succeed() basically increments an atomic. With ~25 elements in the stream, this does not work. The reason is that the logic in partition gets called for all 25 elements, then I see that the logic in feedback gets called for all 25 elements sequentially.

I wonder if it has anything to do we having an async boundary in timeoutFlow for the Broadcast. But, understanding the element processing mechanics is difficult.
yeap, that seems to be the case.
when I eliminate the async boundary, the behaviour is different
let me fix the async boundary first then tackle this. If that’s the culprit, we might potentially go back to usage of Agent or feedback loop.
Anil Gursel
@anilgursel
Dec 24 2016 19:01

I wonder if it has anything to do we having an async boundary in timeoutFlow for the Broadcast.

I am referring to paypal/squbs#354. Looking into it now. Probably with some buffers we can eliminate it. Once done, then circuit breaker might start working.

Akara Sucharitakul
@akara
Dec 24 2016 19:04
Let's try. I tend to see batching behavior in the stream.
Also the failure check in feedback may need to be more specific to Failure (TimeoutException) so we do not gloss over other potential cases.
Anil Gursel
@anilgursel
Dec 24 2016 19:11
Actually, some discussion needed around that. The reason I did it that way is to align with existing CircuitBreaker, please see https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala#L65. However, if we think about HTTP, there are scenarios where we may need to consider it as failed even if there is no exception, e.g., 500 Internal Server Error.
Akara Sucharitakul
@akara
Dec 24 2016 19:13
Hmm, you're right. The response itself can be an error condition.
Akara Sucharitakul
@akara
Dec 24 2016 19:23
A 500 can be caused by errors processing requests as well, although it shouldn't. A 503 is clear. So we need a scheme for users to decide what constitutes an error condition similar to timeouts. Need to provide defaults, though.
Anil Gursel
@anilgursel
Dec 24 2016 19:24
I agree