Files
wave-threading/main.py
Tolya Khabarov 0a5bedcc94 demo
2025-02-11 23:43:21 +07:00

113 lines
3.8 KiB
Python

import io
import queue
import threading
import time
import wave
class SipCall:
_queue: queue.Queue
_file_path: str
def __init__(self, file_path) -> None:
self._file_path = file_path
self._queue = queue.Queue()
with wave.open(file_path, "rb") as f:
self._channels = f.getnchannels()
self._sampwidth = f.getsampwidth()
self._framerate = f.getframerate()
def wave_to_queue(self, chunk_size=4096, sleep=0.1):
while True:
with wave.open(self._file_path, "rb") as f:
while True:
frames = f.readframes(chunk_size)
if not frames:
break
self._queue.put_nowait(frames)
time.sleep(sleep)
def process_recorded_audio(self, iters=10000):
audio_bytes = bytearray()
for _ in range(iters):
try:
if frame := self._queue.get_nowait():
audio_bytes.extend(frame)
except:
break
return audio_bytes
def get_audio(call: SipCall, buff: io.BytesIO):
while True:
if audio_bytes := call.process_recorded_audio():
buff.write(audio_bytes)
def get_audio_duration_seconds(call: SipCall, audio_data: bytes):
num_frames = len(audio_data) // (call._channels * call._sampwidth)
return num_frames / float(call._framerate)
def read_audio_by_range(call: SipCall, buff: io.BytesIO, start_seconds, end_seconds):
audio_data = buff.getvalue()
num_frames = len(audio_data) // (call._channels * call._sampwidth)
start_frame = max(0, start_seconds * call._framerate)
end_frame = min(num_frames, end_seconds * call._framerate)
start_byte = start_frame * call._channels * call._sampwidth
end_byte = end_frame * call._channels * call._sampwidth
frames = audio_data[start_byte:end_byte]
print("read_audio_by_range duration ", get_audio_duration_seconds(call, frames))
return frames
def read_audio_by_last(call: SipCall, buff: io.BytesIO, last_seconds):
audio_data = buff.getvalue()
num_frames = len(audio_data) // (call._channels * call._sampwidth)
start_frame = max(0, num_frames - last_seconds * call._framerate)
start_byte = start_frame * call._channels * call._sampwidth
frames = audio_data[start_byte:]
print("read_audio_by_last duration ", get_audio_duration_seconds(call, frames))
return frames
def audio_to_text(call: SipCall, audio_buff: io.BytesIO):
# тут логика нарезиния кусочков аудио и отправки на распознавание
# получение из audio_buff не блокирует выполнение
# как именно возвращать текст не придумал, но тоже не блокируя нужно
pass
def main():
call = SipCall("voice.wav")
audio_buff = io.BytesIO()
t1 = threading.Thread(target=call.wave_to_queue, args=(4096, 0.1))
t2 = threading.Thread(target=get_audio, args=(call, audio_buff))
t3 = threading.Thread(target=audio_to_text, args=(call, audio_buff))
t1.start()
t2.start()
t3.start()
time.sleep(7)
frames = read_audio_by_last(call, audio_buff, 3)
with wave.open("read_audio_by_last.wav", "wb") as wf:
wf.setnchannels(call._channels)
wf.setsampwidth(call._sampwidth)
wf.setframerate(call._framerate)
wf.writeframes(frames)
frames = read_audio_by_range(call, audio_buff, 1, 7)
with wave.open("read_audio_by_range.wav", "wb") as wf:
wf.setnchannels(call._channels)
wf.setsampwidth(call._sampwidth)
wf.setframerate(call._framerate)
wf.writeframes(frames)
print("CTRL+C to exit")
if __name__ == "__main__":
main()