2

I have a scenario where I have a setup of AWS EMR with few applications such as Spark, Hadoop, Hive, HCatalog, Zeppelin, Sqoop, etc. And, I have another server which runs only Airflow.

I am working on a requirement, where I want to move MySQL tables ( which is again on a different RDS instance ) to Hive using Sqoop and this trigger has to be submitted by Airflow.

Is it possible to achieve this using SqoopOperator available in Airflow given that Airflow is in remote server? I believe not, then is there any other way to achieve this?

Thanking in advance.

rishm.msc
  • 361
  • 5
  • 16
  • I'm sure [just like `hadoop distcp`](https://stackoverflow.com/a/51708322/3679900), this can be done via [`SSHOperator`](https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/contrib/operators/ssh_operator.py). But what was the final solution you used? – y2k-shubham Dec 10 '18 at 19:07

2 Answers2

2

Yes, this is possible. I'll admit the documentation on how to use the operators is lacking but if you understand the concept of hooks and operators in Airflow, you can figure it out by reading the code of the operator you're looking to use. In the case, you'll want to read through the SqoopHook and the SqoopOperator codebase. Most of what I know how to do with Airflow comes from reading the code, while I haven't used this operator, I can try and help you out here best I can.

Let's assume you want to execute this this sqoop command:

sqoop import --connect jdbc:mysql://mysql.example.com/testDb --username root --password hadoop123 --table student 

And you have a Sqoop server running on a remote host which you can access with the Scoop client at http://scoop.example.com:12000/sqoop/.

First, you'll need to create the connection in the Airflow Admin UI, call the connection sqoop. For the connection, fill in host as scoop.example.com, schema as sqoop, and port as 12000. If you have a password, you will need to put this into a file on your server and in extras fill out a json string that looks like {'password_file':'/path/to/password.txt'} (see inline code about this password file).

After you set up the connection in the UI, can now create an task using the SqoopOperator in you DAG file. This might look like this:

sqoop_mysql_export = SqoopOperator(conn_id='sqoop',
                                   table='student',
                                   username='root',
                                   password='password',
                                   driver='jdbc:mysql://mysql.example.com/testDb',
                                   cmd_type='import')

You can see the full list of paramaters you might want to pass for imports can be found in the code here.

You can see how the SqoopOperator (and really the SqoopHook which the operator leverages to connect to Sqoop) translates these arguments to command line commands here.

Really this SqoopOperator just works by translating the kwargs you pass into sqoop client CLI commands. If you check out the SqoopHook, you can see how that's done and probably figure out how to make it work for your case. Good luck!

To troubleshoot, I would recommend SSHing into the server you're running Airflow on and confirm you can run the Scoop client from the command line and connect to the remote Scoop server.

Mike
  • 2,514
  • 2
  • 23
  • 37
  • 1
    Thanks @mike But, in your explanation I believe you are considering that Airflow and Sqoop are installed on same server which is not the case. I have Airflow and Sqoop running on different servers. Hence, I believe this won't work. – rishm.msc Jul 09 '18 at 05:59
  • It will work, you just need to set up the connection differently. I will adjust the answer for a remote case. – Mike Jul 09 '18 at 22:37
  • Mike: Didn't work. conn_id in SqoopOperator is converted into --connect of Sqoop CLI which is should be a source host for importing table. https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/sqoop_hook.py#L150 – rishm.msc Jul 11 '18 at 00:22
  • Your right. It looks like you need to pass the properties in using `properties` https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/sqoop_hook.py#L53, but I'm not sure, I have never used the sqoop client to connect to a remote sqoop server so I don't know how it's done – Mike Jul 12 '18 at 12:57
  • First step would be to figure out how to execute sqoop commands from a remote host – Mike Jul 12 '18 at 12:58
  • **@Tarivs**, **@rishi91991** see my answer [here](https://stackoverflow.com/a/53743370/3679900). While it discusses using `HiveCliOperator` over *remote* server, the same idea can be applied to `SqoopOperator` – y2k-shubham Dec 19 '18 at 16:58
0

Try to add step by use script-runner.jar. here is more.

aws emr create-cluster --name "Test cluster" –-release-label emr-5.16.0 --applications Name=Hive Name=Pig --use-default-roles --ec2-attributes KeyName=myKey --instance-type m4.large --instance-count 3 --steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://region.elasticmapreduce/libs/script-runner/script-runner.jar,Args=["s3://mybucket/script-path/my_script.sh"]

Then you can do it like this.

SPARK_TEST_STEPS = [
    {
        'Name': 'demo',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'cn-northwest-1.elasticmapreduce/libs/script-runner/script-runner.jar',
            'Args': [
                "s3://d.s3.d.com/demo.sh",
            ]
        }
    }
]

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

step_adder.set_downstream(step_checker)

demo.sh like this.

!/bin/bash    
sqoop import --connect jdbc:mysql://mysql.example.com/testDb --username root --password hadoop123 --table student
Tarivs
  • 19
  • 1
  • The question specifically asked how to perform the said thing using the [`SqoopOperator`](https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/contrib/operators/sqoop_operator.py) available in `Airflow` – y2k-shubham Dec 10 '18 at 19:01
  • The questions did ask about a specific operator, but I'm glad this answer is here as it demonstrates a way to accomplish a sqoop job without the sqoop client installed, and it contains a working example that solves the problem the sqoop operator attempts to solve. – jhnclvr May 02 '19 at 13:18