Advertisement
Guest User

Untitled

a guest
Oct 15th, 2019
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.75 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. import argparse
  3. import base64
  4. import json
  5. import os
  6. import shlex
  7. import subprocess
  8. import sys
  9.  
  10.  
  11. def main(raw_args=sys.argv[1:]):
  12. parser = argparse.ArgumentParser(
  13. description="Grab a certificate out of Traefik's acme.json file")
  14. parser.add_argument('acme_json', help='path to the acme.json file')
  15. parser.add_argument('domain', help='domain to get certificate for')
  16. parser.add_argument('dest_dir',
  17. help='path to the directory to store the certificate')
  18. parser.add_argument(
  19. '--post-update', required=False,
  20. help='command to run after updating the certificate')
  21.  
  22. args = parser.parse_args(raw_args)
  23.  
  24. new_privkey, new_fullchain = read_domain_certs(args.acme_json, args.domain)
  25.  
  26. privkey_filename = "{}.key".format(args.domain)
  27. cert_filename = "{}.crt".format(args.domain)
  28.  
  29. old_privkey = read_cert(args.dest_dir, privkey_filename)
  30. old_fullchain = read_cert(args.dest_dir, cert_filename)
  31.  
  32. if new_privkey != old_privkey or new_fullchain != old_fullchain:
  33. print('Certificates changed! Writing new files...')
  34. write_cert(args.dest_dir, privkey_filename, new_privkey)
  35. write_cert(args.dest_dir, cert_filename, new_fullchain)
  36.  
  37. if args.post_update is not None:
  38. print('Running post update command "%s"' % (args.post_update,))
  39. post_update(args.post_update)
  40. else:
  41. print('Certificates unchanged. Skipping...')
  42.  
  43. print('Done')
  44.  
  45.  
  46. def read_cert(storage_dir, filename):
  47. cert_path = os.path.join(storage_dir, filename)
  48. if os.path.exists(cert_path):
  49. with open(cert_path) as cert_file:
  50. return cert_file.read()
  51. return None
  52.  
  53.  
  54. def write_cert(storage_dir, filename, cert_content):
  55. cert_path = os.path.join(storage_dir, filename)
  56. with open(cert_path, 'w') as cert_file:
  57. cert_file.write(cert_content.decode('utf-8'))
  58. os.chmod(cert_path, 0o600)
  59.  
  60.  
  61. def read_domain_certs(acme_json_path, domain):
  62. with open(acme_json_path) as acme_json_file:
  63. acme_json = json.load(acme_json_file)
  64.  
  65. certs_json = acme_json['DomainsCertificate']['Certs']
  66. domain_certs = [cert for cert in certs_json
  67. if domain in cert['Domains']['Main'] ]
  68.  
  69. if not domain_certs:
  70. raise RuntimeError(
  71. 'Unable to find certificate for domain "%s"' % (domain,))
  72. elif len(domain_certs) > 1:
  73. raise RuntimeError(
  74. 'More than one (%d) certificates for domain "%s"' % (domain,))
  75.  
  76. [domain_cert] = domain_certs
  77. return (base64.b64decode(domain_cert['Certificate']['PrivateKey']),
  78. base64.b64decode(domain_cert['Certificate']['Certificate']))
  79.  
  80.  
  81. def post_update(command):
  82. subprocess.check_call(shlex.split(command))
  83.  
  84.  
  85. if __name__ == '__main__':
  86. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement