pythonazurepython-asyncioazure-cognitive-services

How to include Pause and Resume Feature with Azure conversation transcriber?


We are using Azure Conversation Transcriber for realtime speech to text with diarization. We need to incorporate the pause_resume feature. We tried different ways but nothing worked.

Azure only provides stop_transcribing_async() function that completely stops the current session.

I have attached the code we tried but it is not working, Any help will be appreciated. I have attached a block of code that contains the logic for pausing and resuming. Please do advice what else method we could follow.

In the below code, we stop the transcriber completely once "pause" message is passed and restart the same once the "resume" message is detected.

async def receive_audio(uuid, path):

    audio_queue = Queue(maxsize=0)
    
    transcriber_state = False
    try:
        conversation_transcriber, push_stream = create_conversation_transcriber(
            CONNECTIONS.connections[uuid]
        )

        # Start continuous recognition
        conversation_transcriber.start_transcribing_async().get()
        transcriber_state = True
        
        while True:
            # Receive audio data from the WebSocket
            websocket = CONNECTIONS.connections[uuid]["websocket"]
            data = await websocket.recv()
            
            logger.info(CONNECTIONS.connections[uuid]['state'])
            if isinstance(data, str):

                logger.info(f"Current State: {CONNECTIONS.connections[uuid]['state']}")
                if data == "inactive":
                    logger.info("Pausing the transcriber...")
                    conversation_transcriber.stop_transcribing_async().get()
                    push_stream.close()
                    transcriber_state = False
                
                elif data == "active" and not transcriber_state:
                    logger.info(f"Resuming the transcriber...")
                    conversation_transcriber, push_stream = create_conversation_transcriber()                
                    conversation_transcriber.start_transcribing_async().get()
                    transcriber_state = True
                
                CONNECTIONS.connections[uuid]["state"] = data
                    
            
            if CONNECTIONS.connections[uuid]["state"] == "active":
                audio_queue.put_nowait(data)
                while not audio_queue.empty():
                    chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                    CONNECTIONS.connections[uuid]["audio_buffer"] += chunk
                    push_stream.write(chunk)

    except websockets.exceptions.ConnectionClosed as e:
        logger.info("Connection closed")
        logger.info(e)
        conversation_transcriber.stop_transcribing_async().get()
        push_stream.close()
    except Exception as e:
        logger.error(f"Error in receive_audio: {e}")

    finally:
        await websocket.close(code=1000)

