I have an interactive console program in windows. I need to press keystroke like 'e' 'c' to the cmd window. It's convenient for human operating , but very difficult for program automation.
And now, I would like to wrap the console program, in order to make it more easy to be manipulated in other program like python.
However, the console program uses 'getch()' to get keyboard input , which is not standard input. So, I can't simply send key into stdin.
Did anyone ever come across this problem? Thanks.
class ThreadWorker(threading.Thread):
def __init__(self, callable, *args, **kwargs):
super(ThreadWorker, self).__init__()
self.callable = callable
self.args = args
self.kwargs = kwargs
self.setDaemon(True)
def run(self):
try:
self.callable(*self.args, **self.kwargs)
except Exception, e:
print e
def start(self):
global running
running = True
threading.Thread.start(self)
def console_presskey(char):
proc.stdin.write(char)
proc.stdin.flush()
def start():
def worker(pipe):
while running:
line = pipe.readline()
if line == '':
break
else:
print line,
proc = Popen("something.exe",stdout=PIPE,stdin=PIPE)
stdout_worker = ThreadWorker(worker, proc.stdout)
stderr_worker = ThreadWorker(worker, proc.stderr)
stdout_worker.start()
stderr_worker.start()
if __name__ == "__main__":
start()
sleep(2)
console_presskey('e')
sleep(2)
console_presskey('c')
EDIT:
Finally, I use win32 SendMessage function to get things done. I forked a new subprocess, then hide it , get its hWnd and pid.
here is the code:
import threading
import re
import os
from subprocess import Popen, PIPE,STDOUT,CREATE_NEW_CONSOLE,STARTUPINFO,STARTF_USESHOWWINDOW,SW_HIDE
from time import sleep
import win32process,win32con,win32gui,win32api
TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
con_stdin = ''
con_stdout = ''
stdout_worker = ''
stdin_woker = ''
running = False
proc = ''
Console_hwnd = ''
#status = 'NotStarted' # or Running
class ThreadWorker(threading.Thread):
def __init__(self, callable, *args, **kwargs):
super(ThreadWorker, self).__init__()
self.callable = callable
self.args = args
self.kwargs = kwargs
self.setDaemon(True)
def run(self):
try:
self.callable(*self.args, **self.kwargs)
except Exception, e:
print e
def start(self):
global running
running = True
threading.Thread.start(self)
def get_hwnds_for_pid (pid):
def callback (hwnd, hwnds):
#if win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd):
_, found_pid = win32process.GetWindowThreadProcessId(hwnd)
#print hwnd
if found_pid == pid:
hwnds.append(hwnd)
return True
hwnds = []
win32gui.EnumWindows(callback, hwnds)
#if hwnds == []:
#raise
return hwnds
def sendkey(char):
global Console_hwnd
hwnd = Console_hwnd[0]
code = ord(char)
win32api.SendMessage(hwnd, win32con.WM_CHAR, code, 0)
print '[*]',char,'Pressed.'
#Another Keypress example. only works with keycode
#win32api.PostMessage(hwnd, win32con.WM_KEYDOWN, win32con.VK_F9, 0)
#print char,code,"down"
#win32api.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
#print char,code,"up"
def sendesc():
global Console_hwnd
hwnd = Console_hwnd[0]
win32api.PostMessage(hwnd, win32con.WM_KEYDOWN, win32con.VK_ESCAPE, 0)
print '[*]',"Esc down"
win32api.PostMessage(hwnd, win32con.WM_KEYUP, win32con.VK_ESCAPE, 0)
print '[*]',"Esc up"
def start():
def worker(pipe):
global TargetPower
global Mode
global Channel
global running
while running:
line = pipe.readline()
if line == '':
break
elif line.startswith('|') or line.startswith('==='):
pass
elif line.startswith("Operating in"):
info = line
for i in range(7):
info += pipe.readline()
#print 'ART> '+info,
try:
TargetPower = eval(re.search(r'(Output|Target) (power|Power) = .{4}',info).group(0).split('=')[1])
#27.0
Mode = re.search(r'gOffRate = (.+?),',info).group(1).lstrip().rstrip()
#6MBps
Channel = re.search(r'channel ([\d.]+GHz),',info).group(1)
#2.412GHz
except Exception,e:
TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
print e
elif line =='\r\n':
print '>',
else:
print 'ART>'+line,
print 'worker done.'
global proc
si = STARTUPINFO()
si.dwFlags |= STARTF_USESHOWWINDOW
si.wShowWindow = SW_HIDE
#proc = Popen("art.bat",stdout=PIPE,creationflags=CREATE_NEW_CONSOLE) #qt works!
proc = Popen("art.bat",stdout=PIPE,creationflags=CREATE_NEW_CONSOLE,startupinfo=si)
#proc = Popen("art.bat",stdout=PIPE,startupinfo=si) #hidden
#proc = Popen("cmd") #for test
sleep(2)
print '[*] pid: ',proc.pid
global Console_hwnd
Console_hwnd = get_hwnds_for_pid(proc.pid)
print '[*] hwnd:',Console_hwnd[0]
global stdout_worker
global stderr_worker
stdout_worker = ThreadWorker(worker, proc.stdout)
stderr_worker = ThreadWorker(worker, proc.stderr)
stdout_worker.start()
stderr_worker.start()
def stop():
global stdout_worker
global stderr_worker
global running
print 'stop'
global proc
sendesc()
sendesc()
sendesc()
Popen("taskkill /F /T /PID %i"%proc.pid , shell=True)
try:
running = False
TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
except Exception,e:
print e
if __name__ == "__main__":
start()
sleep(1)
sendkey('e')
sleep(1)
sendkey('c')
sleep(10)
stop()
while True:
pass