import asyncio
from contextlib import suppress
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
async def some_async_handler(event):
# дальше уже, что то сделать с ним
class SomeEventHandler(FileSystemEventHandler):
def __init__(self, loop, *args, **kwargs):
self._loop = loop
super().__init__(*args, **kwargs)
def on_any_event(self, event):
asyncio.run_coroutine_threadsafe(some_async_handler(event), self._loop)
loop = asyncio.get_event_loop()
event_handler = SomeEventHandler(loop)
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
loop.run_forever()
except KeyboardInterrupt:
observer.stop()
for task in asyncio.Task.all_tasks():
task.cancel()
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
finally:
loop.close()
observer.join()
cython critical_business_rules.py -o critical_business_rules.c
gcc -shared -pthread -fPIC -fwrapv -O2 -Wl,--strip-all -Wall -fno-strict-aliasing -I/usr/lib64/python3.6 -o critical_business_rules.so critical_business_rules.c
from random import randint, uniform
def argrandomizer(func):
def wrapper():
arg_1 = uniform(0.8, 2.2)
arg_2 = randint(-3, 3)
arg_3 = randint(-3, 3)
return func(arg_1, arg_2, arg_3)
return wrapper
@argrandomizer
def my_func(arg_1, arg_2, arg_3):
print( arg_1 )
print( arg_2 )
print( arg_3 )
my_func()
my_func()
my_func()
1.48763356514
-3
-3
2.07023550873
-2
3
1.48356419425
-2
-1
def foo(value=None):
value = value or []