I have tested almost all wrappers for Python to do the same.
PyKCS11 is not really solid.
I recommend to use one of these two possibilities:
1. PyCurl
When you configure correctly your OpenSSL and so your cURL, the Python wrapper to cUrl can do this.
Here is a simple implementation:
import pycurl
from io import BytesIO
import pprint
import json
c = pycurl.Curl()
url = 'https://yourserver/endpoint'
c.setopt(pycurl.URL, url)
# set proxy-insecure
c.setopt(c.SSL_VERIFYPEER, 0)
c.setopt(c.SSL_VERIFYHOST, False)
c.setopt(c.VERBOSE, 0)
c.setopt(pycurl.SSLENGINE, 'pkcs11')
c.setopt(pycurl.SSLCERTTYPE, 'eng')
c.setopt(pycurl.SSLKEYTYPE, 'eng')
c.setopt(pycurl.SSLCERT, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC;type=cert;pin-value=pin-pin')
c.setopt(pycurl.SSLKEY, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC;type=private;pin-value=pin-pin')
# set headers
c.setopt(pycurl.HEADER, True)
# c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0')
c.setopt(pycurl.HTTPHEADER, ("HEADER_TO_ADD:VALUE",))
buffer = BytesIO()
c.setopt(c.WRITEDATA, buffer)
c.perform()
# HTTP response code, e.g. 200.
print('>>> Status: %d' % c.getinfo(c.RESPONSE_CODE))
# Elapsed time for the transfer.
print('>>> Time: %f' % c.getinfo(c.TOTAL_TIME))
# getinfo must be called before close.
c.close()
body = buffer.getvalue().decode('utf-8')
print('>>> Body:\n', body)
if body.find('{') >= 0:
body = body[body.find('{'):]
dictionary = json.loads(body)
pprint.pprint(dictionary)
Note that the pkcs#11 URI Scheme must be compliant to the RFC7512
It can be discovered using this command:
p11tool --provider=/usr/lib/libeTPkcs11.so --list-all
All fields including the pin must be url encoded. Use an online website to encode/decode string(pin)<->url_encoded
2. M2Crypto
This is the best pkcs#11 implementation done for Python in my point of view.
It allows to override urllib2 and so requests calls thanks to the requests.Session.mount method and the requests.adapters.BaseAdapter.
I put here some code to use it with urllib2:
from M2Crypto import m2urllib2 as urllib2
from M2Crypto import m2, SSL, Engine
# load dynamic engine
e = Engine.load_dynamic_engine("pkcs11", "/usr/lib/x86_64-linux-gnu/engines-1.1/libpkcs11.so")
pk = Engine.Engine("pkcs11")
pk.ctrl_cmd_string("MODULE_PATH", "/usr/lib/libeTPkcs11.so")
m2.engine_init(m2.engine_by_id("pkcs11"))
pk.ctrl_cmd_string("PIN", 'pin-pin')
cert = pk.load_certificate('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC')
key = pk.load_private_key('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC', pin='pin-pin')
ssl_context = SSL.Context('tls')
ssl_context.set_cipher_list('EECDH+AESGCM:EECDH+aECDSA:EECDH+aRSA:EDH+AESGCM:EDH+aECDSA:EDH+aRSA:!SHA1:!SHA256:!SHA384:!MEDIUM:!LOW:!EXP:!aNULL:!eNULL:!PSK:!SRP:@STRENGTH')
ssl_context.set_default_verify_paths()
ssl_context.set_allow_unknown_ca(True)
SSL.Connection.postConnectionCheck = None
m2.ssl_ctx_use_x509(ssl_context.ctx, cert.x509)
m2.ssl_ctx_use_pkey_privkey(ssl_context.ctx, key.pkey)
opener = urllib2.build_opener(ssl_context)
urllib2.install_opener(opener)
url = 'https://yourserver/endpoint'
content = urllib2.urlopen(url=url).read()
# content = opener.open(url)
print(content)
Note that, we don't indicate type and pin in the pkcs#11 URI as for PyCurl.
The PIN is indicated as string and not url encoded when passed to load_private_key.
Finally, I found a good implementation for the HttpAdapter to mount on requests to do the calls.
Here is the original implementation.
I added some lines to support the PIN code and to disable checking hostname with this:
M2Crypto.SSL.Connection.postConnectionCheck = None
And here is how to mount the adapter:
from requests import Session
from m2requests import M2HttpsAdapter
import pprint, json
request = Session()
m2httpsadapter = M2HttpsAdapter()
# Added by me to set a pin when loading private key
m2httpsadapter.pin = 'pin-pin'
# Need this attribute set to False else we cannot use direct IP (added by me to disable checking hostname)
m2httpsadapter.check_hostname = False
request.mount("https://", m2httpsadapter)
request.cert=('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC',
'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC')
headers = {'HEADER_TO_ADD_IF_WANTED': 'VALUE', }
r = request.get("https://yourserver/endpoint", headers=headers, verify=False)
print(r.status_code)
pprint.pprint(json.loads(r.raw.data.decode('utf-8')))