mirror of
https://github.com/auricom/home-cluster.git
synced 2025-09-17 18:24:14 +02:00
♻️ homelab
This commit is contained in:
@@ -0,0 +1,223 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Import and activate a SSL/TLS certificate into FreeNAS 11.1 or later
|
||||
Uses the FreeNAS API to make the change, so everything's properly saved in the config
|
||||
database and captured in a backup.
|
||||
|
||||
Requires paths to the cert (including the any intermediate CA certs) and private key,
|
||||
and username, password, and FQDN of your FreeNAS system.
|
||||
|
||||
Source: https://github.com/danb35/deploy-freenas
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import requests
|
||||
import time
|
||||
import configparser
|
||||
import socket
|
||||
from datetime import datetime, timedelta
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
||||
|
||||
API_KEY = os.getenv('CERTS_DEPLOY_API_KEY')
|
||||
|
||||
DOMAIN_NAME = socket.gethostname()
|
||||
TRUENAS_ADDRESS = 'localhost'
|
||||
VERIFY = False
|
||||
PRIVATEKEY_PATH = os.getenv('CERTS_DEPLOY_PRIVATE_KEY_PATH')
|
||||
FULLCHAIN_PATH = os.getenv('CERTS_DEPLOY_FULLCHAIN_PATH')
|
||||
PROTOCOL = 'http://'
|
||||
PORT = '80'
|
||||
FTP_ENABLED = bool(os.getenv('CERTS_DEPLOY_FTP_ENABLED', ''))
|
||||
S3_ENABLED = bool(os.getenv('CERTS_DEPLOY_S3_ENABLED', ''))
|
||||
now = datetime.now()
|
||||
cert = "letsencrypt-%s-%s-%s-%s" %(now.year, now.strftime('%m'), now.strftime('%d'), ''.join(c for c in now.strftime('%X') if
|
||||
c.isdigit()))
|
||||
|
||||
|
||||
# Set some general request params
|
||||
session = requests.Session()
|
||||
session.headers.update({
|
||||
'Content-Type': 'application/json'
|
||||
})
|
||||
if API_KEY:
|
||||
session.headers.update({
|
||||
'Authorization': f'Bearer {API_KEY}'
|
||||
})
|
||||
else:
|
||||
print ("Unable to authenticate. Specify 'CERTS_DEPLOY_API_KEY' in the os Env.")
|
||||
exit(1)
|
||||
if not PRIVATEKEY_PATH:
|
||||
print ("Unable to find private key. Specify 'CERTS_DEPLOY_PRIVATE_KEY_PATH' in the os Env.")
|
||||
exit(1)
|
||||
if not FULLCHAIN_PATH:
|
||||
print ("Unable to find private key. Specify 'CERTS_DEPLOY_FULLCHAIN_PATH' in the os Env.")
|
||||
exit(1)
|
||||
|
||||
# Load cert/key
|
||||
with open(PRIVATEKEY_PATH, 'r') as file:
|
||||
priv_key = file.read()
|
||||
with open(FULLCHAIN_PATH, 'r') as file:
|
||||
full_chain = file.read()
|
||||
|
||||
# Update or create certificate
|
||||
r = session.post(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/certificate/',
|
||||
verify=VERIFY,
|
||||
data=json.dumps({
|
||||
"create_type": "CERTIFICATE_CREATE_IMPORTED",
|
||||
"name": cert,
|
||||
"certificate": full_chain,
|
||||
"privatekey": priv_key,
|
||||
})
|
||||
)
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Certificate import successful")
|
||||
else:
|
||||
print ("Error importing certificate!")
|
||||
print (r.text)
|
||||
sys.exit(1)
|
||||
|
||||
# Sleep for a few seconds to let the cert propagate
|
||||
time.sleep(5)
|
||||
|
||||
# Download certificate list
|
||||
limit = {'limit': 0} # set limit to 0 to disable paging in the event of many certificates
|
||||
r = session.get(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/certificate/',
|
||||
verify=VERIFY,
|
||||
params=limit
|
||||
)
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Certificate list successful")
|
||||
else:
|
||||
print ("Error listing certificates!")
|
||||
print (r.text)
|
||||
sys.exit(1)
|
||||
|
||||
# Parse certificate list to find the id that matches our cert name
|
||||
cert_list = r.json()
|
||||
|
||||
new_cert_data = None
|
||||
for cert_data in cert_list:
|
||||
if cert_data['name'] == cert:
|
||||
new_cert_data = cert_data
|
||||
cert_id = new_cert_data['id']
|
||||
break
|
||||
|
||||
if not new_cert_data:
|
||||
print ("Error searching for newly imported certificate in certificate list.")
|
||||
sys.exit(1)
|
||||
|
||||
# Set our cert as active
|
||||
r = session.put(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/system/general/',
|
||||
verify=VERIFY,
|
||||
data=json.dumps({
|
||||
"ui_certificate": cert_id,
|
||||
})
|
||||
)
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Setting active certificate successful")
|
||||
else:
|
||||
print ("Error setting active certificate!")
|
||||
print (r.text)
|
||||
sys.exit(1)
|
||||
|
||||
if FTP_ENABLED:
|
||||
# Set our cert as active for FTP plugin
|
||||
r = session.put(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/ftp/',
|
||||
verify=VERIFY,
|
||||
data=json.dumps({
|
||||
"ssltls_certfile": cert,
|
||||
}),
|
||||
)
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Setting active FTP certificate successful")
|
||||
else:
|
||||
print ("Error setting active FTP certificate!")
|
||||
print (r.text)
|
||||
sys.exit(1)
|
||||
|
||||
if S3_ENABLED:
|
||||
# Set our cert as active for S3 plugin
|
||||
r = session.put(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/s3/',
|
||||
verify=VERIFY,
|
||||
data=json.dumps({
|
||||
"certificate": cert_id,
|
||||
}),
|
||||
)
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Setting active S3 certificate successful")
|
||||
else:
|
||||
print ("Error setting active S3 certificate!")
|
||||
print (r)
|
||||
sys.exit(1)
|
||||
|
||||
# Get expired and old certs with same SAN
|
||||
cert_ids_same_san = set()
|
||||
cert_ids_expired = set()
|
||||
for cert_data in cert_list:
|
||||
if set(cert_data['san']) == set(new_cert_data['san']):
|
||||
cert_ids_same_san.add(cert_data['id'])
|
||||
|
||||
issued_date = datetime.strptime(cert_data['from'], "%c")
|
||||
lifetime = timedelta(days=cert_data['lifetime'])
|
||||
expiration_date = issued_date + lifetime
|
||||
if expiration_date < now:
|
||||
cert_ids_expired.add(cert_data['id'])
|
||||
|
||||
# Remove new cert_id from lists
|
||||
if cert_id in cert_ids_expired:
|
||||
cert_ids_expired.remove(cert_id)
|
||||
|
||||
if cert_id in cert_ids_same_san:
|
||||
cert_ids_same_san.remove(cert_id)
|
||||
|
||||
# Delete expired and old certificates with same SAN from freenas
|
||||
for cid in (cert_ids_same_san | cert_ids_expired):
|
||||
r = session.delete(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/certificate/id/' + str(cid),
|
||||
verify=VERIFY
|
||||
)
|
||||
|
||||
for c in cert_list:
|
||||
if c['id'] == cid:
|
||||
cert_name = c['name']
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Deleting certificate " + cert_name + " successful")
|
||||
else:
|
||||
print ("Error deleting certificate " + cert_name + "!")
|
||||
print (r.text)
|
||||
sys.exit(1)
|
||||
|
||||
# Reload nginx with new cert
|
||||
# If everything goes right, the request fails with a ConnectionError
|
||||
try:
|
||||
r = session.post(
|
||||
PROTOCOL + TRUENAS_ADDRESS + ':' + PORT + '/api/v2.0/system/general/ui_restart',
|
||||
verify=VERIFY
|
||||
)
|
||||
|
||||
if r.status_code == 200:
|
||||
print ("Reloading WebUI successful")
|
||||
print ("deploy_freenas.py executed successfully")
|
||||
else:
|
||||
print ("Error reloading WebUI!")
|
||||
print ("{}: {}".format(r.status_code, r.text))
|
||||
sys.exit(1)
|
||||
except requests.exceptions.ConnectionError:
|
||||
print ("Error reloading WebUI!")
|
||||
sys.exit(1)
|
Reference in New Issue
Block a user