I'm new with modbus and I need to write python script to read TCP/IP messages from Modbus client. I need to read 4 bytes starting in address 002049 using Big-endian order. so my questions are:
I tried to use How to read byte order endian BIG with Python? but couldn't find an answer.
---UPDATE---
how can I do it using AsyncModbusTcpClient?
Code to read 4 bytes (2 registers) starting at address 2049 (address 002049 in Modbus):
from pyModbusTCP.client import ModbusClient
def read_modbus_registers(ip_address, port, start_address, num_registers):
# Create a Modbus TCP client
client = ModbusClient(host=ip_address, port=port, auto_open=True)
if client.is_open():
result = client.read_holding_registers(start_address, num_registers)
if result:
# Combine the two registers into a single 32-bit integer (big-endian order)
value = (result[0] << 16) | result[1]
print(f"Value read from address {start_address}: {value}")
else:
print(f"Failed to read from Modbus server: {client.last_error}")
else:
print(f"Failed to connect to Modbus server at {ip_address}:{port}")
# Close the Modbus TCP client
client.close()
# Modbus TCP server configuration
modbus_ip = "your_modbus_server_ip"
modbus_port = 502 # Modbus TCP port
# Address and number of registers to read
start_address = 2049 # Address 002049 in Modbus
num_registers = 2 # Reading 4 bytes (2 registers)
# Read Modbus registers
read_modbus_registers(modbus_ip, modbus_port, start_address, num_registers)
To perform the task asynchronously, we need to use await
when interacting with the client.
import asyncio
from pyModbusTCP.asyncio import AsyncModbusTcpClient
async def read_modbus_registers_async(ip_address, port, start_address, num_registers):
# Create an async client instance
client = AsyncModbusTcpClient(host=ip_address, port=port)
try:
# Connect to the Modbus server
await client.connect()
if client.is_connected:
# Read registers
result = await client.read_holding_registers(start_address, num_registers)
if result:
# Combine the registers
value = (result[0] << 16) | result[1]
print(f"Value read from address {start_address}: {value}")
else:
print(f"Failed to read from Modbus server: {client.last_error}")
else:
print(f"Failed to connect to Modbus server at {ip_address}:{port}")
except Exception as e:
print(f"Error: {e}")
finally:
# Close the Modbus TCP client
await client.close()
# Modbus TCP server configuration
modbus_ip = "your_modbus_server_ip"
modbus_port = 502 # Modbus TCP port
# Address and number of registers to read
start_address = 2049 # Address 002049 in Modbus
num_registers = 2 # Reading 4 bytes (2 registers)
# Create an event loop and run the asynchronous function
async def main():
await read_modbus_registers_async(modbus_ip, modbus_port, start_address, num_registers)
# Run the event loop
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())