blob: 85a316434448f022ffa1dbe0987f5f56446dd5cf [file] [log] [blame]
#!/usr/bin/env python3
"""
A little tool to encrypt/decrypt git secrets. Kinda like password-store, but
more purpose specific and portable.
It generally expects to work with directory structures as follows:
foo/bar/secrets/plain: plaintext files
/cipher: ciphertext files, with names corresponding to
plaintext files
Note: currently all plaintext/cipher files are at a single level, ie.: there
cannot be any subdirectory within a /plain or /cipher directory.
There are multiple secret 'roots' like this in hscloud, notably:
- cluster/secrets
- hswaw/kube/secrets
In the future, some repository-based configuration might exist to specify these
roots in a nicer way, possibly with different target keys per root.
This tool a bit of a swiss army knife, and can be used in the following ways:
- as a CLI tool to encrypt/decrypt files directly
- as a library for its encryption/decryption methods, and for a SecretStore
API, which allows for basic programmatic access to secrets, decrypting
things if necessary
- as a CLI tool to 'synchronize' a directory containing plain/cipher files,
which means encrypting every new plaintext file (or new ciphertext file),
and re-encrypting all files whose keys are different from the keys list
defined in this file.
"""
import argparse
import logging
import os
import sys
import subprocess
import tempfile
# Keys that are to be used to encrypt all secret roots.
keys = [
"63DFE737F078657CC8A51C00C29ADD73B3563D82", # q3k
"482FF104C29294AD1CAF827BA43890A3DE74ECC7", # inf
"F07205946C07EEB2041A72FBC60C64879534F768", # cz2
"0879F9FCA1C836677BB808C870FD60197E195C26", # implr
]
_logger_name = __name__
if _logger_name == '__main__':
_logger_name = 'secretstore'
logger = logging.getLogger(_logger_name)
class CLIException(Exception):
pass
def encrypt(src, dst):
cmd = ['gpg' , '--encrypt', '--armor', '--batch', '--yes', '--output', dst]
for k in keys:
cmd.append('--recipient')
cmd.append(k)
cmd.append(src)
subprocess.check_call(cmd)
def decrypt(src, dst):
cmd = ['gpg', '--decrypt', '--batch', '--yes', '--output', dst, src]
# catch stdout to make this code less chatty.
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def _encryption_key_for_fingerprint(fp):
"""
Returns the encryption key ID for a given GPG fingerprint (eg. one from the
'keys' list.
"""
cmd = ['gpg', '-k', '--keyid-format', 'long', fp]
res = subprocess.check_output(cmd).decode()
# Sample output:
# pub rsa4096/70FD60197E195C26 2014-02-22 [SC] [expires: 2021-02-05]
# 0879F9FCA1C836677BB808C870FD60197E195C26
# uid [ultimate] Bartosz Stebel <bartoszstebel@gmail.com>
# uid [ultimate] Bartosz Stebel <implr@hackerspace.pl>
# sub rsa4096/E203C94E5CEBB3EF 2014-02-22 [E] [expires: 2021-02-05]
#
# We want to extract the 'sub' key with the [E] tag.
for line in res.split('\n'):
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 4:
continue
if parts[0] != 'sub':
continue
if not parts[3].startswith('[') or not parts[3].endswith(']'):
continue
usages = parts[3].strip('[]')
if 'E' not in usages:
continue
# Okay, we found the encryption key.
return parts[1].split('/')[1]
raise Exception("Could not find encryption key ID for fingerprint {}".format(fp))
_encryption_keys_cache = None
def encryption_keys():
"""
Return all encryption keys associated with the keys array.
"""
global _encryption_keys_cache
if _encryption_keys_cache is None:
_encryption_keys_cache = [_encryption_key_for_fingerprint(fp) for fp in keys]
return _encryption_keys_cache
def encrypted_for(path):
"""
Return for which encryption keys is a given GPG ciphertext file encrypted.
"""
cmd = ['gpg', '--pinentry-mode', 'cancel', '--list-packets', path]
res = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
# Sample output:
# gpg: encrypted with 4096-bit RSA key, ID E203C94E5CEBB3EF, created 2014-02-22
# "Bartosz Stebel <bartoszstebel@gmail.com>"
# gpg: encrypted with 2048-bit RSA key, ID 5C1B6B69E9F5EABE, created 2013-01-29
# "Piotr Dobrowolski <piotr.tytus.dobrowolski@gmail.com>"
# gpg: encrypted with 2048-bit RSA key, ID 386E893E110BC55B, created 2012-01-10
# "Sergiusz Bazanski (Low Latency Consulting) <serge@lowlatency.ie>"
# gpg: public key decryption failed: Operation cancelled
# gpg: decryption failed: No secret key
# # off=0 ctb=85 tag=1 hlen=3 plen=268
# :pubkey enc packet: version 3, algo 1, keyid 386E893E110BC55B
# data: [2047 bits]
# # off=271 ctb=85 tag=1 hlen=3 plen=268
# :pubkey enc packet: version 3, algo 1, keyid 5C1B6B69E9F5EABE
# data: [2048 bits]
# # off=542 ctb=85 tag=1 hlen=3 plen=524
# :pubkey enc packet: version 3, algo 1, keyid E203C94E5CEBB3EF
# data: [4095 bits]
# # off=1069 ctb=d2 tag=18 hlen=2 plen=121 new-ctb
# :encrypted data packet:
# length: 121
# mdc_method: 2
keys = []
for line in res.split('\n'):
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 9:
continue
if parts[:4] != [':pubkey', 'enc', 'packet:', 'version']:
continue
if parts[7] != 'keyid':
continue
keys.append(parts[8])
# Make unique.
return list(set(keys))
class SyncAction:
"""
SyncAction is a possible action taken to synchronize some secrets.
An action is some sort of side-effect bearing OS action (ie execution of
script or file move, or...) that can also 'describe' that it's acting - ie,
just return a human readable string of what it would be doing. These
describe descriptions are used for dry-runs of the secretstore sync
functionality.
"""
def describe(self):
return ""
def act(self):
pass
class SyncActionEncrypt:
def __init__(self, src, dst, reason):
self.src = src
self.dst = dst
self.reason = reason
def describe(self):
return f'Encrypting {os.path.split(self.src)[-1]} ({self.reason})'
def act(self):
return encrypt(self.src, self.dst)
class SyncActionDecrypt:
def __init__(self, src, dst, reason):
self.src = src
self.dst = dst
self.reason = reason
def describe(self):
return f'Decrypting {os.path.split(self.src)[-1]} ({self.reason})'
def act(self):
return encrypt(self.src, self.dst)
def sync(path: str, dry: bool):
"""Synchronize (decrypt and encrypt what's needed) a given secrets directory."""
# Turn the path into an absolute path just to make things safer.
path = os.path.abspath(path)
# Trim all trailing slashes to canonicalize.
path = path.rstrip('/')
plain_path = os.path.join(path, "plain")
cipher_path = os.path.join(path, "cipher")
# Ensure that at least one of the plain/cipher paths exist.
plain_exists = os.path.exists(plain_path)
cipher_exists = os.path.exists(cipher_path)
if not plain_exists and not cipher_exists:
raise CLIException('Given directory must contain a plain/ or cipher/ subdirectory.')
# Make missing directories.
if not plain_exists:
os.mkdir(plain_path)
if not cipher_exists:
os.mkdir(cipher_path)
# List files on both sides:
plain_files = [f for f in os.listdir(plain_path) if f != '.gitignore' and os.path.isfile(os.path.join(plain_path, f))]
cipher_files = [f for f in os.listdir(cipher_path) if os.path.isfile(os.path.join(cipher_path, f))]
# Helper function to turn a short filename within a directory to a pair
# of plain/cipher full paths.
def pc(p):
return os.path.join(plain_path, p), os.path.join(cipher_path, p)
# Make a set of all file names - no matter if only available as plain, as
# cipher, or as both.
all_files = set(plain_files + cipher_files)
# We'll be making a list of actions to perform to bring up given directory
# pair to a stable state.
actions = [] # type: List[SyncAction]
# First, for every possible file (either encrypted or decrypted), figure
# out which side is fresher based on file presence and mtime.
fresher = {} # type: Dict[str, str]
for p in all_files:
# Handle the easy case when the file only exists on one side.
if p not in cipher_files:
fresher[p] = 'plain'
continue
if p not in plain_files:
fresher[p] = 'cipher'
continue
plain, cipher = pc(p)
# Otherwise, we have both the cipher and plain version.
# Check if the decrypted version matches the plaintext version. If so,
# they're both equal.
f = tempfile.NamedTemporaryFile(delete=False)
f.close()
decrypt(cipher, f.name)
with open(f.name, 'rb') as fd:
decrypted_data = fd.read()
with open(plain, 'rb') as fc:
current_data = fc.read()
if decrypted_data == current_data:
fresher[p] = 'equal'
os.unlink(f.name)
continue
os.unlink(f.name)
# The plain and cipher versions differ. Let's choose based on mtime.
mtime_plain = os.path.getmtime(plain)
mtime_cipher = os.path.getmtime(cipher)
if mtime_plain > mtime_cipher:
fresher[p] = 'plain'
elif mtime_cipher > mtime_plain:
fresher[p] = 'cipher'
else:
raise CLIException(f'cipher/plain stalemate on {p}: contents differ, but files have same mtime')
# Find all files that need to be re-encrypted for changed keys.
reencrypt = set()
for p in cipher_files:
_, cipher = pc(p)
current = set(encrypted_for(cipher))
want = set(encryption_keys())
if current != want:
reencrypt.add(p)
# Okay, now actually construct a list of actions.
# First, all fresh==cipher keys need to be decrypted.
for p, v in fresher.items():
if v != 'cipher':
continue
plain, cipher = pc(p)
actions.append(SyncActionDecrypt(cipher, plain, "cipher version is newer"))
encrypted = set()
# Then, encrypt all fresh==plain files, and make note of what those
# are.
for p, v in fresher.items():
if v != 'plain':
continue
plain, cipher = pc(p)
actions.append(SyncActionEncrypt(plain, cipher, "plain version is newer"))
encrypted.add(p)
# Finally, re-encrypt all the files that aren't already being encrypted.
for p in reencrypt.difference(encrypted):
plain, cipher = pc(p)
actions.append(SyncActionEncrypt(plain, cipher, "needs to be re-encrypted for different keys"))
if len(actions) == 0:
logger.info('Nothing to do!')
else:
if dry:
logger.info('Would perform the following:')
else:
logger.info('Running actions...')
for a in actions:
logger.info(a.describe())
if not dry:
a.act()
class SecretStoreMissing(Exception):
pass
class SecretStore(object):
def __init__(self, plain_root, cipher_root):
self.proot = plain_root
self.croot = cipher_root
def exists(self, suffix):
p = os.path.join(self.proot, suffix)
c = os.path.join(self.croot, suffix)
return os.path.exists(c) or os.path.exists(p)
def plaintext(self, suffix):
p = os.path.join(self.proot, suffix)
c = os.path.join(self.croot, suffix)
has_p = os.path.exists(p)
has_c = os.path.exists(c)
if has_c and has_p and os.path.getctime(p) < os.path.getctime(c):
logger.info("Decrypting {} ({})...".format(suffix, c))
decrypt(c, p)
return p
def open(self, suffix, mode, *a, **kw):
p = os.path.join(self.proot, suffix)
c = os.path.join(self.croot, suffix)
if 'w' in mode:
return open(p, mode, *a, **kw)
if not self.exists(suffix):
raise SecretStoreMissing("Secret {} does not exist".format(suffix))
if not os.path.exists(p) or os.path.getctime(p) < os.path.getctime(c):
logger.info("Decrypting {} ({})...".format(suffix, c))
decrypt(c, p)
return open(p, mode, *a, **kw)
def main():
parser = argparse.ArgumentParser(description='Manage hscloud git-based secrets.')
subparsers = parser.add_subparsers(dest='mode')
parser_decrypt = subparsers.add_parser('decrypt', help='decrypt a single secret file')
parser_decrypt.add_argument('input', type=str, help='encrypted file path')
parser_decrypt.add_argument('output', type=str, default='-', help='decrypted file path file path (or - for stdout)')
parser_encrypt = subparsers.add_parser('encrypt', help='encrypt a single secret file')
parser_encrypt.add_argument('input', type=str, help='plaintext file path')
parser_encrypt.add_argument('output', type=str, default='-', help='encrypted file path file path (or - for stdout)')
parser_sync = subparsers.add_parser('sync', help='Synchronize a canonically formatted secrets/{plain,cipher} directory')
parser_sync.add_argument('dir', type=str, help='Path to secrets directory to synchronize')
parser_sync.add_argument('--dry', dest='dry', action='store_true')
parser_sync.set_defaults(dry=False)
logging.basicConfig(level='INFO')
args = parser.parse_args()
if args.mode == None:
parser.print_help()
sys.exit(1)
try:
if args.mode == 'encrypt':
encrypt(args.input, args.output)
elif args.mode == 'decrypt':
decrypt(args.input, args.output)
elif args.mode == 'sync':
sync(args.dir, dry=args.dry)
else:
# ???
raise Exception('invalid mode {}'.format(args.mode))
except CLIException as e:
logger.error(e)
sys.exit(1)
if __name__ == '__main__':
sys.exit(main() or 0)