strict_slashes
docs
I've written about how to use postgres with Quart, specifying my preferred libraries (Databases & asyncpg) and how to convert types automatically,
https://pgjones.dev/blog/quart-postgres-2021/
Would be good to know if you think this is better as an extension
Hi @pgjones I am stumped with the following. I'm trying to pass a cursor_id
to a websocket view, like so:
@home_app.websocket("/ws")
@login_required
async def ws():
dbc = current_app.dbc
cursor_id = int(request.args.get("cursor_id"))
But when I try to read the argument I get the error: "Attempt to access request outside of a relevant context". How can I pass that value from the template to the websocket endpoint?
FileStorage.stream
not being an asyncio object, or would we be locked waiting for the (currently) 16KB chunks of data on every iteration? I think those would probably be fine considering the kernel can cache possible future file reads, but I'm not sure.
queue.get()
also never worked
This is the code on the docs
connected_websockets = set()
def collect_websocket(func):
@wraps(func)
async def wrapper(*args, **kwargs):
global connected_websockets
queue = asyncio.Queue()
connected_websockets.add(queue)
try:
return await func(queue, *args, **kwargs)
finally:
connected_websockets.remove(queue)
return wrapper
This creates an empty queue, and then adds it to the set
, but the queue is completely empty.
len(connected_websockets)
would indicate how many connected websockets you have
@pgjones sorry for the late reply, but here's my code:
from quart import Quart, websocket
from functools import wraps
import asyncio
app = Quart(__name__)
connected_websockets = set()
def collect_websocket(func):
@wraps(func)
async def wrapper(*args, **kwargs):
global connected_websockets
queue = asyncio.Queue()
connected_websockets.add(queue)
print(connected_websockets)
try:
return await func(queue, *args, **kwargs)
finally:
connected_websockets.remove(queue)
return wrapper
async def broadcast(message):
for queue in connected_websockets:
queue.put_nowait("New connection")
@app.websocket('/')
@collect_websocket
async def ws(queue):
while True:
data = await websocket.receive()
print(data)
await broadcast(data)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
You can ignore the host="0.0.0.0"
, that's just there because it's on replit
"New connection"
(ignoring the data) on a queue, but you never read the data from the queue. The connected_websockets
set will contain a queue for each websocket connection though. What are you hoping will happen?
Hi, everyone.
I've just started using quart for my tests, I'm using this simple code :
def start_app():
from keyboard import press
import quart
app = quart.Quart(__name__)
@app.route("/api", methods=["POST"])
async def json():
return {"hello": "quart world"}
app.run(host='myip',port=5000)
start_app()
and it's working fine, except that I always get these unwanted messages when it starts:
- Serving Quart app 'main'
- Environment: production
- Please use an ASGI server (e.g. Hypercorn) directly in production
- Debug mode: False
- Running on http://myhost:5000 (CTRL + C to quit)
[2021-04-05 18:19:07,784] Running on http://myhost:5000 (CTRL + C to quit).
Can anyone please tell me how to disable them?
I've tried
logging.getLogger('werkzeug').setLevel(logging.CRITICAL)
logging.getLogger('app.serving').setLevel(logging.CRITICAL)
logging.getLogger('quart.serving').setLevel(logging.CRITICAL)
logging.getLogger('app.serving').disabled = True
logging.getLogger('quart.serving').disabled = True
logging.getLogger('app.serving').propagate = False
logging.getLogger('quart.serving').propagate = False
nothing helps
thank you, it worked in combination with
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, "w")
but I actually wanted to start this function in background via this code:
import multiprocess as mp
proc = mp.Process(target=start_app, args=())
proc.start()
and that still doesn't work because besides those lines I'm also getting an empty line in console (the app waiting for the requests I guess).
Is there a way to start/stop the quart app in background?
import mock_service_app
import multiprocessing as mp
proc = mp.Process(target=mock_service_app.start_app, args=())
proc.start()
proc = mp.Process(target=start_app, args=(), initializer=mute)