0

I have two programs:

  • emitter.exe - runs an infinite loop, and prints on stdout a message all the time.
  • receiver.exe - also runs an infinite loop, waits for messages on stdin and processes them.

On Unix-line systems I can run something like that and everything works well:

$ emitter | receiver

It of course doesn't work on PowerShell. PS first try to buffer the entire emitter's output, and if it completes, passes it to the receiver. In our case the output will never be passed because of the infinite loop in both programs.

What is the simplest workaround to emulate the Unix-like behaviour?

Szymon Bednorz
  • 425
  • 3
  • 10
  • 25

1 Answers1

1

I guess that a simple way would be to encapsulate emitter.exe and receiver.exe in two PowerShell functions specified as pipe processor.

Here is a simple example script in full PowerShell :

function emitter() {
  [CmdletBinding(SupportsShouldProcess=$True)]
  [OutputType([String])] Param ()
  Begin {}
  Process {
    sleep -m 1000
    Write-Output "FIRST" 
    sleep -m 5000
    Write-Output "TWO" 
    sleep -m 4000
    Write-Output "THREE"
  }
  End {}  
}
function receiver() {
  [CmdletBinding(SupportsShouldProcess=$True)]
  [OutputType([String])]
  Param ([Parameter(Mandatory=$true, ValueFromPipeline=$true)][String[]] $msg)
  Begin {
    Write-Host "Start"
  }
  Process {
    Write-Host "Received $msg"
  }
  End {
    Write-Host  "quit"
  }
}
emitter | receiver 

