I am programming a interaction script in python 2.7 for the robot Pepper but when I try to set the vocabulary to detect the words I need, I get this error:
Traceback (most recent call last):
File "choice_script.py", line 225, in <module>
text, value = ask_pepper_question(question2, actions2)
File "choice_script.py", line 209, in ask_pepper_question
return handler.ask_question(question, response_actions, timeout)
File "choice_script.py", line 70, in ask_question
self._setup_vocabulary(possible_responses)
File "choice_script.py", line 124, in _setup_vocabulary
self.asr.setVocabulary(vocabulary, False)
File "/opt/aldebaran/lib/python2.7/site-packages/naoqi.py", line 194, in __call__
return self.__wrapped__.method_missing(self.__method__, *args, **kwargs)
File "/opt/aldebaran/lib/python2.7/site-packages/naoqi.py", line 264, in method_missing
raise e
RuntimeError: ALSpeechRecognition::setVocabulary
NuanceContext::addContext
A grammar named "modifiable_grammar" already exists.
At first it worked by simply checking if i had already set the vocabulary so omitting the set line but now i need to change it so that doesnt work.
This is the code I would like to run:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Script para que Pepper reaccione a respuestas personalizables
Requiere Python 2.7 y NAOqi SDK
"""
from __future__ import print_function
from naoqi import ALProxy
import time
# Clave de ALMemory para marcar vocab inicializado
VOCAB_FLAG = "MyApp/VocabInitialized"
DISTINCT_VOCABS = []
class PepperInteractionHandler:
def __init__(self, robot_ip="localhost", robot_port=9559):
"""
Inicializa el manejador de interacciones de Pepper
"""
self.robot_ip = robot_ip
self.robot_port = robot_port
self.tts = None
self.asr = None
self.memory = None
self._initialize_robot()
def _initialize_robot(self):
"""
Inicializa los proxies del robot
"""
try:
self.tts = ALProxy("ALTextToSpeech", self.robot_ip, self.robot_port)
self.asr = ALProxy("ALSpeechRecognition", self.robot_ip, self.robot_port)
self.memory = ALProxy("ALMemory", self.robot_ip, self.robot_port)
# Configurar idioma
self.asr.setLanguage("Spanish")
print("Robot inicializado correctamente")
except Exception as e:
print("Error conectando al robot: {}".format(e))
print("Asegúrate de que:")
print("1. El robot esté encendido")
print("2. La IP sea correcta")
print("3. Estés en la misma red")
raise
def ask_question(self, question, response_actions, timeout=15, confidence_threshold=0.5):
"""
Hace una pregunta al usuario y procesa la respuesta
Args:
question (str): La pregunta que hará el robot
response_actions (dict): Diccionario que mapea cada respuesta a la acción del robot
Formato: {"respuesta": {"text": "texto_respuesta", "value": numero}}
timeout (int): Tiempo límite para esperar respuesta en segundos
confidence_threshold (float): Umbral de confianza para aceptar la respuesta
Returns:
tuple: (texto_respuesta, valor_numerico) o (None, None) si no se reconoció
"""
if not self.tts or not self.asr or not self.memory:
print("Error: Robot no inicializado correctamente")
return (None, None)
try:
# Extraer respuestas posibles del diccionario
possible_responses = list(response_actions.keys())
# Configurar vocabulario dinámicamente
self._setup_vocabulary(possible_responses)
# Hacer la pregunta
self.tts.say(question)
# Iniciar reconocimiento de voz
self.asr.subscribe("Dynamic_ASR")
print("Robot esperando respuesta...")
print("Respuestas posibles: {}".format(", ".join(possible_responses)))
# Limpiar datos anteriores de memoria
self.memory.insertData("WordRecognized", [])
# Esperar respuesta del usuario
recognized_response = self._wait_for_response(timeout, confidence_threshold)
if recognized_response:
# Procesar la respuesta y devolver tupla
return self._process_response(recognized_response, response_actions)
else:
self.tts.say("No pude escuchar tu respuesta. Intenta hablar más claro.")
return (None, None)
except Exception as e:
print("Error durante la interacción: {}".format(e))
print(e)
return (None, None)
finally:
# Asegurar que se detenga el reconocimiento
try:
self.asr.unsubscribe("Dynamic_ASR")
except:
pass
def _setup_vocabulary(self, possible_responses):
"""
Configura el vocabulario dinámicamente basado en las respuestas posibles
"""
try:
# Pausar ASR para cambiar vocabulario
self.asr.pause(True)
# Crear vocabulario expandido (incluir variaciones)
vocabulary = []
for response in possible_responses:
vocabulary.append(response.lower())
vocabulary.append(response.upper())
vocabulary.append(response.capitalize())
# Eliminar duplicados
vocabulary = list(set(vocabulary))
self.asr.setVocabulary(vocabulary, False)
# Reanudar ASR
self.asr.pause(False)
print("Vocabulario configurado: {}".format(vocabulary))
except Exception as e:
print("Error configurando vocabulario")
print(e)
raise e
def _wait_for_response(self, timeout, confidence_threshold):
"""
Espera y procesa la respuesta del usuario
"""
recognized = False
start_time = time.time()
time.sleep(1.0) # Esperar antes de empezar a escuchar
while not recognized and (time.time() - start_time) < timeout:
# Verificar si hay una palabra reconocida
word_recognized = self.memory.getData("WordRecognized")
if word_recognized and len(word_recognized) > 1:
recognized_word = word_recognized[0]
confidence = word_recognized[1]
print("Palabra detectada: '{}' con confianza: {:.2f}".format(recognized_word, confidence))
if confidence > confidence_threshold:
print("Palabra reconocida con suficiente confianza: {}".format(recognized_word))
# Limpiar memoria después del reconocimiento
self.memory.insertData("WordRecognized", [])
return recognized_word
else:
print("Confianza insuficiente ({:.2f}), continuando...".format(confidence))
# Limpiar para evitar re-procesamiento
self.memory.insertData("WordRecognized", [])
time.sleep(0.1)
return None
def _process_response(self, recognized_word, response_actions):
"""
Procesa la respuesta reconocida y devuelve la tupla correspondiente
Returns:
tuple: (texto_respuesta, valor_numerico) o (None, None) si no se encuentra
"""
word_lower = recognized_word.lower()
# Buscar la acción correspondiente
for response_key, action_data in response_actions.items():
if word_lower == response_key.lower():
text = action_data.get("text", "")
value = action_data.get("value", 0)
print("Usuario respondió '{}' - Devolviendo: ('{}', {})".format(
recognized_word, text, value))
return (text, value)
# Si no se encuentra una acción específica
print("Respuesta no reconocida en acciones: '{}'".format(recognized_word))
return (None, None)
# Función de conveniencia para usar sin clase
def ask_pepper_question(question, response_actions, robot_ip="localhost", robot_port=9559, timeout=15):
"""
Función de conveniencia para hacer una pregunta sin instanciar la clase
Args:
question (str): La pregunta que hará el robot
response_actions (dict): Diccionario que mapea respuestas a acciones del robot
Formato: {"respuesta": {"text": "texto", "value": numero}}
robot_ip (str): IP del robot
robot_port (int): Puerto del robot
timeout (int): Tiempo límite para respuesta
Returns:
tuple: (texto_respuesta, valor_numerico) o (None, None) si no se reconoció
"""
handler = PepperInteractionHandler(robot_ip, robot_port)
return handler.ask_question(question, response_actions, timeout)
# Ejemplo de uso
if __name__ == "__main__":
print("\n=== Ejemplo 2: Múltiples opciones ===")
# Ejemplo 2: Pregunta de múltiples opciones con diferentes valores
question2 = "¿Qué te gustaría hacer? Puedes decir: bailar, cantar o hablar."
actions2 = {
"bailar": {"text": "¡Perfecto! Vamos a bailar juntos.", "value": 1},
"cantar": {"text": "¡Qué divertido! Me encanta cantar.", "value": 2},
"hablar": {"text": "Excelente, podemos tener una buena conversación.", "value": 3}
}
print("\n=== Ejemplo 2: Múltiples opciones ===")
text, value = ask_pepper_question(question2, actions2)
print("Resultado: ('{}', {})".format(text, value))
# Ejemplo de uso del valor numérico para lógica de control
if value == 1:
print("Activando modo baile...")
elif value == 2:
print("Activando modo canto...")
elif value == 3:
print("Activando modo conversación...")
else:
print("No se reconoció la respuesta")
print("\n=== Ejemplo 1: Múltiples opciones ===")
# Ejemplo 1: Pregunta Sí/No con valores numéricos
question1 = "¿Tienes alguna otra pregunta?"
actions1 = {
"si": {"text": "¡Vamos!", "value": 1},
"no": {"text": "Entiendo. Ha sido un placer. No dudes en volver a consultarme.", "value": 0}
}
print("=== Ejemplo 1: Pregunta Sí/No ===")
text, value = ask_pepper_question(question1, actions1)
print("Resultado: ('{}', {})".format(text, value))
# Si quieres que el robot diga el texto, lo puedes hacer después:
if text:
handler = PepperInteractionHandler()
handler.tts.say(text)
I tried running the code with multiple inputs and it should work without the grammar error. But the error appeared randomly, usually in the second or so run.
This is the code of the naoqi.py file if it were of use:
import os
import sys
import ctypes
import weakref
import logging
import inspect
from functools import partial
import qi
def set_dll_directory():
this_dir = os.path.abspath(os.path.dirname(__file__))
bin_dir = os.path.join(this_dir, "..", "bin")
if os.path.exists(bin_dir):
ctypes.windll.kernel32.SetDllDirectoryA(bin_dir)
def load_inaoqi_deps():
""" Helper to laod _inaoqi.so deps on linux """
deps = [
"libboost_python.so",
"libboost_system.so",
"libboost_chrono.so",
"libboost_program_options.so",
"libboost_thread.so",
"libboost_filesystem.so",
"libboost_regex.so",
"libboost_locale.so",
"libboost_signals.so",
"libqi.so",
"libalerror.so",
"libalthread.so",
"libalvalue.so",
"libalcommon.so",
"libalproxies.so",
"libalpythontools.so",
"libqipython.so",
"libinaoqi.so"
]
relpaths = [
# in pynaoqi, this file is /naoqi.py and we search for /libqi.so
[],
# in deploys/packages/etc,
# this file is $PREFIX/lib/python2.7/site-packages/naoqi.py
# and we need $PREFIX/lib/libqi.so
["..", ".."],
]
this_dir = os.path.abspath(os.path.dirname(__file__))
for dep in deps:
for relpath in relpaths:
list_path = [this_dir] + relpath + [dep]
full_path = os.path.join(*list_path)
try:
ctypes.cdll.LoadLibrary(full_path)
except Exception:
pass
if sys.platform.startswith("linux"):
load_inaoqi_deps()
if sys.platform.startswith("win"):
set_dll_directory()
import inaoqi
import motion
import allog
def _getMethodParamCount(func):
r = inspect.getargspec(func)
#normal args
np = len(r[0])
#*args (none if non existent)
if r[1] is not None:
np = np + 1
#**kargs (none if non existent)
if r[2] is not None:
np = np + 1
return np
def autoBind(myClass, bindIfnoDocumented):
"""Show documentation for each
method of the class"""
# dir(myClass) is a list of the names of
# everything in class
myClass.setModuleDescription(myClass.__doc__)
for thing in dir(myClass):
# getattr(x, "y") is exactly: x.y
function = getattr(myClass, thing)
if callable(function):
if (type(function) == type(myClass.__init__)):
if (bindIfnoDocumented or function.__doc__ != ""):
if (thing[0] != "_"): # private method
if (function.__doc__):
myClass.functionName(thing, myClass.getName(), function.__doc__)
else:
myClass.functionName(thing, myClass.getName(), "")
for param in function.func_code.co_varnames:
if (param != "self"):
myClass.addParam(param)
myClass._bindWithParam(myClass.getName(),thing, _getMethodParamCount(function)-1)
class ALDocable():
def __init__(self, bindIfnoDocumented):
autoBind(self,bindIfnoDocumented)
# define the log handler to be used by the logging module
class ALLogHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
def emit(self, record):
level_to_function = {
logging.DEBUG: allog.debug,
logging.INFO: allog.info,
logging.WARNING: allog.warning,
logging.ERROR: allog.error,
logging.CRITICAL: allog.fatal,
}
function = level_to_function.get(record.levelno, allog.debug)
function(record.getMessage(),
record.name,
record.filename,
record.funcName,
record.lineno)
# define a class that will be inherited by both ALModule and ALBehavior, to store instances of modules, so a bound method can be called on them.
class NaoQiModule():
_modules = dict()
@classmethod
def getModule(cls, name):
# returns a reference a module, giving its string, if it exists !
if(name not in cls._modules):
raise RuntimeError("Module " + str(name) + " does not exist")
return cls._modules[name]()
def __init__(self, name, logger=True):
# keep a weak reference to ourself, so a proxy can be called on this module easily
self._modules[name] = weakref.ref(self)
self.loghandler = None
if logger:
self.logger = logging.getLogger(name)
self.loghandler = ALLogHandler()
self.logger.addHandler(self.loghandler)
self.logger.setLevel(logging.DEBUG)
def __del__(self):
# when object is deleted, clean up dictionnary so we do not keep a weak reference to it
del self._modules[self.getName()]
if(self.loghandler != None):
self.logger.removeHandler(self.loghandler)
class ALBroker(inaoqi.broker):
def init(self):
pass
class ALModule(inaoqi.module, ALDocable, NaoQiModule):
def __init__(self,param):
inaoqi.module.__init__(self, param)
ALDocable.__init__(self, False)
NaoQiModule.__init__(self, param)
self.registerToBroker()
def __del__(self):
NaoQiModule.__del__(self)
def methodtest(self):
pass
def pythonChanged(self, param1, param2, param3):
pass
class MethodMissingMixin(object):
""" A Mixin' to implement the 'method_missing' Ruby-like protocol. """
def __getattribute__(self, attr):
try:
return object.__getattribute__(self, attr)
except:
class MethodMissing(object):
def __init__(self, wrapped, method):
self.__wrapped__ = wrapped
self.__method__ = method
def __call__(self, *args, **kwargs):
return self.__wrapped__.method_missing(self.__method__, *args, **kwargs)
return MethodMissing(self, attr)
def method_missing(self, *args, **kwargs):
""" This method should be overridden in the derived class. """
raise NotImplementedError(str(self.__wrapped__) + " 'method_missing' method has not been implemented.")
class postType(MethodMissingMixin):
def __init__(self):
""
def setProxy(self, proxy):
self.proxy = weakref.ref(proxy)
# print name
def method_missing(self, method, *args, **kwargs):
list = []
list.append(method)
for arg in args:
list.append(arg)
result = 0
try:
p = self.proxy()
result = p.pythonPCall(list)
except RuntimeError,e:
raise e
return result
class ALProxy(inaoqi.proxy,MethodMissingMixin):
def __init__(self, *args):
self.post = postType()
self.post.setProxy(self)
if (len (args) == 1):
inaoqi.proxy.__init__(self, args[0])
elif (len (args) == 2):
inaoqi.proxy.__init__(self, args[0], args[1])
else:
inaoqi.proxy.__init__(self, args[0], args[1], args[2])
def call(self, *args):
list = []
for arg in args:
list.append(arg)
return self.pythonCall(list)
def pCall(self, *args):
list = []
for arg in args:
list.append(arg)
return self.pythonPCall(list)
def method_missing(self, method, *args, **kwargs):
list = []
list.append(method)
for arg in args:
list.append(arg)
result = 0
try:
result = self.pythonCall(list)
except RuntimeError,e:
raise e
#print e.args[0]
return result
@staticmethod
def initProxy(name, fut):
try:
proxy = ALProxy(name)
except:
print "Error: cannot get proxy to %s even though we waited for it" % name
return
globals()[name] = proxy
print "Got proxy to " + name
global _initCb
if _initCb is not None:
_initCb(name)
@staticmethod
def lazyLoad(session, name):
session.waitForService(name, _async = True).addCallback(partial(ALProxy.initProxy, name))
@staticmethod
def initProxies(initCb = None):
global _initCb
_initCb = initCb
#Warning: The use of these default proxies is deprecated.
global ALLeds
ALLeds = None
session = inaoqi._getDefaultSession()
ALProxy.lazyLoad(session, "ALFrameManager")
ALProxy.lazyLoad(session, "ALMemory")
ALProxy.lazyLoad(session, "ALMotion")
ALProxy.lazyLoad(session, "ALLeds")
ALProxy.lazyLoad(session, "ALLogger")
ALProxy.lazyLoad(session, "ALSensors")
def createModule(name):
global moduleList
str = "moduleList.append("+ "module(\"" + name + "\"))"
exec(str)
Disclaimer: I've also encountered this issue while using the python3 libqi-python package - thus it is possible that this may not be applicable in your case (though I really think it should also work with the 2.7 version).
As @furas mentioned, this issue can be fixed by changing the language. What worked in my case (installed languages english and german) was swapping between the installed languages and then setting the desired one last before starting the speech recognition.
def start_speech_recognition_service(language, keywords):
speech_recognition.setLanguage("German")
speech_recognition.setLanguage("English")
speech_recognition.setLanguage(language)
speech_recognition.setVocabulary(keywords, False)
speech_recognition.setAudioExpression(True)
speech_recognition.setVisualExpression(True)
speech_recognition.pause(False)
speech_recognition.subscribe("speech_recognition")