From 222dbee9aa988bef1b4582ac3f757a9de043a972 Mon Sep 17 00:00:00 2001 From: Darsey Litzenberger Date: Thu, 19 Mar 2026 19:05:42 -0600 Subject: [PATCH] First working version --- NOTES.txt | 10 - mtik_cert_pusher/__main__.py | 145 +++++++++------ mtik_cert_pusher/cert_pusher.py | 57 ------ mtik_cert_pusher/cert_util.py | 48 ----- mtik_cert_pusher/connector.py | 131 ++++++++++++++ mtik_cert_pusher/pkcs12_export.py | 46 ----- mtik_cert_pusher/routeros.py | 292 ++++++++++++++++++++++++++++++ mtik_cert_pusher/routeros_ssh.py | 132 -------------- mtik_cert_pusher/ssl_util.py | 180 ------------------ p12test.py | 43 ----- pkcs12_export_proc.py | 71 -------- 11 files changed, 508 insertions(+), 647 deletions(-) delete mode 100644 NOTES.txt delete mode 100644 mtik_cert_pusher/cert_pusher.py delete mode 100644 mtik_cert_pusher/cert_util.py create mode 100644 mtik_cert_pusher/connector.py delete mode 100644 mtik_cert_pusher/pkcs12_export.py create mode 100644 mtik_cert_pusher/routeros.py delete mode 100644 mtik_cert_pusher/routeros_ssh.py delete mode 100644 mtik_cert_pusher/ssl_util.py delete mode 100644 p12test.py delete mode 100644 pkcs12_export_proc.py diff --git a/NOTES.txt b/NOTES.txt deleted file mode 100644 index 973b37a..0000000 --- a/NOTES.txt +++ /dev/null @@ -1,10 +0,0 @@ -# Initial install (generated private key on pusher host) -cat fullchain privkey > all.pem -scp all.pem arachnia:/ -ssh arachnia "/certificate import name=www_ssl_cert file-name=all.pem no-key-export=yes; /ip service set www-ssl certificate=www_ssl_cert; /ip service set api-ssl certificate=www_ssl_cert" - -# Subsequent install (renewed on pusher host) -cat fullchain > renew.pem -scp renew.pem arachnia:/ -ssh arachnia "/certificate import name=www_ssl_cert file-name=renew.pem" -#ssh arachnia "/ip service set www-ssl certificate=www_ssl_cert" diff --git a/mtik_cert_pusher/__main__.py b/mtik_cert_pusher/__main__.py index a43e530..db27c4a 100644 --- a/mtik_cert_pusher/__main__.py +++ b/mtik_cert_pusher/__main__.py @@ -1,107 +1,132 @@ #!/usr/bin/env python3 # dlitz 2025 -import sys import os import shlex import subprocess +import sys import tempfile from argparse import ArgumentParser from pathlib import Path -from .cert_util import split_certs -from .ssl_util import SSLUtil -#from .cert_pusher import MTCertPusher -from .routeros_ssh import RouterOS, SSHConnector +from cryptography import x509 +from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, + load_pem_public_key, +) -#def generate_random_passphrase(): -# return os.urandom(64).hex() +from .connector import SSHConnector +from .routeros import RouterOS def make_arg_parser(): - if Path(sys.argv[0]).stem == '__main__': + if Path(sys.argv[0]).stem == "__main__": prog = __package__ else: prog = None parser = ArgumentParser( prog=prog, - description="push TLS privkey & certificate to MikroTik RouterOS router" + description="push TLS privkey & certificate to MikroTik RouterOS router", ) - parser.add_argument( + + subparsers = parser.add_subparsers(dest="subcommand") + + install_parser = subparsers.add_parser( + "install", help="push TLS privkey & certificate to MikroTik RouterOS router" + ) + install_parser.add_argument( "-k", "--privkey", type=Path, required=True, help="private key file" ) - parser.add_argument("--cert", type=Path, required=True, help="certificate file") - parser.add_argument( + install_parser.add_argument( + "--cert", type=Path, required=True, help="certificate file" + ) + install_parser.add_argument( "--chain", type=Path, help="separate certificate chain file (optional)" ) - parser.add_argument("--ssh-config", type=Path, help="ssh config file") - parser.add_argument("--ssh-host", required=True, help="target ssh host") - parser.add_argument("--ssh-user", help="target ssh user") - parser.add_argument("--ssh-port", type=int, help="target ssh port") + install_parser.add_argument("--ssh-config", type=Path, help="ssh config file") + install_parser.add_argument("--ssh-host", required=True, help="target ssh host") + install_parser.add_argument("--ssh-user", help="target ssh user") + install_parser.add_argument("--ssh-port", type=int, help="target ssh port") + + fingerprint_parser = subparsers.add_parser( + "fingerprint", aliases=["fpr"], help="calculate fingerprint of certificate(s)" + ) + fingerprint_parser.add_argument( + dest="files", + metavar="cert.pem", + nargs='+', + type=Path, + help="PEM certificate file to read", + ) + + skid_parser = subparsers.add_parser( + #"skid", help="calculate SubjectKeyIdentifier of certificate(s) or key(s)" + "skid", help="show the SubjectKeyIdentifier of certificate(s)" + ) + skid_parser.add_argument( + dest="files", + #metavar="file.pem", + metavar="cert.pem", + nargs='+', + type=Path, + #help="PEM file to read", + help="PEM certificate file to read", + ) + return parser def parse_args(): parser = make_arg_parser() args = parser.parse_args() - assert ":" not in args.ssh_host return args, parser def main(): args, parser = parse_args() - # TODO: Check certificate serial number before attempting to copy cert, and at end. + if args.subcommand == "install": + assert ":" not in args.ssh_host - privkey_data = args.privkey.read_text() - cert_data = args.cert.read_text() - chain_data = args.chain.read_text() if args.chain is not None else None + privkey_data = args.privkey.read_text() + cert_data = args.cert.read_text() + chain_data = args.chain.read_text() if args.chain is not None else None - #key_passphrase = generate_random_passphrase() - #chain_certs = split_certs(chain_data) + connector = SSHConnector( + host=args.ssh_host, + port=args.ssh_port, + user=args.ssh_user, + ssh_config_path=args.ssh_config, + ) + ros_remote = RouterOS(connector=connector) - ssl_util = SSLUtil() - ssh_connector = SSHConnector(host=args.ssh_host, port=args.ssh_port, user=args.ssh_user, ssh_config_path=args.ssh_config) - ros_remote = RouterOS(connector=ssh_connector, ssl_util=ssl_util) + fingerprint, host_cert_obj = ros_remote.install_key_and_certificates( + key=privkey_data, cert=cert_data, chain=chain_data + ) - #ros_ssh = RouterOS_SSH(host=args.ssh_host) + ros_remote.use_certificate(fingerprint) - ros_remote.install_key_and_certificates(key=privkey_data, cert=cert_data, chain=chain_data) -# pkcs12_data = sslutil.export_pkcs12( -# privkey_data=privkey_data, -# cert_data=cert_data, -# chain_data=chain_data, -# passphrase=key_passphrase, -# ) + elif args.subcommand in ("fingerprint", "fpr"): + for path in args.files: + try: + for cert_obj in x509.load_pem_x509_certificates(path.read_bytes()): + print(RouterOS.cert_fingerprint(cert_obj)) + except Exception as exc: + exc.add_note(f"path={path}") + raise -# -# -# with tempfile.NamedTemporaryFile(dir="/dev/shm") as tf: -# tf.write(pkcs12_data) -# tf.flush() -# -# ssh_options = [ -# "-oBatchMode=yes", -# "-oControlMaster=no", -# ] -# -# cmd = ["scp", *ssh_options, "-q", tf.name, f"{args.ssh_host}:/cert-pusher-data.p12"] -# # print("executing:", shlex.join(cmd)) -# subprocess.run(cmd, check=True) -# -# # ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"' -# ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"; /file remove [/file find name=cert-pusher-data.p12]' -# cmd = [ -# "ssh", -# *ssh_options, -# args.ssh_host, -# ros_command, -# ] -# result = subprocess.check_output(cmd, text=True) -# assert " files-imported: 1" in result -# # print(result) + elif args.subcommand in ("skid"): + for path in args.files: + try: + for cert_obj in x509.load_pem_x509_certificates(path.read_bytes()): + print(RouterOS.cert_skid(cert_obj)) + except Exception as exc: + exc.add_note(f"path={path}") + raise + else: + raise NotImplementedError(args.subcommand) if __name__ == "__main__": diff --git a/mtik_cert_pusher/cert_pusher.py b/mtik_cert_pusher/cert_pusher.py deleted file mode 100644 index 3c8d01b..0000000 --- a/mtik_cert_pusher/cert_pusher.py +++ /dev/null @@ -1,57 +0,0 @@ -#!python3 -# dlitz 2025-2026 - -import tempfile -import os - -from cryptography import x509 -from cryptography.hazmat.primitives.hashes import SHA256 -from cryptography.hazmat.primitives.serialization import ( - BestAvailableEncryption, - KeySerializationEncryption, - PrivateFormat, - load_pem_private_key, - load_pem_public_key, - pkcs12, -) - -from .cert_util import split_certs -from .routeros_ssh import RouterOS_SSH -from .ssl_util import SSLUtil - - -class MTCertPusher: - - temporary_directory = "/dev/shm" - - def __init__(self, ssl_util: SSLUtil, ros_ssh: RouterOS_SSH): - self.ssl_util = ssl_util - self.ros = ros_ssh - self.tempdir = tempfile.TemporaryDirectory(dir=self.temporary_directory) - - def __del__(self): - self.close() - - def close(self): - try: - tempdir = self.tempdir - except AttributeError: - pass - else: - self.tempdir.cleanup() - del self.tempdir - - def generate_random_pkcs12_passphrase(self): - return os.urandom(64).hex() - - def install_key_and_certificates( - self, key: str, cert: str, chain: str | None = None - ): - private_key_obj = load_pem_private_key(key.encode()) - cert_obj = x509.load_pem_x509_certificate(cert.encode()) - if cert_obj.public_key() != private_key_obj.public_key(): - raise ValueError("certificate does not match private key") - - passphrase = self.generate_random_pkcs12_passphrase() - p12 = self.ssl_util.create_pkcs12_from_key_and_certificates(key=key, cert=cert, passphrase=passphrase) - diff --git a/mtik_cert_pusher/cert_util.py b/mtik_cert_pusher/cert_util.py deleted file mode 100644 index 6896d4b..0000000 --- a/mtik_cert_pusher/cert_util.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python3 -# dlitz 2026 -import base64 -import re - -CERT_REGEXP = re.compile( - r""" - (?P - ^-----BEGIN\ CERTIFICATE-----\n - (?P - [0-9A-Za-z/+\n]+? # base64 characters and newlines - ={0,2}\n # base64 padding - ) - ^-----END\ CERTIFICATE-----(?:\n|\Z) - ) - """, - re.S | re.M | re.X, -) - - -def split_certs(pem_data: str, *, strict: bool = True) -> list[str]: - r = CERT_REGEXP - if not strict: - return [m["cert"] for m in r.finditer(pem_data)] - certs = [] - pos = 0 - for m in r.finditer(pem_data): - if strict and m.start() != pos: - raise ValueError( - f"certificate data contains extra junk at (position {pos})" - ) - cert = m["cert"] - if strict: - # Try decoding the base64 data - base64.b64decode(m["b64data"]) - certs.append(cert) - pos = m.end() - if strict and pos != len(pem_data): - raise ValueError(f"extra junk after certificate data (at position {pos})") - return certs - - -if __name__ == "__main__": - from pathlib import Path - - # print(split_certs(Path("threecerts.pem").read_text(), strict=False)) - # print(split_certs(Path("threecerts.pem").read_text(), strict=True)) - print(split_certs("-----BEGIN CERTIFICATE-----\nAAA=\n-----END CERTIFICATE-----\n")) diff --git a/mtik_cert_pusher/connector.py b/mtik_cert_pusher/connector.py new file mode 100644 index 0000000..baf3abe --- /dev/null +++ b/mtik_cert_pusher/connector.py @@ -0,0 +1,131 @@ +#!python3 +# dlitz 2026 + +import json +import os +import re +import shlex +import subprocess +import tempfile +from pathlib import Path, PosixPath + +remote_name_validation_regex = re.compile( + r""" + \A + [A-Za-z0-9_] + [A-Za-z0-9_\.\-]* + (?: + \. + [A-Za-z0-9_]+ + )? + \Z + """, + re.X, +) + + +def _private_opener(file, flags): + """Open a file with restrictive permission bits""" + return os.open(os.fsencode(file), flags, 0o600) + + +class Connector: + pass + + +class SSHConnector(Connector): + + ssh_executable = "ssh" + scp_executable = "scp" + temporary_directory = "/dev/shm" + common_args = ["-oBatchMode=yes", "-oControlMaster=no"] + + def __init__( + self, + host: str, + *, + user: str | None = None, + port: int | None = None, + ssh_config_path: str | None = None, + extra_ssh_options=None, + extra_ssh_args=None, + extra_scp_args=None, + ): + self.ssh_host = host + self.ssh_user = user + self.ssh_port = port + self.ssh_config_path = ssh_config_path + if extra_ssh_options is None: + extra_ssh_options = {} + assert isinstance(extra_ssh_options, dict), "extra_ssh_options should be a dict" + self.extra_ssh_options = extra_ssh_options + self.extra_ssh_args = extra_ssh_args or () + self.extra_scp_args = extra_scp_args or () + + def _ssh_option_args(self) -> list: + result = [] + if self.ssh_config_path: + result.extend(["-F", self.ssh_config_path]) + if self.ssh_user: + result.append("-oUser={self.ssh_user}") + if self.ssh_port: + result.append("-oPort={self.ssh_port:d}") + for k, v in self.extra_ssh_options.items(): + assert "=" not in k + assert k + result.append(f"-o{k}={v}") + return result + + def _ssh_args(self, args, /) -> list: + return [ + self.ssh_executable, + *self.common_args, + *self._ssh_option_args(), + *self.extra_ssh_args, + *args, + ] + + def _scp_args(self, args, /) -> list: + return [ + self.scp_executable, + *self.common_args, + *self._ssh_option_args(), + *self.extra_scp_args, + *args, + ] + + def invoke_remote_command( + self, cmdline: str, text: bool = False, capture: bool = False + ) -> str: + cmd = self._ssh_args([self.ssh_host, cmdline]) + #print("running: ", shlex.join(cmd)) + if capture: + return subprocess.check_output(cmd, text=text) + subprocess.run(cmd, check=True) + + def create_remote_files(self, content_by_name: dict, remote_directory: str): + if not content_by_name: + raise ValueError("require at least one file to copy") + with tempfile.TemporaryDirectory(dir=self.temporary_directory, prefix="mtik-connector-tmp") as td: + tempfile_paths = [] + + # Write the files to a temporary directory + for remote_name, content in content_by_name.items(): + assert isinstance(remote_name, str) + if not remote_name_validation_regex.fullmatch(remote_name): + raise ValueError(f"illegal remote filename: {remote_name!r}") + tempfile_path = Path(td, remote_name) + with open(tempfile_path, "wb", opener=_private_opener) as outfile: + outfile.write(content) + tempfile_paths.append(tempfile_path) + + # Copy them in a single scp command + cmd = self._scp_args( + [ + "-q", + *(str(p) for p in tempfile_paths), + f"{self.ssh_host}:{remote_directory}", + ] + ) + #print("running: ", shlex.join(cmd)) + subprocess.run(cmd, check=True) diff --git a/mtik_cert_pusher/pkcs12_export.py b/mtik_cert_pusher/pkcs12_export.py deleted file mode 100644 index a5346c8..0000000 --- a/mtik_cert_pusher/pkcs12_export.py +++ /dev/null @@ -1,46 +0,0 @@ -#!python3 -# dlitz 2025 - -import fcntl -import re -import subprocess -from contextlib import ExitStack, contextmanager - -from .ssl_util import SSLUtil - -if __name__ == '__main__': - from argparse import ArgumentParser - from pathlib import Path - import getpass - import sys - - parser = ArgumentParser( - description="push TLS privkey & certificate to MikroTik RouterOS router" - ) - parser.add_argument( - "-k", "--privkey", type=Path, required=True, help="private key file" - ) - parser.add_argument("--cert", type=Path, required=True, help="certificate file") - parser.add_argument( - "--chain", type=Path, help="separate certificate chain file (optional)" - ) - parser.add_argument("-o", "--output", type=Path, help="output file") - args = parser.parse_args() - - privkey_data = args.privkey.read_text() - cert_data = args.cert.read_text() - chain_data = args.chain.read_text() if args.chain is not None else None - - key_passphrase = getpass.getpass("set the passphrase:") - - pkcs12_data = SSLUtil().export_pkcs12( - privkey_data=privkey_data, - cert_data=cert_data, - chain_data=chain_data, - passphrase=key_passphrase, - ) - - if args.output: - args.output.write_bytes(pkcs12_data) - else: - sys.stdout.buffer.write(pkcs12_data) diff --git a/mtik_cert_pusher/routeros.py b/mtik_cert_pusher/routeros.py new file mode 100644 index 0000000..a631489 --- /dev/null +++ b/mtik_cert_pusher/routeros.py @@ -0,0 +1,292 @@ +#!python3 +# dlitz 2026 + +import json +import os +import re +import shlex +import subprocess +import tempfile +import textwrap +from collections import defaultdict +from dataclasses import dataclass +from io import StringIO +from pathlib import Path, PosixPath + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.types import ( + CertificatePublicKeyTypes, + PrivateKeyTypes, + PublicKeyTypes, +) +from cryptography.hazmat.primitives.serialization import ( + BestAvailableEncryption, + Encoding, + PrivateFormat, + PublicFormat, + load_pem_private_key, +) + +from .connector import Connector + +import_result_regex = re.compile( + r""" + ^ + \s* + (?P + certificates-imported + | private-keys-imported + | files-imported + | decryption-failures + | keys-with-no-certificate + ) + :\s + (?P\d+) + \s* + $ + """, + re.M | re.X, +) + +json_list_regex = re.compile(r"^\[.*\]$") + +fingerprint_regex = re.compile(r"\A[0-9a-f]{64}\Z") + + +class CertInstallError(Exception): + pass + + +@dataclass +class InstallInfo: + skid_hex: str # SubjectKeyInfo digest (hexadecimal) + fingerprint_hex: str | None # fingerprint of cert + remote_name: str # remote name + remote_filename: str # remote filename + content: bytes # file content + install_cmd: str # command used to install + + +class RouterOS: + def __init__(self, connector: Connector): + self.connector = connector + + def get_certificate_list(self): + cmdline = ":put [:serialize to=json [/certificate print detail as-value]]" + result_json = self.connector.invoke_remote_command(cmdline, capture=True) + return json.loads(result_json) + + @classmethod + def cert_skid(cls, cert_obj) -> str: + skid_bin = cert_obj.extensions.get_extension_for_class( + x509.SubjectKeyIdentifier + ).value.key_identifier + return skid_bin.hex().upper() + + @classmethod + def cert_fingerprint(cls, cert_obj) -> str: + return cert_obj.fingerprint(hashes.SHA256()).hex() + + def _make_cert_installinfo(self, cert_obj, privkey_obj=None) -> InstallInfo: + # RouterOS indexes cert by their SubjectKeyIdentifier (SKID) so we'll + # use that for naming. + skid_hex = self.cert_skid(cert_obj) + fpr_hex = self.cert_fingerprint(cert_obj) + + remote_name = f"skid_{skid_hex}" + remote_filename = f"fpr_{fpr_hex}.pem" + + content = cert_obj.public_bytes(Encoding.PEM) + passphrase = None + if privkey_obj is not None: + assert ( + cert_obj.public_key() == privkey_obj.public_key() + ), "private key doesn't match certificate" + + passphrase = os.urandom(32).hex() + + encrypted_private_key = privkey_obj.private_bytes( + Encoding.PEM, + PrivateFormat.PKCS8, + BestAvailableEncryption(passphrase.encode()), + ) + + content = content.rstrip(b"\n") + b"\n" + encrypted_private_key + + install_cmd = f'/certificate import trusted=no no-key-export=yes name="{remote_name}" file-name="{remote_filename}"' + if passphrase is not None: + install_cmd += f' passphrase="{passphrase}"' + + return InstallInfo( + skid_hex=skid_hex, + fingerprint_hex=fpr_hex, + remote_name=remote_name, + remote_filename=remote_filename, + content=content, + install_cmd=install_cmd, + ) + + def _dedup_certs(self, cert_objs) -> list: + # Dedup certificates by fingerprint + cert_by_fpr = {} + for cert_obj in cert_objs: + fpr = self.cert_fingerprint(cert_obj) + cert_by_fpr[fpr] = cert_obj + + # Look for duplicate skids in different certs + cert_skids = {} + for cert_obj in cert_by_fpr.values(): + skid = self.cert_skid(cert_obj) + if skid in cert_skids: + raise ValueError(f"skid conflict: {skid!r}") + + return list(cert_by_fpr.values()) + + def install_key_and_certificates( + self, key: str, cert: str, chain: str | None = None + ): + if not chain: + chain = "" + private_key_obj = load_pem_private_key(key.encode(), None) + cert_objs = x509.load_pem_x509_certificates((cert + "\n" + chain).encode()) + cert_objs = self._dedup_certs(cert_objs) + + # Find the certificate that signs our private key + public_key_obj = private_key_obj.public_key() + (host_cert_obj,) = [ + cert_obj + for cert_obj in cert_objs + if cert_obj.public_key() == public_key_obj + ] + chain_cert_objs = [ + cert_obj + for cert_obj in cert_objs + if cert_obj.public_key() != public_key_obj + ] + + private_key_skid = self.cert_skid(host_cert_obj) + + # Build the list of InstallInfo objects, and sets of what we're + # expecting to find. + install_list = [self._make_cert_installinfo(host_cert_obj, private_key_obj)] + install_list += [ + self._make_cert_installinfo(cert_obj) for cert_obj in chain_cert_objs + ] + + expected_skids = {info.skid_hex for info in install_list} + expected_fingerprints = {info.fingerprint_hex for info in install_list} - {None} + + # Merge the commands into a single command-line, and + # add a command to get a list of installed certs at the end. + remote_commands = [] + # workaround for race condition where /certificate/import sometimes + # doesn't see the recently-uploaded file + remote_commands += [":delay 0.1"] + remote_commands += [info.install_cmd for info in install_list] + remote_commands += [ + ":put [:serialize to=json [/certificate print detail as-value]]" + ] + remote_cmdline = "\n".join(cmd for cmd in remote_commands) + + # Collect the files to upload + remote_files = {info.remote_filename: info.content for info in install_list} + expected_files_count = len(cert_objs) + 1 + + # Upload the files and run the command + self.connector.create_remote_files(remote_files, "") + cmd_output = self.connector.invoke_remote_command( + remote_cmdline, capture=True, text=True + ) + try: + import_result_counts, certlist_output = self._parse_cmd_output(cmd_output) + + if import_result_counts["decryption-failures"] != 0: + raise CertInstallError( + f"BUG: Private key decryption failed on install; {import_result_counts!r}" + ) + if import_result_counts["keys-with-no-certificate"] != 0: + raise CertInstallError( + f"BUG: No certificate for private key; {import_result_counts!r}" + ) + + found_fingerprints = set() + found_skids = set() + found_private_key_skids = set() + for cert_detail in certlist_output: + assert isinstance(cert_detail["fingerprint"], str) + assert isinstance(cert_detail["skid"], str) + assert isinstance(cert_detail["private-key"], bool) + skid = cert_detail["skid"].upper() + found_fingerprints.add(cert_detail["fingerprint"].lower()) + found_skids.add(skid) + if cert_detail["private-key"]: + found_private_key_skids.add(skid) + + if private_key_skid not in found_private_key_skids: + raise CertInstallError( + f"private-key for skid {private_key_skid} was not installed" + ) + missing_skids: set = expected_skids - found_skids + missing_fingerprints: set = expected_fingerprints - found_fingerprints + if missing_skids or missing_fingerprints: + raise CertInstallError( + f"some certs were not installed {missing_skids=!r}, {missing_fingerprints=!r}" + f",\n{expected_skids=!r}" + f",\n{found_skids=!r}" + f",\n{expected_fingerprints=!r}" + f",\n{found_fingerprints=!r}" + ) + + return self.cert_fingerprint(host_cert_obj), host_cert_obj + + except Exception as exc: + exc.add_note(f"remote_cmdline={remote_cmdline!r}") + exc.add_note(f"cmd_output={cmd_output!r}") + raise + + def use_certificate(self, fingerprint: str): + if not fingerprint_regex.fullmatch(fingerprint): + raise ValueError(f"illegal fingerprint {fingerprint!r}") + cmds = [ + f'/ip/service set api-ssl,www-ssl certificate=[/certificate find where fingerprint="{fingerprint}"]', + f':put [:serialize to=json value={{[/certificate get [/ip/service get api-ssl certificate] fingerprint],[/certificate get [/ip/service get www-ssl certificate] fingerprint]}}]' + ] + remote_cmdline = "\n".join(cmds) + raw_result = self.connector.invoke_remote_command( + remote_cmdline, text=True, capture=True + ) + result = json.loads(raw_result) + api_fingerprint = bytes.fromhex(result[0][0]).hex() + www_fingerprint = bytes.fromhex(result[0][1]).hex() + missing = [] + if api_fingerprint != fingerprint: + missing.append("api-ssl") + if www_fingerprint != fingerprint: + missing.append("www-ssl") + if missing: + raise CertInstallError( + f"certs didn't get installed to {','.join(missing)}:" + f" {api_fingerprint=!r}" + f", {www_fingerprint=!r}" + f", {fingerprint=!r}" + ) + + def _parse_cmd_output(self, cmd_output: str): + import_result_counts = defaultdict(int) + certlist_output = None + + for rawline in StringIO(cmd_output): + line = rawline.rstrip("\r\n") + + if m := import_result_regex.match(line): + import_result_counts[m["name"]] += int(m["count"]) + elif m := json_list_regex.match(line): + assert certlist_output is None, "certificate list json received twice?" + certlist_output = json.loads(line) + elif not line.strip(): + # blank line + pass + else: + raise ValueError(f"unable to parse output line: {line!r}") + return import_result_counts, certlist_output diff --git a/mtik_cert_pusher/routeros_ssh.py b/mtik_cert_pusher/routeros_ssh.py deleted file mode 100644 index 868bf6e..0000000 --- a/mtik_cert_pusher/routeros_ssh.py +++ /dev/null @@ -1,132 +0,0 @@ -#!python3 -# dlitz 2026 - -import shlex -import subprocess -import tempfile -import json -import os -from pathlib import Path - -from .ssl_util import SSLUtil -from cryptography import x509 -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.serialization import load_pem_private_key - -class Connector: - pass - -class SSHConnector(Connector): - - ssh_executable = "ssh" - scp_executable = "scp" - temporary_directory = "/dev/shm" - common_args = ["-oBatchMode=yes", "-oControlMaster=no"] - - def __init__(self, host: str, *, user: str|None=None, port: int|None=None, ssh_config_path: str|None=None, extra_ssh_options=None, extra_ssh_args=None, extra_scp_args=None): - self.ssh_host = host - self.ssh_user = user - self.ssh_port = port - self.ssh_config_path = ssh_config_path - if extra_ssh_options is None: - extra_ssh_options = {} - assert isinstance(extra_ssh_options, dict), "extra_ssh_options should be a dict" - self.extra_ssh_options = extra_ssh_options - self.extra_ssh_args = extra_ssh_args or () - self.extra_scp_args = extra_scp_args or () - - def _ssh_option_args(self) -> list: - result = [] - if self.ssh_config_path: - result.extend(["-F", self.ssh_config_path]) - if self.ssh_user: - result.append("-oUser={self.ssh_user}") - if self.ssh_port: - result.append("-oPort={self.ssh_port:d}") - for k, v in self.extra_ssh_options.items(): - assert "=" not in k - assert k - result.append(f"-o{k}={v}") - return result - - def ssh_args(self, args, /) -> list: - return [ - self.ssh_executable, - *self.common_args, - *self._ssh_option_args(), - *self.extra_ssh_args, - *args, - ] - - def scp_args(self, args, /) -> list: - return [ - self.scp_executable, - *self.common_args, - *self._ssh_option_args(), - *self.extra_scp_args, - *args, - ] - - def invoke_remote_command(self, cmdline: str, text:bool=False) -> str: - cmd = self.ssh_args([self.ssh_host, cmdline]) - result = subprocess.check_output(cmd, text=text) - return result - - def copy_to_remote(self, content: bytes, remote_name: str): - with tempfile.NamedTemporaryFile(dir=self.temporary_directory) as tf: - tf.write(content) - tf.flush() - cmd = self.scp_args([ - "-q", - tf.name, - f"{self.ssh_host}:{remote_name}", - ]) - print("running: ", shlex.join(cmd)) - subprocess.run(cmd, check=True) - -class RouterOS: - def __init__(self, connector: Connector, ssl_util:SSLUtil): - self.connector = connector - self.ssl_util = ssl_util - - def get_certificate_info(self): - cmdline = ":put [:serialize to=json [/certificate print detail as-value]]" - result_json = self.connector.invoke_remote_command(cmdline) - return json.loads(result_json) - - def _generate_random_pkcs12_passphrase(self): - return os.urandom(32).hex() - - def install_key_and_certificates( - self, key: str, cert: str, chain: str, - ): - private_key_obj = load_pem_private_key(key.encode(), password=None) - cert_obj = x509.load_pem_x509_certificate(cert.encode()) - if cert_obj.public_key() != private_key_obj.public_key(): - raise ValueError("certificate does not match private key") - - fpr_hex = cert_obj.fingerprint(hashes.SHA256()).hex() - - pubkey_obj = cert_obj.public_key() - skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj) - skid_hex = skid_obj.key_identifier.hex() - - remote_filename = f"{fpr_hex}.p12" - #remote_name = f"skid_{skid_hex}" - remote_name = f"fpr_{fpr_hex}" - - passphrase = self._generate_random_pkcs12_passphrase() - p12 = self.ssl_util.create_pkcs12_from_key_and_certificates(key=key, cert=cert, chain=chain, passphrase=passphrase) - - #Path("test.p12").write_bytes(p12) # DEBUG FIXME - - # Copy PKCS12 to remote - self.connector.copy_to_remote(p12, remote_filename) - cmdline = f'/certificate import no-key-export=yes trusted=no name="{remote_name}" file-name="{remote_filename}" passphrase="{passphrase}"' - #print(cmdline) - cmd_out = self.connector.invoke_remote_command(cmdline, text=True) - assert " files-imported: 1" in cmd_out, cmd_out - - # TODO: Check that certs are installed in remote; set cert - - #cmdline = "/ diff --git a/mtik_cert_pusher/ssl_util.py b/mtik_cert_pusher/ssl_util.py deleted file mode 100644 index 7fcf608..0000000 --- a/mtik_cert_pusher/ssl_util.py +++ /dev/null @@ -1,180 +0,0 @@ -#!python3 -# dlitz 2025-2026 -import contextlib -import fcntl -import os -import re -import subprocess - -from cryptography import x509 -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.serialization import ( - BestAvailableEncryption, - Encoding, - KeySerializationEncryption, - PrivateFormat, - load_pem_private_key, - load_pem_public_key, - pkcs12, -) - - -def pipe_with_buffered_data(data, *, text=False, encoding=None, errors=None): - # Open a pipe and place a small string into its buffer, then return a file - # open for reading the pipe. - rfile, wfile = _open_pipes(text=text) - try: - if text: - bdata = data.encode(encoding=wfile.encoding, errors=wfile.errors) - else: - bdata = data - pipe_buffer_size = fcntl.fcntl(wfile.fileno(), fcntl.F_GETPIPE_SZ) - if pipe_buffer_size < len(bdata): - pipe_buffer_size = fcntl.fcntl( - wfile.fileno(), fcntl.F_SETPIPE_SZ, len(bdata) - ) - assert pipe_buffer_size >= len(bdata) - wfile.write(data) - wfile.close() - return rfile - except: - wfile.close() - rfile.close() - raise - - -def _open_pipes(*, text=False, encoding=None, errors=None): - rfd, wfd = os.pipe() - rfile = wfile = None - try: - rfile = open(rfd, "r" if text else "rb", encoding=encoding, errors=errors) - wfile = open(wfd, "w" if text else "wb", encoding=encoding, errors=errors) - return rfile, wfile - except: - if rfile is not None: - rfile.close() - else: - os.close(rfd) - if wfile is not None: - wfile.close() - else: - os.close(wfd) - raise - - -class ResultParseError(Exception): - pass - - -class SSLUtil: - - openssl_prog = "openssl" - - def cert_fingerprint_sha256(self, pem_cert: str) -> bytes: - """Return the SHA256 fingerprint of the certificate, as bytes.""" - cert_obj = x509.load_pem_x509_certificate(pem_cert.encode()) - result = cert_obj.fingerprint(hashes.SHA256()) - assert isinstance(result, bytes) and len(result) == 32, (result,) - return result - - def cert_serial(self, cert: str) -> int: - """Return the serial number of the certificate, as integer. Might be negative.""" - cert_obj = x509.load_pem_x509_certificate(cert.encode()) - result = cert_obj.serial_number - assert isinstance(result, int), ("serial number not integer?", result) - return result - - def cert_skid(self, pem_cert: str) -> bytes: - """Return the SubjectKeyIdentifier of the certificate, as bytes.""" - cert_obj = x509.load_pem_x509_certificate(pem_cert.encode()) - pubkey_obj = cert_obj.public_key() - skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj) - assert skid_obj.digest == skid_obj.key_identifier - result = skid_obj.key_identifier - assert isinstance(result, bytes) - assert len(result) == 20 - return result - - def skid_from_pubkey(self, pubkey: str) -> bytes: - pubkey_obj = load_pem_public_key(pubkey.encode()) - result = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj).key_identifier - assert isinstance(result, bytes) - assert len(result) == 20 - return result - - def skid_from_private_key(self, private_key: str) -> bytes: - private_key_obj = load_pem_private_key(private_key.encode()) - pubkey_obj = private_key_obj.public_key() - result = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj).key_identifier - assert isinstance(result, bytes) - assert len(result) == 20 - return result - - def create_pkcs12_from_key_and_certificates( - self, - *, - name: str | None = None, - key: str, - cert: str, - chain: str | None = None, - passphrase: str, - ) -> bytes: - private_key_obj = load_pem_private_key(key.encode(), password=None) - cert_obj = x509.load_pem_x509_certificate(cert.encode()) - chain_objs = x509.load_pem_x509_certificates(chain.encode()) if chain else [] - - if name is None: - pubkey_obj = cert_obj.public_key() - skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj) - skid = skid_obj.key_identifier.hex() - fingerprint = cert_obj.fingerprint(hashes.SHA256()).hex() - name = f"SKID:{skid} FPR:{fingerprint}" - - result = pkcs12.serialize_key_and_certificates( - name=name.encode(), - key=private_key_obj, - cert=cert_obj, - cas=chain_objs, - encryption_algorithm=self.pkcs12_encryption_algorithm(passphrase.encode()), - ) - assert isinstance(result, bytes) - assert result - return result - - # def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase): - # assert re.search( - # r"^-----BEGIN(?: (.*))? PRIVATE KEY-----\n", privkey_data, re.M - # ) - # assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M) - # assert "PRIVATE KEY" not in cert_data - # assert chain_data is None or "PRIVATE KEY" not in chain_data - # - # fullchain_data = cert_data + "\n" + (chain_data or "") + "\n" - # - # all_data = privkey_data + "\n" + fullchain_data - # - # with pipe_with_buffered_data(passphrase, text=True) as passphrase_r: - # cmd = [ - # self.openssl_prog, - # "pkcs12", - # "-export", - # "-passout", - # f"fd:{passphrase_r.fileno():d}", - # # "-macalg", "SHA256", - # # "-keypbe", "AES-256-CBC", - # # "-certpbe", "NONE", - # ] - # return subprocess.check_output( - # cmd, pass_fds=[passphrase_r.fileno()], input=all_data.encode() - # ) - - def pkcs12_encryption_algorithm( - self, passphrase: bytes - ) -> KeySerializationEncryption: - return ( - PrivateFormat.PKCS12.encryption_builder() - .key_cert_algorithm(pkcs12.PBES.PBESv2SHA256AndAES256CBC) - .kdf_rounds(20000) - .hmac_hash(hashes.SHA256()) - .build(passphrase) - ) diff --git a/p12test.py b/p12test.py deleted file mode 100644 index 5bdc628..0000000 --- a/p12test.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python3 -# dlitz 2026 - -from cryptography import x509 -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.serialization import ( - Encoding, - load_pem_private_key, - load_pem_public_key, - PrivateFormat, -) -from cryptography.hazmat.primitives.serialization import ( - pkcs12, - BestAvailableEncryption, - KeySerializationEncryption, -) - -from pathlib import Path - -privkey_obj = load_pem_private_key(Path("x-private.pem").read_bytes(), password=None) -cert_obj = x509.load_pem_x509_certificate(Path("x-cert.pem").read_bytes()) -chain_objs = x509.load_pem_x509_certificates(Path("threecerts.pem").read_bytes()) - - -def pkcs12_pbes(password: bytes) -> KeySerializationEncryption: - return ( - PrivateFormat.PKCS12.encryption_builder() - .kdf_rounds(50000) - .key_cert_algorithm(pkcs12.PBES.PBESv2SHA256AndAES256CBC) - .hmac_hash(hashes.SHA256()) - .build(password) - ) - - -p12 = pkcs12.serialize_key_and_certificates( - name=b"friendly-name", - key=privkey_obj, - cert=cert_obj, - cas=chain_objs, - encryption_algorithm=pkcs12_pbes(b"secret"), -) - -Path("out.p12").write_bytes(p12) diff --git a/pkcs12_export_proc.py b/pkcs12_export_proc.py deleted file mode 100644 index 829c9e3..0000000 --- a/pkcs12_export_proc.py +++ /dev/null @@ -1,71 +0,0 @@ -#!python3 -# dlitz 2025 - -from contextlib import ExitStack, contextmanager -import multiprocessing -import subprocess -import os -import re - -class PKCS12Exporter: - - openssl_prog = "openssl" - - def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase): - assert re.search(r"^-----BEGIN (.*) PRIVATE KEY-----\n", privkey_data, re.M) - assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M) - assert "PRIVATE KEY" not in cert_data - assert chain_data is None or "PRIVATE KEY" not in chain_data - - fullchain_data = cert_data + "\n" + (chain_data or "") + "\n" - - all_data = privkey_data + "\n" + fullchain_data - - with ExitStack() as stack: - passphrase_r, passphrase_w = stack.enter_context(self.open_pipes(text=True)) - - cmd = [ - self.openssl_prog, - "pkcs12", - "-export", - "-passout", f"fd:{passphrase_r.fileno():d}", - "-macalg", "SHA256", - "-keypbe", "AES-256-CBC", - "-certpbe", "NONE", - ] - - passphrase_proc = multiprocessing.Process(target=self.data_writer, args=(passphrase_w, passphrase)) - passphrase_proc.start() - passphrase_w.close() - - print(cmd) - #subprocess.run(["bash", "-x", "-c", "ls -l /dev/fd/; cat /dev/fd/5; cat /dev/fd/3"], pass_fds=(passphrase_r.fileno(), privkey_r.fileno())) - #subprocess.run(cmd, pass_fds=(passphrase_r.fileno(), privkey_r.fileno()), input=fullchain_data.encode()) - subprocess.run(cmd, pass_fds=(passphrase_r.fileno(), privkey_r.fileno()), input=all_data.encode()) - - passphrase_proc.terminate() - passphrase_proc. - - @contextmanager - def open_pipes(self, *, text=False): - rfd, wfd = os.pipe() - rfile = wfile = None - try: - rfile = open(rfd, "r" if text else "rb") - wfile = open(wfd, "w" if text else "wb") - except: - if rfile is not None: - rfile.close() - else: - os.close(rfd) - if wfile is not None: - wfile.close() - else: - os.close(wfd) - with rfile as r, wfile as w: - yield r, w - - @classmethod - def data_writer(cls, file, data): - file.write(data) - file.close()