I guess that a simple way would be to encapsulate emitter.exe
and receiver.exe
in two PowerShell functions specified as pipe processor.
Here is a simple example script in full PowerShell :
function emitter() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])] Param ()
Begin {}
Process {
sleep -m 1000
Write-Output "FIRST"
sleep -m 5000
Write-Output "TWO"
sleep -m 4000
Write-Output "THREE"
}
End {}
}
function receiver() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])]
Param ([Parameter(Mandatory=$true, ValueFromPipeline=$true)][String[]] $msg)
Begin {
Write-Host "Start"
}
Process {
Write-Host "Received $msg"
}
End {
Write-Host "quit"
}
}
emitter | receiver
Replace Write-Output
... in emitter
function with a call to your emitter.exe
and Write-Host "Received $msg"
in receiver
function with a call to your receiver.exe
.
Or better, pass them as parameter to this two functions. Fo the receiver, you have to use System.Diagnostics.Process
as we need to transmit the output from emitter on the fly. The built-in Powerhell Start-Process
is not able to do the job as it does not permit write operations on the process standard input (I've looked for a workaround for that without success until now).
EDIT: An example implementation with the output of WHERE.EXE
piped to FINDSTR /C:x
:
function emitter() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])] Param ([String] $cmd)
Begin {}
Process {
& $cmd
}
End {}
}
function receiver() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])]
Param ([String] $cmd, [String] $cmdpar,
[Parameter(Mandatory=$false, ValueFromPipeline=$true)][String[]] $msg)
Begin {
$cmdinfo = $(Get-Command $cmd)
if (!$cmdinfo) {
Write-Host "Failed to find $cmd.."
exit;
}
$fullpath = $cmdinfo.Path
$ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo
$ProcessInfo.FileName = $fullpath
$ProcessInfo.RedirectStandardError = $false
$ProcessInfo.RedirectStandardOutput = $false
$ProcessInfo.RedirectStandardInput = $true
$ProcessInfo.UseShellExecute = $false
$ProcessInfo.CreateNoWindow = $false;
$ProcessInfo.Arguments = $cmdpar
$Process = New-Object System.Diagnostics.Process
$Process.StartInfo = $ProcessInfo
if ($Process.Start()) {
$StreamWriter = $Process.StandardInput;
Write-Host "Start"
} else {
Write-Host "Failed to open $cmd.."
exit;
}
}
Process {
if ($StreamWriter) {
$StreamWriter.WriteLine([String]$msg);
$StreamWriter.Flush();
}
}
End {
if ($StreamWriter) {
$StreamWriter.Close();
$Process.WaitForExit();
Write-Host "quit"
}
}
}
emitter "where.exe" | receiver "findstr.exe" "/C:x"
Which will output, depending of your locales, something like :
Start
The syntax of this command :
Examples :
WHERE /R c:\windows *.exe *.dll *.bat
quit
UPDATE
Here is a far more complex PowerShell Cmdlet that can be used to daisy chain external commands and/or PS cmdlets. It can be used also to debug your pipelines. It permits continuous pipelining with PowerShell pipeline processing (ping -t 127.0.0.1 | emitter_recevier "more.com"
as an example). It comes with a little performance cost regarding the equivalent with cmd.exe
.
And indeed, don't try a continuous pipeline with sort.exe
.
The code with explanation comments :
# Unix-like daisy chaining of console commands with PowerShell pipeline processor.
#
#
# The wrapper function for external console command.
#
function emitter_receiver() {
[CmdletBinding(SupportsShouldProcess=$False)]
[OutputType([String])]
Param ([Parameter(Mandatory=$True,
HelpMessage="Command to execute, without parameters")][String] $cmd,
[Parameter(Mandatory=$False,
HelpMessage="Command parameters")][String] $cmdpar=$null,
[Parameter(Mandatory=$False,
HelpMessage="Pad buffered pipeline size")][Int32] $bufferedPipe=0,
[Parameter(Mandatory=$False,
HelpMessage="Debug pipeline flag")][Boolean] $debugPipe=$False,
[Parameter(Mandatory=$False, ValueFromPipeline=$True)][String[]] $msg)
<#
.SYNOPSIS
Allows Unix-like daisy chaining of pipelined commands with PowerShell pipeline.
.DESCRIPTION
Allows pipelining "on the fly" of external console commands with the
PowerShell pipe processing. For external console commands, the PowerShell
pipe processing waits for the termination of the command before piping its
output to the next channel, thus preventing any "tail"/pool usage type.
This function wraps the external console command to inject its input
and output in the PowerShell pipeline processor in a Unix-like way,
each member of the pipeline will be executed in parallel and will consume
outputs of previous member "on the fly" without a wait for their termination.
PowerShell 7.1 at least, not tested with previous release.
Indeed, don't try with sort.exe.
.PARAMETER cmd
Specify the external console command to execute, without any parameters.
The function will search the command with the Get-Command cmdlet, so a
relative path or only the command filename can be provided. The complete
name including extension must be provided.
.PARAMETER cmdpar
Specify a parameters string for the external console command to execute.
Optionnal, none by default.
.PARAMETER bufferedPipe
Size of the padding to add to the STDIN of command like findstr.exe.
Some commands has buffered input processing and do not process anything
until their input buffer is full. For findstr.exe, with a continous
input pipe, set it to 8000 for "fluent" output, otherwise findstr.exe
will wait to have 8kb of input before processing/outputting anything.
Optionnal, 0 by default. 8000 at a maximum, capped by default.
.PARAMETER debugPipe
Boolean flag to get a debug output on console for the pipe processing.
Optionnal, $False by default.
.EXAMPLES
A simple daisy chaning of common console commands :
ps> emitter_receiver "where.exe" | emitter_receiver "findstr.exe" "/C:x" | emitter_receiver "more.com"
> WHERE /R c:\windows *.exe *.dll *.bat
> examples :
> The syntax of this commande is :
The real interest of this function is for continuous pipeline of console commands output :
ps> emitter_receiver "ping" "-t 127.0.0.1" | emitter_receiver "more.com"
> pinging [127.0.0.1] with 32 bytes of data:
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000 | emitter_receiver "more.com"
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
A direct "ping -t 127.0.0.1 | findstr /C:TTL" or "ping -t 127.0.0.1 | more" in PowerShell will hang.
The function can be used only in part of the pipeline for commands that can't handle continous input :
ps> ping -t 127.0.0.1 | emitter_receiver "more.com"
> pinging [127.0.0.1] with 32 bytes of data:
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> ping.exe -t 127.0.0.1 | emitter_receiver "findstr.exe" "/C:TTL" 8000
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
.INPUTS
Pipelined input from a PowerShell pipeline processor.
.OUTPUTS
Pipelined output from the provided external console command.
.EXCEPTIONS
[System.IO.FileNotFoundException] "Failed to find .." : The provided command can't be found with Get-Command.
[System.Exception] "Failed to open .." : The provided command can't be launched or failed
[System.Exception] "Can't get a stack..." : Can't get a stack, memory and/or .Net related trouble
[System.Exception] "Input write to $cmd failed ..." : Can't write anymore in command STDIN
[System.Exception] "Output read from $cmd failed ..." : Can't read anymore from command STDOUT
[System.Exception] "Unexpected leave, pipe broken ?..." : Abnormal termination of the pipe processing
#>
Begin {
# If we fail, stop the pipe...
$ErrorActionPreference = "Stop"
# Look for the command in PATH
$cmdinfo = $(Get-Command $cmd)
if (!$cmdinfo -or !$cmdinfo.Path) {
throw [System.IO.FileNotFoundException] "Failed to find $cmd..";
}
$fullpath = $cmdinfo.Path
# Use of System.Diagnostics.Process to launch the command and redirect its STDIN/STDOUT
$ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo
$ProcessInfo.FileName = $fullpath
$ProcessInfo.RedirectStandardError = $False
$ProcessInfo.RedirectStandardOutput = $True
$ProcessInfo.RedirectStandardInput = $True
$ProcessInfo.UseShellExecute = $False
$ProcessInfo.CreateNoWindow = $False;
if ($cmdpar) {
$ProcessInfo.Arguments = $cmdpar
}
$Process = New-Object System.Diagnostics.Process
$Process.StartInfo = $ProcessInfo
# Reference to the stream reader and writer for the command's STDOUT / STDIN processing
$StdInStreamWriter = $null;
$StdOutStreamReader = $null;
# Sleep time for polling STDIN. To not slow too much the pipeline processing,
# we take 1ms. We have to avoid stressing the CPU with the polling.
$SleepTime=1;
# Control of input parameters if any
if ($bufferedPipe -gt 8000) { $bufferedPipe = 8000; }
if ($bufferedPipe -lt 0) { $bufferedPipe = 0; }
# Launch the command
$ProcessStarted = $Process.Start()
if ($ProcessStarted) {
$StdInStreamWriter = $Process.StandardInput;
$StdOutStreamReader = $Process.StandardOutput;
$StdInStreamWriter.AutoFlush = $True;
} else {
throw [System.Exception] "Failed to open $cmd..";
}
# We use two FIFO stacks to exchange STDIN and STDOUT stream data between
# the PowerShell pipeline processing in the 'premise-style' context and the
# process we have launched for the command. We will feed them in the
# following two poll subprocesses for STDIN and STDOUT as no async operations
# are allowed in a 'premise-style' context.
$StdOutStack = new-object System.Collections.Queue;
if (!$StdOutStack) {
throw [System.Exception] "Can't get a stack..."
}
$StdInStack = new-object System.Collections.Queue;
if (!$StdInStack) {
throw [System.Exception] "Can't get a stack..."
}
# We create two poll subprocesses to read from STDOUT and write into STDIN
# of the process we have launched for the command. As theses subprocesses
# are started from the 'begin' pipeline part of the function, they can then
# makes STDOUT read that feed the STDOUT stack and STDIN write from the
# STDIN stack. Theses operations are not directly doable into the 'process'
# pipeline part of the function as it is a 'premise-style' context.
#
# STDOUT poll subprocess
#
$OutProc = [powershell]::Create().AddScript({
Param ([parameter(Mandatory=$True)]$args)
$Process = $args[0]
$StdOutStreamReader = $args[1]
$SleepTime = $args[2]
$cmd = $args[3]
$StdOutStack = $args[4]
$debugPipe = $args[5]
while (!$StdOutStreamReader.EndOfStream) {
$msgproc = $StdOutStreamReader.ReadLine()
Write-Output ($cmd+": OUT_S: "+[String]$msgproc)
try {
$syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
$syncStdOutStack.Enqueue($msgproc)
} finally {
$syncStdOutStack = $null
}
}
if ($debugPipe) { Write-Output ($cmd+": OUT_S terminated.") }
})
$tmp = $OutProc.AddParameter("args", @($Process, $StdOutStreamReader, $SleepTime, $cmd, $StdOutStack, $debugPipe))
$OutJob = $OutProc.BeginInvoke()
#
# STDIN poll subprocess
#
$InProc = [powershell]::Create().AddScript({
Param ([parameter(Mandatory=$True)]$args)
$Process = $args[0]
$StdInStreamWriter = $args[1]
$SleepTime = $args[2]
$cmd = $args[3]
$StdInStack = $args[4]
$debugPipe = $args[5]
$bufferedPipe = $args[6]
if ($bufferedPipe -gt 0) { $dumb_findstr = ("12"*4000).Replace("1", [char]27) }
while (!$Process.hasExited -and $StdInStreamWriter) {
try {
$syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
while ($syncStdInStack.Count -gt 0) {
$stack_msg = [String]$syncStdInStack.Dequeue();
$syncStdInStack = $null;
if ($debugPipe) { Write-Output ($cmd+": IN_S: "+[String]$stack_msg) }
#$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
$check = $StdInStreamWriter.WriteLine([String]$stack_msg)
if ($bufferedPipe -gt 0) {
# As some command (findstr...) can have buffered input in every case,
# we send a padding up to 4000 escape bytes just to overcome that...
$padsize = $bufferedPipe-$stack_msg.Length-($stack_msg.Length%2)
$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
}
};
} finally {
$syncStdInStack = $null;
}
# To not strain unusefully the CPU, we wait far more time between each
# flush to the stack. This does not affect really the performance as
# the incoming message are still stacked as we wait the 'Process'
# part of the function.
$tmp = [System.Threading.Thread]::Sleep($SleepTime * 5);
}
if ($debugPipe) { Write-Output ($cmd+": IN_S terminated.") }
})
$tmp = $InProc.AddParameter("args", @($Process, $StdInStreamWriter, $SleepTime, $cmd, $StdInStack, $debugPipe, $bufferedPipe))
$InJob = $InProc.BeginInvoke()
}
Process {
# If we are in the process part, that means there is input from the PowerShell
# pipeline to process. We send this input to the STDIN of the command through
# a FIFO stack and a polling subprocess. Each input from the PowerShell pipeline
# processor is injected in the command STDIN.
if ($StdInStreamWriter -and $StdInStack) {
try {
$syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
$syncStdInStack.Enqueue($msg)
if ($debugPipe) { Write-Host ($cmd+": IN_P "+$msg) }
} finally {
$syncStdInStack = $null
}
}
if ($debugPipe) { Write-Host ($cmd+": INST "+$InProc.InvocationStateInfo.State) }
if ($debugPipe) { Write-Host ($cmd+": OUST "+$OutProc.InvocationStateInfo.State) }
# While processing input from the pipe, we send on the fly command
# output to the pipeline processor.
if ($StdOutStack) {
try {
$syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
if ($debugPipe) { Write-Host ($cmd+": OUSTS "+$syncStdOutStack.Count) }
while ($syncStdOutStack.Count -gt 0) {
$stack_msg = [String]$syncStdOutStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_P "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
} finally {
$syncStdOutStack = $null;
}
}
}
End {
if ($debugPipe) { Write-Host ($cmd+": No more input") }
# If there is still input for command to process, we wait.
# We do that until the process terminates, so the pipelining will be
# maintained.
try {
$inputDone = $False
while(!$inputDone -and !$Process.hasExited -and $StdInStack) {
$syncStack = [System.Collections.Queue]::Synchronized($StdInStack);
if ($syncStack.Count -gt 0) {
$syncStack = $null
$tmp = [System.Threading.Thread]::Sleep($SleepTime);
} else {
$inputDone = $True
}
}
} finally {
$syncStack = $null
}
# At end, we are sure we have no more input to process,
# so we immediately close the command STDIN. That way the command
# will "know" there is no more input to process. That also Allows
# the async read of its output to complete.
if ($StdInStreamWriter) {
$StdInStreamWriter.Close();
}
# The command has no more input, but it still can output things.
# We wait until the command terminated and send to the pipe
# output everything the command sends on STDOUT.
while (!$Process.hasExited) {
try {
# If we can sync the OUT stack, that means that the Output
# subprocess has nothing more to write.
if ($StdOutStack) {
$syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
while ($syncStack.Count -gt 0) {
$stack_msg = [String]$syncStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
}
} finally {
$syncStack = $null;
}
# To not strain unusefully the CPU, we wait a little more time between
# each check of STDOUT as the command can be in a "long run", thus we'll
# feed the stack at a lesser rhythm.
$tmp = [System.Threading.Thread]::Sleep($SleepTime * 2);
}
# We are finally at complete termination of the command.
if ($Process.hasExited) {
if ($debugPipe) { Write-Host "$cmd terminated." }
while(!$InJob.IsCompleted -or !$OutJob.IsCompleted) {
[System.Threading.Thread]::Sleep($SleepTime);
}
if ($InJob.IsCompleted) {
if ($InProc.InvocationStateInfo.State -eq "Failed") {
Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.State)
Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.Reason)
throw [System.Exception] "Input write to $cmd failed ..."
}
$res = $InProc.EndInvoke($InJob)
if ($debugPipe) { Write-Host ("JOB IN terminated : "+$res) }
}
if ($OutJob.IsCompleted) {
if ($OutProc.InvocationStateInfo.State -eq "Failed") {
Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.State)
Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.Reason)
throw [System.Exception] "Output read from $cmd failed ..."
}
$res = $OutProc.EndInvoke($OutJob)
if ($debugPipe) { Write-Host ("JOB OUT terminated : "+$res) }
}
# If the async read of command output is still not completed, we wait.
# When completed, we send its output to the pipeline processor.
if ($StdOutStack) {
try {
$syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
while ($syncStack.Count -gt 0) {
$stack_msg = [String]$syncStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
} finally {
$syncStack = $null;
}
}
# If we are here, command output was entirely processed, so we close command STDOUT.
if ($StdOutStreamReader) {
$StdOutStreamReader.Close();
}
} else {
throw [System.Exception] "Unexpected leave, pipe broken ?..."
}
}
}