commit 5922d2bb08105cd441ab4db246b9634aa188506d Author: Darsey Litzenberger Date: Tue Jul 8 23:00:02 2025 -0600 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9ec28cf --- /dev/null +++ b/.gitignore @@ -0,0 +1,135 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +/build/ +develop-eggs/ +/dist/ +/downloads/ +eggs/ +.eggs/ +/lib/ +/lib64/ +/parts/ +/sdist/ +/var/ +/wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# PyCharm +.idea/ + +# ctags output file +tags diff --git a/NOTES.txt b/NOTES.txt new file mode 100644 index 0000000..973b37a --- /dev/null +++ b/NOTES.txt @@ -0,0 +1,10 @@ +# Initial install (generated private key on pusher host) +cat fullchain privkey > all.pem +scp all.pem arachnia:/ +ssh arachnia "/certificate import name=www_ssl_cert file-name=all.pem no-key-export=yes; /ip service set www-ssl certificate=www_ssl_cert; /ip service set api-ssl certificate=www_ssl_cert" + +# Subsequent install (renewed on pusher host) +cat fullchain > renew.pem +scp renew.pem arachnia:/ +ssh arachnia "/certificate import name=www_ssl_cert file-name=renew.pem" +#ssh arachnia "/ip service set www-ssl certificate=www_ssl_cert" diff --git a/pkcs12_export.py b/pkcs12_export.py new file mode 100644 index 0000000..85466dd --- /dev/null +++ b/pkcs12_export.py @@ -0,0 +1,116 @@ +#!python3 +# dlitz 2025 + +import fcntl +import os +import re +import subprocess +from contextlib import ExitStack, contextmanager + + +class PKCS12Exporter: + + openssl_prog = "openssl" + + def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase): + assert re.search(r"^-----BEGIN (.*) PRIVATE KEY-----\n", privkey_data, re.M) + assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M) + assert "PRIVATE KEY" not in cert_data + assert chain_data is None or "PRIVATE KEY" not in chain_data + + fullchain_data = cert_data + "\n" + (chain_data or "") + "\n" + + all_data = privkey_data + "\n" + fullchain_data + + with self.pipe_with_buffered_data(passphrase, text=True) as passphrase_r: + cmd = [ + self.openssl_prog, + "pkcs12", + "-export", + "-passout", + f"fd:{passphrase_r.fileno():d}", + # "-macalg", "SHA256", + # "-keypbe", "AES-256-CBC", + # "-certpbe", "NONE", + ] + return subprocess.check_output( + cmd, pass_fds=[passphrase_r.fileno()], input=all_data.encode() + ) + + def open_pipes(self, *, text=False, encoding=None, errors=None): + rfd, wfd = os.pipe() + rfile = wfile = None + try: + rfile = open(rfd, "r" if text else "rb", encoding=encoding, errors=errors) + wfile = open(wfd, "w" if text else "wb", encoding=encoding, errors=errors) + return rfile, wfile + except: + if rfile is not None: + rfile.close() + else: + os.close(rfd) + if wfile is not None: + wfile.close() + else: + os.close(wfd) + raise + + def pipe_with_buffered_data(self, data, *, text=False, encoding=None, errors=None): + # Open a pipe and place a small string into its buffer, then return a file + # open for reading the pipe. + rfile, wfile = self.open_pipes(text=text) + try: + if text: + bdata = data.encode(encoding=wfile.encoding, errors=wfile.errors) + else: + bdata = data + pipe_buffer_size = fcntl.fcntl(wfile.fileno(), fcntl.F_GETPIPE_SZ) + if pipe_buffer_size < len(bdata): + pipe_buffer_size = fcntl.fcntl( + wfile.fileno(), fcntl.F_SETPIPE_SZ, len(bdata) + ) + assert pipe_buffer_size >= len(bdata) + wfile.write(data) + wfile.close() + return rfile + except: + wfile.close() + rfile.close() + raise + +if __name__ == '__main__': + from argparse import ArgumentParser + from pathlib import Path + import getpass + import sys + + parser = ArgumentParser( + description="push TLS privkey & certificate to MikroTik RouterOS router" + ) + parser.add_argument( + "-k", "--privkey", type=Path, required=True, help="private key file" + ) + parser.add_argument("--cert", type=Path, required=True, help="certificate file") + parser.add_argument( + "--chain", type=Path, help="separate certificate chain file (optional)" + ) + parser.add_argument("-o", "--output", type=Path, help="output file") + args = parser.parse_args() + + privkey_data = args.privkey.read_text() + cert_data = args.cert.read_text() + chain_data = args.chain.read_text() if args.chain is not None else None + + key_passphrase = getpass.getpass("set the passphrase:") + + pkcs12_data = PKCS12Exporter().export_pkcs12( + privkey_data=privkey_data, + cert_data=cert_data, + chain_data=chain_data, + passphrase=key_passphrase, + ) + + if args.output: + args.output.write_bytes(pkcs12_data) + else: + sys.stdout.buffer.write(pkcs12_data) diff --git a/pkcs12_export_proc.py b/pkcs12_export_proc.py new file mode 100644 index 0000000..829c9e3 --- /dev/null +++ b/pkcs12_export_proc.py @@ -0,0 +1,71 @@ +#!python3 +# dlitz 2025 + +from contextlib import ExitStack, contextmanager +import multiprocessing +import subprocess +import os +import re + +class PKCS12Exporter: + + openssl_prog = "openssl" + + def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase): + assert re.search(r"^-----BEGIN (.*) PRIVATE KEY-----\n", privkey_data, re.M) + assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M) + assert "PRIVATE KEY" not in cert_data + assert chain_data is None or "PRIVATE KEY" not in chain_data + + fullchain_data = cert_data + "\n" + (chain_data or "") + "\n" + + all_data = privkey_data + "\n" + fullchain_data + + with ExitStack() as stack: + passphrase_r, passphrase_w = stack.enter_context(self.open_pipes(text=True)) + + cmd = [ + self.openssl_prog, + "pkcs12", + "-export", + "-passout", f"fd:{passphrase_r.fileno():d}", + "-macalg", "SHA256", + "-keypbe", "AES-256-CBC", + "-certpbe", "NONE", + ] + + passphrase_proc = multiprocessing.Process(target=self.data_writer, args=(passphrase_w, passphrase)) + passphrase_proc.start() + passphrase_w.close() + + print(cmd) + #subprocess.run(["bash", "-x", "-c", "ls -l /dev/fd/; cat /dev/fd/5; cat /dev/fd/3"], pass_fds=(passphrase_r.fileno(), privkey_r.fileno())) + #subprocess.run(cmd, pass_fds=(passphrase_r.fileno(), privkey_r.fileno()), input=fullchain_data.encode()) + subprocess.run(cmd, pass_fds=(passphrase_r.fileno(), privkey_r.fileno()), input=all_data.encode()) + + passphrase_proc.terminate() + passphrase_proc. + + @contextmanager + def open_pipes(self, *, text=False): + rfd, wfd = os.pipe() + rfile = wfile = None + try: + rfile = open(rfd, "r" if text else "rb") + wfile = open(wfd, "w" if text else "wb") + except: + if rfile is not None: + rfile.close() + else: + os.close(rfd) + if wfile is not None: + wfile.close() + else: + os.close(wfd) + with rfile as r, wfile as w: + yield r, w + + @classmethod + def data_writer(cls, file, data): + file.write(data) + file.close() diff --git a/pushcert-mikrotik b/pushcert-mikrotik new file mode 100755 index 0000000..22f7217 --- /dev/null +++ b/pushcert-mikrotik @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# dlitz 2025 + +import os +import shlex +import subprocess +import tempfile +from argparse import ArgumentParser +from pathlib import Path + +from pkcs12_export import PKCS12Exporter + + +def generate_random_passphrase(): + return os.urandom(64).hex() + + +def make_arg_parser(): + parser = ArgumentParser( + description="push TLS privkey & certificate to MikroTik RouterOS router" + ) + parser.add_argument( + "-k", "--privkey", type=Path, required=True, help="private key file" + ) + parser.add_argument("--cert", type=Path, required=True, help="certificate file") + parser.add_argument( + "--chain", type=Path, help="separate certificate chain file (optional)" + ) + parser.add_argument("--ssh-host", required=True, help="target ssh host") + return parser + + +def parse_args(): + parser = make_arg_parser() + args = parser.parse_args() + assert ":" not in args.ssh_host + return args, parser + + +def main(): + args, parser = parse_args() + + # TODO: Check certificate serial number before attempting to copy cert, and at end. + + privkey_data = args.privkey.read_text() + cert_data = args.cert.read_text() + chain_data = args.chain.read_text() if args.chain is not None else None + + key_passphrase = generate_random_passphrase() + + pkcs12_data = PKCS12Exporter().export_pkcs12( + privkey_data=privkey_data, + cert_data=cert_data, + chain_data=chain_data, + passphrase=key_passphrase, + ) + + with tempfile.NamedTemporaryFile(dir="/dev/shm") as tf: + tf.write(pkcs12_data) + tf.flush() + + ssh_options = [ + "-oBatchMode=yes", + "-oControlMaster=no", + ] + + cmd = ["scp", *ssh_options, "-q", tf.name, f"{args.ssh_host}:/cert-pusher-data.p12"] + # print("executing:", shlex.join(cmd)) + subprocess.run(cmd, check=True) + + # ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"' + ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"; /file remove [/file find name=cert-pusher-data.p12]' + cmd = [ + "ssh", + *ssh_options, + args.ssh_host, + ros_command, + ] + result = subprocess.check_output(cmd, text=True) + assert " files-imported: 1" in result + # print(result) + + + +if __name__ == "__main__": + main()