I am a newbie. My current project is when the current end decides to start the modbus service, I will create a process for the modbus service. Then the value is obtained in the parent process, through the ZeroMQ PUB/SUB
to pass the value, I now want to update the value of the modbus register in the modbus service process.
I tried the method mentioned by pymodbus provided by updating_server.py, and twisted.internet.task.LoopingCall()
to update the value of the register, but this will make it impossible for me to connect to my server with the client. I don't know why?
Use LoopingCall()
to establish the server, the log when the client connects.
Then I tried to put both the uploading and startTCPserver in the async loop, but the update was only entered for the first time after the startup, and then it was not entered.
Currently, I'm using the LoopingCall()
to handle updates, but I don't think this is a good way.
This is the code I initialized the PUB and all the tags that can read the tag.
from loop import cycle
import asyncio
from multiprocessing import Process
from persistence import models as pmodels
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
from zmq.asyncio import Context
from common import logging
from server.modbustcp import i3ot_tcp as sertcp
import common.config as cfg
import communication.admin as ca
import json
import os
import signal
from datetime import datetime
from server.opcuaserver import i3ot_opc as seropc
async def main():
future = []
task = []
global readers, readers_old, task_flag
logger.debug("connecting to database and create table.")
pmodels.connect_create()
logger.debug("init read all address to create loop task.")
cycle.init_readers(readers)
ctx = Context()
publisher = ctx.socket(zmq.PUB)
logger.debug("init publish [%s].", addrs)
publisher.bind(addrs)
readers_old = readers.copy()
for reader in readers:
task.append(asyncio.ensure_future(
cycle.run_readers(readers[reader], publisher)))
if not len(task):
task_flag = True
logger.debug("task length [%s - %s].", len(task), task)
opcua_server = LocalServer(seropc.opc_server, "opcua")
future = [
start_get_all_address(),
start_api(),
create_address_loop(publisher, task),
modbus_server(),
opcua_server.run()
]
logger.debug("run loop...")
await asyncio.gather(*future)
asyncio.run(main(), debug=False)
This is to get the device tag value and publish it.
async def run_readers(reader, publisher):
while True:
await reader.run(publisher)
class DataReader:
def __init__(self, freq, clients):
self._addresses = []
self._frequency = freq
self._stop_signal = False
self._clients = clients
self.signature = sign_data_reader(self._addresses)
async def run(self, publisher):
while not self._stop_signal:
for addr in self._addresses:
await addr.read()
data = {
"type": "value",
"data": addr._final_value
}
publisher.send_pyobj(data)
if addr._status:
if addr.alarm_log:
return_alarm_log = pbasic.get_log_by_time(addr.alarm_log['date'])
if return_alarm_log:
data = {
"type": "alarm",
"data": return_alarm_log
}
publisher.send_pyobj(data)
self.data_send(addr)
logger.debug("run send data")
await asyncio.sleep(int(self._frequency))
def stop(self):
self._stop_signal = True
modbus server imports
from common import logging
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
import common.config as cfg
import struct
import os
import signal
from datetime import datetime
from twisted.internet.task import LoopingCall
def updating_writer(a):
logger.info("in updates of modbus tcp server.")
context = a[0]
# while True:
if check_pid(os.getppid()) is False:
os.kill(os.getpid(), signal.SIGKILL)
url = ("ipc://{}" .format(cfg.get('ipc', 'pubsub')))
logger.debug("connecting to [%s].", url)
ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.connect(url)
subscriber.setsockopt(zmq.SUBSCRIBE, b"")
slave_id = 0x00
msg = subscriber.recv_pyobj()
logger.debug("updates.")
if msg['data']['data_type'] in modbus_server_type and msg['type'] == 'value':
addr = pservice.get_mbaddress_to_write_value(msg['data']['id'])
if addr:
logger.debug(
"local address and length [%s - %s].",
addr['local_address'], addr['length'])
values = get_value_by_type(msg['data']['data_type'], msg['data']['final'])
logger.debug("modbus server updates values [%s].", values)
register = get_register(addr['type'])
logger.debug(
"register [%d] local address [%d] and value [%s].",
register, addr['local_address'], values)
context[slave_id].setValues(register, addr['local_address'], values)
# time.sleep(1)
def tcp_server(pid):
logger.info("Get server configure and device's tags.")
st = datetime.now()
data = get_servie_and_all_tags()
if data:
logger.debug("register address space.")
register_address_space(data)
else:
logger.debug("no data to create address space.")
length = register_number()
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0] * length),
co=ModbusSequentialDataBlock(0, [0] * length),
hr=ModbusSequentialDataBlock(0, [0] * length),
ir=ModbusSequentialDataBlock(0, [0] * length)
)
context = ModbusServerContext(slaves=store, single=True)
identity = ModbusDeviceIdentification()
identity.VendorName = 'pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'pymodbus Server'
identity.ModelName = 'pymodbus Server'
identity.MajorMinorRevision = '2.2.0'
# ----------------------------------------------------------------------- #
# set loop call and run server
# ----------------------------------------------------------------------- #
try:
logger.debug("thread start.")
loop = LoopingCall(updating_writer, (context, ))
loop.start(1, now=False)
# process = Process(target=updating_writer, args=(context, os.getpid(),))
# process.start()
address = (data['tcp_ip'], int(data['tcp_port']))
nt = datetime.now() - st
logger.info("modbus tcp server begin has used [%s] s.", nt.seconds)
pservice.write_server_status_by_type('modbus', 'running')
StartTcpServer(context, identity=identity, address=address)
except Exception as e:
logger.debug("modbus server start error [%s].", e)
pservice.write_server_status_by_type('modbus', 'closed')
This is the code I created for the modbus process.
def process_stop(p_to_stop):
global ptcp_flag
pid = p_to_stop.pid
os.kill(pid, signal.SIGKILL)
logger.debug("process has closed.")
ptcp_flag = False
def ptcp_create():
global ptcp_flag
pid = os.getpid()
logger.debug("sentry pid [%s].", pid)
ptcp = Process(target=sertcp.tcp_server, args=(pid,))
ptcp_flag = True
return ptcp
async def modbus_server():
logger.debug("get mosbuc server's status.")
global ptcp_flag
name = 'modbus'
while True:
ser = pservice.get_server_status_by_name(name)
if ser['enabled']:
if ser['tcp_status'] == 'closed' or ser['tcp_status'] == 'running':
tags = pbasic.get_tag_by_name(name)
if len(tags):
if ptcp_flag is False:
logger.debug("[%s] status [%s].", ser['tcp_name'], ptcp_flag)
ptcp = ptcp_create()
ptcp.start()
else:
logger.debug("modbus server is running ...")
else:
logger.debug("no address to create [%s] server.", ser['tcp_name'])
pservice.write_server_status_by_type(name, "closed")
else:
logger.debug("[%s] server is running ...", name)
else:
if ptcp_flag:
process_stop(ptcp)
logger.debug("[%s] has been closed.", ser['tcp_name'])
pservice.write_server_status_by_type(name, "closed")
logger.debug("[%s] server not allowed to running.", name)
await asyncio.sleep(5)
This is the command that Docker runs.
/usr/bin/docker run --privileged --network host --name scout-sentry -v /etc/scout.cfg:/etc/scout.cfg -v /var/run:/var/run -v /sys:/sys -v /dev/mem:/dev/mem -v /var/lib/scout:/data --rm shulian/scout-sentry
This is the Docker configuration file /etc/scout.cfg
.
[scout]
mode=product
[logging]
level=DEBUG
[db]
path=/data
[ipc]
cs=/var/run/scout-cs.sock
pubsub=/var/run/pubsub.sock
I want to be able to trigger the modbus value update function when there is a message coming from ZeroMQ, and it will be updated correctly.