Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 21:48
    ioquatix edited #90
  • 21:47
    ioquatix labeled #90
  • 21:47
    ioquatix commented #90
  • 17:37
    AllanKlaus opened #90
  • Dec 10 04:21
    ioquatix commented #40
  • Dec 07 16:26
    BMorearty commented #89
  • Dec 07 09:48
    ioquatix commented #89
  • Dec 07 02:13
    BMorearty closed #89
  • Dec 07 02:13
    BMorearty commented #89
  • Dec 07 00:59
    ioquatix commented #89
  • Dec 06 22:22
    BMorearty opened #89
  • Nov 26 05:12

    ioquatix on v0.34.4

    (compare)

  • Nov 26 05:01

    ioquatix on master

    Bump dependencies. Bump version. (compare)

  • Nov 23 03:20

    ioquatix on master

    Add example of synchronous and … (compare)

  • Nov 15 07:39
    jellybob commented #87
  • Nov 14 00:52
    ioquatix commented #87
  • Nov 13 13:03
    coveralls commented #87
  • Nov 13 12:33
    jellybob synchronize #87
  • Nov 13 09:58
    jellybob commented #88
  • Nov 13 01:38
    ioquatix commented #88
Samuel Williams
@ioquatix
Regarding the influxdb asynchronous adapter, it looks like it's not async in the sense we expect - it's using threads and mutexes which can cause deadlocks and other issues when combined with async.
However, it should be super simple to fix
just need to replace that line with Async::IO::UDPSocket.
Hopefully going forward, with Ruby 3.0 it won't be needed.
Then, you should avoid using the thread/mutex implementation of influxdb - there is no point once the underlying socket is non-blocking.
Async::Task.current do
  @internet.post(url, headers, body)
end
Where did you get that idea from? It's not quite right...
Just do
@internet.post(url, headers, body)
Let me show you an example of how to do it.
Samuel Williams
@ioquatix
require 'async'
require 'async/http/internet'

class Middleware
    def initialize(app)
        @internet = Async::HTTP::Internet.new
    end

    def call(env)
        # Synchronous
        response = @internet.post(...)

        # Asynchronous
        Async do
            response = @internet.post(...)
        end

        # ...
    end
end
Samuel Williams
@ioquatix
@Paul I would be happy to help you in more detail if you provide a working test case/example
Paul Sadauskas
@paul

Thanks , @ioquatix!

Firstly, async-rspec is warning you that you are leaking file descriptors.

Yeah, I discovered its coming from the aws-ruby-sdk, which uses a thread for request connection keep-alive.

I've squashed it for now by doing Aws.empty_connection_pools! after every request in my rack app. Not ideal, but I cache the results of the requests in memory for 10 minutes, so not a big deal.

Then, you should avoid using the thread/mutex implementation of influxdb - there is no point once the underlying socket is non-blocking.

