import io import queue import threading import time import wave class SipCall: _queue: queue.Queue _file_path: str def __init__(self, file_path) -> None: self._file_path = file_path self._queue = queue.Queue() with wave.open(file_path, "rb") as f: self._channels = f.getnchannels() self._sampwidth = f.getsampwidth() self._framerate = f.getframerate() def wave_to_queue(self, chunk_size=4096, sleep=0.1): while True: with wave.open(self._file_path, "rb") as f: while True: frames = f.readframes(chunk_size) if not frames: break self._queue.put_nowait(frames) time.sleep(sleep) def process_recorded_audio(self, iters=10000): audio_bytes = bytearray() for _ in range(iters): try: if frame := self._queue.get_nowait(): audio_bytes.extend(frame) except: break return audio_bytes def get_audio(call: SipCall, buff: io.BytesIO): while True: if audio_bytes := call.process_recorded_audio(): buff.write(audio_bytes) def get_audio_duration_seconds(call: SipCall, audio_data: bytes): num_frames = len(audio_data) // (call._channels * call._sampwidth) return num_frames / float(call._framerate) def read_audio_by_range(call: SipCall, buff: io.BytesIO, start_seconds, end_seconds): audio_data = buff.getvalue() num_frames = len(audio_data) // (call._channels * call._sampwidth) start_frame = max(0, start_seconds * call._framerate) end_frame = min(num_frames, end_seconds * call._framerate) start_byte = start_frame * call._channels * call._sampwidth end_byte = end_frame * call._channels * call._sampwidth frames = audio_data[start_byte:end_byte] print("read_audio_by_range duration ", get_audio_duration_seconds(call, frames)) return frames def read_audio_by_last(call: SipCall, buff: io.BytesIO, last_seconds): audio_data = buff.getvalue() num_frames = len(audio_data) // (call._channels * call._sampwidth) start_frame = max(0, num_frames - last_seconds * call._framerate) start_byte = start_frame * call._channels * call._sampwidth frames = audio_data[start_byte:] print("read_audio_by_last duration ", get_audio_duration_seconds(call, frames)) return frames def audio_to_text(call: SipCall, audio_buff: io.BytesIO): # тут логика нарезиния кусочков аудио и отправки на распознавание # получение из audio_buff не блокирует выполнение # как именно возвращать текст не придумал, но тоже не блокируя нужно pass def main(): call = SipCall("voice.wav") audio_buff = io.BytesIO() t1 = threading.Thread(target=call.wave_to_queue, args=(4096, 0.1)) t2 = threading.Thread(target=get_audio, args=(call, audio_buff)) t3 = threading.Thread(target=audio_to_text, args=(call, audio_buff)) t1.start() t2.start() t3.start() time.sleep(7) frames = read_audio_by_last(call, audio_buff, 3) with wave.open("read_audio_by_last.wav", "wb") as wf: wf.setnchannels(call._channels) wf.setsampwidth(call._sampwidth) wf.setframerate(call._framerate) wf.writeframes(frames) frames = read_audio_by_range(call, audio_buff, 1, 7) with wave.open("read_audio_by_range.wav", "wb") as wf: wf.setnchannels(call._channels) wf.setsampwidth(call._sampwidth) wf.setframerate(call._framerate) wf.writeframes(frames) print("CTRL+C to exit") if __name__ == "__main__": main()