I want to trace a node and all of it's child nodes in a tree graph from top to down, then change the color and status of all nodes, so I use pyqt
to simulate this behavior.
There are two kinds of resident threads in run time:
ChangeColorThread()
- read thecoloring_queue
and change color and status of a node.coloring_queue
contain the next node will be changed status and color.TraceGraphThread()
- read thewaiting_queue
and trace all relative nodes.waiting_queue
contain the next node will be processed.
The color of all status for nodes:
init (no color) -> waiting (color = lightyellow) -> running (color = yellow) -> finish (color = green)
I decide to create only one ChangeColorThread()
thread and max_number - 1 TraceGraphThread()
threads in memory, because I think changing color/status is faster than tracing the graph.
This program has a problem: If I slide the horizontal or vertical scrollbar arbitrarily while tracing nodes, this program maybe crash. :(
I'm not sure what the reason is, but I guess the bottleneck maybe using QGraphicsSvgItem.update() too frequently or processing xml string of the graph. Do anyone know how to solve this problem? Thanks a lot!
The source code is below:
import re
import sys
import time
import logging
import random
import networkx as nx
import xml.etree.cElementTree as et
from graphviz import Digraph
from queue import Queue, Empty
from PyQt6.QtWidgets import *
from PyQt6.QtSvgWidgets import QGraphicsSvgItem
from PyQt6.QtSvg import QSvgRenderer
from PyQt6.QtCore import *
from PyQt6.QtGui import *
class MyWindow(QWidget):
def __init__(self):
super().__init__()
self.cfg = {} # store all QGraphicsSvgItem() objects and their status (init -> waiting -> running -> finish)
self.graph = nx.DiGraph() # store the upstream/downstream of every nodes and help tracing
self.viewer = QGraphicsView(self)
self.scene = QGraphicsScene(self)
self.scene.clear()
self.viewer.setScene(self.scene)
self.viewer.resetTransform()
layout = QVBoxLayout(self)
layout.addWidget(self.viewer)
def closeEvent(self, event):
global sys_close
sys_close = True
class SvgItem(QGraphicsSvgItem):
def __init__(self, renderer, id, node):
super().__init__()
self.id = id
self.node = node
self.setSharedRenderer(renderer)
self.setElementId(id)
bounds = renderer.boundsOnElement(id)
self.setPos(bounds.topLeft())
def mousePressEvent(self, event):
if self.id.find('node') != -1:
self.trace_nodes()
def trace_nodes(self):
global window
global coloring_queue
global waiting_queue
if window.cfg[self.node, 'status'] not in ['waiting', 'running']:
coloring_queue.put((self, 'waiting'))
waiting_queue.put(self)
class ChangeColorThread(QRunnable):
def set_thread_name(self, thread_name):
self.thread_name = thread_name
def run(self):
global svg
global window
global renderer
global coloring_queue
global sys_close
while True:
time.sleep(0.1)
if sys_close == True:
break
if not coloring_queue.empty():
event = coloring_queue.get(False)
item = event[0]
status = event[1]
# color of all status:
# init (no color) -> waiting (color = lightyellow) -> running (color = yellow) -> finish (color = green)
window.cfg[item.node, 'status'] = status
if status == 'waiting':
color = 'lightyellow'
elif status == 'running':
color = 'yellow'
elif status == 'finish':
color = 'green'
# change node color by replace xml string of svg graph
if item.id.find('node') != -1:
svg = re.sub('(<title>' + item.node + '</title>\n<polygon fill=")([a-z]*)(")(.{0,3000})(status: )([a-z]*)(</text>)', '\\1' + color + '\\3\\4\\5' + status + '\\7', svg, 1, flags = re.DOTALL)
renderer.load(bytes(svg, encoding='utf-8'))
item.update()
class TraceGraphThread(QRunnable):
def set_thread_name(self, thread_name):
self.thread_name = thread_name
def run(self):
global window
global coloring_queue
global waiting_queue
global sys_close
while True:
time.sleep(3)
if sys_close == True:
break
try:
if not waiting_queue.empty():
item = waiting_queue.get(False)
node_name = item.node
# simulate running something for 1 ~ 5 seconds then finish.
coloring_queue.put((item, 'running'))
time.sleep(random.randint(1, 5))
coloring_queue.put((item, 'finish'))
# process all the child nodes
out_num = len(window.graph.out_edges(node_name))
for n in range(out_num):
out_node_name = list(window.graph.out_edges(node_name))[n][1]
coloring_queue.put((window.cfg[out_node_name, 'item'], 'waiting'))
waiting_queue.put(window.cfg[out_node_name, 'item'])
except Empty:
logging.info(f'TraceGraphThread error: {self.name} queue empty.')
self.run() # Queue empty exception may happend in the multi-threads program. If happend, rerun this thread.
def generate_svg(edges):
dot = Digraph(comment='Test SVG Graph', format='svg', node_attr={'shape': 'record'})
for i in range(len(edges)):
lable_src = '{{node_name:|' + edges[i][0] + '|type}|{note1:|....................}|{note2:|....................}|{note3:|....................}|this is a test node\lthis is a test node\lthis is a test node\l|status: init\l}'
lable_dest = '{{node_name:|' + edges[i][1] + '}|{note1:|....................}|{note2:|....................}|{note3:|....................}|this is a test node\lthis is a test node\lthis is a test node\l|status: init\l}'
dot.node(edges[i][0], lable_src)
dot.node(edges[i][1], lable_dest)
dot.edge(edges[i][0], edges[i][1])
window.graph.add_edges_from([edges[i]])
return dot.pipe(encoding='utf-8')
logging.basicConfig(format="%(message)s", level=logging.INFO)
if __name__ == '__main__':
waiting_queue = Queue() # contain the next node will be processed
coloring_queue = Queue() # contain the next node will be changed status and color
sys_close = False # it become True when colse the main window, then all threads will be interupted
app = QApplication(sys.argv)
window = MyWindow()
# edges tuple contains two columns: source node, destination node.
edges = [('TEST_NODE_001', 'TEST_NODE_002'), ('TEST_NODE_002', 'TEST_NODE_A'),
('TEST_NODE_002', 'TEST_NODE_B'), ('TEST_NODE_002', 'TEST_NODE_C'),
('TEST_NODE_002', 'TEST_NODE_D'), ('TEST_NODE_002', 'TEST_NODE_E'),
('TEST_NODE_002', 'TEST_NODE_F'), ('TEST_NODE_002', 'TEST_NODE_G'),
('TEST_NODE_002', 'TEST_NODE_H'), ('TEST_NODE_002', 'TEST_NODE_I'),
('TEST_NODE_002', 'TEST_NODE_J'), ('TEST_NODE_A', 'TEST_NODE_A01'),
('TEST_NODE_B', 'TEST_NODE_B01'), ('TEST_NODE_B01', 'TEST_NODE_B02'),
('TEST_NODE_C', 'TEST_NODE_C01'), ('TEST_NODE_C01', 'TEST_NODE_C02'),
('TEST_NODE_C02', 'TEST_NODE_C03'), ('TEST_NODE_D', 'TEST_NODE_D01'),
('TEST_NODE_D01', 'TEST_NODE_D02'), ('TEST_NODE_D02', 'TEST_NODE_D03'),
('TEST_NODE_D03', 'TEST_NODE_D04'), ('TEST_NODE_D04', 'TEST_NODE_D05'),
('TEST_NODE_D05', 'TEST_NODE_D06'), ('TEST_NODE_D06', 'TEST_NODE_D07'),
('TEST_NODE_D07', 'TEST_NODE_D08'), ('TEST_NODE_D08', 'TEST_NODE_D09')]
svg = generate_svg(edges)
#print(f'svg: {svg}')
renderer = QSvgRenderer()
renderer.load(bytes(svg, encoding='utf-8'))
# get all id and node name from svg graph and draw items on screen
root = et.fromstring(svg)
for g in root.findall('.//{*}g'):
id = g.get('id')
node = g.find('.//{*}title').text
if id != 'graph0':
window.item = SvgItem(renderer, id, node)
window.scene.addItem(window.item)
window.cfg[node, 'item'] = window.item
window.cfg[node, 'status'] = 'init'
# create one ChangeColorThread() and max-1 TraceGraphThread() in memory
pool = QThreadPool.globalInstance()
max_thread = QThreadPool.globalInstance().maxThreadCount()
run = ChangeColorThread()
run.set_thread_name(f'Thread_0')
pool.start(run)
for i in range(max_thread-1):
run = TraceGraphThread()
run.set_thread_name(f'Thread_{i+1}')
pool.start(run)
window.show()
window.viewer.horizontalScrollBar().setSliderPosition(0) # let the horizontal scrollbar at the left
window.viewer.verticalScrollBar().setSliderPosition(-int(window.scene.height())) # let the vertical scrollbar at the top
sys.exit(app.exec())