Solution

  • Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.

    App.py:

    import logging
    import azure.cognitiveservices.speech as speechsdk
    import asyncio
    from queue import Queue
    import websockets
    
    # Assume CONNECTIONS is a global dict to manage websocket connections.
    CONNECTIONS = {}
    
    async def receive_audio(uuid, path):
        audio_queue = Queue(maxsize=0)
        transcriber_state = False  # False means transcriber is paused
        conversation_transcriber = None
        push_stream = None
    
        try:
            # Get the WebSocket connection and initialize the transcriber
            websocket = CONNECTIONS[uuid]["websocket"]
            connection_details = CONNECTIONS[uuid]
            
            conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)
    
            # Start continuous recognition
            conversation_transcriber.start_transcribing_async().get()
            transcriber_state = True
            logging.info("Started transcribing...")
    
            while True:
                # Receive control messages or audio data
                data = await websocket.recv()
    
                if isinstance(data, str):
                    # Handle 'inactive' and 'active' state changes (pause/resume)
                    logging.info(f"Received state: {data}")
                    if data == "inactive" and transcriber_state:
                        # Pausing: keep the transcriber alive, but stop sending audio
                        logging.info("Pausing the transcriber... (not stopping)")
                        transcriber_state = False
    
                    elif data == "active" and not transcriber_state:
                        # Resuming: continue sending audio to the transcriber
                        logging.info("Resuming the transcriber...")
                        transcriber_state = True
                    
                    CONNECTIONS[uuid]["state"] = data
                
                # If transcriber is active, continue pushing audio data
                if CONNECTIONS[uuid]["state"] == "active":
                    audio_queue.put_nowait(data)
                    while not audio_queue.empty():
                        chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                        CONNECTIONS[uuid]["audio_buffer"] += chunk
                        push_stream.write(chunk)  # Keep writing to the open stream
    
        except websockets.exceptions.ConnectionClosed as e:
            logging.info("WebSocket connection closed.")
            if conversation_transcriber:
                conversation_transcriber.stop_transcribing_async().get()
            if push_stream:
                push_stream.close()
        except Exception as e:
            logging.error(f"Error in receive_audio: {e}")
        finally:
            await websocket.close(code=1000)
            logging.info("WebSocket closed.")
    
    def create_conversation_transcriber(connection_details):
        """Create a conversation transcriber with Azure speech configuration."""
        speech_config = speechsdk.SpeechConfig(
            subscription=connection_details['subscription_key'],
            region=connection_details['region']
        )
        audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
        push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
        audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
        
        transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
        
        return transcriber, push_stream
    
    # Helper function to get chunk from queue
    def get_chunk_from_queue(q, chunk_size):
        return q.get_nowait()
    
    async def main():
        # Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
        CONNECTIONS['dummy_uuid'] = {
            'websocket': await websockets.connect('ws://localhost:8000'),  # Example WebSocket endpoint
            'subscription_key': 'your_azure_subscription_key',
            'region': 'your_azure_region',
            'audio_buffer': bytearray(),
            'state': 'active'  # Initial state
        }
    
        # Start receiving audio for this connection
        await receive_audio('dummy_uuid', 'path/to/audio')
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.INFO)
        logging.info("Starting the application...")
        
        # Run the asyncio event loop to execute the main function
        asyncio.run(main())
    

    Console Log:

    ![enter image description here](Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.

    App.py:

    import logging
    import azure.cognitiveservices.speech as speechsdk
    import asyncio
    from queue import Queue
    import websockets
    
    # Assume CONNECTIONS is a global dict to manage websocket connections.
    CONNECTIONS = {}
    
    async def receive_audio(uuid, path):
        audio_queue = Queue(maxsize=0)
        transcriber_state = False  # False means transcriber is paused
        conversation_transcriber = None
        push_stream = None
    
        try:
            # Get the WebSocket connection and initialize the transcriber
            websocket = CONNECTIONS[uuid]["websocket"]
            connection_details = CONNECTIONS[uuid]
            
            conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)
    
            # Start continuous recognition
            conversation_transcriber.start_transcribing_async().get()
            transcriber_state = True
            logging.info("Started transcribing...")
    
            while True:
                # Receive control messages or audio data
                data = await websocket.recv()
    
                if isinstance(data, str):
                    # Handle 'inactive' and 'active' state changes (pause/resume)
                    logging.info(f"Received state: {data}")
                    if data == "inactive" and transcriber_state:
                        # Pausing: keep the transcriber alive, but stop sending audio
                        logging.info("Pausing the transcriber... (not stopping)")
                        transcriber_state = False
    
                    elif data == "active" and not transcriber_state:
                        # Resuming: continue sending audio to the transcriber
                        logging.info("Resuming the transcriber...")
                        transcriber_state = True
                    
                    CONNECTIONS[uuid]["state"] = data
                
                # If transcriber is active, continue pushing audio data
                if CONNECTIONS[uuid]["state"] == "active":
                    audio_queue.put_nowait(data)
                    while not audio_queue.empty():
                        chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                        CONNECTIONS[uuid]["audio_buffer"] += chunk
                        push_stream.write(chunk)  # Keep writing to the open stream
    
        except websockets.exceptions.ConnectionClosed as e:
            logging.info("WebSocket connection closed.")
            if conversation_transcriber:
                conversation_transcriber.stop_transcribing_async().get()
            if push_stream:
                push_stream.close()
        except Exception as e:
            logging.error(f"Error in receive_audio: {e}")
        finally:
            await websocket.close(code=1000)
            logging.info("WebSocket closed.")
    
    def create_conversation_transcriber(connection_details):
        """Create a conversation transcriber with Azure speech configuration."""
        speech_config = speechsdk.SpeechConfig(
            subscription=connection_details['subscription_key'],
            region=connection_details['region']
        )
        audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
        push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
        audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
        
        transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
        
        return transcriber, push_stream
    
    # Helper function to get chunk from queue
    def get_chunk_from_queue(q, chunk_size):
        return q.get_nowait()
    
    async def main():
        # Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
        CONNECTIONS['dummy_uuid'] = {
            'websocket': await websockets.connect('ws://localhost:8000'),  # Example WebSocket endpoint
            'subscription_key': 'your_azure_subscription_key',
            'region': 'your_azure_region',
            'audio_buffer': bytearray(),
            'state': 'active'  # Initial state
        }
    
        # Start receiving audio for this connection
        await receive_audio('dummy_uuid', 'path/to/audio')
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.INFO)
        logging.info("Starting the application...")
        
        # Run the asyncio event loop to execute the main function
        asyncio.run(main())
    

    Console Log:

    enter image description here)