2

So, I'm writing a CLI to talk to Elasticsearch (because I was tired of long curl commands). Everything works pretty great except when I want to talk to my cluster running inside Kubernetes because it's not exposed over the internet. Thus I have to manually run a kubectl port-forward first and I find it very annoying.

To "fix" that, I started implement some kind of "pre-command" to run before the real command in my CLI. In my CLI config file, I'm defining something like :

my_cluster:
  servers: https://localhost:9200
  pre_commands:
  - command: "kubectl use-context ..."
    wait_for_exit: True
  - command: "kubectl port-forward svc/es 9200"
    wait_for_exit: False
    wait_for_output: "Forwarding from 127.0.0.1:9200"

This config means that when talking to the cluster named "my_cluster" I want to run two shell commands beforehand :

  • kubectl use-context ... and wait for it to exit (since it's not a long-running command)
  • kubectl port-forward svc/es 9200 and wait for a specific output : I don't want to wait for this command to exit because it doesn't. However, it prints something on stdout when the port-forwarding is established, thus I'm waiting for that.

I'm using cliff to build the command line, and it exposes some hooks in the methods named prepare_to_run_command and clean_up (defined here) :

def _run_shell_subcommand(self, command):
    command = command.split(" ")
    process = subprocess.Popen(command, stdout=subprocess.PIPE)

    return process

def prepare_to_run_command(self, cmd):
    for i in range(len(self.context.pre_commands)):
        command_block = self.context.pre_commands[i]
        process = self._run_shell_subcommand(command_block.get("command"))

        if command_block.get("wait_for_exit"):
            process.communicate()

        elif command_block.get("wait_for_output"):
            string_to_look_for = command_block.get("wait_for_output")
            pattern = re.compile(string_to_look_for)

            while True:
                line = process.stdout.readline()
                if not line:
                    break

                line = line.decode("utf-8").strip()

                match = re.search(pattern, line)

                if match is None:
                    pass
                else:
                    break

        self.context.pre_commands[i]["process"] = process

def clean_up(self, cmd, result, err):
    for pre_command in self.context.pre_commands:
        pre_command.get("process").terminate()

It appears to successfully wait for the expected output and then move on to the real action I want to run against my Elasticsearch cluster. But this is were the problem arises : I usually end up with an error like :

Traceback (most recent call last):
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/site-packages/urllib3/connectionpool.py", line 376, in _make_request
    self._validate_conn(conn)
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
    conn.connect()
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/site-packages/urllib3/connection.py", line 394, in connect
    ssl_context=context,
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
    return context.wrap_socket(sock, server_hostname=server_hostname)
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/ssl.py", line 407, in wrap_socket
    _context=self, _session=session)
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/ssl.py", line 817, in __init__
    self.do_handshake()
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/ssl.py", line 1077, in do_handshake
    self._sslobj.do_handshake()
  File "/Users/jeromepin/.asdf/installs/python/3.6.9/lib/python3.6/ssl.py", line 689, in do_handshake
    self._sslobj.do_handshake()
ConnectionResetError: [Errno 54] Connection reset by peer

My best guess is the port-forward is interrupted (while it shouldn't) during the call to Elasticsearch, terminating the connection unexpectedly.

If I run the port-forward manually in another shell and issue my CLI command (without any pre_command), everything works like a charm.

If it's not the good way to run shell commands, I'm open to suggestions too.

Jérôme Pin
  • 1,363
  • 4
  • 18
  • 33
  • You **have** to clear the `stdout` of each process, otherwise they will hang. Are you sure you clear all the `stdout`'s? Asking because I don't see that the stack trace is relevant to the code and perhaps there's some other magic in another part of the code that isn't getting handled properly, although I might be wrong. – Torxed Jun 01 '20 at 10:24
  • I don't. How do you do that ? And why processes will hang because of that ? – Jérôme Pin Jun 01 '20 at 10:35
  • Imagine you're in charge of filling a bucket with water and your friend is in charge of emptying it when it gets full. But your friend never empties it because he found a candy shop on the way to you and your bucket.. Should you keep filling the bucket anyway or close the water valve and wait for the bucket to become free? :) – Torxed Jun 01 '20 at 10:42
  • `.communicate()` should empty the `stdout` (and `stderr`) tho, but the problem is that it will *hang* until the process is done, meaning your other processes won't get the attention it needs. So if you're running multiple commands at the same time, you're better off to [poll the process and empty stdout manually](https://stackoverflow.com/a/41466247/929999) – Torxed Jun 01 '20 at 10:45
  • Haha nice metaphor :) However linux can handle multiple file-descriptors opened at the same time right ? And each process has its own stdout/stderr streams, no ? Anyway, I don't *have* to poll and read them, I only have to close them right ? – Jérôme Pin Jun 01 '20 at 11:57
  • Correct, each new `Popen()` gets its own file descriptor. And that stream takes X ammount of bytes before completely hanging. You do have to poll and read them during it's running state - otherwise it might hang during normal operation depending on how much output is given etc. And you do need to close them but only to avoid "maximum filehandles opened" errors, the hanging has more to do with the reading of the buffers to do. I'm still not convinced that this is the problem you're experiencing since I can't really tell which commands you're running and if they spam output or not :) – Torxed Jun 01 '20 at 12:07
  • I would highly recommend using Python's Pexpect library for this rather than rolling your own. There are a lot of edge cases. I have successfully implemented `kubectl port-forward` using this library. It is simple once you get a handle on using the library https://pexpect.readthedocs.io/en/stable/ – Brandon Nov 08 '22 at 23:22

0 Answers0