The Python Oracle

How to make a generic method in Python to execute multiple piped shell commands?

--------------------------------------------------
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------

Music by Eric Matyas
https://www.soundimage.org
Track title: RPG Blues Looping

--

Chapters
00:00 How To Make A Generic Method In Python To Execute Multiple Piped Shell Commands?
01:24 Accepted Answer Score 3
03:03 Answer 2 Score 3
05:06 Thank you

--

Full question
https://stackoverflow.com/questions/5175...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #shell #subprocess

#avk47



ACCEPTED ANSWER

Score 3


Instead of using a shell string and trying to parse it with your own means, I’d ask the user to provide the commands as separate entities themselves. This avoid the obvious trap of detecting a | that is part of a command and not used as a shell pipe. That you ask them to provide commands as a list of strings or a single string that you will shlex.split afterwards is up to the interface that you want to expose. I’d choose the first one for its simplicity in the following example.

Once you have the individual commands, a simple for loop is enough to pipe outputs of the previous commands to inputs of the next ones, as you have found yourself:

def pipe_subprocesses(*commands):
    if not commands:
        return

    next_input = None
    for command in commands:
        p = subprocess.Popen(command, stdin=next_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        next_input = p.stdout

    out, err = p.communicate()
    if err:
        print(err.decode().strip())
    else:
        print(out.decode())

Usage being:

>>> pipe_subprocesses(['ls', '-lhtr'], ['awk', '{print $9}'], ['wc', '-l'])
25

Now this is a quick and dirty way to get it setup and have seemingly work as you want it. But there are at least two issues with this code:

  1. You leak zombies process/opened process handles because no process' exit code but the last one is collected; and the OS is keeping resources opened for you to do so;
  2. You can't access the informations of a process that would fail midway through.

To avoid that, you need to maintain a list of opened process and explicitly wait for each of them. And because I don't know your exact use case, I'll just return the first process that failed (if any) or the last process (if not) so you can act accordingly:

def pipe_subprocesses(*commands):
    if not commands:
        return

    processes = []
    next_input = None
    for command in commands:
        if isinstance(command, str):
            command = shlex.split(command)
        p = subprocess.Popen(command, stdin=next_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        next_input = p.stdout
        processes.append(p)

    for p in processes:
        p.wait()

    for p in processes:
        if p.returncode != 0:
            return p
    return p  # return the last process in case everything went well

I also thrown in some shlex as an example so you can mix raw strings and already parsed lists:

>>> pipe_subprocesses('ls -lhtr', ['awk', '{print $9}'], 'wc -l')
25



ANSWER 2

Score 3


This unfortunately has a few edge cases in it that the shell takes care of for you, or alternatively, that the shell completely ignores for you. Some concerns:

  • The function should always wait() for every process to finish, or else you will get what are called zombie processes.

  • The commands should be connected to each other using real pipes, that way the entire output doesn't need to be read into memory at once. This is the normal way pipes work.

  • The read end of every pipe should be closed in the parent process, so children can properly SIGPIPE when the next process closes its input. Without this, the parent process can keep the pipe open and the child does not know to exit, and it may run forever.

  • Errors in child processes should be raised as exceptions, except SIGPIPE. It is left as an exercise to the reader to raise exceptions for SIGPIPE on the final process because SIGPIPE is not expected there, but ignoring it is not harmful.

Note that subprocess.DEVNULL does not exist prior to Python 3.3. I know there are some of you out there still living with 2.x, you will have to open a file for /dev/null manually or just decide that the first process in the pipeline gets to share stdin with the parent process.

Here is the code:

import signal
import subprocess

def run_pipe(*cmds):
    """Run a pipe that chains several commands together."""
    pipe = subprocess.DEVNULL
    procs = []
    try:
        for cmd in cmds:
            proc = subprocess.Popen(cmd, stdin=pipe,
                                    stdout=subprocess.PIPE)
            procs.append(proc)
            if pipe is not subprocess.DEVNULL:
                pipe.close()
            pipe = proc.stdout
        stdout, _ = proc.communicate()
    finally:
        # Must call wait() on every process, otherwise you get
        # zombies.
        for proc in procs:
            proc.wait()
    # Fail if any command in the pipe failed, except due to SIGPIPE
    # which is expected.
    for proc in procs:
        if (proc.returncode
            and proc.returncode != -signal.SIGPIPE):
            raise subprocess.CalledProcessError(
                proc.returncode, proc.args)
    return stdout

Here we can see it in action. You can see that the pipeline correctly terminates with yes (which runs until SIGPIPE) and correctly fails with false (which always fails).

In [1]: run_pipe(["yes"], ["head", "-n", "1"])
Out[1]: b'y\n'

In [2]: run_pipe(["false"], ["true"])
---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
<ipython-input-2-db97c6876cd7> in <module>()
----> 1 run_pipe(["false"], ["true"])

~/test.py in run_pipe(*cmds)
     22     for proc in procs:
     23         if proc.returncode and proc.returncode != -signal.SIGPIPE:
---> 24             raise subprocess.CalledProcessError(proc.returncode, proc.args)
     25     return stdout

CalledProcessError: Command '['false']' returned non-zero exit status 1