I have a script that opens a subprocess, this process awaits multiple inputs.
I tried with subprocess.Popen() but when combining the 3 standards streams it gets stuck in deadlocks...
So I tired to use wexpect (Apparently Windows version of pexecpt) it also didn't worked.
Now I'm using Sarge and my code right now looks like this:
import os
import subprocess
import sarge
import sys
def get_env():
env = os.environ.copy()
return env
def interactive(list_command: list, instruction_dict: dict):
env = get_env()
std_out = ""
for a_number in instruction_dict:
capture_stdout = sarge.Capture(buffer_size=-1)
capture_stderr = sarge.Capture(buffer_size=-1)
child_proc = sarge.Command(list_command, stdout=capture_stdout, stderr=capture_stderr,
shell=True, env=env)
child_proc.run(input=subprocess.PIPE,
async_=True)
print(capture_stdout.readlines(timeout=1.0), capture_stderr.readlines(timeout=1.0))
if instruction_dict[a_number][0] is not None:
#if capture_stdout.expect(instruction_dict[a_number][0]) is not None or capture_stderr.expect(instruction_dict[a_number][0]) is not None:
# self.test_manager.log_event(msg="found line : " + instruction_dict[a_number][0] + "in PIPE", current_event_type=LogLevel.DEBUG, screen_only=False)
print("in exec line : ", child_proc.stdout.readline(timeout=1.0), child_proc.stderr.readlines(timeout=1.0))
else:
print("in exec line : ", child_proc.stdout.readlines(timeout=1.0), child_proc.stderr.readlines(timeout=1.0))
if instruction_dict[a_number][1] is not None:
print(f"sending \"{instruction_dict[a_number][1]}\" to process in interactive")
temp_bytes = str.encode((instruction_dict[a_number][1]+"\n"))
child_proc.stdin.write(temp_bytes)
try:
child_proc.stdin.flush() # blind flush
except:
pass
print("Last line : ", child_proc.stdout.readlines(timeout=1.0), child_proc.stderr.readlines(timeout=1.0))
child_proc.wait()
std_out += child_proc.stdout.read().decode('utf-8')
child_proc.kill() # kill the subprocess
print("output: ", std_out)
interactive(list_command=["process.exe", "-delete_datamodel", "MODELNAME"],instruction_dict={1 : (None, "y")})
Output looks like this:
[] []
in exec line : [] []
send "y" to process in interactive
Last line : [] [b'User root connected to DB MODELNAME\r\n']
output: Are you sure to delete DB MODELNAME? (y/n)\n\nDelete Data Model Ok\n
In console it looks like this:
C:\>process.exe -delete_data_model -db_name MODELNAME
Are you sure to delete DB MODELNAME ? (y/n)
y
User root connected to DB MODELNAME
Delete Data Model Ok
There are multiple problems:
I can't catch the first stdout "Are you sure to delete DB MODELNAME ? (y/n)" during the loop I only get [].
capture_stdout.expect crash but in fact it might be the same problem as the first one, if it can't read the first stdout.
lines are going in stderr instead of stdout (it might be because of the process, I don't know how to test this)
for another command I ll need to answer more than one interactive by looking of the question before answering them that way my instruction_dict will look like this
instruction_dict = {1 : ("Are you sure to recover the DB", "y"),
2 : ("Do you want to stop the service", "y"),
3 : ("Do you want to stop the DB", "y"),
4 : ("Do you want to drop and create the DB", "y")}
Is this even possible? I have search for a working exemple and didn't found one, I know I might be bad at searching but...
Sorry for the terrible syntax(and bad english), I want to know if there is a pure python solution (I have one solution working in tcl but I'm trying to get rid of it/ I didn't wrote the tcl code).