What:
I am creating a custom console for an application I am making that has to handle processes (I/O) and its UI.
Why:
I am using Tkinter as the UI library. This needs an active main loop in order not to hang. When the Popen process waits for an input it freezes the process object which freezes the main loop in the application.
I was wondering if there was a way to prevent the Popen object from freezing or do I need to rethink how I'm running the thread?
Question Applicable Functions:
def _loop(self):
while self._run:
if not self.main_queue_in.empty():
self._parse_queue_command(self.main_queue_in.get())
if self._cur_open_tag:
if self._cur_process.poll() is not None and not str(self._cur_process.returncode).__contains__("stide-returncode-used"):
self._write("\nProcess finished with exit code %s\n" % str(self._cur_process.returncode))
self._cur_process.returncode = "<stide-returncode-used-{%s}>" % str(self._cur_process.returncode)
self._top_bar_cv.tag_bind(self._cur_open_tag + "_close", "<Button-1>", lambda e: self._cterm_redraw())
self._close_terminal([i for i in self._registered_runtimes if i[1] == self._cur_process][0], redraw_tabs=False)
else:
if not self._cur_process.stdout.closed:
out = self._cur_process.stdout.read()
if out != b'':
self._write(out.decode("utf-8"))
else:
self._cur_process.returncode = self._cur_process.returncode if self._cur_process.returncode else -1
self.cv.update()
def _stdin_input(self):
inp = self._stdin_entry.get()
if inp in [None, ""]:
return
if not self._cur_process:
self._main_text.configure(state="normal")
self._main_text.insert(END, "\n[ERR] No Process Selected...")
self._main_text.see(END)
self._main_text.configure(state="disabled")
self._stdin_entry.delete(0, END)
return
# out, errs = self._cur_process.communicate(input=b'%b' % bytes(inp, encoding='utf8'))
self._cur_process.stdin.write(inp)
self._write("<stideinput>%s</stideinput>\n" % inp)
# if out:
# self._write(out)
# if errs:
# self._write(errs)
self._stdin_entry.delete(0, END)
Code Breakdown:
The _loop function contains an infinite loop that runs for the duration of the time the application is live. It contains checks like if the queue is not empty, read an execute the command; if there is a terminal that is currently open, check if it is finished then display the exit code if so; and check if there is any new output and display to the screen if so.
The _stdin_input function takes the input from an Entry object and runs some validity checks like if there is an active process and if the input is null. The input is then written to the standard in the pipe and the input is displayed on the screen.
Entire Terminal Object:
class StringsTerminal:
def __init__(self, queue_in, queue_out):
self.main_queue_in = queue_in
self.main_queue_out = queue_out
self._registered_runtimes = REG_RUNTIMES # [[script_path, process, id]]
for i in list(self._registered_runtimes):
if i[1].returncode is not None:
REG_RUNTIMES.remove(i)
self._registered_runtimes.remove(i)
self._cur_open_tag = None
self._cur_process = None
self._run = True
self.WINDOW_WIDTH = 807
self.WINDOW_HEIGHT = 453
self.root = Tk()
self.root.title("Project Terminal(s)")
self.root.resizable(False, False)
self.root.geometry(f"{self.WINDOW_WIDTH}x{self.WINDOW_HEIGHT}")
self.cv = Canvas(self.root, width=int(self.WINDOW_WIDTH), height=int(self.WINDOW_HEIGHT), background="#AAAAAA")
self.cv.pack(side='top', fill='both', expand=1)
self.root.focus()
self._draw_top_bar()
self._draw_main_win()
self._draw_stdin_section()
self.root.after(1, lambda: self.root.focus_force())
self._loop()
self.root.protocol("WM_DELETE_WINDOW", self._close)
self.root.mainloop()
def _close(self):
for i in self._registered_runtimes:
self._close_terminal(i, send=False)
# self.main_queue_out.put("destroy")
self._run = False
self.root.destroy()
gc.collect()
# Close Thread...
@cache
def _limit_length(self, text, max_length):
return text[:max_length] + "..." if len(text) > max_length else text
def check(self):
print("DEBUG: Terminal UI queue is currently operational.")
def _parse_queue_command(self, command):
reg = {"run_command": self.run_command, "check": self.check}
if len(command.split("~")) > 1:
return reg[command.split("~")[0]](command.split("~")[1])
else:
return reg[command.split("~")[0]]()
def run_command(self, command):
match command:
case "destroy":
self._close()
case "register":
obj = self.main_queue_in.get()
self._registered_runtimes.append(obj)
self._open_terminal(obj)
self._draw_tabs()
case "stop":
obj = self.main_queue_in.get()
self._close_terminal(obj, send=False)
case _:
raise Exception("Invalid command: " + command)
def _loop(self):
while self._run:
if not self.main_queue_in.empty():
self._parse_queue_command(self.main_queue_in.get())
if self._cur_open_tag:
if self._cur_process.poll() is not None and not str(self._cur_process.returncode).__contains__("stide-returncode-used"):
self._write("\nProcess finished with exit code %s\n" % str(self._cur_process.returncode))
self._cur_process.returncode = "<stide-returncode-used-{%s}>" % str(self._cur_process.returncode)
self._top_bar_cv.tag_bind(self._cur_open_tag + "_close", "<Button-1>", lambda e: self._cterm_redraw())
self._close_terminal([i for i in self._registered_runtimes if i[1] == self._cur_process][0], redraw_tabs=False)
else:
if not self._cur_process.stdout.closed:
out = self._cur_process.stdout.read()
if out != b'':
self._write(out.decode("utf-8"))
else:
self._cur_process.returncode = self._cur_process.returncode if self._cur_process.returncode else -1
self.cv.update()
def _draw_main_win(self):
self._main_text = Text(self.root, width=98, height=22, state="normal", bd=0, bg="black", fg="white")
self._main_text.insert("1.0", "No Terminal Selected" if len(self._registered_runtimes) else "No Terminals Available")
self._main_text.see(END)
self._main_text.configure(state="disabled")
self._vsb = Scrollbar(self.root, orient="vertical", command=self._main_text.yview)
self._main_text.configure(yscrollcommand=self._vsb.set)
self.cv.create_window(2, 54, window=self._main_text, anchor=NW, tag="main_text")
self._vsb.place(in_=self._main_text, relx=1.0, relheight=1.0, bordermode="outside")
self._main_text.tag_configure("green", foreground="green")
def _open_terminal(self, item, *_):
print("Open Terminal", item)
if self._cur_open_tag == "%s(%s)" % (os.path.basename(item[0]), item[2]):
return
self._main_text.configure(state="normal")
self._main_text.delete("1.0", END)
self._main_text.configure(state="disabled")
self._cur_open_tag = "%s(%s)" % (os.path.basename(item[0]), item[2])
if self._cur_open_tag in list(OUT_HOLD.keys()):
self._main_text.configure(state="normal")
self._main_text.insert("1.0", OUT_HOLD[self._cur_open_tag])
self._colour_text()
self._main_text.configure(state="disabled")
self._cur_process = item[1]
self._draw_tabs()
def _colour_text(self):
self._main_text.configure(state="normal")
count = IntVar(self.root)
index = self._main_text.search("<stideinput>.*<\/stideinput>", "1.0", END, count=count, regexp=True)
self._main_text.mark_set("searchLimit", END)
while count.get() > 0:
if index == "" or count.get() == 0:
return
self._main_text.mark_set("matchStart", index)
self._main_text.mark_set("matchEnd", "%s+%sc" % (index, count.get()))
text = self._main_text.get("matchStart", "matchEnd").replace("<stideinput>", "").replace("</stideinput>", "")
index = self._main_text.search("<stideinput>", "matchStart", "matchEnd", count=count, regexp=True)
self._main_text.delete(index, "%s+%sc" % (index, count.get()))
index = self._main_text.search("<\/stideinput>", "matchStart", "matchEnd", count=count, regexp=True)
self._main_text.delete(index, "%s+%sc" % (index, count.get()))
index = self._main_text.search(text, "matchStart", "matchEnd", count=count, regexp=False)
self._main_text.mark_set("matchStart", index)
self._main_text.mark_set("matchEnd", "%s+%sc" % (index, count.get()))
self._main_text.tag_add("green", "matchStart", "matchEnd")
index = self._main_text.search("<stideinput>.*<\/stideinput>", "matchEnd", "searchLimit", count=count, regexp=True)
self._main_text.configure(state="disabled")
def _cterm_redraw(self):
self._main_text.configure(state="normal")
self._main_text.delete("1.0", END)
self._main_text.insert("1.0", "No Terminal Selected" if len(self._registered_runtimes) else "No Terminals Available")
self._main_text.configure(state="disabled")
self._draw_tabs()
def _close_terminal(self, item, *_, send=True, redraw_tabs=True):
print("Close Terminal", item)
if send:
self.main_queue_out.put("close")
self.main_queue_out.put((item[0], item[2]))
if self._cur_open_tag == "%s(%s)" % (os.path.basename(item[0]), item[2]):
self._cur_open_tag = None
self._cur_process = None
self._registered_runtimes.remove(item)
if redraw_tabs:
self._cterm_redraw()
def _draw_tabs(self):
self._top_bar_cv.delete("tabs")
x_step = 150
for c, i in enumerate(self._registered_runtimes):
cur_tag = "%s(%s)" % (os.path.basename(i[0]), i[2])
colour = "#9A9A9A" if self._cur_open_tag == cur_tag else "#7A7A7A"
Utils.round_rectangle(self._top_bar_cv, c * x_step + 5, 5, c * x_step + 150, 35, radius=5, fill=colour, outline="#8A8A8A", tags=(cur_tag, "tabs"))
self._top_bar_cv.create_text(c * x_step + 10, 11, text=self._limit_length(os.path.basename(i[0] + " {%s}" % str(i[2])), 15), font=("Artifakt Element", 9, "bold"), fill="#CACACA", anchor=NW, tag=(cur_tag, "tabs"))
self._top_bar_cv.tag_bind(cur_tag, "<Button-1>", lambda e, x=list(i): self._open_terminal(x, e))
Utils.round_rectangle(self._top_bar_cv, c * x_step + 125, 10, c * x_step + 145, 30, radius=5, fill=colour, outline="#8A8A8A", tag=(cur_tag + "_close", "tabs"))
self._top_bar_cv.create_text(c * x_step + 130, 10, text="X", font=("Artifakt Element", 12, "bold"), fill="#CACACA", anchor=NW, tag=(cur_tag + "_close", "tabs"))
self._top_bar_cv.tag_bind(cur_tag + "_close", "<Button-1>", lambda e, x=list(i): self._close_terminal(x, e))
self._top_bar_cv.configure(scrollregion=self._top_bar_cv.bbox("all"))
def _draw_top_bar(self):
self._top_bar_cv = Canvas(self.root, width=self.WINDOW_WIDTH - 4, height=35, background="#7A7A7A", bd=0, highlightthickness=0, relief='ridge')
self.cv.create_window(2, 2, window=self._top_bar_cv, anchor=NW)
self._hsb_tb = Scrollbar(self.root, orient="horizontal", command=self._top_bar_cv.xview)
self._top_bar_cv.configure(xscrollcommand=self._hsb_tb.set)
self._hsb_tb.place(in_=self._top_bar_cv, rely=1.0, relwidth=1.0, bordermode="outside")
self._draw_tabs()
def _draw_stdin_section(self):
self._stdin_entry = Utils.PlaceholderEntry(self.root, placeholder="Terminal Input", bg="#7A7A7A", cursor="xterm", fg="white", width=120, relief=FLAT, bd=5)
self.cv.create_window(10, 417, window=self._stdin_entry, anchor=NW)
go_button = Button(self.root, text=">>", command=self._stdin_input, background="#7A7A7A", cursor="hand2", foreground="white", activeforeground="#7A7A7A", width=5, relief=FLAT)
self.cv.create_window(750, 417, window=go_button, anchor=NW)
def _stdin_input(self):
inp = self._stdin_entry.get()
if inp in [None, ""]:
return
if not self._cur_process:
self._main_text.configure(state="normal")
self._main_text.insert(END, "\n[ERR] No Process Selected...")
self._main_text.see(END)
self._main_text.configure(state="disabled")
self._stdin_entry.delete(0, END)
return
# out, errs = self._cur_process.communicate(input=b'%b' % bytes(inp, encoding='utf8'))
self._cur_process.stdin.write(inp)
self._write("<stideinput>%s</stideinput>\n" % inp)
# if out:
# self._write(out)
# if errs:
# self._write(errs)
self._stdin_entry.delete(0, END)
def _write(self, out):
self._main_text.configure(state="normal")
self._main_text.insert(END, str(out))
self._main_text.see(END)
self._main_text.configure(state="disabled")
OUT_HOLD[self._cur_open_tag] = self._main_text.get("1.0", END)
self._colour_text()
It is incredibly difficult to create a reproducible example of this because it extends off the main application runtime. To help people run this on their computers I have created a Github repository of the project where people can download and run the whole code. By clicking the Terminal button, you can open the terminal screen in question. The applicable code can be found in the src/UI/TerminalUI.py file. https://github.com/ItzTheDodo/Showcase-StringsIDE