I still would want the "buffer" aspect of it. Each request to my app (of which there's about 100/s) results in 0 or 1 measurement to report to influxdb. The influx API allows multiple measurements in a single write request, so I want to avoid the HTTP overhead. I have a buffer of measurements that gets flushed to influx after 1000 measurements or 5 seconds, instead of 100/s which would probably crush the influx server.

Where did you get that idea from? It's not quite right...

This line in the async README:

The cost of using Async{...} is minimal for initialization/server setup, but is not ideal for per-connection tasks.

I missed the one a few sentences above until just now:

If Async(&block) happens within an existing reactor, it will schedule an asynchronous task and return.

So I thought to be efficient within a falcon reactor, I needed to somehow get access to that reactor's current task.

I'll see if I can come up with a more detailed test case, but my app is a little more complicated. The flow looks like this:

  1. The App#call receives a request
  2. It passes the request body to a Parser to find relevant bits
  3. The interesting parts are passed to a Decoder to convert them into one of several possible Metrics
  4. The Metric is passed to an Adapter, for either the influxdb v1 or v2 api.
  5. The Adapter converts the Metric to a Measurement of the proper format depending on the version, and passes it to a Writer
  6. The Writer buffers the measurements, and when its time HTTP POSTs them all at once to influxdb

So in my case, the Writer's @internet.post call is a long way removed from the rack App. But, it sounds like the thing I'm doing with Async::Task.current is automatically handled by Async(&block)? Here's my writer implementation: https://gist.github.com/paul/fe7e2121c2795ddf1519a664cc12e352#file-writer-rb-L49-L55. Instead of this, I just to Async do? Same thing with the timer on L75?

Paul Sadauskas
@paul
@picatz Thanks for digging into that. Seems gross :P I think the Aws.empty_connection_pools! thing after every request is gonna be fine.
Samuel Williams
@ioquatix
Regarding Async::Task.current do I think what you want is just Async do.

I've squashed it for now by doing Aws.empty_connection_pools! after every request in my rack app. Not ideal, but I cache the results of the requests in memory for 10 minutes, so not a big deal.

That's a reasonable approach and very safe.

I made this for you hopefully it clarifies what you are after w.r.t. upstream HTTP requests: https://github.com/socketry/falcon/blob/master/examples/internet/config.ru
Seahourse is a little bit tricky because they kind of hacked HTTP/1 using Net::HTTP and implemented their own HTTP/2 (buggy) client using http2 gem.

Here's my writer implementation: (gist link)

that actually looks pretty much okay to me

@task.async will scope the child task to the specified parent. Async do will scope the task to whatever the current task is. It's slightly less efficient because it needs to look up a thread local, so @task.async can be better in perf critical code.
If you want to buffer metrics, it's possible
Paul Sadauskas
@paul
@ioquatix I seem to be running into an issue with async/http writing to influxdb. AFAIK, its doesn't support HTTP/2, but somehow async/http thinks it does and is running into an error: Protocol::HTTP2::StreamError: Stream closed. I'm trying to figure out how to capture the TLS keys so I can look at the packets in wireshark :-/
Samuel Williams
@ioquatix
That’s odd are you sure it doesn’t support HTTP/2? Because it requires ALPN to advertise HTTP/2 support
Hold on...
Paul Sadauskas
@paul
 0.78s    debug: Async::HTTP::Protocol::HTTPS [pid=11127] [2019-11-22 22:24:49 -0700]
               | Negotiating protocol "h2"...
 0.92s    error: #<Async::Task:0x2aeadce473ec connected to #<Addrinfo: 52.51.140.92:8086 TCP (calvinklein-5713f949.influxcloud.net)> [fd=8] (failed)> [pid=11127] [2019-11-22 22:24:50 -0700]
               |   Protocol::HTTP2::StreamError: Stream closed!
Samuel Williams
@ioquatix
Sorry, I was trying to write it on mobile phone and it didn't come out correctly, I just jumped to my laptop.
Paul Sadauskas
@paul
maybe it does, but i don't see it documented anywhere
I was able to snag a wireshark capture and a keylog file, so I can sorta see the capture, but nothing interesting
Samuel Williams
@ioquatix
Run with CONSOLE_LEVEL=debug (or if that does nothing bundle update or use CONSOLE_LOG_LEVEL=debug)
Negotiating protocol "h2" means that ALPN from the SSL termination advertised HTTP/2 support
which is generally good
Because you want to reduce the overhead per log item
IF you are using HTTP/2 there is almost no reason to use any kind of buffering/batching
The overhead per request is tiny
There could be a bug in the HTTP/2 implementation or some incompatibility, can you privately share some example with me so I can make a connection to that server?
Paul Sadauskas
@paul
hunh... so the first Async works, the 2nd does not:
Async do
  internet = Async::HTTP::Internet.new
  response = internet.post(url, headers, body)
  pp response.status, response.headers.fields, response.read
ensure
  internet.close
end.wait

Async do
  endpoint = Async::HTTP::Endpoint.parse(url)

  client = Async::HTTP::Client.new(endpoint)

  response = client.post(url, headers, body)

  pp response.status, response.headers.fields, response.read

ensure
  client.close
end.wait
The second example came from the async README when I was trying to capture the ssl certs
Paul Sadauskas
@paul
@ioquatix This is interesting. Connecting to the url in the example ALSO fails the same way:
url = "https://www.codeotaku.com/index"
Async do
  endpoint = Async::HTTP::Endpoint.parse(url)

  client = Async::HTTP::Client.new(endpoint)

  response = client.post(url, [], body)

  pp response.status, response.headers.fields, response.read

ensure
  client.close
end.wait
 0.34s    error: #<Async::Task:0x2b019c0283e8 connected to #<Addrinfo: 192.81.135.133:443 TCP (www.codeotaku.com)> [fd=8] (failed)> [pid=15570] [2019-11-22 22:43:02 -0700]
               |   Protocol::HTTP2::StreamError: Stream closed!
Samuel Williams
@ioquatix
That’s a bit odd I can check and see if it fails on my end
That is part of the specs and they are currently passing so it seems
a bit strange
Paul Sadauskas
@paul
@ioquatix I've tried several things from your examples, removing the buffering, just doing Async { } and so on. I just can't seem to make it make the requests in production. I usually get same StreamError as above, but I occasionally get a Protocol::HTTP2::GoawayError. Is there a way to just force Async::HTTP to not attempt HTTP2?
Samuel Williams
@ioquatix
Can you make sure body is not just a string and you’ve wrapped it as per my example
@paul let
e
Me know if that doesn’t fix your issue. You definitely want to be using HTTP/2
Paul Sadauskas
@paul
I've tried both with and without. Oddly, I can't figure out how to reproduce it locally.
In prod, i'm running falcon behind caddy as a proxy. I'll have to see if I can set up something similar locally tomorrow.
Samuel Williams
@ioquatix
When you get a go away error it should give you a reason
Samuel Williams
@ioquatix
@Paul do you mind elaborating a bit more on what you are doing with influxdb?