5

Simple question:

Rather than using S3 or GCS, I'd like to know how to use minio as a local S3 proxy to hold Airflow-sent data. How do I do this? Can I use the FileToGoogleCloudStorageOperator or not really?

And if not this route for local storage (of large-ish images rather than db rows), what would you recommend?

Thanks!

jtlz2
  • 7,700
  • 9
  • 64
  • 114

1 Answers1

5

Building off of a similar answer, this is what I had to do with the latest version of Airflow at time of writing (1.10.7):

First, create an S3 connection with the following information:

Connection Name: '<your connection name>' #  e.g. local_minio
Connection Type: S3
Extra: a JSON object with the following properties: 
 {
    "aws_access_key_id":"your_minio_access_key",
    "aws_secret_access_key": "your_minio_secret_key",
    "host": "http://127.0.0.1:9000"
 }

Next, in your DAG, create a task using the S3Hook to interact with data. Here is an example that you can adapt for your needs:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.S3_hook import S3Hook

DEFAULT_ARGS = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 13),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('create_date_dimension', default_args=DEFAULT_ARGS,
          schedule_interval="@once")


def write_text_file(ds, **kwargs):
    with open("/tmp/test.txt", "w") as fp:
        # Add file generation/processing step here, E.g.:
        fp.write(ds)

        # Upload generated file to Minio
        s3 = S3Hook('local_minio')
        s3.load_file("/tmp/test.txt",
                     key=f"my-test-file.txt",
                     bucket_name="my-bucket")


# Create a task to call your processing function
t1 = PythonOperator(
    task_id='generate_and_upload_to_s3',
    provide_context=True,
    python_callable=write_text_file,
    dag=dag
)
iinuwa
  • 444
  • 4
  • 14
  • I tried something similar, but getting an error: ERROR - Failed to execute task: Invalid endpoint: http://xxxx:9001 – BuffaloDev Jun 14 '22 at 03:17