Recently I had to do I/O reads on stdin,stdout in python with cross-platform compatibility.
For linux:
for linux we can use select module. It is wrapper implementation of posix select
function. It allows you to pass multiple file descriptors, wait for them to get ready. Once they get ready you will be notified and read/write
action can be performed. Here is little code that will get you an idea
here nodejs is a docker environment with nodejs image
stdin_buf = BytesIO(json.dumps(fn) + "\n")
stdout_buf = BytesIO()
stderr_buf = BytesIO()
rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
wselect = [nodejs.stdin] # type: List[BytesIO]
while (len(wselect) + len(rselect)) > 0:
rready, wready, _ = select.select(rselect, wselect, [])
try:
if nodejs.stdin in wready:
b = stdin_buf.read(select.PIPE_BUF)
if b:
os.write(nodejs.stdin.fileno(), b)
else:
wselect = []
for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
if pipes[0] in rready:
b = os.read(pipes[0].fileno(), select.PIPE_BUF)
if b:
pipes[1].write(b)
else:
rselect.remove(pipes[0])
if stdout_buf.getvalue().endswith("\n"):
rselect = []
except OSError as e:
break
For windows
This code example has both read and write operations with stdin,stdout involved.
Now this code wouldn't work with windows OS because on windows select implelentation does not allow stdin, stdout to be passed as arguments.
Docs says:
File objects on Windows are not acceptable, but sockets are. On Windows, the underlying select() function is provided by the WinSock library, and does not handle file descriptors that don’t originate from WinSock.
First I must mention that there are lot of libraries for non-blocking I/O read on windows like asyncio
(python 3), gevent
(for python 2.7) , msvcrt
and then there is pywin32
's win32event
that alert you if your socket is ready to read/write
data. But none of them allowed me to read write on stdin/stdout
giving errros like
An operation is performend on something that is not a socket
Handles only expect integer values
etc.
There are some other libraries like twister
that I haven't tried.
Now, To implement a functionality like in above mentioned code on windows platform I used threads
. here is my code:
stdin_buf = BytesIO(json.dumps(fn) + "\n")
stdout_buf = BytesIO()
stderr_buf = BytesIO()
rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
wselect = [nodejs.stdin] # type: List[BytesIO]
READ_BYTES_SIZE = 512
# creating queue for reading from a thread to queue
input_queue = Queue.Queue()
output_queue = Queue.Queue()
error_queue = Queue.Queue()
# To tell threads that output has ended and threads can safely exit
no_more_output = threading.Lock()
no_more_output.acquire()
no_more_error = threading.Lock()
no_more_error.acquire()
# put constructed command to input queue which then will be passed to nodejs's stdin
def put_input(input_queue):
while True:
sys.stdout.flush()
b = stdin_buf.read(READ_BYTES_SIZE)
if b:
input_queue.put(b)
else:
break
# get the output from nodejs's stdout and continue till otuput ends
def get_output(output_queue):
while not no_more_output.acquire(False):
b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
if b:
output_queue.put(b)
# get the output from nodejs's stderr and continue till error output ends
def get_error(error_queue):
while not no_more_error.acquire(False):
b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
if b:
error_queue.put(b)
# Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
input_thread = threading.Thread(target=put_input, args=(input_queue,))
input_thread.start()
output_thread = threading.Thread(target=get_output, args=(output_queue,))
output_thread.start()
error_thread = threading.Thread(target=get_error, args=(error_queue,))
error_thread.start()
# mark if output/error is ready
output_ready=False
error_ready=False
while (len(wselect) + len(rselect)) > 0:
try:
if nodejs.stdin in wselect:
if not input_queue.empty():
os.write(nodejs.stdin.fileno(), input_queue.get())
elif not input_thread.is_alive():
wselect = []
if nodejs.stdout in rselect:
if not output_queue.empty():
output_ready = True
stdout_buf.write(output_queue.get())
elif output_ready:
rselect = []
no_more_output.release()
no_more_error.release()
output_thread.join()
if nodejs.stderr in rselect:
if not error_queue.empty():
error_ready = True
stderr_buf.write(error_queue.get())
elif error_ready:
rselect = []
no_more_output.release()
no_more_error.release()
output_thread.join()
error_thread.join()
if stdout_buf.getvalue().endswith("\n"):
rselect = []
no_more_output.release()
no_more_error.release()
output_thread.join()
except OSError as e:
break
So the best option for me turned out to be threads. This article would be nice read if you want to know more.