import json
import typing
from starlette.responses import Response
class PrettyJSONResponse(Response):
media_type = "application/json"
def render(self, content: typing.Any) -> bytes:
return json.dumps(
content,
ensure_ascii=False,
allow_nan=False,
indent=4,
separators=(", ", ": "),
).encode("utf-8")
@app.get("/", response_class=PrettyJSONResponse)
async def get_some_json():
...
var code="@app.get("/long_task", status_code=202) # 202=Accepted
async def long_task():
from backend.workers import tasks
mytask = tasks.long_task.apply_async(kwargs={'num':30})
return {'task_id': mytask.task_id}
@app.get("/long_task/status/{task_id}", status_code=200) # 200=OK
async def long_task_status(task_id: str):
from backend.workers import tasks
result = tasks.long_task.AsyncResult(task_id)
return {'status': result.status}
"
``` @app.get("/long_task", status_code=202) # 202=Accepted
async def long_task():
from backend.workers import tasks
mytask = tasks.long_task.apply_async(kwargs={'num':30})
return {'task_id': mytask.task_id}
@app.get("/long_task/status/{task_id}", status_code=200) # 200=OK
async def long_task_status(task_id: str):
from backend.workers import tasks
result = tasks.long_task.AsyncResult(task_id)
return {'status': result.status}
```
```var code="@app.get("/long_task", status_code=202) # 202=Accepted
async def long_task():
from backend.workers import tasks
mytask = tasks.long_task.apply_async(kwargs={'num':30})
return {'task_id': mytask.task_id}
@app.get("/long_task/status/{task_id}", status_code=200) # 200=OK
async def long_task_status(task_id: str):
from backend.workers import tasks
result = tasks.long_task.AsyncResult(task_id)
return {'status': result.status}";```
var code="@app.get("/long_task", status_code=202) # 202=Accepted
async def long_task():
from backend.workers import tasks
mytask = tasks.long_task.apply_async(kwargs={'num':30})
return {'task_id': mytask.task_id}
@app.get("/long_task/status/{task_id}", status_code=200) # 200=OK
async def long_task_status(task_id: str):
from backend.workers import tasks
result = tasks.long_task.AsyncResult(task_id)
return {'status': result.status}"
var code="from celery import Celery
CELERY_TASK_LIST = [
'worker.tasks',
]
def create_celery_app():
celery_app = Celery('worker')
celery_app.conf.update(
result_backend = 'redis://localhost:6379/1',
broker_url = 'redis://localhost:6379/0',
broker_transport_options = {'confirm_publish': True},
task_track_started = True,
task_ignore_result = False,
accept_content = ['json'],
task_serializer = 'json',
result_serializer = 'json',
worker_send_task_events = True,
timezone = 'Asia/Calcutta',
)
return celery_app
celery_app = create_celery_app()
var code="from backend.celery_common import celery_app"