5

Lets say I have a bash script that looks like this:

array=( 1 2 3 4 5 6 )

for each in "${array[@]}"
do
  echo "$each"

  command --arg1 $each

done

If I want to run the everything in the loop in parallel, I could just change command --arg1 $each to command --arg1 $each &.

But now lets say I want to take the results of command --arg1 $each and do something with those results like this:

array=( 1 2 3 4 5 6 )
for each in "${array[@]}"
do
  echo "$each"

  lags=($(command --arg1 $each)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done

done

If I just add a & to the end of command --arg1 $each, everything after command --arg1 $each will run without command --arg1 $each finishing first. How do I prevent that from happening? Also, how do I also limit the amount of threads the loop can occupy?

Essentially, this block should run in parallel for 1,2,3,4,5,6

  echo "$each"

  lags=($(command --arg1 $each)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done

-----EDIT--------

Here is the original code:

#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))

lngth=${#array[*]}

echo "array length: " $lngth

timestamp=$(($(date +%s%N)/1000000))

log_time=`date +%Y-%m-%d:%H`

echo "log time: " $log_time

log_file="/home/ec2-user/laglogs/laglog.$log_time.log"

echo "log file: " $log_file

echo "timestamp: " $timestamp

get_lags () {

  echo "$1"

  lags=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe  --group $1 --command-config /etc/kafka/client.properties --new-consumer))

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done
}

for each in "${array[@]}"
do 

   get_lags $each &

done

------EDIT 2-----------

Trying with answer below:

#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))

lngth=${#array[*]}

echo "array length: " $lngth

timestamp=$(($(date +%s%N)/1000000))

log_time=`date +%Y-%m-%d:%H`

echo "log time: " $log_time

log_file="/home/ec2-user/laglogs/laglog.$log_time.log"

echo "log file: " $log_file

echo "timestamp: " $timestamp

max_proc_count=8

run_for_each() {
  local each=$1
  echo "Processing: $each" >&2
  IFS=$'\n' read -r -d '' -a lags < <(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --command-config /etc/kafka/client.properties --new-consumer --group "$each" && printf '\0')
  for result in "${lags[@]}"; do
    printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
  done >>"$log_file"
}

export -f run_for_each
export log_file # make log_file visible to subprocesses

printf '%s\0' "${array[@]}" |
  xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'
lightweight
  • 3,227
  • 14
  • 79
  • 142
  • You want to push the contents of multiple parallel instances into one array, _lags_ (or) process output of one instance at at time? – Inian Apr 09 '17 at 15:21
  • I want to process each element in `for each in "${array[@]}"` in parallel...so if `"${array[@]}"` has 6, I'd like to process all 6 in parallel. Also, lets say I have 6 elements in there, but how could I limit that to run a max of 4 at a time in parallel? – lightweight Apr 09 '17 at 15:28
  • The problem is with your storage in the arrays. How would you like to do it? For example as per your previous comment, output of all 4 parallel instances in one array (or) something similar? – Inian Apr 09 '17 at 15:33
  • @Inian, I added a small edit hoping to clarify, trying to explain this the best I can... – lightweight Apr 09 '17 at 15:37
  • There is no way you can have child processes writing to an array (or other data structure in memory) that belongs to the parent process. You need to use regular files or some other natively multi-process data structure provided by the OS, like FIFOs. – Fred Apr 09 '17 at 15:41
  • 1
    You "array" isn't. `array=( 1 2 3 4 5 6 )` would be syntax to define a six-element array. – Charles Duffy Apr 09 '17 at 15:43
  • I made an edit to correct that, and I also thought of one way to do this but don't know if its the most efficient... – lightweight Apr 09 '17 at 15:49
  • If you have your own solution, feel free to add it using the "Add An Answer" button. Adding a proposed answer (even your own proposed answer) *as an answer* means it gets commented and voted on separate from the question itself, and that you can accept it to mark your question closed. – Charles Duffy Apr 09 '17 at 15:52
  • @CharlesDuffy...well, I kind of do...but its not complete...I dont know how to limit the amount of threads it uses...I think you answer is the best one...need to run it still and make sure it does what I want, going to step away for a bit but will later – lightweight Apr 09 '17 at 15:55
  • Just fixed a major bug there (wasn't actually setting `each` from `$1`. Do make sure you're testing with the current version. – Charles Duffy Apr 09 '17 at 15:57
  • BTW, where does `timestamp` come from? Your code uses it in several places but never sets it. If you want all your processes to use a single timestamp related to when the entire script started, for instance, that's something I'd need to know for my answer to implement it correctly (would need to `export` that variable to the environment for subprocesses to be able to see it). – Charles Duffy Apr 09 '17 at 16:08
  • (As an aside, see [What is the appropriate action when the answer to a question is added to the question itself?](https://meta.stackoverflow.com/questions/267434/what-is-the-appropriate-action-when-the-answer-to-a-question-is-added-to-the-que) re: recent edits). – Charles Duffy Apr 09 '17 at 16:11

5 Answers5

4

The convenient thing to do is to push your background code into a separate script -- or an exported function. That way xargs can create a new shell, and access the function from its parent. (Be sure to export any other variables that need to be available in the child as well).

array=( 1 2 3 4 5 6 )
max_proc_count=8
log_file=out.txt

run_for_each() {
  local each=$1
  echo "Processing: $each" >&2
  IFS=$' \t\n' read -r -d '' -a lags < <(yourcommand --arg1 "$each" && printf '\0')
  for result in "${lags[@]}"; do
    printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
  done >>"$log_file"
}

export -f run_for_each
export log_file # make log_file visible to subprocesses

printf '%s\0' "${array[@]}" |
  xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'

Some notes:

  • Using echo -e is bad form. See the APPLICATION USAGE and RATIONALE sections in the POSIX spec for echo, explicitly advising using printf instead (and not defining an -e option, and explicitly defining than echo must not accept any options other than -n).
  • We're including the each value in the log file so it can be extracted from there later.
  • You haven't specified whether the output of yourcommand is space-delimited, tab-delimited, line-delimited, or otherwise. I'm thus accepting all these for now; modify the value of IFS passed to the read to taste.
  • printf '%(...)T' to get a timestamp without external tools such as date requires bash 4.2 or newer. Replace with your own code if you see fit.
  • read -r -a arrayname < <(...) is much more robust than arrayname=( $(...) ). In particular, it avoids treating emitted values as globs -- replacing *s with a list of files in the current directory, or Foo[Bar] with FooB should any file by that name exist (or, if the failglob or nullglob options are set, triggering a failure or emitting no value at all in that case).
  • Redirecting stdout to your log_file once for the entire loop is somewhat more efficient than redirecting it every time you want to run printf once. Note that having multiple processes writing to the same file at the same time is only safe if all of them opened it with O_APPEND (which >> will do), and if they're writing in chunks small enough to individually complete as single syscalls (which is probably happening unless the individual lags values are quite large).
Charles Duffy
  • 280,126
  • 43
  • 390
  • 441
  • 2
    This is a good, simple solution. It has to be noted that it works in cases where the results of each individual operations can be worked on individually (i.e. no interaction after processing). Also, `xargs` will launch only `$max_proc_count` processes at a time, but each task will result in a new process being launched, which may be sub-optimal in some rare, performance-critical cases, but is simpler (and therefore preferable) in most cases compared to creating/controlling you own worker processes. – Fred Apr 09 '17 at 16:34
  • *nod*. When `wait -n` gets fixed to exit once per completed job with no corner cases or exceptions, I'll be comfortable doing without `xargs`. As it is, though, there are currently enough corner cases (ie. when you have multiple SIGCHLDs coming in very close to each other) to make the roll-it-yourself approach tricky to get right. – Charles Duffy Apr 09 '17 at 16:36
  • (Basically, instead of being able to assume that one return from `wait -n` means you can now start one more process, you need to go through and test the individual PIDs to see *how many* died, since you can't be sure it was just one). – Charles Duffy Apr 09 '17 at 16:39
  • Good to know. I see how that would make it simpler to launch worker processes from the main script. This problem is not present if each worker process reads the next task to be executed, but this requires having a way to tell worker processes what to do (e.g. a FIFO), so a bit more complicated. If I needed something like this, I would build it as a reusable framework receiving the actual worker function names as arguments to separate the plumbing. – Fred Apr 09 '17 at 17:00
  • @CharlesDuffy, I tried it this way but got this error for some reason...`line 6: printf: \`(': invalid format character`. I made an edit to show my full code as well – lightweight Apr 09 '17 at 18:53
  • @user2061886, as I said in the answer, `printf %()T` requires bash 4.2 or newer. It looks like you don't have that, and so should fall back to getting your `$timestamp` a different way. – Charles Duffy Apr 10 '17 at 13:21
  • This solution seems to have several problems: It does not work as it is (a dummy argument is needed in the end of the last line) - thus it is not correct. It uses around 10x RAM of the output. Thus you cannot use it for output that is bigger than 10% of your RAM. Runtime is proportional to number of words. Processing `yes | head -n 1000000;` takes around 10 sec - even though it is only 2MB. Since you are focused efficiency it might be worth mentioning. – Ole Tange Apr 15 '17 at 23:34
  • 1
    If this was meant as a general solution you might want to fix: It splits lines that has more than a single word on separate lines. It does not guard against mixing long outputs. – Ole Tange Apr 15 '17 at 23:36
4

A lot of lenghty and theoretical answers here, I'll try to keep it simple - what about using | (pipe) to connect the commands as usual ?;) (And GNU parallel, which excels for these type of tasks).

seq 6 | parallel -j4 "command --arg1 {} | command2 > results/{}"

The -j4 will limit number of threads (jobs) as requested. You DON'T want to write to a single file from multiple jobs, output one file per job and join them after the parallel processing is finished.

liborm
  • 2,634
  • 20
  • 32
  • `parallel` is actually able to collate output, so writing to separate output files and collating explicitly is unneeded. (That said, personally, `parallel` is a tool I'm unable to condone -- it's a big mess of perl with features that interact in underspecified ways). Also, as I describe in my answer, writing directly to a shared output file is actually safe under some specific circumstances (`O_APPEND` and sufficiently short individual writes), and I believe that the OP falls under them here. – Charles Duffy Apr 10 '17 at 13:25
  • 1
    @CharlesDuffy there's an old indian saying about not reading the source code of tools that help you to get the job done. "safe" and "sufficiently short" together sounds fishy, a bit like "It's safe to sync your threads with `sleep()` using a sufficiently long wait." (You only know it's not sufficient after it fails.) – liborm Apr 10 '17 at 21:15
  • http://stackoverflow.com/a/35260110/14122 discusses the O_APPEND case even more explicitly. – Charles Duffy Apr 10 '17 at 21:27
  • 1
    (as for "tools that help you get the job done", I care about correctness -- if piping the name of the file created by `touch $'/tmp/evil/$(rm -rf $HOME)\'$(rm -rf $HOME)\''` into `parallel` is going to result in it running a shell that executes the command substitutions embedded in same, I want to *know* that, and restrict the domain of cases where I use or advise use of that tool appropriately. If I don't know how it behaves in the corner cases? I'm not advising its use by anyone). – Charles Duffy Apr 10 '17 at 21:29
  • It's getting a bit off topic, here we're piping a sequence of numbers to the tool. And I'm having a hard time to reproduce the code execution vulnerability with a bit ancient `parallel` (2013something). Do you have a CVE link? – liborm Apr 10 '17 at 21:48
  • @CharlesDuffy I cannot reproduce your behaviour: `mkdir /tmp/evil;touch $'/tmp/evil/$(rm -rf $HOME)\'$(rm -rf $HOME)\''; find /tmp/evil | parallel echo` None of the released version since 2012 does anything but `echo`ing the file name correctly. Are you sure you are using the GNU version? – Ole Tange Apr 15 '17 at 14:28
  • If you prefer having the input in an array: `parallel 'echo {}; command --arg1 {}' ::: "${array[@]}" > myoutput` – Ole Tange Apr 15 '17 at 14:30
  • @OleTange, I didn't assert that it was vulnerable. I asserted that I'm unaware of any documented semantics that guarantee it *not* to be vulnerable -- and if I don't know a tool's behavior in the corner cases, I'm not going to advise its use. In GNU `find -exec` and other common tools, `{}` means "just perform a literal string substitution here" -- which *does* cause the vulnerability given. Obviously, `parallel` is doing magic. I don't trust magic. – Charles Duffy Apr 15 '17 at 15:06
  • (Digressing a bit: In the `find -exec` case, it's mid-argument string substitution, specifically, that's problematic: `-exec sh -c 'foo {} >{}.out' \;` is vulnerable, whereas `-exec 'for arg; do foo "$arg" >"$arg.out"; done' _ {} +` is not; the latter is all POSIX requires, but GNU does the former in addition). – Charles Duffy Apr 15 '17 at 15:11
  • ...That said, this has inspired me to go read the docs a bit and look for holes in the relevant functionality and/or examples. Back in a bit. :) – Charles Duffy Apr 15 '17 at 15:19
  • ...so, for the example in the docs titled "EXAMPLE: Using shell variables", `V=$'$(touch vulnerable-A) \'$(touch vulnerable-B)\''` creates the file `vulnerable-B`. (Continuing to look for more, will comment as they're found). – Charles Duffy Apr 15 '17 at 15:23
  • `parallel --shellquote`, by contrast, is nice and conservative -- should be safe with any POSIX-compliant shell. I'm **assuming** this is identical logic to what's used to quote the items iterated over, but haven't found an explicit guarantee to that effect). – Charles Duffy Apr 15 '17 at 15:36
  • (For those not referring to the current man page, `parallel echo "'$V'" ::: suffix` to exercise the example two comments up; perhaps that might be better replaced with `parallel echo "$(printf '%s\0' "$V" | parallel -0 --shellquote)" ::: suffix` to show practices safe with unknown values). – Charles Duffy Apr 15 '17 at 15:37
  • @OleTange, ...by the way, in the argument-appending case where `{}` is unused, does `parallel` start a shell at all? If not, I think we may have a scenario in which I find it utterly unobjectionable (except, perhaps, for the need to *audit* arguments at runtime to ensure that none of them contains unintentional replacement strings; a flag or option that disabled replacement strings would moot that need). – Charles Duffy Apr 15 '17 at 16:37
1

Using GNU Parallel it looks like this:

array=( 1 2 3 4 5 6 )
parallel -0 --bar --tagstring '{= $_=localtime(time)."\t".$_; =}' \
  command --arg1 {} ::: "${array[@]}" > output

GNU Parallel makes sure output from different jobs is not mixed.

If you prefer the output from jobs mixed:

parallel -0 --bar --line-buffer --tagstring '{= $_=localtime(time)."\t".$_; =}' \
  command --arg1 {} ::: "${array[@]}" > output-linebuffer

Again GNU Parallel makes sure to only mix with full lines: You will not see half a line from one job and half a line from another job.

It also works if the array is a bit more nasty:

array=( "new
line" 'quotes"  '"'" 'echo `do not execute me`')

Or if the command prints long lines half-lines:

command() {
  echo Input: "$@"
  echo '"  '"'"
  sleep 1
  echo -n 'Half a line '
  sleep 1
  echo other half

  superlong_a=$(perl -e 'print "a"x1000000')
  superlong_b=$(perl -e 'print "b"x1000000')
  echo -n $superlong_a
  sleep 1
  echo $superlong_b
}
export -f command

GNU Parallel strives to be a general solution. This is because I have designed GNU Parallel to care about correctness and try vehemently to deal correctly with corner cases, too, while staying reasonably fast.

GNU Parallel guards against race conditions and does not split words in the output on each their line.

array=( $(seq 30) )
max_proc_count=30

command() {
  # If 'a', 'b' and 'c' mix: Very bad                                                         
  perl -e 'print "a"x3000_000," "'
  perl -e 'print "b"x3000_000," "'
  perl -e 'print "c"x3000_000," "'
  echo
}
export -f command

parallel -0 --bar --tagstring '{= $_=localtime(time)."\t".$_; =}' \
  command --arg1 {} ::: "${array[@]}" > parallel.out

# 'abc' should always stay together
# and there should only be a single line per job
cat parallel.out | tr -s abc

GNU Parallel works fine if the output has a lot of words:

array=(1)
command() {
  yes "`seq 1000`" | head -c 10M
}
export -f command

parallel -0 --bar --tagstring '{= $_=localtime(time)."\t".$_; =}' \
  command --arg1 {} ::: "${array[@]}" > parallel.out

GNU Parallel does not eat all your memory - even if the output is bigger than your RAM:

array=(1)
outputsize=1000M
export outputsize
command() {
    yes "`perl -e 'print \"c\"x30_000'`" | head -c $outputsize
}
export -f command

parallel -0 --bar --tagstring '{= $_=localtime(time)."\t".$_; =}' \
  command --arg1 {} ::: "${array[@]}" > parallel.out
Ole Tange
  • 31,768
  • 5
  • 86
  • 104
  • Please disclose that you're the (original) author of GNU parallel - not just here, but in _all_ answers you post. – mklement0 Apr 15 '17 at 23:12
0

You know how to execute commands in separate processes. The missing part is how to allow those processes to communicate, as separate processes cannot share variables.

Basically, you must chose whether to communicate using regular files, or inter-process communication/FIFOs (which still boils down to using files).

The general approach :

  • Decide how you want to present tasks to be executed. You could have them as separate files on the filesystem, as a FIFO special file that can be read from, etc. This could be a simple as writing to a separate file each command to be executed, or writing each command to a FIFO (one command per line).

  • In the main process, prepare the files describing tasks to perform or launch a separate process in the background that will feed the FIFO.

  • Then, still in the main process, launch worker processes in the background (with &), as many of them as you want parallel tasks being executed (not one per task to perform). Once they have been launched, use wait to, well, wait until all processes are finished. Separate processes cannot share variables, you will have to write any output that needs to be used later to separate files, or a FIFO, etc. If using a FIFO, remember more than one process can write to a FIFO at the same time, so use some kind of mutex mechanism (I suggest looking into the use of mkdir/rmdir for that purpose).

  • Each worker process must fetch the next task (from a file/FIFO), execute it, generate the output (to a file/FIFO), loop until there are no new tasks, then exit. If using files, you will need to use a mutex to "reserve" a file, read it, and then delete it to mark it as taken care of. This would not be needed for a FIFO.

  • Depending on the case, your main process may have to wait until all tasks are finished before handling the output, or in some cases may launch a worker process that will detect and handle output as it appears. This worker process would have to either be stopped by the main process once all tasks have been executed, or figure out for itself when all tasks have been executed and exit (while being waited on by the main process).

This is not detailed code, but I hope it gives you an idea of how to approach problems like this.

Fred
  • 6,590
  • 9
  • 20
0

(Community Wiki answer with the OP's proposed self-answer from the question -- now edited out):


So here is one way I can think of doing this, not sure if this is the most efficient way and also, I can't control the amount of threads (I think, or processes?) this would use:

array=( 1 2 3 4 5 6 )

lag_func () {
  echo "$1"

  lags=($(command --arg1 $1)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do
    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"
  done
}

for each in "${array[@]}"
do
  lag_func $each &
done
Community
  • 1
  • 1
Charles Duffy
  • 280,126
  • 43
  • 390
  • 441
  • Other than the usual ways it's buggy (missing quotes, for instance, and side effects of using the `array=( $(...) )` idiom) and the acknowledged caveat around controlling the number of threads, this is entirely workable. See http://shellcheck.net/ for guidance on several of those bugs. – Charles Duffy Apr 09 '17 at 16:07