blob: 0107080a0b213defa530b80ba400f4660b9ea7c3 [file] [log] [blame]
# encoding: utf-8
from datetime import datetime, timezone
import json
import logging
import os
from six import StringIO
import subprocess
import tempfile
logger = logging.getLogger(__name__)
_std_subj = {
"C": "PL",
"ST": "Mazowieckie",
"L": "Warsaw",
"O": "Warsaw Hackerspace",
"OU": "clustercfg",
}
_ca_csr = {
"CN": "Prototype Test Certificate Authority",
"key": {
"algo": "rsa",
"size": 2048
},
"names": [ _std_subj ],
}
_ca_config = {
"signing": {
"default": {
"expiry": "168h"
},
"profiles": {
"intermediate": {
"expiry": "8760h",
"usages": [
"signing",
"key encipherment",
"cert sign",
"crl sign",
"server auth",
"client auth",
],
"ca_constraint": {
"is_ca": True,
},
},
"server": {
"expiry": "8760h",
"usages": [
"signing",
"key encipherment",
"server auth"
]
},
"client": {
"expiry": "8760h",
"usages": [
"signing",
"key encipherment",
"client auth"
]
},
"client-server": {
"expiry": "8760h",
"usages": [
"signing",
"key encipherment",
"server auth",
"client auth"
]
}
}
}
}
class CAException(Exception):
pass
class CA(object):
def __init__(self, secretstore, certdir, short, cn):
self.ss = secretstore
self.cdir = certdir
self.short = short
self.cn = cn
self._init_ca()
def __str__(self):
return 'CN={} ({})'.format(self.cn, self.short)
@property
def _secret_key(self):
return 'ca-{}.key'.format(self.short)
@property
def _cert(self):
return os.path.join(self.cdir, 'ca-{}.crt'.format(self.short))
@property
def cert_data(self):
with open(self._cert) as f:
return f.read()
def _cfssl_call(self, args, obj=None, stdin=None):
p = subprocess.Popen(['cfssl'] + args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if obj is not None:
stdin = json.dumps(obj)
outs, errs = p.communicate(stdin.encode())
if p.returncode != 0:
raise Exception(
'cfssl failed. stderr: %r, stdout: %r, code: %r' % (
errs, outs, p.returncode))
out = json.loads(outs)
return out
def _init_ca(self):
if self.ss.exists(self._secret_key):
return
ca_csr = dict(_ca_csr)
ca_csr['CN'] = self.cn
logger.info("{}: Generating CA...".format(self))
out = self._cfssl_call(['gencert', '-initca', '-'], obj=ca_csr)
f = self.ss.open(self._secret_key, 'w')
f.write(out['key'])
f.close()
f = open(self._cert, 'w')
f.write(out['cert'])
f.close()
def gen_key(self, hosts, o=_std_subj['O'], ou=_std_subj['OU'], save=None):
"""お元気ですか?"""
cfg = {
"CN": hosts[0],
"hosts": hosts,
"key": {
"algo": "rsa",
"size": 4096,
},
"names": [
{
"C": _std_subj["C"],
"ST": _std_subj["ST"],
"L": _std_subj["L"],
"O": o,
"OU": ou,
},
],
}
cfg.update(_ca_config)
logger.info("{}: Generating key/CSR for {}".format(self, hosts))
out = self._cfssl_call(['genkey', '-'], obj=cfg)
key, csr = out['key'], out['csr']
if save is not None:
logging.info("{}: Saving new key to secret {}".format(self, save))
f = self.ss.open(save, 'w')
f.write(key)
f.close()
return key, csr
def gen_csr(self, key, hosts, o=_std_subj['O'], ou=_std_subj['OU']):
"""
Generate a CSR while already having a private key - for renewals, etc.
TODO(q3k): this shouldn't be a CA method, but a cert method.
"""
cfg = {
"CN": hosts[0],
"hosts": hosts,
"key": {
"algo": "rsa",
"size": 4096,
},
"names": [
{
"C": _std_subj["C"],
"ST": _std_subj["ST"],
"L": _std_subj["L"],
"O": o,
"OU": ou,
},
],
}
cfg.update(_ca_config)
logger.info("{}: Generating CSR for {}".format(self, hosts))
out = self._cfssl_call(['gencsr', '-key', key, '-'], obj=cfg)
return out['csr']
def sign(self, csr, save=None, profile='client-server'):
logging.info("{}: Signing CSR".format(self))
ca = self._cert
cakey = self.ss.plaintext(self._secret_key)
config = tempfile.NamedTemporaryFile(mode='w')
json.dump(_ca_config, config)
config.flush()
out = self._cfssl_call(['sign', '-ca=' + ca, '-ca-key=' + cakey,
'-profile='+profile, '-config='+config.name, '-'], stdin=csr)
cert = out['cert']
if save is not None:
name = os.path.join(self.cdir, save)
logging.info("{}: Saving new certificate to {}".format(self, name))
f = open(name, 'w')
f.write(cert)
f.close()
config.close()
return cert
def upload(self, c, remote_cert):
logger.info("Uploading CA {} to {}".format(self, remote_cert))
c.put(local=self._cert, remote=remote_cert)
def make_cert(self, *a, **kw):
return ManagedCertificate(self, *a, **kw)
class ManagedCertificate(object):
def __init__(self, ca, name, hosts, o=None, ou=None, profile='client-server'):
self.ca = ca
self.hosts = hosts
self.name = name
self.key = '{}.key'.format(name)
self.cert = '{}.cert'.format(name)
self.o = o
self.ou = ou
self.profile = profile
self.ensure()
def __str__(self):
return '{}'.format(self.name)
@property
def key_exists(self):
return self.ca.ss.exists(self.key)
@property
def key_data(self):
f = open(self.ca.ss.open(self.key))
d = f.read()
f.close()
return d
@property
def key_path(self):
return self.ca.ss.plaintext(self.key)
@property
def cert_path(self):
return os.path.join(self.ca.cdir, self.cert)
@property
def cert_exists(self):
return os.path.exists(self.cert_path)
@property
def cert_data(self):
with open(self.cert_path) as f:
return f.read()
@property
def cert_expires_soon(self):
if not self.cert_exists:
return False
out = self.ca._cfssl_call(['certinfo', '-cert', self.cert_path], stdin="")
not_after = datetime.strptime(out['not_after'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
until = not_after - datetime.now(timezone.utc)
if until.days < 30:
return True
return False
def ensure(self):
if self.key_exists and self.cert_exists and not self.cert_expires_soon:
return
key = None
if not self.key_exists:
logger.info("{}: Generating key...".format(self))
key, csr = self.ca.gen_key(self.hosts, o=self.o, ou=self.ou, save=self.key)
else:
logger.info("{}: Renewing certificate...".format(self))
# Use already existing key
csr = self.ca.gen_csr(self.key_path, self.hosts, o=self.o, ou=self.ou)
self.ca.sign(csr, save=self.cert, profile=self.profile)
def upload(self, c, remote_cert, remote_key, concat_ca=False):
logger.info("Uploading Cert {} to {} & {}".format(self, remote_cert, remote_key))
if concat_ca:
f = StringIO(self.cert_data + self.ca.cert_data)
c.put(local=f, remote=remote_cert)
else:
c.put(local=self.cert_path, remote=remote_cert)
c.put(local=self.key_path, remote=remote_key)
def upload_pki(self, c, pki, concat_ca=False):
self.upload(c, pki['cert'], pki['key'], concat_ca)