Процессор почему-то загружен только на 25%.
Диск HDD. Данные загрузились за 5 часов.
import asyncio
import heapq
from asyncio import DefaultEventLoopPolicy, SelectorEventLoop
from asyncio.base_events import MAXIMUM_SELECT_TIMEOUT, _format_handle
from asyncio.log import logger
_MIN_SCHEDULED_TIMER_HANDLES = 100
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
class MyEventLoop(SelectorEventLoop):
def run_forever(self):
print("run_forever")
return super().run_forever()
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
print("-"*40)
print(f"_run_once. {self._scheduled=}, {self._ready=}")
sched_count = len(self._scheduled)
if (
sched_count > _MIN_SCHEDULED_TIMER_HANDLES
and self._timer_cancelled_count / sched_count
> _MIN_CANCELLED_TIMER_HANDLES_FRACTION
):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
print(f"select timeout {timeout=}")
event_list = self._selector.select(timeout)
print(f"got {len(event_list)} events: {event_list=}")
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
print(f"{self._scheduled=}")
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
print(f"{self._ready=}")
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning(
"Executing %s took %.3f seconds", _format_handle(handle), dt
)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
print("_run_once FINISHED")
print("=" * 40)
class X(DefaultEventLoopPolicy):
def new_event_loop(self):
return MyEventLoop()
asyncio.set_event_loop_policy(X())
async def c():
pass
async def a():
print("start a")
await c()
print("finish a")
async def b():
print("start b")
await c()
print("finish b")
async def main():
asyncio.create_task(a())
print("create task")
asyncio.create_task(b())
if __name__ == "__main__":
asyncio.run(main())
def run(main, *, debug=None):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop) # Здесь
loop.run_until_complete(loop.shutdown_asyncgens()) # Здесь
loop.run_until_complete(loop.shutdown_default_executor()) # И здесь
finally:
events.set_event_loop(None)
loop.close()