Пытаюсь через flask_apscheduler запустить фоново отправку данных через flask_socketio.
__init__.py
from flask import Flask
from flask_socketio import SocketIO
from flask_apscheduler import APScheduler
sio = SocketIO()
scheduler = APScheduler()
def create_app():
app = Flask(__name__)
app.config['SECRET_KEY'] = 'SECRET_KEY'
app.config['SCHEDULER_API_ENABLED'] = True
sio.init_app(app)
scheduler.init_app(app)
from .sio import socket as socket_blueprint
app.register_blueprint(socket_blueprint)
from .scheduler import tasks as tasks_blueprint
app.register_blueprint(tasks_blueprint)
return app
sio.py
from flask import Blueprint
from flask_socketio import emit, join_room
from . import sio
socket = Blueprint('sio', __name__)
@sio.on('connect')
def connect(message):
room = session.get('room')
join_room(room)
emit('message', {'text': f'Hi!'}, room=room)
scheduler.py
from flask import Blueprint
from . import scheduler, sio
tasks = Blueprint('scheduler', __name__)
def test():
sio.emit('test', 'test', room='test')
job = scheduler.add_job(func=test, trigger='interval', seconds=1, id='test', name='test', replace_existing=True)
scheduler.start()
При запуске возникает ошибка:
Job "test (trigger: interval[0:00:01], next run at: 2022-08-27 23:54:40 MSK)" raised an exception
Traceback (most recent call last):
File "e:\Python\Flask\cardgame\venv\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "E:\Python\Flask\cardgame\app\scheduler.py", line 13, in test
sio.emit('test', 'test', room='test')
File "e:\Python\Flask\cardgame\venv\lib\site-packages\flask_socketio\__init__.py", line 827, in emit
namespace = flask.request.namespace
File "e:\Python\Flask\cardgame\venv\lib\site-packages\werkzeug\local.py", line 316, in __get__
obj = instance._get_current_object() # type: ignore[misc]
File "e:\Python\Flask\cardgame\venv\lib\site-packages\werkzeug\local.py", line 513, in _get_current_object
raise RuntimeError(unbound_message) from None
RuntimeError: Working outside of request context.
This typically means that you attempted to use functionality that needed
an active HTTP request. Consult the documentation on testing for
information about how to avoid this problem.
Поискал в гугле и все ссылаются что необходимо передать контекст, так как шедулер работает вне потока flask. Однако где бы я не пытался передать контекст
with app.app_context():
- ошибка сохраняется.