from logging import debug
from flask import Flask
from flask_socketio import SocketIO
from celery import Celery
import threading
app = Flask("server", static_url_path="", static_folder='/')
app.config['SECRET_KEY'] = 'gjr39dkjn344_!67#'
app.config['broker_url'] = 'redis://127.0.0.1:6379/0'
app.config['result_backend'] = 'redis://127.0.0.1:6379/0'
def make_celery(app):
celery = Celery(
"server",
backend=app.config['broker_url'],
broker=app.config['result_backend']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
@celery.task()
async def my_background_task():
f = open("text.txt", "w")
f.write("fsdsdf")
return "sss"
@app.route("/")
async def main():
task = my_background_task.delay()<code></code>
print(task)
return "sss"