I'm trying to get S3 hook in Apache Airflow using the Connection object.
It looks like this:
class S3ConnectionHandler:
def __init__():
# values are read from configuration class, which loads from env. variables
self._s3 = Connection(
conn_type="s3",
conn_id=config.AWS_CONN_ID,
login=config.AWS_ACCESS_KEY_ID,
password=config.AWS_SECRET_ACCESS_KEY,
extra=json.dumps({"region_name": config.AWS_DEFAULT_REGION}),
)
@property
def s3(self) -> Connection:
return get_live_connection(self.logger, self._s3)
@property
def s3_hook(self) -> S3Hook:
return self.s3.get_hook()
I get an error:
Broken DAG: [...] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py", line 282, in get_hook
return hook_class(**{conn_id_param: self.conn_id})
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 354, in __init__
raise AirflowException('Either client_type or resource_type must be provided.')
airflow.exceptions.AirflowException: Either client_type or resource_type must be provided.
Why does this happen? From what I understand the S3Hook calls the constructor from the parent class, AwsHook, and passes the client_type as "s3" string. How can I fix this?
I took this configuration for hook from here.
EDIT: I even get the same error when directly creating the S3 hook:
@property
def s3_hook(self) -> S3Hook:
#return self.s3.get_hook()
return S3Hook(
aws_conn_id=config.AWS_CONN_ID,
region_name=self.config.AWS_DEFAULT_REGION,
client_type="s3",
config={"aws_access_key_id": self.config.AWS_ACCESS_KEY_ID, "aws_secret_access_key": self.config.AWS_SECRET_ACCESS_KEY}
)
``