2

I am a newbie. My current project is when the current end decides to start the modbus service, I will create a process for the modbus service. Then the value is obtained in the parent process, through the ZeroMQ PUB/SUB to pass the value, I now want to update the value of the modbus register in the modbus service process.

I tried the method mentioned by pymodbus provided by updating_server.py, and twisted.internet.task.LoopingCall() to update the value of the register, but this will make it impossible for me to connect to my server with the client. I don't know why?

Use LoopingCall() to establish the server, the log when the client connects.

/home/xiaohe/Pictures/Selection_050.png

Then I tried to put both the uploading and startTCPserver in the async loop, but the update was only entered for the first time after the startup, and then it was not entered.

Currently, I'm using the LoopingCall() to handle updates, but I don't think this is a good way.

This is the code I initialized the PUB and all the tags that can read the tag.

from loop import cycle
import asyncio
from multiprocessing import Process
from persistence import models as pmodels
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
from zmq.asyncio import Context
from common import logging
from server.modbustcp import i3ot_tcp as sertcp
import common.config as cfg
import communication.admin as ca
import json
import os
import signal
from datetime import datetime
from server.opcuaserver import i3ot_opc as seropc

async def main():
    future = []
    task = []
    global readers, readers_old, task_flag
    logger.debug("connecting to database and create table.")
    pmodels.connect_create()
    logger.debug("init read all address to create loop task.")
    cycle.init_readers(readers)
    ctx = Context()
    publisher = ctx.socket(zmq.PUB)
    logger.debug("init publish [%s].", addrs)
    publisher.bind(addrs)
    readers_old = readers.copy()
    for reader in readers:
        task.append(asyncio.ensure_future(
            cycle.run_readers(readers[reader], publisher)))
    if not len(task):
        task_flag = True
    logger.debug("task length [%s - %s].", len(task), task)
    opcua_server = LocalServer(seropc.opc_server, "opcua")
    future = [
        start_get_all_address(),
        start_api(),
        create_address_loop(publisher, task),
        modbus_server(),
        opcua_server.run()
    ]
    logger.debug("run loop...")
    await asyncio.gather(*future)

asyncio.run(main(), debug=False)

This is to get the device tag value and publish it.

async def run_readers(reader, publisher):
    while True:
        await reader.run(publisher)


class DataReader:
    def __init__(self, freq, clients):
        self._addresses = []
        self._frequency = freq
        self._stop_signal = False
        self._clients = clients
        self.signature = sign_data_reader(self._addresses)

    async def run(self, publisher):
        while not self._stop_signal:
            for addr in self._addresses:
                await addr.read()
                data = {
                    "type": "value",
                    "data": addr._final_value
                }
                publisher.send_pyobj(data)
                if addr._status:
                    if addr.alarm_log:
                        return_alarm_log = pbasic.get_log_by_time(addr.alarm_log['date'])
                        if return_alarm_log:
                            data = {
                                "type": "alarm",
                                "data": return_alarm_log
                            }
                            publisher.send_pyobj(data)
                    self.data_send(addr)
                    logger.debug("run send data")
            await asyncio.sleep(int(self._frequency))

    def stop(self):
        self._stop_signal = True

modbus server imports

from common import logging
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
import common.config as cfg
import struct
import os
import signal
from datetime import datetime
from twisted.internet.task import LoopingCall
def updating_writer(a):
    logger.info("in updates of modbus tcp server.")
    context = a[0]
    # while True:
    if check_pid(os.getppid()) is False:
        os.kill(os.getpid(), signal.SIGKILL)
    url = ("ipc://{}" .format(cfg.get('ipc', 'pubsub')))
    logger.debug("connecting to [%s].", url)
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect(url)
    subscriber.setsockopt(zmq.SUBSCRIBE, b"")
    slave_id = 0x00
    msg = subscriber.recv_pyobj()
    logger.debug("updates.")
    if msg['data']['data_type'] in modbus_server_type and msg['type'] == 'value':
        addr = pservice.get_mbaddress_to_write_value(msg['data']['id'])
        if addr:
            logger.debug(
                "local address and length [%s - %s].",
                addr['local_address'], addr['length'])
            values = get_value_by_type(msg['data']['data_type'], msg['data']['final'])
            logger.debug("modbus server updates values [%s].", values)
            register = get_register(addr['type'])
            logger.debug(
                "register [%d] local address [%d] and value [%s].",
                register, addr['local_address'], values)
            context[slave_id].setValues(register, addr['local_address'], values)
        # time.sleep(1)
