I have 3 scripts that need to be combined in order to process data in a pipeline. The scripts run forever, until the execution is interrupted by the user. This is how they are executed inside a terminal:
script1_producer.sh | script2_processor.sh | script3_processor.sh
script1_producer.sh
produces the data to be processed (as an example it just prints incrementing numbers)
i=1
while true; do
echo $i
i=$(($i+1))
sleep 1
done
script2_processor.sh
consumes data from Script1 and calculates a new stream of data (multiplying each number*2):
while read -r line
do
echo "$(($line*2))"
done < "${1:-/dev/stdin}"
script3_processor.sh
consumes data from Script2 and calculates a new stream of data (Adding a letter to each number):
while read -r line
do
echo "A$(($line))"
done < "${1:-/dev/stdin}"
The resulting output when running script1_producer.sh | script2_processor.sh | script3_processor.sh
:
A2
A4
A6
...
Now I would like these scripts to be controlled by Python subprocesses using pipes.
In the end I need process the output of script3_processor.sh
and perform operations for each line.
I'm trying to implement this using asyncio though it would be ok not to use asyncio if thats possible.
This is my - very naive attempt process_pipes.py
:
import asyncio
import subprocess
import os
async def async_receive():
p1 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=subprocess.PIPE,
)
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=p2.stdout,
stdout=subprocess.PIPE,
)
# Read just one line to test
data = await p3.stdout.readline()
print(data)
asyncio.run(async_receive())
Unfortunately, I'm getting the following exception when executing this script:
Traceback (most recent call last):
File "process_pipes.py", line 28, in <module>
asyncio.run(async_receive())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "process_pipes.py", line 12, in async_receive
p2 = await asyncio.create_subprocess_exec(
File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
transport = await self._make_subprocess_transport(
File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
transp = _UnixSubprocessTransport(self, protocol, args, shell,
File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
self._proc = subprocess.Popen(
File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'
I read some examples on Stackoverflow and elsewhere telling me to handle the pipes differently, but could not get these to work in my scenario.
How can I mimic running script1_producer.sh | script2_processor.sh | script3_processor.sh
and process the output of script3 in Python?