"""async testing module"""
import asyncio
import datetime
async def put_message_on_s3(message_date_: datetime, n_exec: int):
"""async function"""
x = (12 - n_exec) / 2 + 0.1
await asyncio.sleep(float(x))
print(
"message_date {} and number of execution = {}".format(
message_date_.strftime("%B %-d, %Y"), str(n_exec)
)
)
return "execution number: " + str(n_exec)
loop = asyncio.get_event_loop()
tasks = []
groups = []
groups_tuple = tuple()
async def main_func():
"""main async func"""
ind = 0
interim_results = []
while True:
mydate = datetime.datetime.now()
task = asyncio.ensure_future(put_message_on_s3(mydate, ind))
tasks.append(task)
ind += 1
print("\nNEXT MESSAGE\n")
await asyncio.sleep(1)
if ind >= 10:
break
for a_task in tasks:
interim_result = await a_task
interim_results.append(interim_result)
# print(interim_result)
return interim_results
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
completion = loop.run_until_complete(main_func())
print("put_message_on_s3()= {}".format(completion))
loop.close()
"""async testing module"""
import asyncio
import datetime
async def put_message_on_s3(message_date_: datetime, n_exec: int):
"""async function"""
x = (12 - n_exec) / 2 + 0.1
await asyncio.sleep(float(x))
print(
"message_date {} and number of execution = {}".format(
message_date_.strftime("%B %-d, %Y"), str(n_exec)
)
)
return "execution number: " + str(n_exec)
loop = asyncio.get_event_loop()
tasks = []
groups = []
groups_tuple = tuple()
async def main_func():
"""main async func"""
ind = 0
interim_results = []
while True:
mydate = datetime.datetime.now()
task = asyncio.ensure_future(put_message_on_s3(mydate, ind))
tasks.append(task)
ind += 1
print("\nNEXT MESSAGE\n")
await asyncio.sleep(1)
if ind >= 10:
break
for a_task in tasks:
interim_result = await a_task
interim_results.append(interim_result)
# print(interim_result)
return interim_results
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
completion = loop.run_until_complete(main_func())
print("put_message_on_s3()= {}".format(completion))
loop.close()
"""async testing module"""
import asyncio
import datetime
async def put_message_on_s3(message_date_: datetime, n_exec: int):
"""async function"""
x = (12 - n_exec) / 2 + 0.1
await asyncio.sleep(float(x))
print(
"message_date {} and number of execution = {}".format(
message_date_.strftime("%B %-d, %Y"), str(n_exec)
)
)
return "execution number: " + str(n_exec)
loop = asyncio.get_event_loop()
tasks = []
groups = []
groups_tuple = tuple()
async def main_func():
"""main async func"""
ind = 0
interim_results = []
while True:
mydate = datetime.datetime.now()
task = asyncio.ensure_future(put_message_on_s3(mydate, ind))
tasks.append(task)
ind += 1
print("\nNEXT MESSAGE\n")
await asyncio.sleep(1)
if ind >= 10:
break
for a_task in tasks:
interim_result = await a_task
interim_results.append(interim_result)
# print(interim_result)
return interim_results
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
completion = loop.run_until_complete(main_func())
print("put_message_on_s3()= {}".format(completion))
loop.close()
а yapf кстати работает... перешел на него