Задать вопрос
@maryaTurova

Flask + Playwright?

Задача состоит в следующем:
Каждой вкладке присваивается ключ.

- flask принимает данные в виде словаря
{ "url": "http://example.com/", "key": key, "req": "test"}

- если ключ отсутствует в словаре вкладок, то открываем в новой вкладке url , делаем инъекцию в виде функции и присваиваем вкладке ключ.

- возвращаем результат функцию

Пример:

from playwright.sync_api import sync_playwright
from flask import Flask, request
import threading
import logging

app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.setLevel(logging.CRITICAL)


class API:
    def __init__(self) -> None:
        self.context = None
        self.browser = None
        self.playwright = None
        self.playwright_thread = threading.Thread(target=self.run)
        self.playwright_thread.start()
        self.contexts = {}

    def inject_script(self, req) -> str:
        self.contexts[key].add_script_tag(content="""
            function sleep(ms) {
              return new Promise(resolve => setTimeout(resolve, ms));
            }
            async function custom(key){
                await sleep(1000);
                var data = {
                    "host": window.location.host,
                    "req": req
                }
                return data;
            }
        """)

    def check_page(self, data):
        key = data["key"]
        url = data["url"]
        req = data["req"]
        if key not in self.contexts:
            self.contexts[key] = self.context.new_page()
            self.contexts[key].route(url, lambda r: r.fulfill(status=200, content_type="text/html", ))
            self.contexts[key].goto(url)
            self.contexts[key].wait_for_load_state('domcontentloaded')
            self.inject_script(key)
        return self.contexts[key].evaluate(f"await custom('{req}')")

    def run(self) -> None:
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(channel="chrome", headless=False)
        self.context = self.browser.new_context(locale="en")

api = API()

@app.route('/set', methods=["POST"])
def set():
    data = request.get_json()
    result = api.check_page(data)
    return result


В примере получаю ошибку:
greenlet.error: Cannot switch to a different thread

Полагаю будут ошибки и в случае параллельных запросов.
Как решить сию задачу???
  • Вопрос задан
  • 63 просмотра
Подписаться 1 Простой Комментировать
Решения вопроса 1
fenrir1121
@fenrir1121
Начни с документации
Запрос в поиск по тексту ошибки выдаёт issue на гитхабе, где чётко сказано что playwright не является потокобезопасным и стоит использовать async.
Ну и касательно подобного использования словаря стоит либо использовать каналы для общения потоков и изменять его только в одном потоке, либо использовать мьютексы
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы