pythonpepper

Pepper robot error: A grammar named "modifiable_grammar" already exists


I am programming a interaction script in python 2.7 for the robot Pepper but when I try to set the vocabulary to detect the words I need, I get this error:

Traceback (most recent call last):
  File "choice_script.py", line 225, in <module>
    text, value = ask_pepper_question(question2, actions2)
  File "choice_script.py", line 209, in ask_pepper_question
    return handler.ask_question(question, response_actions, timeout)
  File "choice_script.py", line 70, in ask_question
    self._setup_vocabulary(possible_responses)
  File "choice_script.py", line 124, in _setup_vocabulary
    self.asr.setVocabulary(vocabulary, False)
  File "/opt/aldebaran/lib/python2.7/site-packages/naoqi.py", line 194, in __call__
    return self.__wrapped__.method_missing(self.__method__, *args, **kwargs)
  File "/opt/aldebaran/lib/python2.7/site-packages/naoqi.py", line 264, in method_missing
    raise e
RuntimeError:   ALSpeechRecognition::setVocabulary
                NuanceContext::addContext
        A grammar named "modifiable_grammar" already exists.

At first it worked by simply checking if i had already set the vocabulary so omitting the set line but now i need to change it so that doesnt work.

This is the code I would like to run:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Script para que Pepper reaccione a respuestas personalizables
Requiere Python 2.7 y NAOqi SDK
"""

from __future__ import print_function
from naoqi import ALProxy
import time

# Clave de ALMemory para marcar vocab inicializado
VOCAB_FLAG = "MyApp/VocabInitialized"
DISTINCT_VOCABS = []

class PepperInteractionHandler:
    def __init__(self, robot_ip="localhost", robot_port=9559):
        """
        Inicializa el manejador de interacciones de Pepper
        """
        self.robot_ip = robot_ip
        self.robot_port = robot_port
        self.tts = None
        self.asr = None
        self.memory = None
        self._initialize_robot()

    def _initialize_robot(self):
        """
        Inicializa los proxies del robot
        """
        try:
            self.tts = ALProxy("ALTextToSpeech", self.robot_ip, self.robot_port)
            self.asr = ALProxy("ALSpeechRecognition", self.robot_ip, self.robot_port)
            self.memory = ALProxy("ALMemory", self.robot_ip, self.robot_port)
            
            # Configurar idioma
            self.asr.setLanguage("Spanish")
            print("Robot inicializado correctamente")
            
        except Exception as e:
            print("Error conectando al robot: {}".format(e))
            print("Asegúrate de que:")
            print("1. El robot esté encendido")
            print("2. La IP sea correcta")
            print("3. Estés en la misma red")
            raise

    def ask_question(self, question, response_actions, timeout=15, confidence_threshold=0.5):
        """
        Hace una pregunta al usuario y procesa la respuesta
        
        Args:
            question (str): La pregunta que hará el robot
            response_actions (dict): Diccionario que mapea cada respuesta a la acción del robot
                                   Formato: {"respuesta": {"text": "texto_respuesta", "value": numero}}
            timeout (int): Tiempo límite para esperar respuesta en segundos
            confidence_threshold (float): Umbral de confianza para aceptar la respuesta
            
        Returns:
            tuple: (texto_respuesta, valor_numerico) o (None, None) si no se reconoció
        """
        if not self.tts or not self.asr or not self.memory:
            print("Error: Robot no inicializado correctamente")
            return (None, None)

        try:
            # Extraer respuestas posibles del diccionario
            possible_responses = list(response_actions.keys())
            
            # Configurar vocabulario dinámicamente
            self._setup_vocabulary(possible_responses)
            
            # Hacer la pregunta
            self.tts.say(question)
            
            # Iniciar reconocimiento de voz
            self.asr.subscribe("Dynamic_ASR")
            
            print("Robot esperando respuesta...")
            print("Respuestas posibles: {}".format(", ".join(possible_responses)))
            
            # Limpiar datos anteriores de memoria
            self.memory.insertData("WordRecognized", [])
            
            # Esperar respuesta del usuario
            recognized_response = self._wait_for_response(timeout, confidence_threshold)
            
            if recognized_response:
                # Procesar la respuesta y devolver tupla
                return self._process_response(recognized_response, response_actions)
            else:
                self.tts.say("No pude escuchar tu respuesta. Intenta hablar más claro.")
                return (None, None)
                
        except Exception as e:
            print("Error durante la interacción: {}".format(e))
            print(e)
            return (None, None)
        finally:
            # Asegurar que se detenga el reconocimiento
            try:
                self.asr.unsubscribe("Dynamic_ASR")
            except:
                pass

    def _setup_vocabulary(self, possible_responses):
        """
        Configura el vocabulario dinámicamente basado en las respuestas posibles
        """
        try:
            # Pausar ASR para cambiar vocabulario
            self.asr.pause(True)
            
            # Crear vocabulario expandido (incluir variaciones)
            vocabulary = []
            for response in possible_responses:
                vocabulary.append(response.lower())
                vocabulary.append(response.upper())
                vocabulary.append(response.capitalize())
            
            # Eliminar duplicados
            vocabulary = list(set(vocabulary))
            
            self.asr.setVocabulary(vocabulary, False)
            
            # Reanudar ASR
            self.asr.pause(False)
            
            print("Vocabulario configurado: {}".format(vocabulary))
            
        except Exception as e:
            print("Error configurando vocabulario")
            print(e)
            raise e

    def _wait_for_response(self, timeout, confidence_threshold):
        """
        Espera y procesa la respuesta del usuario
        """
        recognized = False
        start_time = time.time()
        time.sleep(1.0)  # Esperar antes de empezar a escuchar
        
        while not recognized and (time.time() - start_time) < timeout:
            # Verificar si hay una palabra reconocida
            word_recognized = self.memory.getData("WordRecognized")
            if word_recognized and len(word_recognized) > 1:
                recognized_word = word_recognized[0]
                confidence = word_recognized[1]
                
                print("Palabra detectada: '{}' con confianza: {:.2f}".format(recognized_word, confidence))
                
                if confidence > confidence_threshold:
                    print("Palabra reconocida con suficiente confianza: {}".format(recognized_word))
                    # Limpiar memoria después del reconocimiento
                    self.memory.insertData("WordRecognized", [])
                    return recognized_word
                else:
                    print("Confianza insuficiente ({:.2f}), continuando...".format(confidence))
                    # Limpiar para evitar re-procesamiento
                    self.memory.insertData("WordRecognized", [])
            
            time.sleep(0.1)
        
        return None

    def _process_response(self, recognized_word, response_actions):
        """
        Procesa la respuesta reconocida y devuelve la tupla correspondiente
        
        Returns:
            tuple: (texto_respuesta, valor_numerico) o (None, None) si no se encuentra
        """
        word_lower = recognized_word.lower()
        
        # Buscar la acción correspondiente
        for response_key, action_data in response_actions.items():
            if word_lower == response_key.lower():
                text = action_data.get("text", "")
                value = action_data.get("value", 0)
                
                print("Usuario respondió '{}' - Devolviendo: ('{}', {})".format(
                    recognized_word, text, value))
                return (text, value)
        
        # Si no se encuentra una acción específica
        print("Respuesta no reconocida en acciones: '{}'".format(recognized_word))
        return (None, None)


# Función de conveniencia para usar sin clase
def ask_pepper_question(question, response_actions, robot_ip="localhost", robot_port=9559, timeout=15):
    """
    Función de conveniencia para hacer una pregunta sin instanciar la clase
    
    Args:
        question (str): La pregunta que hará el robot
        response_actions (dict): Diccionario que mapea respuestas a acciones del robot
                               Formato: {"respuesta": {"text": "texto", "value": numero}}
        robot_ip (str): IP del robot
        robot_port (int): Puerto del robot
        timeout (int): Tiempo límite para respuesta
        
    Returns:
        tuple: (texto_respuesta, valor_numerico) o (None, None) si no se reconoció
    """
    handler = PepperInteractionHandler(robot_ip, robot_port)
    return handler.ask_question(question, response_actions, timeout)


# Ejemplo de uso
if __name__ == "__main__":

    print("\n=== Ejemplo 2: Múltiples opciones ===")
    # Ejemplo 2: Pregunta de múltiples opciones con diferentes valores
    question2 = "¿Qué te gustaría hacer? Puedes decir: bailar, cantar o hablar."
    actions2 = {
        "bailar": {"text": "¡Perfecto! Vamos a bailar juntos.", "value": 1},
        "cantar": {"text": "¡Qué divertido! Me encanta cantar.", "value": 2},
        "hablar": {"text": "Excelente, podemos tener una buena conversación.", "value": 3}
    }
    
    print("\n=== Ejemplo 2: Múltiples opciones ===")
    text, value = ask_pepper_question(question2, actions2)
    print("Resultado: ('{}', {})".format(text, value))
    
    # Ejemplo de uso del valor numérico para lógica de control
    if value == 1:
        print("Activando modo baile...")
    elif value == 2:
        print("Activando modo canto...")
    elif value == 3:
        print("Activando modo conversación...")
    else:
        print("No se reconoció la respuesta")

    print("\n=== Ejemplo 1: Múltiples opciones ===")
    # Ejemplo 1: Pregunta Sí/No con valores numéricos
    question1 = "¿Tienes alguna otra pregunta?"
    actions1 = {
        "si": {"text": "¡Vamos!", "value": 1},
        "no": {"text": "Entiendo. Ha sido un placer. No dudes en volver a consultarme.", "value": 0}
    }
    
    print("=== Ejemplo 1: Pregunta Sí/No ===")
    text, value = ask_pepper_question(question1, actions1)
    print("Resultado: ('{}', {})".format(text, value))
    
    # Si quieres que el robot diga el texto, lo puedes hacer después:
    if text:
        handler = PepperInteractionHandler()
        handler.tts.say(text)
    

I tried running the code with multiple inputs and it should work without the grammar error. But the error appeared randomly, usually in the second or so run.

This is the code of the naoqi.py file if it were of use:

import os
import sys
import ctypes
import weakref
import logging
import inspect
from functools import partial

import qi


def set_dll_directory():
    this_dir = os.path.abspath(os.path.dirname(__file__))
    bin_dir = os.path.join(this_dir, "..", "bin")
    if os.path.exists(bin_dir):
        ctypes.windll.kernel32.SetDllDirectoryA(bin_dir)

def load_inaoqi_deps():
    """ Helper to laod _inaoqi.so deps on linux """
    deps = [
            "libboost_python.so",
            "libboost_system.so",
            "libboost_chrono.so",
            "libboost_program_options.so",
            "libboost_thread.so",
            "libboost_filesystem.so",
            "libboost_regex.so",
            "libboost_locale.so",
            "libboost_signals.so",
            "libqi.so",
            "libalerror.so",
            "libalthread.so",
            "libalvalue.so",
            "libalcommon.so",
            "libalproxies.so",
            "libalpythontools.so",
            "libqipython.so",
            "libinaoqi.so"
    ]
    relpaths = [
            # in pynaoqi, this file is /naoqi.py and we search for /libqi.so
            [],
            # in deploys/packages/etc,
            # this file is $PREFIX/lib/python2.7/site-packages/naoqi.py
            # and we need $PREFIX/lib/libqi.so
            ["..", ".."],
            ]
    this_dir = os.path.abspath(os.path.dirname(__file__))
    for dep in deps:
        for relpath in relpaths:
            list_path = [this_dir] + relpath + [dep]
            full_path = os.path.join(*list_path)
            try:
                ctypes.cdll.LoadLibrary(full_path)
            except Exception:
                pass

if sys.platform.startswith("linux"):
    load_inaoqi_deps()

if sys.platform.startswith("win"):
    set_dll_directory()

import inaoqi

import motion
import allog

def _getMethodParamCount(func):
    r = inspect.getargspec(func)
    #normal args
    np = len(r[0])
    #*args (none if non existent)
    if r[1] is not None:
        np = np + 1
    #**kargs  (none if non existent)
    if r[2] is not None:
        np = np + 1
    return np

def autoBind(myClass, bindIfnoDocumented):
  """Show documentation for each
  method of the class"""

  # dir(myClass) is a list of the names of
  # everything in class
  myClass.setModuleDescription(myClass.__doc__)

  for thing in dir(myClass):
    # getattr(x, "y") is exactly: x.y
    function = getattr(myClass, thing)
    if callable(function):
        if (type(function) == type(myClass.__init__)):
            if (bindIfnoDocumented or function.__doc__ != ""):
                if (thing[0] != "_"):  # private method
                    if (function.__doc__):
                        myClass.functionName(thing, myClass.getName(), function.__doc__)
                    else:
                        myClass.functionName(thing, myClass.getName(), "")

                    for param in function.func_code.co_varnames:
                        if (param != "self"):
                            myClass.addParam(param)
                        myClass._bindWithParam(myClass.getName(),thing, _getMethodParamCount(function)-1)



class ALDocable():
  def __init__(self, bindIfnoDocumented):
    autoBind(self,bindIfnoDocumented)


# define the log handler to be used by the logging module
class ALLogHandler(logging.Handler):
  def __init__(self):
    logging.Handler.__init__(self)

  def emit(self, record):
    level_to_function = {
      logging.DEBUG: allog.debug,
      logging.INFO: allog.info,
      logging.WARNING: allog.warning,
      logging.ERROR: allog.error,
      logging.CRITICAL: allog.fatal,
    }
    function = level_to_function.get(record.levelno, allog.debug)
    function(record.getMessage(),
             record.name,
             record.filename,
             record.funcName,
             record.lineno)


# define a class that will be inherited by both ALModule and ALBehavior, to store instances of modules, so a bound method can be called on them.
class NaoQiModule():
  _modules = dict()

  @classmethod
  def getModule(cls, name):
    # returns a reference a module, giving its string, if it exists !
    if(name not in cls._modules):
      raise RuntimeError("Module " + str(name) + " does not exist")
    return cls._modules[name]()

  def __init__(self, name, logger=True):
    # keep a weak reference to ourself, so a proxy can be called on this module easily
    self._modules[name] = weakref.ref(self)
    self.loghandler = None
    if logger:
        self.logger = logging.getLogger(name)
        self.loghandler = ALLogHandler()
        self.logger.addHandler(self.loghandler)
        self.logger.setLevel(logging.DEBUG)

  def __del__(self):
    # when object is deleted, clean up dictionnary so we do not keep a weak reference to it
    del self._modules[self.getName()]
    if(self.loghandler != None):
        self.logger.removeHandler(self.loghandler)


class ALBroker(inaoqi.broker):
    def init(self):
        pass

class ALModule(inaoqi.module, ALDocable, NaoQiModule):

  def __init__(self,param):
    inaoqi.module.__init__(self, param)
    ALDocable.__init__(self, False)
    NaoQiModule.__init__(self, param)
    self.registerToBroker()

  def __del__(self):
    NaoQiModule.__del__(self)

  def methodtest(self):
    pass

  def pythonChanged(self, param1, param2, param3):
    pass

class MethodMissingMixin(object):
    """ A Mixin' to implement the 'method_missing' Ruby-like protocol. """
    def __getattribute__(self, attr):
        try:
            return object.__getattribute__(self, attr)
        except:
            class MethodMissing(object):
                def __init__(self, wrapped, method):
                    self.__wrapped__ = wrapped
                    self.__method__ = method
                def __call__(self, *args, **kwargs):
                    return self.__wrapped__.method_missing(self.__method__, *args, **kwargs)

            return MethodMissing(self, attr)

    def method_missing(self, *args, **kwargs):
        """ This method should be overridden in the derived class. """
        raise NotImplementedError(str(self.__wrapped__) + " 'method_missing' method has not been implemented.")


class postType(MethodMissingMixin):
    def __init__(self):
        ""

    def setProxy(self, proxy):
        self.proxy = weakref.ref(proxy)
     #   print name

    def method_missing(self, method, *args, **kwargs):
          list = []
          list.append(method)
          for arg in args:
            list.append(arg)
          result = 0
          try:
                  p =  self.proxy()
                  result = p.pythonPCall(list)
          except RuntimeError,e:
                raise e

          return result



class ALProxy(inaoqi.proxy,MethodMissingMixin):

    def __init__(self, *args):
        self.post = postType()
        self.post.setProxy(self)
        if (len (args) == 1):
            inaoqi.proxy.__init__(self, args[0])
        elif (len (args) == 2):
            inaoqi.proxy.__init__(self, args[0],  args[1])
        else:
            inaoqi.proxy.__init__(self, args[0], args[1], args[2])

    def call(self, *args):
        list = []
        for arg in args:
            list.append(arg)

        return self.pythonCall(list)


    def pCall(self, *args):
        list = []
        for arg in args:
            list.append(arg)

        return self.pythonPCall(list)


    def method_missing(self, method, *args, **kwargs):
          list = []
          list.append(method)
          for arg in args:
            list.append(arg)
          result = 0
          try:
                result = self.pythonCall(list)
          except RuntimeError,e:
                raise e
                #print e.args[0]

          return result

    @staticmethod
    def initProxy(name, fut):
        try:
            proxy = ALProxy(name)
        except:
            print "Error: cannot get proxy to %s even though we waited for it" % name
            return

        globals()[name] = proxy
        print "Got proxy to " + name

        global _initCb
        if _initCb is not None:
            _initCb(name)

    @staticmethod
    def lazyLoad(session, name):
        session.waitForService(name, _async = True).addCallback(partial(ALProxy.initProxy, name))

    @staticmethod
    def initProxies(initCb = None):
        global _initCb
        _initCb = initCb

        #Warning: The use of these default proxies is deprecated.
        global ALLeds
        ALLeds = None

        session = inaoqi._getDefaultSession()

        ALProxy.lazyLoad(session, "ALFrameManager")
        ALProxy.lazyLoad(session, "ALMemory")
        ALProxy.lazyLoad(session, "ALMotion")
        ALProxy.lazyLoad(session, "ALLeds")
        ALProxy.lazyLoad(session, "ALLogger")
        ALProxy.lazyLoad(session, "ALSensors")


def createModule(name):
    global moduleList
    str = "moduleList.append("+ "module(\"" + name + "\"))"
    exec(str)

Solution

  • Disclaimer: I've also encountered this issue while using the python3 libqi-python package - thus it is possible that this may not be applicable in your case (though I really think it should also work with the 2.7 version).

    As @furas mentioned, this issue can be fixed by changing the language. What worked in my case (installed languages english and german) was swapping between the installed languages and then setting the desired one last before starting the speech recognition.

    def start_speech_recognition_service(language, keywords):
            speech_recognition.setLanguage("German")
            speech_recognition.setLanguage("English")
    
            speech_recognition.setLanguage(language)
            speech_recognition.setVocabulary(keywords, False)
            speech_recognition.setAudioExpression(True)
            speech_recognition.setVisualExpression(True)
    
            speech_recognition.pause(False)
            speech_recognition.subscribe("speech_recognition")