Replace Write-Output ... in emitter function with a call to your emitter.exe and Write-Host "Received $msg" in receiver function with a call to your receiver.exe. Or better, pass them as parameter to this two functions. Fo the receiver, you have to use System.Diagnostics.Process as we need to transmit the output from emitter on the fly. The built-in Powerhell Start-Process is not able to do the job as it does not permit write operations on the process standard input (I've looked for a workaround for that without success until now).

EDIT: An example implementation with the output of WHERE.EXE piped to FINDSTR /C:x :

function emitter() {
  [CmdletBinding(SupportsShouldProcess=$True)]
  [OutputType([String])] Param ([String] $cmd)
  Begin {}
  Process {
    & $cmd
  }
  End {}  
}
function receiver() {
  [CmdletBinding(SupportsShouldProcess=$True)]
  [OutputType([String])] 
  Param ([String] $cmd, [String] $cmdpar,
         [Parameter(Mandatory=$false, ValueFromPipeline=$true)][String[]] $msg)
  Begin {
    $cmdinfo = $(Get-Command $cmd)
    if (!$cmdinfo) {
      Write-Host "Failed to find $cmd.."
      exit;
    }
    $fullpath = $cmdinfo.Path
    $ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo 
    $ProcessInfo.FileName = $fullpath
    $ProcessInfo.RedirectStandardError = $false
    $ProcessInfo.RedirectStandardOutput = $false
    $ProcessInfo.RedirectStandardInput = $true
    $ProcessInfo.UseShellExecute = $false 
    $ProcessInfo.CreateNoWindow = $false;
    $ProcessInfo.Arguments = $cmdpar
    $Process = New-Object System.Diagnostics.Process 
    $Process.StartInfo = $ProcessInfo 
    if ($Process.Start()) {
      $StreamWriter = $Process.StandardInput;
      Write-Host "Start"
    } else {
      Write-Host "Failed to open $cmd.."
      exit;
    }
  }
  Process {
    if ($StreamWriter) { 
      $StreamWriter.WriteLine([String]$msg); 
      $StreamWriter.Flush();
    }
  }
  End {
    if ($StreamWriter) { 
       $StreamWriter.Close();
       $Process.WaitForExit();
       Write-Host  "quit"
    }
  }
}
emitter "where.exe"  | receiver "findstr.exe" "/C:x"

Which will output, depending of your locales, something like :

Start
The syntax of this command :
Examples :
    WHERE /R c:\windows *.exe *.dll *.bat
quit

UPDATE

Here is a far more complex PowerShell Cmdlet that can be used to daisy chain external commands and/or PS cmdlets. It can be used also to debug your pipelines. It permits continuous pipelining with PowerShell pipeline processing (ping -t 127.0.0.1 | emitter_recevier "more.com" as an example). It comes with a little performance cost regarding the equivalent with cmd.exe. And indeed, don't try a continuous pipeline with sort.exe.

The code with explanation comments :

# Unix-like daisy chaining of console commands with PowerShell pipeline processor.
#
#
#  The wrapper function for external console command. 
#
function emitter_receiver() {
  [CmdletBinding(SupportsShouldProcess=$False)]
  [OutputType([String])] 
  Param ([Parameter(Mandatory=$True,
                    HelpMessage="Command to execute, without parameters")][String] $cmd, 
         [Parameter(Mandatory=$False,
                    HelpMessage="Command parameters")][String] $cmdpar=$null,
         [Parameter(Mandatory=$False,
                    HelpMessage="Pad buffered pipeline size")][Int32] $bufferedPipe=0,
         [Parameter(Mandatory=$False,
                    HelpMessage="Debug pipeline flag")][Boolean] $debugPipe=$False,
         [Parameter(Mandatory=$False, ValueFromPipeline=$True)][String[]] $msg)
<#
    .SYNOPSIS
        Allows Unix-like daisy chaining of pipelined commands with PowerShell pipeline.
 
    .DESCRIPTION
        Allows pipelining "on the fly" of external console commands with the
        PowerShell pipe processing.  For external console commands, the PowerShell 
        pipe processing waits for the termination of the command before piping its
        output to the next channel, thus preventing any "tail"/pool usage type.
        This function wraps the external console command to inject its input
        and output in the PowerShell pipeline processor in a Unix-like way,
        each member of the pipeline will be executed in parallel and will consume
        outputs of previous member "on the fly" without a wait for their termination.
        PowerShell 7.1 at least, not tested with previous release.
        
        Indeed, don't try with sort.exe.
 
    .PARAMETER cmd
        Specify the external console command to execute, without any parameters. 
        The function will search the command with the Get-Command cmdlet, so a 
        relative path or only the command filename can be provided. The complete
        name including extension must be provided.

    .PARAMETER cmdpar
        Specify a parameters string for the external console command to execute. 
        
        Optionnal, none by default.
         
    .PARAMETER bufferedPipe
        Size of the padding to add to the STDIN of command like findstr.exe.
        Some commands has buffered input processing and do not process anything
        until their input buffer is full. For findstr.exe, with a continous 
        input pipe, set it to 8000 for "fluent" output, otherwise findstr.exe 
        will wait to have 8kb of input before processing/outputting anything.        
        
        Optionnal, 0 by default. 8000 at a maximum, capped by default.

    .PARAMETER debugPipe
        Boolean flag to get a debug output on console for the pipe processing. 
        
        Optionnal, $False by default.

    .EXAMPLES
        A simple daisy chaning of common console commands :
         
        ps> emitter_receiver "where.exe" | emitter_receiver "findstr.exe" "/C:x" | emitter_receiver "more.com" 
         
        >     WHERE /R c:\windows *.exe *.dll *.bat
        > examples :
        > The syntax of this commande is :

        The real interest of this function is for continuous pipeline of console commands output :
         
        ps> emitter_receiver "ping" "-t 127.0.0.1" | emitter_receiver "more.com"

        > pinging [127.0.0.1] with 32 bytes of data:
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     ...

        ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000
         
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     ...

        ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000 | emitter_receiver "more.com"

        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     ...
    
        A direct "ping -t 127.0.0.1 | findstr /C:TTL" or "ping -t 127.0.0.1 | more" in PowerShell will hang.
        
        The function can be used only in part of the pipeline for commands that can't handle continous input :
        
        ps> ping -t 127.0.0.1 | emitter_receiver "more.com"

        > pinging [127.0.0.1] with 32 bytes of data:
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     ...

        ps> ping.exe -t 127.0.0.1 | emitter_receiver "findstr.exe" "/C:TTL" 8000

        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
        >     ...
        
        
    .INPUTS
        Pipelined input from a PowerShell pipeline processor.
    .OUTPUTS
        Pipelined output from the provided external console command.
    .EXCEPTIONS
         [System.IO.FileNotFoundException] "Failed to find .." : The provided command can't be found with Get-Command.
         [System.Exception] "Failed to open .." : The provided command can't be launched or failed
         [System.Exception] "Can't get a stack..." : Can't get a stack, memory and/or .Net related trouble
         [System.Exception] "Input write to $cmd failed ..." : Can't write anymore in command STDIN
         [System.Exception] "Output read from $cmd failed ..." : Can't read anymore from command STDOUT
         [System.Exception] "Unexpected leave, pipe broken ?..." : Abnormal termination of the pipe processing
#>
  Begin {
    # If we fail, stop the pipe...
    $ErrorActionPreference = "Stop"
    # Look for the command in PATH
    $cmdinfo = $(Get-Command $cmd)
    if (!$cmdinfo -or !$cmdinfo.Path) {
      throw  [System.IO.FileNotFoundException] "Failed to find $cmd..";
    }
    $fullpath = $cmdinfo.Path
    # Use of System.Diagnostics.Process to launch the command and redirect its STDIN/STDOUT
    $ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo 
    $ProcessInfo.FileName = $fullpath
    $ProcessInfo.RedirectStandardError = $False
    $ProcessInfo.RedirectStandardOutput = $True
    $ProcessInfo.RedirectStandardInput = $True
    $ProcessInfo.UseShellExecute = $False 
    $ProcessInfo.CreateNoWindow = $False;
    if ($cmdpar) {
      $ProcessInfo.Arguments = $cmdpar
    }
    $Process = New-Object System.Diagnostics.Process 
    $Process.StartInfo = $ProcessInfo 
    # Reference to the stream reader and writer for the command's STDOUT / STDIN processing
    $StdInStreamWriter = $null;
    $StdOutStreamReader = $null;
    # Sleep time for polling STDIN. To not slow too much the pipeline processing, 
    # we take 1ms. We have to avoid stressing the CPU with the polling.
    $SleepTime=1;
    # Control of input parameters if any
    if ($bufferedPipe -gt 8000) { $bufferedPipe = 8000; }
    if ($bufferedPipe -lt 0) { $bufferedPipe = 0; }
    # Launch the command 
    $ProcessStarted = $Process.Start()
    if ($ProcessStarted) {
      $StdInStreamWriter = $Process.StandardInput;
      $StdOutStreamReader = $Process.StandardOutput;
      $StdInStreamWriter.AutoFlush = $True;
    } else {
      throw  [System.Exception] "Failed to open $cmd..";
    }
    # We use two FIFO stacks to exchange STDIN and STDOUT stream data between
    # the PowerShell pipeline processing in the 'premise-style' context and the
    # process we have launched for the command. We will feed them in the
    # following two poll subprocesses for STDIN and STDOUT as no async operations
    # are allowed in a 'premise-style' context.
    $StdOutStack = new-object System.Collections.Queue;
    if (!$StdOutStack) {
      throw [System.Exception] "Can't get a stack..."
    }
    $StdInStack = new-object System.Collections.Queue;
    if (!$StdInStack) {
      throw [System.Exception] "Can't get a stack..."
    }
    # We create two poll subprocesses to read from STDOUT and write into STDIN 
    # of the process we have launched for the command. As theses subprocesses  
    # are started from the 'begin' pipeline part of the function, they can then 
    # makes STDOUT read that feed the STDOUT stack and STDIN write from the
    # STDIN stack. Theses operations are not directly doable into the 'process' 
    # pipeline part of the function as it is a 'premise-style' context.
    #
    # STDOUT poll subprocess
    #
    $OutProc = [powershell]::Create().AddScript({
      Param ([parameter(Mandatory=$True)]$args)
      $Process = $args[0]
      $StdOutStreamReader = $args[1]
      $SleepTime = $args[2]
      $cmd = $args[3]
      $StdOutStack = $args[4]
      $debugPipe = $args[5]
      while (!$StdOutStreamReader.EndOfStream) {
        $msgproc = $StdOutStreamReader.ReadLine()
        Write-Output ($cmd+": OUT_S: "+[String]$msgproc)
        try {
          $syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
          $syncStdOutStack.Enqueue($msgproc)
        } finally {
          $syncStdOutStack = $null
        }
      }
      if ($debugPipe) { Write-Output ($cmd+": OUT_S terminated.") }
    })
    $tmp = $OutProc.AddParameter("args", @($Process, $StdOutStreamReader, $SleepTime, $cmd, $StdOutStack, $debugPipe))
    $OutJob = $OutProc.BeginInvoke()
    #
    # STDIN poll subprocess
    #
    $InProc = [powershell]::Create().AddScript({
      Param ([parameter(Mandatory=$True)]$args)
      $Process = $args[0]
      $StdInStreamWriter = $args[1]
      $SleepTime = $args[2]
      $cmd = $args[3]
      $StdInStack = $args[4]
      $debugPipe = $args[5]
      $bufferedPipe = $args[6]
      if ($bufferedPipe -gt 0) { $dumb_findstr = ("12"*4000).Replace("1", [char]27) }
      while (!$Process.hasExited -and $StdInStreamWriter) {
        try {
          $syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
          while ($syncStdInStack.Count -gt 0) {
            $stack_msg = [String]$syncStdInStack.Dequeue();
            $syncStdInStack = $null;
            if ($debugPipe) { Write-Output ($cmd+": IN_S: "+[String]$stack_msg) }
            #$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
            $check = $StdInStreamWriter.WriteLine([String]$stack_msg)
            if ($bufferedPipe -gt 0) {
              # As some command (findstr...) can have buffered input in every case, 
              # we send a padding up to 4000 escape bytes just to overcome that...
              $padsize = $bufferedPipe-$stack_msg.Length-($stack_msg.Length%2)
              $check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
            }
          };
        } finally {
          $syncStdInStack = $null;
        }
        # To not strain unusefully the CPU, we wait far more time between each 
        # flush to the stack. This does not affect really the performance as
        # the incoming message are still stacked as we wait the 'Process' 
        # part of the function.
        $tmp = [System.Threading.Thread]::Sleep($SleepTime * 5);
      }
      if ($debugPipe) { Write-Output ($cmd+": IN_S terminated.") }
    })
    $tmp = $InProc.AddParameter("args", @($Process, $StdInStreamWriter, $SleepTime, $cmd, $StdInStack, $debugPipe, $bufferedPipe))
    $InJob = $InProc.BeginInvoke()
  }
  Process {
    # If we are in the process part, that means there is input from the PowerShell
    # pipeline to process. We send this input to the STDIN of the command through
    # a FIFO stack and a polling subprocess. Each input from the PowerShell pipeline 
    # processor is injected in the command STDIN.
    if ($StdInStreamWriter -and $StdInStack) { 
      try {
        $syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
        $syncStdInStack.Enqueue($msg)
        if ($debugPipe) { Write-Host ($cmd+": IN_P "+$msg) }
      } finally {
        $syncStdInStack = $null
      }
    }
    if ($debugPipe) { Write-Host ($cmd+": INST "+$InProc.InvocationStateInfo.State) }
    if ($debugPipe) { Write-Host ($cmd+": OUST "+$OutProc.InvocationStateInfo.State) }
    # While processing input from the pipe, we send on the fly command 
    # output to the pipeline processor.
    if ($StdOutStack) {
      try {
        $syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
        if ($debugPipe) { Write-Host ($cmd+": OUSTS "+$syncStdOutStack.Count) }
        while ($syncStdOutStack.Count -gt 0) {
          $stack_msg = [String]$syncStdOutStack.Dequeue();
          if ($debugPipe) { Write-Host ($cmd+": OUT_P "+[String]$stack_msg) }
          Write-Output ([String]$stack_msg)
        };
      } finally {
        $syncStdOutStack = $null;
      }
    }
  }
  End {
    if ($debugPipe) { Write-Host ($cmd+": No more input") }
    # If there is still input for command to process, we wait.
    # We do that until the process terminates, so the pipelining will be
    # maintained.
    try {
      $inputDone = $False
      while(!$inputDone -and !$Process.hasExited -and $StdInStack) {
        $syncStack = [System.Collections.Queue]::Synchronized($StdInStack);
        if ($syncStack.Count -gt 0) {
          $syncStack = $null
          $tmp = [System.Threading.Thread]::Sleep($SleepTime);
        } else {
          $inputDone = $True
        }
      }
    } finally {
      $syncStack = $null
    }
    # At end, we are sure we have no more input to process, 
    # so we immediately close the command STDIN. That way the command
    # will "know" there is no more input to process. That also Allows
    # the async read of its output to complete.
    if ($StdInStreamWriter) { 
       $StdInStreamWriter.Close();
    }
    # The command has no more input, but it still can output things.
    # We wait until the command terminated and send to the pipe 
    # output everything the command sends on STDOUT.
    while (!$Process.hasExited) {
      try {
        # If we can sync the OUT stack, that means that the Output
        # subprocess has nothing more to write.
        if ($StdOutStack) {
          $syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
          while ($syncStack.Count -gt 0) {
            $stack_msg = [String]$syncStack.Dequeue();
            if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
            Write-Output ([String]$stack_msg)
          };
        }
      } finally {
        $syncStack = $null;
      }
      # To not strain unusefully the CPU, we wait a little more time between 
      # each check of STDOUT as the command can be in a "long run", thus we'll 
      # feed the stack at a lesser rhythm.
      $tmp = [System.Threading.Thread]::Sleep($SleepTime * 2);
    }
    # We are finally at complete termination of the command.
    if ($Process.hasExited) {
      if ($debugPipe) { Write-Host "$cmd terminated." }
      while(!$InJob.IsCompleted -or !$OutJob.IsCompleted) {
        [System.Threading.Thread]::Sleep($SleepTime);
      }
      if ($InJob.IsCompleted) {
        if ($InProc.InvocationStateInfo.State -eq "Failed") {
          Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.State)
          Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.Reason)
          throw [System.Exception] "Input write to $cmd failed ..."
        }
        $res = $InProc.EndInvoke($InJob)
        if ($debugPipe) { Write-Host ("JOB IN terminated : "+$res) }
      }
      if ($OutJob.IsCompleted) {
        if ($OutProc.InvocationStateInfo.State -eq "Failed") {
          Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.State)
          Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.Reason)
          throw [System.Exception] "Output read from $cmd failed ..."
        } 
        $res = $OutProc.EndInvoke($OutJob)
        if ($debugPipe) { Write-Host ("JOB OUT terminated : "+$res) }
      }
      # If the async read of command output is still not completed, we wait.
      # When completed, we send its output to the pipeline processor.
      if ($StdOutStack) {
        try {
          $syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
          while ($syncStack.Count -gt 0) {
            $stack_msg = [String]$syncStack.Dequeue();
            if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
            Write-Output ([String]$stack_msg)
          };
        } finally {
          $syncStack = $null;
        }
      }
      # If we are here, command output was entirely processed, so we close command STDOUT.
      if ($StdOutStreamReader) { 
        $StdOutStreamReader.Close();
      }
    } else {
      throw [System.Exception] "Unexpected leave, pipe broken ?..."
    }
  }
}
Zilog80
  • 2,534
  • 2
  • 15
  • 20
  • It looks like your solution works well. The only problem I've had was to pass the absolute path to the *receiver* program (the relative path throws an error in `if ($Process.Start()) {` line: `Exception calling "Start" with "0" argument(s): "The system cannot find the file specified"`. Btw it surprises me that almost 100 lines of code are required for such a basic feature in PS... – Szymon Bednorz May 27 '21 at 18:55
  • You're using the PowerShell pipelining which doesn't work very well with executables outside PowerShell. For a relative path, you can use `Get-Command` to automatically get the full path, i'll edit to include that. I will also add an unique _emitter/receiver_ function so it will be possible to pipe many executables the same way. – Zilog80 May 27 '21 at 19:03
  • By "piping multiple executables the same way" you mean for example something like that: `emitter-receiver bob.exe | emitter-receiver alice.exe | emitter-receiver carol.exe`? – Szymon Bednorz May 27 '21 at 19:07
  • Yes. Almost done, i've to polish. Edit for relative paths is done. – Zilog80 May 27 '21 at 19:11
  • That was more work than i thought. "emitter_receiver" in one Cmlet done, check the update in the answer. – Zilog80 May 29 '21 at 23:26
  • 1
    wow, it must have been lots of work. I've just tested it and the code works beautifully. – Szymon Bednorz May 31 '21 at 19:40