import sys
import asyncio
import pyaudio
def callback(queue: asyncio.Queue, input: bytes, frames: int, time: dict, status: int):
print('callback')
queue.put_nowait(input)
return None, pyaudio.paContinue
class InputAudio():
def __init__(self, pyaudio):
self.queue = asyncio.Queue()
self.stream = pyaudio.open(format=pyaudio.get_format_from_width(2),
channels=1 if sys.platform == 'darwin' else 2,
rate=44100,
input=True,
stream_callback=lambda a, b, c, d: callback(self.queue, a, b, c, d)
)
def __aiter__(self):
return self
async def __anext__(self):
print('__anext__')
return await self.queue.get()
def close(self): self.stream.close()
async def main():
p = pyaudio.PyAudio()
input = InputAudio(p)
output = p.open(format=p.get_format_from_width(2),
channels=1 if sys.platform == 'darwin' else 2,
rate=44100,
output=True)
async for data in input:
print('here')
output.write(data)
input.close()
output.close()
p.terminate()
asyncio.run(main())
Вызывается один раз __anext__ и много раз callback:
__anext__
callback
callback
callback
...
Может ли это связанно с тем что PyAudio запускает callback в отдельном потоке?
Это игрушечный пример, и все нужно сделать именно через 2 отдельных стрима
P.S
Немного разобрался, вот рабочая версия(правда с большим отставанием)
import sys
import asyncio
import pyaudio
from typing import Callable
from queue import Queue
class InputAudio():
def __init__(self, audio):
self.futures: Queue[asyncio.Future] = Queue()
self.buffer = Queue()
self.loop = asyncio.get_running_loop()
def callback(input: bytes, frames: int, time: dict, status: int):
if self.futures.qsize() > 0:
future = self.futures.get_nowait()
self.loop.call_soon_threadsafe(lambda: future.set_result(input))
else:
self.buffer.put_nowait(input)
return None, pyaudio.paContinue
self.stream = audio.open(format=audio.get_format_from_width(2),
channels=1 if sys.platform == 'darwin' else 2,
rate=44100,
input=True,
stream_callback=callback
)
def __aiter__(self):
return self
def __anext__(self):
future = self.loop.create_future()
if self.buffer.qsize() > 0: future.set_result(self.buffer.get_nowait())
else: self.futures.put_nowait(future)
return future
def close(self): self.stream.close()
async def main():
p = pyaudio.PyAudio()
input = InputAudio(p)
output = p.open(format=p.get_format_from_width(2),
channels=1 if sys.platform == 'darwin' else 2,
rate=44100,
output=True,
frames_per_buffer=44100)
async for data in input:
output.write(data)
input.close()
p.terminate()
asyncio.run(main())
Но я все еще недостаточно разобрался в теме чтобы обьяснить это