15

This question - How to read from an os.pipe() without getting blocked? - shows a solution how to check if os.pipe has any data for Linux, and for this you need to put the pipe into non-blocking mode:

import os, fcntl
fcntl.fcntl(thePipe, fcntl.F_SETFL, os.O_NONBLOCK)

On Windows we have this:

ImportError: No module named fcntl

But os.pipe is there:

>>> os.pipe()
(3, 4)

So, is it possible to do non-blocking read or peek the contents of os.pipe on Windows?

anatoly techtonik
  • 19,847
  • 9
  • 124
  • 140
  • 1
    See also http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python (not a duplicate but there is some overlap). – ideasman42 Jan 28 '16 at 04:50

3 Answers3

21

Answering my own question after digging for some time through StackOverflow.

UPDATE: Things changes thanks to @HarryJohnston.

At first the answer was no, it is not possible to do non-blocking read on os.pipe on Windows. From this answer I've got that:

The term for non-blocking / asynchronous I/O in Windows is 'overlapped' - that's what you should be looking at.

os.pipe on Windows is implemented through CreatePipe API (see here and ... well, I couldn't find os.pipe code in Python sources). CreatePipe makes anonymous pipes, and anonymous pipes do not support asynchronous I/O.

But then @HarryJohnston commented that SetNamedPipeHandleState doc allows to put anonymous pipe to non-blocking mode. I wrote the test and it failed with OSError: [Errno 22] Invalid argument. The error message seemed wrong, so I tried to check what should be return result on non-blocking read operation when data is not available, and after reading MSDN note on named pipe modes I found that it should be ERROR_NO_DATA that has a int value 232. Adding ctypes.WinError() call to exception handler revealed the expected [Error 232] The pipe is being closed.

So, the answer is yes, it is possible to do non-blocking read on os.pipe on Windows, and here is the proof:

import msvcrt
import os

from ctypes import windll, byref, wintypes, GetLastError, WinError
from ctypes.wintypes import HANDLE, DWORD, POINTER, BOOL

LPDWORD = POINTER(DWORD)

PIPE_NOWAIT = wintypes.DWORD(0x00000001)

ERROR_NO_DATA = 232

def pipe_no_wait(pipefd):
  """ pipefd is a integer as returned by os.pipe """

  SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
  SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
  SetNamedPipeHandleState.restype = BOOL

  h = msvcrt.get_osfhandle(pipefd)

  res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
  if res == 0:
      print(WinError())
      return False
  return True


if __name__  == '__main__':
  # CreatePipe
  r, w = os.pipe()

  pipe_no_wait(r)

  print os.write(w, 'xxx')
  print os.read(r, 1024)
  try:
    print os.write(w, 'yyy')
    print os.read(r, 1024)
    print os.read(r, 1024)
  except OSError as e:
    print dir(e), e.errno, GetLastError()
    print(WinError())
    if GetLastError() != ERROR_NO_DATA:
        raise
Community
  • 1
  • 1
anatoly techtonik
  • 19,847
  • 9
  • 124
  • 140
  • Pipes (including anonymous pipes) do support nonblocking I/O for backwards compatibility with LAN manager. See [SetNamedPipeHandleState](https://msdn.microsoft.com/en-us/library/windows/desktop/aa365787(v=vs.85).aspx). But I have no idea whether Python supports it. – Harry Johnston Dec 29 '15 at 05:34
  • @HarryJohnston interesting. Need some proof code to test if it really works. – anatoly techtonik Dec 29 '15 at 05:52
  • @HarryJohnston it worked. =) I also thought that I can apply it to fix this issue https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python but got stuck. Maybe later. – anatoly techtonik Dec 29 '15 at 10:16
  • You should use `WinDLL` instead of `windll`. The global loaders cache libraries which cache function pointers. This was a bad idea that causes problems when different packages use the same functions with their own prototypes (i.e. `restype`, `argtypes`, `errcheck`). – Eryk Sun Dec 30 '15 at 00:39
  • Using `WinDLL` also lets you set `use_last_error=True` to protect the thread's `LastError` value. In a high-level environment like Python a lot of low-level code can execute between statements, so the only way to reliably access the `LastError` value is to capture it in a thread-local variable. Get and set this value using `get_last_error` and `set_last_error`. – Eryk Sun Dec 30 '15 at 00:44
  • The code to raise an exception changes to `raise ctypes.WinError(ctypes.get_last_error())`. Do this in an [`errcheck` function](https://docs.python.org/3/library/ctypes.html#ctypes._FuncPtr.errcheck) set as `SetNamedPipeHandleState.errcheck`. You can assign a single error checking function to all WinAPI functions that return a false value to indicate an error (e.g. 0, `FALSE`, `NULL`). ctypes calls this for you automatically, so you don't have to repeatedly check for errors in your code. – Eryk Sun Dec 30 '15 at 00:49
  • @eryksun is performance overhead when function pointer is not cached Is negligible? – anatoly techtonik Dec 30 '15 at 03:47
  • 1
    The function pointer is still cached on your module's `WinDLL` instance. The problem with `ctypes.windll` is that it caches the `WinDLL` instance itself, i.e. under the hood `ctypes.windll.kernel32` caches a single instance of `WinDLL('kernel32')`. Function pointers set on that `WinDLL` instance are then shared by every module, which has the potential for inconsistent prototypes (e.g. maybe you use `POINTER(c_byte)` and someone else uses `POINTER(c_char)`, or adds their own custom `errcheck` function, etc). – Eryk Sun Dec 30 '15 at 03:54
  • 2
    note: `multiprocessing.connection.Pipe` may also use `SetNamedPipeHandleState`. It might be useful to look at how it is implemented. – jfs Jan 29 '16 at 12:50
  • 1
    I don't think `POINTER` is a member of `ctypes.wintypes`. It's part of `ctypes`. – cs01 Apr 10 '17 at 16:37
2

Not sure but probably this answer from @jfs can also be reused and is quite elegant.

1

This answer is basically @anatolytechtonik's answer but with classes.

import msvcrt
import os

# No idea what is going on here but if it works, it works.
from ctypes import windll, byref, wintypes, GetLastError, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL

# ???
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
ERROR_NO_DATA = 232


class AdvancedFD:
    """
    A wrapper for a file descriptor so that we can call:
        `<AdvancedFD>.read(number_of_bytes)` and
        `<AdvancedFD>.write(data_as_bytes)`

    It also makes the `read_fd` non blocking. When reading from a non-blocking
    pipe with no data it returns b"".

    Methods:
        write(data: bytes) -> None
        read(number_of_bytes: int) -> bytes
        rawfd() -> int
        close() -> None
    """
    def __init__(self, fd: int):
        self.fd = fd
        self.closed = False

    def __del__(self) -> None:
        """
        When this object is garbage collected close the fd
        """
        self.close()

    def close(self) -> None:
        """
        Closes the file descriptor.
        Note: it cannot be reopened and might raise an error if it is
        being used. You don't have to call this function. It is automatically
        called when this object is being garbage collected.
        """
        self.closed = True

    def write(self, data: bytes) -> None:
        """
        Writes a string of bytes to the file descriptor.
        Note: Must be bytes.
        """
        os.write(self.fd, data)

    def read(self, x: int) -> bytes:
        """
        Reads `x` bytes from the file descriptor.
        Note: `x` must be an int
              Returns the bytes. Use `<bytes>.decode()` to convert it to a str
        """
        try:
            return os.read(self.fd, x)
        except OSError as error:
            err_code = GetLastError()
            # If the error code is `ERROR_NO_DATA`
            if err_code == ERROR_NO_DATA:
                # Return an empty string of bytes
                return b""
            else:
                # Otherwise raise the error
                website = "https://learn.microsoft.com/en-us/windows/win32/" +\
                          "debug/system-error-codes--0-499-"
                raise OSError("An exception occured. Error code: %i Look up" +\
                              " the error code here: %s" % (err_code, website))

    def config_non_blocking(self) -> bool:
        """
        Makes the file descriptor non blocking.
        Returns `True` if sucessfull, otherwise returns `False`
        """

        # Please note that this is kindly plagiarised from:
        # https://stackoverflow.com/a/34504971/11106801
        SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
        SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
        SetNamedPipeHandleState.restype = BOOL

        handle = msvcrt.get_osfhandle(self.fd)

        res = windll.kernel32.SetNamedPipeHandleState(handle,
                                                      byref(PIPE_NOWAIT), None,
                                                      None)
        return not (res == 0)

    def rawfd(self) -> int:
        """
        Returns the raw fd as an int.
        """
        return self.fd


class NonBlockingPipe:
    """
    Creates 2 file descriptors and wrapps them in the `AdvancedFD` class
    so that we can call:
        `<AdvancedFD>.read(number_of_bytes)` and
        `<AdvancedFD>.write(data_as_bytes)`

    It also makes the `read_fd` non blocking. When reading from a non-blocking
    pipe with no data it returns b"".

    Methods:
        write(data: bytes) -> None
        read(number_of_bytes: int) -> bytes
        rawfds() -> (int, int)
        close() -> None
    """
    def __init__(self):
        self.read_fd, self.write_fd = self.create_pipes()
        self.read_fd.config_non_blocking()

    def __del__(self) -> None:
        """
        When this object is garbage collected close the fds
        """
        self.close()

    def close(self) -> None:
        """
        Note: it cannot be reopened and might raise an error if it is
        being used. You don't have to call this function. It is automatically
        called when this object is being garbage collected.
        """
        self.read_fd.close()
        self.write_fd.close()

    def create_pipes(self) -> (AdvancedFD, AdvancedFD):
        """
        Creates 2 file descriptors and wrapps them in the `Pipe` class so
        that we can call:
            `<Pipe>.read(number_of_bytes)` and
            `<Pipe>.write(data_as_bytes)`
        """
        read_fd, write_fd = os.pipe()
        return AdvancedFD(read_fd), AdvancedFD(write_fd)

    def write(self, data: bytes) -> None:
        """
        Writes a string of bytes to the file descriptor.
        Note: Must be bytes.
        """
        self.write_fd.write(data)

    def read(self, number_of_bytes: int) -> bytes:
        """
        Reads `x` bytes from the file descriptor.
        Note: `x` must be an int
              Returns the bytes. Use `<bytes>.decode()` to convert it to a str
        """
        return self.read_fd.read(number_of_bytes)

    def rawfds(self) -> (int, int):
        """
        Returns the raw file descriptors as ints in the form:
            (read_fd, write_fd)
        """
        return self.read_fd.rawfd(), self.write_fd.rawfd()


if __name__  == "__main__":
    # Create the nonblocking pipe
    pipe = NonBlockingPipe()

    pipe.write(b"xxx")
    print(pipe.read(1024)) # Check if it can still read properly

    pipe.write(b"yyy")
    print(pipe.read(1024)) # Read all of the data in the pipe
    print(pipe.read(1024)) # Check if it is non blocking
TheLizzard
  • 7,248
  • 2
  • 11
  • 31