113 lines
3.8 KiB
Python
113 lines
3.8 KiB
Python
import io
|
|
import queue
|
|
import threading
|
|
import time
|
|
import wave
|
|
|
|
|
|
class SipCall:
|
|
_queue: queue.Queue
|
|
_file_path: str
|
|
|
|
def __init__(self, file_path) -> None:
|
|
self._file_path = file_path
|
|
self._queue = queue.Queue()
|
|
with wave.open(file_path, "rb") as f:
|
|
self._channels = f.getnchannels()
|
|
self._sampwidth = f.getsampwidth()
|
|
self._framerate = f.getframerate()
|
|
|
|
def wave_to_queue(self, chunk_size=4096, sleep=0.1):
|
|
while True:
|
|
with wave.open(self._file_path, "rb") as f:
|
|
while True:
|
|
frames = f.readframes(chunk_size)
|
|
if not frames:
|
|
break
|
|
|
|
self._queue.put_nowait(frames)
|
|
time.sleep(sleep)
|
|
|
|
def process_recorded_audio(self, iters=10000):
|
|
audio_bytes = bytearray()
|
|
for _ in range(iters):
|
|
try:
|
|
if frame := self._queue.get_nowait():
|
|
audio_bytes.extend(frame)
|
|
except:
|
|
break
|
|
|
|
return audio_bytes
|
|
|
|
|
|
def get_audio(call: SipCall, buff: io.BytesIO):
|
|
while True:
|
|
if audio_bytes := call.process_recorded_audio():
|
|
buff.write(audio_bytes)
|
|
|
|
|
|
def get_audio_duration_seconds(call: SipCall, audio_data: bytes):
|
|
num_frames = len(audio_data) // (call._channels * call._sampwidth)
|
|
return num_frames / float(call._framerate)
|
|
|
|
|
|
def read_audio_by_range(call: SipCall, buff: io.BytesIO, start_seconds, end_seconds):
|
|
audio_data = buff.getvalue()
|
|
num_frames = len(audio_data) // (call._channels * call._sampwidth)
|
|
start_frame = max(0, start_seconds * call._framerate)
|
|
end_frame = min(num_frames, end_seconds * call._framerate)
|
|
start_byte = start_frame * call._channels * call._sampwidth
|
|
end_byte = end_frame * call._channels * call._sampwidth
|
|
frames = audio_data[start_byte:end_byte]
|
|
print("read_audio_by_range duration ", get_audio_duration_seconds(call, frames))
|
|
return frames
|
|
|
|
|
|
def read_audio_by_last(call: SipCall, buff: io.BytesIO, last_seconds):
|
|
audio_data = buff.getvalue()
|
|
num_frames = len(audio_data) // (call._channels * call._sampwidth)
|
|
start_frame = max(0, num_frames - last_seconds * call._framerate)
|
|
start_byte = start_frame * call._channels * call._sampwidth
|
|
frames = audio_data[start_byte:]
|
|
print("read_audio_by_last duration ", get_audio_duration_seconds(call, frames))
|
|
return frames
|
|
|
|
|
|
def audio_to_text(call: SipCall, audio_buff: io.BytesIO):
|
|
# тут логика нарезиния кусочков аудио и отправки на распознавание
|
|
# получение из audio_buff не блокирует выполнение
|
|
# как именно возвращать текст не придумал, но тоже не блокируя нужно
|
|
pass
|
|
|
|
|
|
def main():
|
|
call = SipCall("voice.wav")
|
|
audio_buff = io.BytesIO()
|
|
t1 = threading.Thread(target=call.wave_to_queue, args=(4096, 0.1))
|
|
t2 = threading.Thread(target=get_audio, args=(call, audio_buff))
|
|
t3 = threading.Thread(target=audio_to_text, args=(call, audio_buff))
|
|
t1.start()
|
|
t2.start()
|
|
t3.start()
|
|
time.sleep(7)
|
|
|
|
frames = read_audio_by_last(call, audio_buff, 3)
|
|
with wave.open("read_audio_by_last.wav", "wb") as wf:
|
|
wf.setnchannels(call._channels)
|
|
wf.setsampwidth(call._sampwidth)
|
|
wf.setframerate(call._framerate)
|
|
wf.writeframes(frames)
|
|
|
|
frames = read_audio_by_range(call, audio_buff, 1, 7)
|
|
with wave.open("read_audio_by_range.wav", "wb") as wf:
|
|
wf.setnchannels(call._channels)
|
|
wf.setsampwidth(call._sampwidth)
|
|
wf.setframerate(call._framerate)
|
|
wf.writeframes(frames)
|
|
|
|
print("CTRL+C to exit")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|