1

I just started using Airflow to coordinate our ETL pipeline.

I encountered the pipe error when I run a dag.

I've seen a general stackoverflow discussion here.

My case is more on the Airflow side. According to the discussion in that post, the possible root cause is:

The broken pipe error usually occurs if your request is blocked or takes too long and after request-side timeout, it'll close the connection and then, when the respond-side (server) tries to write to the socket, it will throw a pipe broken error.

This might be the real cause in my case, I have a pythonoperator that will start another job outside of Airflow, and that job could be very lengthy (i.e. 10+ hours), I wonder if what is the mechanism in place in Airflow that I can leverage to prevent this error.

Can anyone help?

UPDATE1 20190303-1:

Thanks to @y2k-shubham for the SSHOperator, I am able to use it to set up a SSH connection successfully and am able to run some simple commands on the remote site (indeed the default ssh connection has to be set to localhost because the job is on the localhost) and am able to see the correct result of hostname, pwd.

However, when I attempted to run the actual job, I received same error, again, the error is from the jpipeline ob instead of the Airflow dag/task.

UPDATE2: 20190303-2

I had a successful run (airflow test) with no error, and then followed another failed run (scheduler) with same error from pipeline.

mdivk
  • 3,545
  • 8
  • 53
  • 91
  • I've run `bash` scripts (`Hadoop` jobs) on remote system through `Airflow` (`SSHHook`) that got **stuck for over 7-8 hours** when disk (on remote system) ran out and resumed once I manually deleted data. Though `hadoop` jobs come with *goodness* of `YARN` that does the job of keeping things *alive* even when *stuck* and gracefully resuming everything back. But also the [`keepalive_interval`](https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/ssh_hook.py#L68) helps preventing timeout from `Airflow`'s end. So moving from `PythonOperator` to `SSHOperator` might work for you – y2k-shubham Mar 03 '19 at 06:03
  • Thank you so much for the enlightening, can you kindly write an anwser here with the sample sshoperator? – mdivk Mar 03 '19 at 13:12

1 Answers1

0

While I'd suggest you keep looking for a more graceful way of trying to achieve what you want, I'm putting up example usage as requested


First you've got to create an SSHHook. This can be done in two ways

  • The conventional way where you supply all requisite settings like host, user, password (if needed) etc from the client code where you are instantiating the hook. Im hereby citing an example from test_ssh_hook.py, but you must thoroughly go through SSHHook as well as its tests to understand all possible usages

    ssh_hook = SSHHook(remote_host="remote_host",
                       port="port",
                       username="username",
                       timeout=10,
                       key_file="fake.file")
    
  • The Airflow way where you put all connection details inside a Connection object that can be managed from UI and only pass it's conn_id to instantiate your hook

    ssh_hook = SSHHook(ssh_conn_id="my_ssh_conn_id")
    

    Of course, if your'e relying on SSHOperator, then you can directly pass the ssh_conn_id to operator.

    ssh_operator = SSHOperator(ssh_conn_id="my_ssh_conn_id")
    

Now if your'e planning to have a dedicated task for running a command over SSH, you can use SSHOperator. Again I'm citing an example from test_ssh_operator.py, but go through the sources for a better picture.

 task = SSHOperator(task_id="test",
                    command="echo -n airflow",
                    dag=self.dag,
                    timeout=10,
                    ssh_conn_id="ssh_default")

But then you might want to run a command over SSH as a part of your bigger task. In that case, you don't want an SSHOperator, you can still use just the SSHHook. The get_conn() method of SSHHook provides you an instance of paramiko SSHClient. With this you can run a command using exec_command() call

my_command = "echo airflow"
stdin, stdout, stderr = ssh_client.exec_command(
  command=my_command,
  get_pty=my_command.startswith("sudo"),
  timeout=10)

If you look at SSHOperator's execute() method, it is a rather complicated (but robust) piece of code trying to achieve a very simple thing. For my own usage, I had created some snippets that you might want to look at

  • For using SSHHook independently of SSHOperator, have a look at ssh_utils.py
  • For an operator that runs multiple commands over SSH (you can achieve the same thing by using bash's && operator), see MultiCmdSSHOperator
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • Thank you very much for the details, another anomaly I noticed is for the SSH Connection setting in Admin/Connections, the SSH_Default connection is refreshed for each DAG run, I need to regenerate it in order to use it again. – mdivk Mar 04 '19 at 14:15