def tcp_server(pid):
    logger.info("Get server configure and device's tags.")
    st = datetime.now()
    data = get_servie_and_all_tags()
    if data:
        logger.debug("register address space.")
        register_address_space(data)
    else:
        logger.debug("no data to create address space.")

    length = register_number()
    store = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [0] * length),
        co=ModbusSequentialDataBlock(0, [0] * length),
        hr=ModbusSequentialDataBlock(0, [0] * length),
        ir=ModbusSequentialDataBlock(0, [0] * length)
    )
    context = ModbusServerContext(slaves=store, single=True)

    identity = ModbusDeviceIdentification()
    identity.VendorName = 'pymodbus'
    identity.ProductCode = 'PM'
    identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
    identity.ProductName = 'pymodbus Server'
    identity.ModelName = 'pymodbus Server'
    identity.MajorMinorRevision = '2.2.0'

    # ----------------------------------------------------------------------- #
    # set loop call and run server
    # ----------------------------------------------------------------------- #

    try:
        logger.debug("thread start.")
        loop = LoopingCall(updating_writer, (context, ))
        loop.start(1, now=False)

        # process = Process(target=updating_writer, args=(context, os.getpid(),))
        # process.start()

        address = (data['tcp_ip'], int(data['tcp_port']))
        nt = datetime.now() - st
        logger.info("modbus tcp server begin has used [%s] s.", nt.seconds)
        pservice.write_server_status_by_type('modbus', 'running')
        StartTcpServer(context, identity=identity, address=address)
    except Exception as e:
        logger.debug("modbus server start error [%s].", e)
        pservice.write_server_status_by_type('modbus', 'closed')

This is the code I created for the modbus process.

def process_stop(p_to_stop):
    global ptcp_flag
    pid = p_to_stop.pid
    os.kill(pid, signal.SIGKILL)
    logger.debug("process has closed.")
    ptcp_flag = False


def ptcp_create():
    global ptcp_flag
    pid = os.getpid()
    logger.debug("sentry pid [%s].", pid)
    ptcp = Process(target=sertcp.tcp_server, args=(pid,))
    ptcp_flag = True
    return ptcp


async def modbus_server():
    logger.debug("get mosbuc server's status.")
    global ptcp_flag
    name = 'modbus'
    while True:
        ser = pservice.get_server_status_by_name(name)
        if ser['enabled']:
            if ser['tcp_status'] == 'closed' or ser['tcp_status'] == 'running':
                tags = pbasic.get_tag_by_name(name)
                if len(tags):
                    if ptcp_flag is False:
                        logger.debug("[%s] status [%s].", ser['tcp_name'], ptcp_flag)
                        ptcp = ptcp_create()
                        ptcp.start()
                    else:
                        logger.debug("modbus server is running ...")
                else:
                    logger.debug("no address to create [%s] server.", ser['tcp_name'])
                    pservice.write_server_status_by_type(name, "closed")
            else:
                logger.debug("[%s] server is running ...", name)
        else:
            if ptcp_flag:
                process_stop(ptcp)
                logger.debug("[%s] has been closed.", ser['tcp_name'])
                pservice.write_server_status_by_type(name, "closed")
            logger.debug("[%s] server not allowed to running.", name)
        await asyncio.sleep(5)

This is the command that Docker runs.

/usr/bin/docker run --privileged --network host --name scout-sentry -v /etc/scout.cfg:/etc/scout.cfg -v /var/run:/var/run -v /sys:/sys -v /dev/mem:/dev/mem -v /var/lib/scout:/data --rm shulian/scout-sentry

This is the Docker configuration file /etc/scout.cfg.

[scout]
mode=product

[logging]
level=DEBUG

[db]
path=/data

[ipc]
cs=/var/run/scout-cs.sock
pubsub=/var/run/pubsub.sock

I want to be able to trigger the modbus value update function when there is a message coming from ZeroMQ, and it will be updated correctly.

halfer
  • 19,824
  • 17
  • 99
  • 186
fish
  • 21
  • 4
  • Your problem is not formulated with an MCVE-code and besides incomplete code, it is irreproducible, so hard to help with. Is your process granted write-rights in the config-setup target directory, where the **`ipc://`** transport-class is going to setup a ZeroMQ connection? Is the config-driven AccessNode target location an absolute-path `/some/loc/in/fs-tree/with/write-rights` or an `@`-abstracted path? Does it already exist prior to call? – user3666197 Aug 28 '19 at 08:56
  • @user3666197 Thank you for your answer! My project is running in docker, and the corresponding files are mapped to the container. The PUB/SUB of zmq is determined to be able to communicate, and the value of the holding register read in the picture is the zmq SUB update. – fish Aug 28 '19 at 09:19
  • Are you sure your Docker-container enjoys a configuration to pass-through the inter-platform communication using **`ipc://`** transport class? May you test the same idea with **`tcp://`** transport-class being used first to validate the pass-through visibility? – user3666197 Aug 28 '19 at 09:52
  • I am sure I have been able to communicate properly. – fish Aug 29 '19 at 01:06
  • I am sure I have not seen so far any **`PUB`**-side to communicate properly. The MCVE-code is incomplete ( also the explicit copy of all imports is missing - **`from twisted.internet.task import LoopingCall`** as an example ). StackOverflow Community builds on excellent Question formulation, so as to let others be able to reproduce your problem and solve it, without spending immense time to excavate and re-collect pieces of information, that were missing in the problem-formulation. – user3666197 Aug 29 '19 at 01:44
  • I'm very sorry. All imports and PUBs have been updated. The PUB is in the parent process section, because PUB and SUB have successfully communicated, so I did not put it up. The configuration of the modbus server is read from the database, and the modbus address of the tag is the address of the register automatically allocated based on whether it is added to the modbus server. – fish Aug 29 '19 at 02:31
  • Do I read you well, that both the PUB-AccessNode(s?) and the SUB-AccessNode are on the same-"side" of the Docker container horizon-of-abstraction? – user3666197 Aug 29 '19 at 03:03
  • I'm not sure if I understand what you said, but I put the docker run command and the configuration file I used.Thanks. – fish Aug 29 '19 at 03:21

