I'm currently using a tool called Modbus Slave that worked pretty well with a gui. However, I would like to have some automated tests that's why I would like to use pymodbus. The setup I would like to have with pymodbus is the following :
- On a STM32 I have a serial modbus master running.
- I would like to have two pymodbus slave running with a "data storage", meaning on my Slave ID N°1 I want to have 3100 registers, and on the Slave ID N°248 I want to have 3100 registers for example.
- Then, I would like to be able to have my modbus master to write in the data registers of the modbus slave Id N°1 or the Slave ID N°248.
- Be able to update the context of the Slave ID N°1 for example while it's running.
What's in Bold is working. As I have not that much experience with python/pymodbus, I would like to know what is the best way to proceed ?
EDIT:
Current code of my pymodbus sync server running with a context of 3100 registers with 0 as default value, for Slave ID 1 and 248.
#!/usr/bin/env python3
"""
Server sync
"""
from threading import Thread
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.version import version
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSlaveContext,
)
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# --------------------------------------------------------------------------- #
# import the various client implementations
# --------------------------------------------------------------------------- #
from pymodbus.server.sync import (
StartSerialServer,
)
from pymodbus.transaction import (
ModbusRtuFramer,
)
def run_sync_server():
"""Run server setup."""
port = "COM6"
datablock = ModbusSequentialDataBlock(0, [0x00] * 3100)
context = {
0x01: ModbusSlaveContext(
hr=datablock,
),
0xF8: ModbusSlaveContext(
hr=datablock,
),
}
# Build data storage
store = ModbusServerContext(slaves=context, single=False)
"""Run server."""
txt = f"### start server, listening on {port} - serial"
print(txt)
server = StartSerialServer(
context=store, # Data storage
# identity=identity, # server identify
timeout=0.01, # waiting time for request to complete
port=port, # serial port
# custom_functions=[], # allow custom handling
framer=ModbusRtuFramer, # The framer strategy to use
# handler=None, # handler for each session
stopbits=1, # The number of stop bits to use
bytesize=8, # The bytesize of the serial messages
parity="N", # Which kind of parity to use
baudrate=256000, # The baud rate to use for the serial device
)
return server
if __name__ == "__main__":
server = run_sync_server()
server.shutdown()
Concerning the update_server I have this for now :
#!/usr/bin/env python3
# pylint: disable=missing-any-param-doc,differing-param-doc
"""Pymodbus Server With Updating Thread.
This is an example of having a background thread updating the
context while the server is operating. This can also be done with
a python thread::
from threading import Thread
Thread(target=updating_writer, args=(context,)).start()
"""
from threading import Thread
import time
import logging
import re
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server.asynchronous import StartSerialServer
from pymodbus.version import version
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# --------------------------------------------------------------------------- #
# define your callback process
# --------------------------------------------------------------------------- #
def updating_writer(extra):
"""Run for tests
:param arguments: The input arguments to the call
"""
print("Check register")
slave_id = 0x1
value = context[slave_id].getValues(0x3, 3001, count=1)
time.sleep(5)
while value[0] != 0:
value = context[slave_id].getValues(0x3, 3001, count=1)
time.sleep(5)
context[slave_id].setValues(0x6, 1000, [0x10])
if __name__ == "__main__":
datablock=ModbusSequentialDataBlock(0x00, [0x00] * 3100)
context={
0x01: ModbusSlaveContext(
hr=datablock,
),
0xF8: ModbusSlaveContext(
hr=datablock,
),
}
store = ModbusServerContext(slaves=context, single=False)
Thread(target=updating_writer, args=(store,)).start()
I had to do lots of fix on pymodbus examples cause they are not up to date with the pymodbus lib (2.5.3).