I develop two programs which are an SNMP agent and an SNMP manager, with the PySnmp library:
The different 'target' and 'user' are created by generic methods.
I managed to make these programs work with SNMPv2 (community system) from PySnmp, but I can't make them work with SNMPv3 (user system, authentication and encryption).
When the function that is supposed to send the notification runs, it returns 'None', so it tells me that something has gone wrong. But no error is triggered. Also, with the tcpdump tool, I can see that no udp frames are coming out of the agent. On the other hand, my SNMP agent and my SNMP manager are able to make GET requests and to respond to these requests.
The GET and TRAP / INFORM are done on objects coming from a custom MIB, which I include in my programs.
The agent and the manager are launched in two different containers (Docker), with a network between the two, and a DNS resolution of the names 'agent' and 'manager' to contact each other.
So my question is : Why doesn't the Notification send? How to correct this problem ? If you see any other errors in the code, don't hesitate.
Thanks in advance.
Unfortunately, my problem may come from the way I use the method that generates a notification, but mostly from the user and target configurations I make before. So I give you the code that creates these user and target.
In order for you to be able to run the code on your computer, I prefer to give you some examples of code that allow the agent and the manager to work together. These following codes only allow to make GET and NOTIFY.
However, here is the piece of code that does not work.
self.notif_origin.sendVarBinds(
self.engine,
str(target_name+'_inform_notif'),
None,
'context',
[((object_id), None)],
self.inform_cb
)
agent's code (only the code useful for understanding the problem). Normally the code should be able to run on your computers. Here you can find out how I create a user and a target.
from pysnmp.entity import engine, config
from pysnmp.entity.rfc3413 import cmdrsp, ntforg, context
from pysnmp.carrier.asyncore.dgram import udp
from pysnmp.smi import builder, view, rfc1902, error
from pysnmp.proto.rfc1902 import ObjectName, Null, OctetString
from socket import gethostbyname, gaierror
from collections import namedtuple
InetAddr = namedtuple('InetAddr', 'IP PORT')
User = namedtuple('User', 'NAME AUTHKEY PRIVKEY')
class Agent():
def __init__(self, address, user, security_engine_i) :
self.engine = engine.SnmpEngine(OctetString(hexValue=str(security_engine_id)))
self.context = context.SnmpContext(self.engine)
self.security_engine_id = security_engine_id
self.users = {}
self.targets = {}
config.addTransport(
self.engine,
udp.domainName,
udp.UdpTransport().openServerMode((address.IP, address.PORT))
)
try:
config.addV3User(
self.engine,
str(user.NAME),
config.usmHMAC384SHA512AuthProtocol, str(user.AUTHKEY),
config.usmAesCfb256Protocol, str(user.PRIVKEY)
)
except error.WrongValueError as _:
return None
config.addTargetParams(
self.engine,
'params',
str(user.NAME),
'authPriv'
)
config.addTransport(
self.engine,
udp.domainName+(1,),
udp.UdpSocketTransport().openClientMode()
)
config.addContext(
self.engine,
'context'
)
config.addVacmUser(
self.engine,
3,
str(user.NAME),
'authPriv',
notifySubTree=(1,3,6,1,4,1,1581)
)
self.import_mib()
cmdrsp.GetCommandResponder(self.engine, self.context)
self.notif_origin = ntforg.NotificationOriginator()
def import_mib(self):
self.mib_builder =self.engine.msgAndPduDsp.mibInstrumController.mibBuilder
self.mib_builder.addMibSources(builder.DirMibSource('/app/mibs'))
self.mib_builder.loadModules()
self.mib_view_controller = view.MibViewController(self.mib_builder)
def add_user(self, user):
if str(user.NAME) not in self.users.keys():
try:
config.addV3User(
self.engine,
str(user.NAME),
config.usmHMAC384SHA512AuthProtocol, str(user.AUTHKEY),
config.usmAesCfb256Protocol, str(user.PRIVKEY)
)
except error.WrongValueError as _:
return False
config.addVacmUser(
self.engine,
3,
str(user.NAME),
'authPriv',
(1,3,6,1,4,1,1581)
)
self.users[str(user.NAME)] = [str(user.AUTHKEY), str(user.PRIVKEY)]
return True
return False
def add_target(self, target_name, address):
if str(target_name) not in self.targets.keys():
try:
address_ip = gethostbyname(str(address.IP))
except gaierror:
return False
config.addTargetAddr(
self.engine,
str(target_name+'_inform_add'),
udp.domainName+(1,),
(str(address_ip), int(address.PORT)),
'params',
tagList='inform_tag'
)
config.addNotificationTarget(
self.engine,
str(target_name+'_inform_notif'),
'filter',
'inform_tag',
'inform'
)
self.targets[str(target_name)] = [str(address.IP), int(address.PORT)]
return True
return False
def convert(self, object):
try:
if object[0].isnumeric():
object_id = rfc1902.ObjectIdentity(object).loadMibs('MIB-DAT')
else:
object_id = rfc1902.ObjectIdentity('MIB-DAT', object, 0)
object_id.resolveWithMib(self.mib_view_controller)
return object_id
except error.SmiError:
return None
def get(self, object):
object_id = self.convert(object)
if object_id != None:
var_bind = self.context.getMibInstrum('').readVars([(ObjectName(object_id), Null(''))])
return var_bind[0][1]
return None
def set(self, object, value):
object_id = self.convert(object)
if object_id != None:
try:
var_bind = self.context.getMibInstrum('').writeVars([(ObjectName(object_id), OctetString(value))])
return var_bind[0][1]
except error.WrongValueError:
return False
return None
def inform(self, target_name, object):
if str(target_name) in self.targets.keys():
object_id = self.convert(object)
if object_id != None:
self.notif_origin.sendVarBinds(
self.engine,
str(target_name+'_inform_notif'),
None,
'context',
[((object_id), None)],
self.inform_cb
)
notif_num = self.get('notif_num')
self.set('notif_num', notif_num+1)
return True
return False
return False
def inform_cb(self, engine, request_handle, err_indication, err_status, err_index, binds, cb_ctx):
print(f'Notification \'{request_handle}\', status - \'{err_indication and err_indication or "delivered"}\'')
def run(self):
self.debuger()
self.engine.transportDispatcher.jobStarted(1)
try:
self.engine.transportDispatcher.runDispatcher()
finally:
self.engine.transportDispatcher.closeDispatcher()
An example of main() launching the agent :
from threading import Thread
from agent_snmp import Agent, InetAddr, User
def main():
security_engine_id = '8000000001020304'
agent_add = InetAddr('agent', 160) # IP = DNS resolution
agent_user = User('agent_user', 'agent_authkey', 'agent_privkey')
agent = Agent(agent_add, agent_user, security_engine_id)
thread_agent = Thread(target=agent.run)
manager_user = User('manager_user', 'manager_authkey', 'manager_privkey')
agent.add_user(manager_user)
manager_add = InetAddr('manager', 161) # IP = DNS resolution
agent.add_target('manager_target', manager_add)
thread_agent.start()
# Then call the agent methods you want
if __name__ == '__main__':
main()
manager's code (for the curious):
from pysnmp.entity import engine, config
from pysnmp.carrier.asyncore.dgram import udp
from pysnmp.entity.rfc3413 import cmdgen, ntfrcv
from pysnmp.smi import builder, rfc1902, error, view
from socket import gethostbyname, gaierror
from collections import namedtuple
InetAddr = namedtuple('InetAddr', 'IP PORT')
User = namedtuple('User', 'NAME AUTHKEY PRIVKEY')
class Manager():
def __init__(self, user, address):
self.engine = engine.SnmpEngine()
self.targets = {}
self.users = {}
try:
config.addV3User(
self.engine,
str(user.NAME),
config.usmHMAC384SHA512AuthProtocol, str(user.AUTHKEY),
config.usmAesCfb256Protocol, str(user.PRIVKEY)
)
except error.WrongValueError as _:
return False
config.addTargetParams(
self.engine,
'params',
str(user.NAME),
'authPriv'
)
config.addTransport(
self.engine,
udp.domainName,
udp.UdpSocketTransport().openClientMode()
)
config.addTransport(
self.engine,
udp.domainName+(1,),
udp.UdpTransport().openServerMode((address.IP, address.PORT))
)
self.import_mib()
ntfrcv.NotificationReceiver(self.engine, self.inform_cb)
def import_mib(self):
self.mib_builder = self.engine.msgAndPduDsp.mibInstrumController.mibBuilder
self.mib_builder.addMibSources(builder.DirMibSource('/app/mibs'))
self.mib_view_controller = view.MibViewController(self.mib_builder)
def add_target(self, target_name, address):
if str(target_name) not in self.targets.keys():
try:
address_ip = gethostbyname(str(address.IP))
except gaierror:
return False
config.addTargetAddr(
self.engine,
str(target_name),
udp.domainName,
(str(address_ip), int(address.PORT)),
'params'
)
self.targets[str(target_name)] = [str(address.IP), int(address.PORT)]
return True
return False
def add_user(self, user, security_engine_id):
if str(user.NAME) not in self.users.keys():
try:
config.addV3User(
self.engine,
str(user.NAME),
config.usmHMAC384SHA512AuthProtocol, str(user.AUTHKEY),
config.usmAesCfb256Protocol, str(user.PRIVKEY)
# OctetString(hexValue=str(security_engine_id))
)
except error.WrongValueError as _:
return False
self.users[str(user.NAME)] = [str(user.AUTHKEY), str(user.PRIVKEY), str(security_engine_id)]
return True
return False
def convert(self, object):
try:
if object[0].isnumeric():
object_id = rfc1902.ObjectIdentity(object).loadMibs('MIB-DAT')
else:
object_id = rfc1902.ObjectIdentity('MIB-DAT', object, 0)
object_id.resolveWithMib(self.mib_view_controller)
return object_id
except error.SmiError:
return None
def get(self, target_name, object):
if str(target_name) in self.targets.keys():
object_id = self.convert(object)
if object_id != None:
cmdgen.GetCommandGenerator().sendVarBinds(
self.engine,
str(target_name),
None,
'',
[((object_id), None)],
self.get_cb
)
return True
return None
return False
def get_cb(self, engine_observer, request_handle, err_indication, err_status, err_index, binds, cb_ctx):
if err_indication:
print(err_indication)
elif err_status and err_status != 2:
print(f'\'{err_status}\' at \'{err_index and binds[int(err_index) - 1][0] or "?"}\'')
else:
for oid, val in binds:
print(f'\'{oid}\' = \'{val}\'')
def inform_cb(self, engine, state_reference, context_engine_id, context_name, binds, cb_ctx):
print(f'Notification from Context_engine_id \'{context_engine_id}\', Context_name \'{context_name}\'')
for name, val in binds:
print(f'\'{name}\' = \'{val}\'')
def run(self):
self.debuger()
self.engine.transportDispatcher.jobStarted(1)
try:
self.engine.transportDispatcher.runDispatcher()
finally:
self.engine.transportDispatcher.closeDispatcher()
An example of main() launching the manager :
from threading import Thread
from manager_snmp import Manager, InetAddr, User
def main():
manager_user = User('manager_user', 'manager_authkey', 'manager_privkey')
manager_add = InetAddr('manager', 161)
manager = Manager(manager_user, manager_add)
thread_manager = Thread(target=manager.run)
agent_add = InetAddr('agent', 160)
manager.add_target('agent_target', agent_add)
agent_user = User('agent_user', 'agent_authkey', 'agent_privkey')
manager.add_user(agent_user, '8000000001020304')
thread_manager.start()
# Then call the manager methods you want
if __name__ == '__main__':
main()
And here is a part of the custom MIB I use :
MIB-DAT DEFINITIONS ::= BEGIN
org OBJECT IDENTIFIER ::= { iso 3 } -- "iso" = 1
dod OBJECT IDENTIFIER ::= { org 6 }
internet OBJECT IDENTIFIER ::= { dod 1 }
private OBJECT IDENTIFIER ::= { internet 4 }
enterprises OBJECT IDENTIFIER ::= { private 1 }
entreprise OBJECT IDENTIFIER ::= { enterprises 1581 }
dat OBJECT IDENTIFIER ::= { entreprise 1 }
notification OBJECT IDENTIFIER ::= { dat 1 }
notif_dat NOTIFICATION-TYPE
OBJECTS {notif_str}
STATUS mandatory
DESCRIPTION
"notif du DAT"
::= { notification 1 }
notif_str OBJECT-TYPE
SYNTAX OCTET STRING (SIZE(78))
ACCESS write-only
STATUS mandatory
DESCRIPTION
"String total de la notif"
DEFVAL { "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" }
::= { notification 2 }
notif_num OBJECT-TYPE
SYNTAX INTEGER (0..9999)
ACCESS read-only
STATUS mandatory
DESCRIPTION
"Numéro d'instance de la notif"
::= { notification 3 }
notif_type OBJECT-TYPE
SYNTAX OCTET STRING (SIZE(1))
ACCESS read-only
STATUS mandatory
DESCRIPTION
"Type de la notif"
DEFVAL { "E" }
::= { notification 4 }
dat_date OBJECT-TYPE
SYNTAX OCTET STRING (SIZE(10))
ACCESS read-only
STATUS mandatory
DESCRIPTION
"date sur le dat"
::= { dat 2 }
dat_time OBJECT-TYPE
SYNTAX OCTET STRING (SIZE(9))
ACCESS read-only
STATUS mandatory
DESCRIPTION
"Temps sur le dat"
::= { dat 3 }
nus OBJECT-TYPE
SYNTAX INTEGER (0..9999)
ACCESS read-only
STATUS mandatory
DESCRIPTION
"Numéro de série de l'appareil"
::= { dat 4 }
EtaValue ::= TEXTUAL-CONVENTION
STATUS mandatory
DESCRIPTION "mapping eta"
SYNTAX INTEGER {
V (1),
S (2),
H (3),
A (4),
M (5),
N (6)
}
eta OBJECT-TYPE
SYNTAX EtaValue
ACCESS read-only
STATUS mandatory
DESCRIPTION
"ETA de la MIB"
DEFVAL { 6 }
::= { dat 5 }
ModValue ::= TEXTUAL-CONVENTION
STATUS mandatory
DESCRIPTION "mapping mod"
SYNTAX INTEGER {
R (1),
F (2)
}
mod OBJECT-TYPE
SYNTAX ModValue
ACCESS read-only
STATUS mandatory
DESCRIPTION
"MOD de la MIB"
DEFVAL { 1 }
::= { dat 6 }
TinValue ::= TEXTUAL-CONVENTION
STATUS mandatory
DESCRIPTION "mapping tin"
SYNTAX INTEGER {
M (1),
E (2),
R (3),
A (4)
}
tin OBJECT-TYPE
SYNTAX TinValue
ACCESS read-only
STATUS mandatory
DESCRIPTION
"TIN de la MIB"
DEFVAL { 4 }
::= { dat 7 }
SseValue ::= TEXTUAL-CONVENTION
STATUS mandatory
DESCRIPTION "mapping sse"
SYNTAX INTEGER {
A (1),
B (2),
C (3),
D (4),
E (5),
F (6),
G (7),
H (8),
I (9),
J (10),
K (11),
L (12),
M (13),
N (14),
O (15),
P (16),
Q (17),
R (18),
S (19),
T (20),
U (21),
V (22),
W (23),
X (24),
Y (25),
Z (26)
}
sse OBJECT-TYPE
SYNTAX SseValue
ACCESS read-only
STATUS mandatory
DESCRIPTION
"SSE de la MIB"
DEFVAL { 1 }
::= { dat 8 }
cod OBJECT-TYPE
SYNTAX INTEGER (1..999)
ACCESS read-only
STATUS mandatory
DESCRIPTION
"COD de la MIB"
DEFVAL { 1 }
::= { dat 9 }
clp OBJECT-TYPE
SYNTAX INTEGER (0..5)
ACCESS read-only
STATUS mandatory
DESCRIPTION
"CLP de la MIB"
::= { dat 10 }
msg OBJECT-TYPE
SYNTAX OCTET STRING (SIZE(31))
ACCESS read-write
STATUS mandatory
DESCRIPTION
"String libre"
DEFVAL { "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" }
::= { dat 11 }
END
To put the MIB in place, you must first convert it into a format that PySnmp can understand with the mibdump tool, then place it in the location indicated in the 'import_mib()' method.
The ContextName was not automatically bind with the VacmUser, so the contextName had to be added by hand.
config.addVacmUser(
self.engine,
3,
str(user.NAME),
'authPriv',
notifySubTree=(1, 3, 6, 1, 4, 1, 1581),
contextName='context'
)