I am currently trying to program a robot arm which is controlled by a Raspberry Pi.
Everything works fine so far, except for one thing and I already googled and tried everything for many hours but can't find a working solution.
For the movement of the robot arm it is necessary to run all motors "simultaneously" with threads (works fine).
The problem I have is that I need to update a label which shows the current angle of an axis (motor) as soon as it finished its movement but other motors are still running (threads).
After a lot of research I thought I found the solution by using a queue and Tkinters after-method. But it still doesn't work as the labels text only gets updated after all threads terminated.
I wrote an example code where I want to get a label update for motor "one" which will finish its for-loop (100 iterations) before motor "two" (500 iterations). I expected the label to get updated as soon as motor one reached its target while motor two is still runing.
But although I used the after-method it still waits till motor two finished before updating the label.
Hope you can help me!
from tkinter import *
import threading
import time
from queue import *
class StepperMotors:
def __init__(self, root):
self.root = root
self.start_btn = Button(root, text="Start", command=lambda:self.start_movement())
self.start_btn.config(width = 10)
self.start_btn.grid(row=1,column=1)
self.label_one = Label(root, text='')
self.label_one.config(width = 10)
self.label_one.grid(row=2, column=1)
self.label_two = Label(root, text='')
self.label_two.config(width = 10)
self.label_two.grid(row=3, column=1)
def start_movement(self):
self.thread_queue = Queue()
self.root.after(100, self.wait_for_finish)
thread_one = threading.Thread(target=self.motor_actuation, args=(1,100))
thread_two = threading.Thread(target=self.motor_actuation, args=(2,500))
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()
def motor_actuation(self, motor, iterations):
for i in range(iterations):
i = i+1
update_text = str(motor) + " " + str(i) + "\n"
print(update_text)
time.sleep(0.01)
self.thread_queue.put(update_text)
def wait_for_finish(self):
try:
self.text = self.thread_queue.get()
self.label_one.config(text=self.text)
except self.thread_queue.empty():
self.root.after(100, self.wait_for_finish)
if __name__ == "__main__":
root = Tk()
root.title("test")
stepper = StepperMotors(root)
root.mainloop()