Building off of a similar answer, this is what I had to do with the latest version of Airflow at time of writing (1.10.7):
First, create an S3 connection with the following information:
Connection Name: '<your connection name>' # e.g. local_minio
Connection Type: S3
Extra: a JSON object with the following properties:
{
"aws_access_key_id":"your_minio_access_key",
"aws_secret_access_key": "your_minio_secret_key",
"host": "http://127.0.0.1:9000"
}
Next, in your DAG, create a task using the S3Hook to interact with data. Here is an example that you can adapt for your needs:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.S3_hook import S3Hook
DEFAULT_ARGS = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 13),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('create_date_dimension', default_args=DEFAULT_ARGS,
schedule_interval="@once")
def write_text_file(ds, **kwargs):
with open("/tmp/test.txt", "w") as fp:
# Add file generation/processing step here, E.g.:
fp.write(ds)
# Upload generated file to Minio
s3 = S3Hook('local_minio')
s3.load_file("/tmp/test.txt",
key=f"my-test-file.txt",
bucket_name="my-bucket")
# Create a task to call your processing function
t1 = PythonOperator(
task_id='generate_and_upload_to_s3',
provide_context=True,
python_callable=write_text_file,
dag=dag
)