1 Answers1

0

Let's start from inside out.

Q : ...this will make it impossible for me to connect to my server with the client. I don't know why?

ZeroMQ is a smart broker-less messaging / signaling middleware or better a platform for smart-messaging. In case one feels not so much familiar with the art of Zen-of-Zero as present in ZeroMQ Architecture, one may like to start with ZeroMQ Principles in less than Five Seconds before diving into further details.


The Basis :

The Scalable Formal Communication Archetype, borrowed from ZeroMQ PUB/SUB, does not come at zero-cost.

This means that each infrastructure setup ( both on PUB-side and on SUB-side ) takes some, rather remarkable time and no one can be sure of when the AccessNode cnfiguration results in RTO-state. So the SUB-side (as proposed above) ought be either a permanent entity, or the user shall not expect to make it RTO in zero-time, after a twisted.internet.task.LoopingCall() gets reinstated.

Preferred way: instantiate your (semi-)persistent zmq.Context(), get it configured so as to serve the <aContextInstance>.socket( zmq.PUB ) as needed, a minimum safeguarding setup being the <aSocketInstance>.setsockopt( zmq.LINGER, 0 ) and all transport / queuing / security-handling details, that the exosystem exposes to your code ( whitelisting and secure sizing and resources protection being the most probable candidates - but details are related to your application domain and the risks that you are willing to face being prepared to handle them ).

ZeroMQ strongly discourages from sharing ( zero-sharing ) <aContextInstance>.socket()-instances, yet the zmq.Context()-instance can be shared / re-used (ref. ZeroMQ Principles... ) / passed to more than one threads ( if needed ).

All <aSocketInstance>{.bind()|.connect()}- methods are expensive, so try to setup the infrastructure AccessPoint(s) and their due error-handling way before one tries to use the their-mediated communication services.

Each <aSocketInstance>.setsockopt( zmq.SUBSCRIBE, ... ) is expensive in that it may take ( depending on (local/remote) version ) a form of a non-local, distributed-behaviour - local side "sets" the subscription, yet the remote side has to "be informed" about such state-change and "implements" the operations in line with the actual (propagated) state. While in earlier versions, all messages were dispatched from the PUB-side and all the SUB-side(s) were flooded with such data and were left for "filtering" which will be moved into a local-side internal-Queue, the newer versions "implement" the Topic-Filter on the PUB-side, which further increases the latency of setting the new modus-operandi in action.


Next comes the modus-operandi: how <aSocketInstance>.recv() gets results:

In their default API-state, .recv()-methods are blocking, potentially infinitely blocking, if no messages arrive.

Solution: avoid blocking-forms of calling ZeroMQ <aSocket>.recv()-methods by always using the zmq.NOBLOCK-modes thereof or rather test a presence or absence of any expected-message(s) with <aSocket>.poll( zmq.POLLIN, <timeout> )-methods available, with zero or controlled-timeouts. This makes you the master, who decides about the flow of code-execution. Not doing so, you knowingly let your code depend on external sequence ( or absence ) of events and your architecture is prone to awful problems with handling infinite blocking-states ( or potential unsalvageable many-agents' distributed behaviour live-locks or dead-locks )

Avoid uncontrolled cross-breeding of event-loops - like passing ZeroMQ-driven-loops into an external "callback"-alike handler or async-decorated code-blocks, where the stack of (non-)blocking logics may wreck havoc the original idea just by throwing the system into an unresolvable state, where events miss expected sequence of events and live-locks are unsalvagable or just the first pass happen to go through.

Stacking asyncio-code with twisted-LoopingCall()-s and async/await-decorated code + ZeroMQ blocking .recv()-s is either a Piece-of-Filligrane-Precise-Art-of-Truly-a-Zen-Master, or a sure ticket to Hell - with all respect to the Art-of-Truly-Zen-Masters :o)

So, yes, complex thinking is needed -- welcome to the realms of distributed-computing!

halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92