I use the following code in Python to generate authorization headers for getting an object in an S3 bucket (where AWS_SECRET_ACCESS_KEY
, AWS_ACCESS_KEY_ID
, AWS_DEFAULT_REGION
, and DEFAULT_BUCKET
are all set via environment variables in another piece of code)
def get_s3_headers(object_name):
'''Get authorization headers required to access a certain object in S3.'''
headers = _sig_v4_headers(pre_auth_headers={},
service='s3',
host=f'{DEFAULT_BUCKET}.s3.amazonaws.com',
method='GET',
path='/' + object_name,
query={ 'X-Amz-Expires' : '120' },
payload=b'')
return headers
def _sig_v4_headers(pre_auth_headers, service, host, method, path, query, payload):
# Define use of signature v4
algorithm = 'AWS4-HMAC-SHA256'
now = datetime.datetime.utcnow()
amzdate = now.strftime('%Y%m%dT%H%M%SZ')
datestamp = now.strftime('%Y%m%d')
payload_hash = hashlib.sha256(payload).hexdigest()
credential_scope = f'{datestamp}/{AWS_DEFAULT_REGION}/{service}/aws4_request'
pre_auth_headers_lower = {
header_key.lower(): ' '.join(header_value.split())
for header_key, header_value in pre_auth_headers.items()
}
required_headers = {
'host': host,
'x-amz-content-sha256': payload_hash,
'x-amz-date': amzdate,
}
headers = {**pre_auth_headers_lower, **required_headers}
header_keys = sorted(headers.keys())
signed_headers = ';'.join(header_keys)
def signature():
def canonical_request():
canonical_uri = urllib.parse.quote(path, safe='/~')
quoted_query = sorted(
(urllib.parse.quote(key, safe='~'), urllib.parse.quote(value, safe='~'))
for key, value in query.items()
)
canonical_querystring = '&'.join(f'{key}={value}' for key, value in quoted_query)
canonical_headers = ''.join(f'{key}:{headers[key]}\n' for key in header_keys)
print(canonical_querystring)
return f'{method}\n{canonical_uri}\n{canonical_querystring}\n' + \
f'{canonical_headers}\n{signed_headers}\n{payload_hash}'
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
string_to_sign = f'{algorithm}\n{amzdate}\n{credential_scope}\n' + \
hashlib.sha256(canonical_request().encode('utf-8')).hexdigest()
date_key = sign(('AWS4' + AWS_SECRET_ACCESS_KEY).encode('utf-8'), datestamp)
region_key = sign(date_key, AWS_DEFAULT_REGION)
service_key = sign(region_key, service)
request_key = sign(service_key, 'aws4_request')
return sign(request_key, string_to_sign).hex()
return {
'uri' : f'https://{host}{path}',
'headers' : {
**pre_auth_headers,
'x-amz-date': amzdate,
'x-amz-content-sha256': payload_hash,
'Authorization': f'{algorithm} Credential={AWS_ACCESS_KEY_ID}/{credential_scope}, '
f'SignedHeaders={signed_headers}, Signature=' + signature(),
}
}
Calling get_s3_headers with a valid object key returns something similar to the following:
"headers": {
"Authorization": "AWS4-HMAC-SHA256 Credential=AKIAYS3VM3EBIFL7FKE5/20220324/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=<string of characters>",
"x-amz-content-sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"x-amz-date": "20220324T193132Z"
},
"uri": "https://hotspot-storage.s3.amazonaws.com/posts/61e0dd8056716196cf357434"
Calling GET
on https://hotspot-storage.s3.amazonaws.com/posts/61e0dd8056716196cf357434?X-Amz-Expires=120
returns the correct image with no expiration errors. Changing X-Amz-Expires
to any other value returns a "Signature not valid" exception as expected. However, I can still use the link and the headers after 120 seconds, as if X-Amz-Expires
doesn't actually do anything. I have zero clue why this is not working, so any help would be much appreciated.
EDIT
Changing X-Amz-Expires
to 0 in both the query string and Python code still allows access to the link. The expiration time always defaults to 15 minutes.