@app.get("/schedule/show_schedules/",response_model=CurrentScheduledJobsResponse,tags=["schedule"])
async def get_scheduled_syncs():
"""
Will provide a list of currently Scheduled Tasks
"""
schedules = []
for job in Schedule.get_jobs():
schedules.append({"job_id": str(job.id), "run_frequency": str(job.trigger), "next_run": str(job.next_run_time)})
return {"jobs":schedules}
add_job()
?
so İ have a Bot
class which is from twitchio
package, and that's running asynchronously.
what would I have to do if I wanted to make apshceduler and twitchio bot run together?
aim: I will update the data periodically so my bot will respond faster
from twitchio.ext import commands
class Bot(commands.Bot):
def __init__(self):
...
bot = Bot()
bot.run() # there is run_until_complete function in run()
maxiumum number of running instances reached(1)
but I set max_instances to 5try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 47200))
log.info("socket bind")
except socket.error:
log.info("socket error")
pass
else:
log.info("socket - register extensions")
scheduler = AsyncIOScheduler()
log.basicConfig()
log.getLogger("apscheduler").setLevel(log.DEBUG)
scheduler.start()
scheduler.add_job(
test_function,
trigger="interval",
id="test_job",
replace_existing=True,
seconds=30,
)
bot = Bot()
bot.run()