We are using Azure Conversation Transcriber for realtime speech to text with diarization. We need to incorporate the pause_resume feature. We tried different ways but nothing worked.
Azure only provides stop_transcribing_async() function that completely stops the current session.
I have attached the code we tried but it is not working, Any help will be appreciated. I have attached a block of code that contains the logic for pausing and resuming. Please do advice what else method we could follow.
In the below code, we stop the transcriber completely once "pause" message is passed and restart the same once the "resume" message is detected.
async def receive_audio(uuid, path):
audio_queue = Queue(maxsize=0)
transcriber_state = False
try:
conversation_transcriber, push_stream = create_conversation_transcriber(
CONNECTIONS.connections[uuid]
)
# Start continuous recognition
conversation_transcriber.start_transcribing_async().get()
transcriber_state = True
while True:
# Receive audio data from the WebSocket
websocket = CONNECTIONS.connections[uuid]["websocket"]
data = await websocket.recv()
logger.info(CONNECTIONS.connections[uuid]['state'])
if isinstance(data, str):
logger.info(f"Current State: {CONNECTIONS.connections[uuid]['state']}")
if data == "inactive":
logger.info("Pausing the transcriber...")
conversation_transcriber.stop_transcribing_async().get()
push_stream.close()
transcriber_state = False
elif data == "active" and not transcriber_state:
logger.info(f"Resuming the transcriber...")
conversation_transcriber, push_stream = create_conversation_transcriber()
conversation_transcriber.start_transcribing_async().get()
transcriber_state = True
CONNECTIONS.connections[uuid]["state"] = data
if CONNECTIONS.connections[uuid]["state"] == "active":
audio_queue.put_nowait(data)
while not audio_queue.empty():
chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
CONNECTIONS.connections[uuid]["audio_buffer"] += chunk
push_stream.write(chunk)
except websockets.exceptions.ConnectionClosed as e:
logger.info("Connection closed")
logger.info(e)
conversation_transcriber.stop_transcribing_async().get()
push_stream.close()
except Exception as e:
logger.error(f"Error in receive_audio: {e}")
finally:
await websocket.close(code=1000)
Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.
App.py:
import logging
import azure.cognitiveservices.speech as speechsdk
import asyncio
from queue import Queue
import websockets
# Assume CONNECTIONS is a global dict to manage websocket connections.
CONNECTIONS = {}
async def receive_audio(uuid, path):
audio_queue = Queue(maxsize=0)
transcriber_state = False # False means transcriber is paused
conversation_transcriber = None
push_stream = None
try:
# Get the WebSocket connection and initialize the transcriber
websocket = CONNECTIONS[uuid]["websocket"]
connection_details = CONNECTIONS[uuid]
conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)
# Start continuous recognition
conversation_transcriber.start_transcribing_async().get()
transcriber_state = True
logging.info("Started transcribing...")
while True:
# Receive control messages or audio data
data = await websocket.recv()
if isinstance(data, str):
# Handle 'inactive' and 'active' state changes (pause/resume)
logging.info(f"Received state: {data}")
if data == "inactive" and transcriber_state:
# Pausing: keep the transcriber alive, but stop sending audio
logging.info("Pausing the transcriber... (not stopping)")
transcriber_state = False
elif data == "active" and not transcriber_state:
# Resuming: continue sending audio to the transcriber
logging.info("Resuming the transcriber...")
transcriber_state = True
CONNECTIONS[uuid]["state"] = data
# If transcriber is active, continue pushing audio data
if CONNECTIONS[uuid]["state"] == "active":
audio_queue.put_nowait(data)
while not audio_queue.empty():
chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
CONNECTIONS[uuid]["audio_buffer"] += chunk
push_stream.write(chunk) # Keep writing to the open stream
except websockets.exceptions.ConnectionClosed as e:
logging.info("WebSocket connection closed.")
if conversation_transcriber:
conversation_transcriber.stop_transcribing_async().get()
if push_stream:
push_stream.close()
except Exception as e:
logging.error(f"Error in receive_audio: {e}")
finally:
await websocket.close(code=1000)
logging.info("WebSocket closed.")
def create_conversation_transcriber(connection_details):
"""Create a conversation transcriber with Azure speech configuration."""
speech_config = speechsdk.SpeechConfig(
subscription=connection_details['subscription_key'],
region=connection_details['region']
)
audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
return transcriber, push_stream
# Helper function to get chunk from queue
def get_chunk_from_queue(q, chunk_size):
return q.get_nowait()
async def main():
# Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
CONNECTIONS['dummy_uuid'] = {
'websocket': await websockets.connect('ws://localhost:8000'), # Example WebSocket endpoint
'subscription_key': 'your_azure_subscription_key',
'region': 'your_azure_region',
'audio_buffer': bytearray(),
'state': 'active' # Initial state
}
# Start receiving audio for this connection
await receive_audio('dummy_uuid', 'path/to/audio')
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.info("Starting the application...")
# Run the asyncio event loop to execute the main function
asyncio.run(main())
when you receive a "pause" command, you can buffer the incoming audio data and delay pushing it to the transcriber until a "resume" command is received.
CONNECTIONS[uuid]["state"]
controls the flow of audio to the transcriber. When the state is "inactive," the audio stream is not fed to the transcriber.
Console Log:
![enter image description here](Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.
App.py:
import logging
import azure.cognitiveservices.speech as speechsdk
import asyncio
from queue import Queue
import websockets
# Assume CONNECTIONS is a global dict to manage websocket connections.
CONNECTIONS = {}
async def receive_audio(uuid, path):
audio_queue = Queue(maxsize=0)
transcriber_state = False # False means transcriber is paused
conversation_transcriber = None
push_stream = None
try:
# Get the WebSocket connection and initialize the transcriber
websocket = CONNECTIONS[uuid]["websocket"]
connection_details = CONNECTIONS[uuid]
conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)
# Start continuous recognition
conversation_transcriber.start_transcribing_async().get()
transcriber_state = True
logging.info("Started transcribing...")
while True:
# Receive control messages or audio data
data = await websocket.recv()
if isinstance(data, str):
# Handle 'inactive' and 'active' state changes (pause/resume)
logging.info(f"Received state: {data}")
if data == "inactive" and transcriber_state:
# Pausing: keep the transcriber alive, but stop sending audio
logging.info("Pausing the transcriber... (not stopping)")
transcriber_state = False
elif data == "active" and not transcriber_state:
# Resuming: continue sending audio to the transcriber
logging.info("Resuming the transcriber...")
transcriber_state = True
CONNECTIONS[uuid]["state"] = data
# If transcriber is active, continue pushing audio data
if CONNECTIONS[uuid]["state"] == "active":
audio_queue.put_nowait(data)
while not audio_queue.empty():
chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
CONNECTIONS[uuid]["audio_buffer"] += chunk
push_stream.write(chunk) # Keep writing to the open stream
except websockets.exceptions.ConnectionClosed as e:
logging.info("WebSocket connection closed.")
if conversation_transcriber:
conversation_transcriber.stop_transcribing_async().get()
if push_stream:
push_stream.close()
except Exception as e:
logging.error(f"Error in receive_audio: {e}")
finally:
await websocket.close(code=1000)
logging.info("WebSocket closed.")
def create_conversation_transcriber(connection_details):
"""Create a conversation transcriber with Azure speech configuration."""
speech_config = speechsdk.SpeechConfig(
subscription=connection_details['subscription_key'],
region=connection_details['region']
)
audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
return transcriber, push_stream
# Helper function to get chunk from queue
def get_chunk_from_queue(q, chunk_size):
return q.get_nowait()
async def main():
# Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
CONNECTIONS['dummy_uuid'] = {
'websocket': await websockets.connect('ws://localhost:8000'), # Example WebSocket endpoint
'subscription_key': 'your_azure_subscription_key',
'region': 'your_azure_region',
'audio_buffer': bytearray(),
'state': 'active' # Initial state
}
# Start receiving audio for this connection
await receive_audio('dummy_uuid', 'path/to/audio')
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.info("Starting the application...")
# Run the asyncio event loop to execute the main function
asyncio.run(main())
when you receive a "pause" command, you can buffer the incoming audio data and delay pushing it to the transcriber until a "resume" command is received.
CONNECTIONS[uuid]["state"]
controls the flow of audio to the transcriber. When the state is "inactive," the audio stream is not fed to the transcriber.
Console Log:
)