From b2239d541455ec68b8490a3b0b4386e017c1019a Mon Sep 17 00:00:00 2001 From: Darsey Litzenberger Date: Thu, 19 Mar 2026 13:45:14 -0600 Subject: [PATCH] snapshot --- mtik_cert_pusher/__main__.py | 108 ++++++++++++++++++ mtik_cert_pusher/cert_pusher.py | 57 ++++++++++ mtik_cert_pusher/cert_util.py | 48 ++++++++ mtik_cert_pusher/pkcs12_export.py | 46 ++++++++ mtik_cert_pusher/routeros_ssh.py | 132 ++++++++++++++++++++++ mtik_cert_pusher/ssl_util.py | 180 ++++++++++++++++++++++++++++++ p12test.py | 43 +++++++ pkcs12_export.py | 116 ------------------- pushcert-mikrotik | 86 -------------- 9 files changed, 614 insertions(+), 202 deletions(-) create mode 100644 mtik_cert_pusher/__main__.py create mode 100644 mtik_cert_pusher/cert_pusher.py create mode 100644 mtik_cert_pusher/cert_util.py create mode 100644 mtik_cert_pusher/pkcs12_export.py create mode 100644 mtik_cert_pusher/routeros_ssh.py create mode 100644 mtik_cert_pusher/ssl_util.py create mode 100644 p12test.py delete mode 100644 pkcs12_export.py delete mode 100755 pushcert-mikrotik diff --git a/mtik_cert_pusher/__main__.py b/mtik_cert_pusher/__main__.py new file mode 100644 index 0000000..a43e530 --- /dev/null +++ b/mtik_cert_pusher/__main__.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# dlitz 2025 + +import sys +import os +import shlex +import subprocess +import tempfile +from argparse import ArgumentParser +from pathlib import Path + +from .cert_util import split_certs +from .ssl_util import SSLUtil +#from .cert_pusher import MTCertPusher +from .routeros_ssh import RouterOS, SSHConnector + +#def generate_random_passphrase(): +# return os.urandom(64).hex() + + +def make_arg_parser(): + if Path(sys.argv[0]).stem == '__main__': + prog = __package__ + else: + prog = None + parser = ArgumentParser( + prog=prog, + description="push TLS privkey & certificate to MikroTik RouterOS router" + ) + parser.add_argument( + "-k", "--privkey", type=Path, required=True, help="private key file" + ) + parser.add_argument("--cert", type=Path, required=True, help="certificate file") + parser.add_argument( + "--chain", type=Path, help="separate certificate chain file (optional)" + ) + parser.add_argument("--ssh-config", type=Path, help="ssh config file") + parser.add_argument("--ssh-host", required=True, help="target ssh host") + parser.add_argument("--ssh-user", help="target ssh user") + parser.add_argument("--ssh-port", type=int, help="target ssh port") + return parser + + +def parse_args(): + parser = make_arg_parser() + args = parser.parse_args() + assert ":" not in args.ssh_host + return args, parser + + +def main(): + args, parser = parse_args() + + # TODO: Check certificate serial number before attempting to copy cert, and at end. + + privkey_data = args.privkey.read_text() + cert_data = args.cert.read_text() + chain_data = args.chain.read_text() if args.chain is not None else None + + #key_passphrase = generate_random_passphrase() + #chain_certs = split_certs(chain_data) + + ssl_util = SSLUtil() + ssh_connector = SSHConnector(host=args.ssh_host, port=args.ssh_port, user=args.ssh_user, ssh_config_path=args.ssh_config) + ros_remote = RouterOS(connector=ssh_connector, ssl_util=ssl_util) + + #ros_ssh = RouterOS_SSH(host=args.ssh_host) + + ros_remote.install_key_and_certificates(key=privkey_data, cert=cert_data, chain=chain_data) + +# pkcs12_data = sslutil.export_pkcs12( +# privkey_data=privkey_data, +# cert_data=cert_data, +# chain_data=chain_data, +# passphrase=key_passphrase, +# ) + +# +# +# with tempfile.NamedTemporaryFile(dir="/dev/shm") as tf: +# tf.write(pkcs12_data) +# tf.flush() +# +# ssh_options = [ +# "-oBatchMode=yes", +# "-oControlMaster=no", +# ] +# +# cmd = ["scp", *ssh_options, "-q", tf.name, f"{args.ssh_host}:/cert-pusher-data.p12"] +# # print("executing:", shlex.join(cmd)) +# subprocess.run(cmd, check=True) +# +# # ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"' +# ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"; /file remove [/file find name=cert-pusher-data.p12]' +# cmd = [ +# "ssh", +# *ssh_options, +# args.ssh_host, +# ros_command, +# ] +# result = subprocess.check_output(cmd, text=True) +# assert " files-imported: 1" in result +# # print(result) + + + +if __name__ == "__main__": + main() diff --git a/mtik_cert_pusher/cert_pusher.py b/mtik_cert_pusher/cert_pusher.py new file mode 100644 index 0000000..3c8d01b --- /dev/null +++ b/mtik_cert_pusher/cert_pusher.py @@ -0,0 +1,57 @@ +#!python3 +# dlitz 2025-2026 + +import tempfile +import os + +from cryptography import x509 +from cryptography.hazmat.primitives.hashes import SHA256 +from cryptography.hazmat.primitives.serialization import ( + BestAvailableEncryption, + KeySerializationEncryption, + PrivateFormat, + load_pem_private_key, + load_pem_public_key, + pkcs12, +) + +from .cert_util import split_certs +from .routeros_ssh import RouterOS_SSH +from .ssl_util import SSLUtil + + +class MTCertPusher: + + temporary_directory = "/dev/shm" + + def __init__(self, ssl_util: SSLUtil, ros_ssh: RouterOS_SSH): + self.ssl_util = ssl_util + self.ros = ros_ssh + self.tempdir = tempfile.TemporaryDirectory(dir=self.temporary_directory) + + def __del__(self): + self.close() + + def close(self): + try: + tempdir = self.tempdir + except AttributeError: + pass + else: + self.tempdir.cleanup() + del self.tempdir + + def generate_random_pkcs12_passphrase(self): + return os.urandom(64).hex() + + def install_key_and_certificates( + self, key: str, cert: str, chain: str | None = None + ): + private_key_obj = load_pem_private_key(key.encode()) + cert_obj = x509.load_pem_x509_certificate(cert.encode()) + if cert_obj.public_key() != private_key_obj.public_key(): + raise ValueError("certificate does not match private key") + + passphrase = self.generate_random_pkcs12_passphrase() + p12 = self.ssl_util.create_pkcs12_from_key_and_certificates(key=key, cert=cert, passphrase=passphrase) + diff --git a/mtik_cert_pusher/cert_util.py b/mtik_cert_pusher/cert_util.py new file mode 100644 index 0000000..6896d4b --- /dev/null +++ b/mtik_cert_pusher/cert_util.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# dlitz 2026 +import base64 +import re + +CERT_REGEXP = re.compile( + r""" + (?P + ^-----BEGIN\ CERTIFICATE-----\n + (?P + [0-9A-Za-z/+\n]+? # base64 characters and newlines + ={0,2}\n # base64 padding + ) + ^-----END\ CERTIFICATE-----(?:\n|\Z) + ) + """, + re.S | re.M | re.X, +) + + +def split_certs(pem_data: str, *, strict: bool = True) -> list[str]: + r = CERT_REGEXP + if not strict: + return [m["cert"] for m in r.finditer(pem_data)] + certs = [] + pos = 0 + for m in r.finditer(pem_data): + if strict and m.start() != pos: + raise ValueError( + f"certificate data contains extra junk at (position {pos})" + ) + cert = m["cert"] + if strict: + # Try decoding the base64 data + base64.b64decode(m["b64data"]) + certs.append(cert) + pos = m.end() + if strict and pos != len(pem_data): + raise ValueError(f"extra junk after certificate data (at position {pos})") + return certs + + +if __name__ == "__main__": + from pathlib import Path + + # print(split_certs(Path("threecerts.pem").read_text(), strict=False)) + # print(split_certs(Path("threecerts.pem").read_text(), strict=True)) + print(split_certs("-----BEGIN CERTIFICATE-----\nAAA=\n-----END CERTIFICATE-----\n")) diff --git a/mtik_cert_pusher/pkcs12_export.py b/mtik_cert_pusher/pkcs12_export.py new file mode 100644 index 0000000..a5346c8 --- /dev/null +++ b/mtik_cert_pusher/pkcs12_export.py @@ -0,0 +1,46 @@ +#!python3 +# dlitz 2025 + +import fcntl +import re +import subprocess +from contextlib import ExitStack, contextmanager + +from .ssl_util import SSLUtil + +if __name__ == '__main__': + from argparse import ArgumentParser + from pathlib import Path + import getpass + import sys + + parser = ArgumentParser( + description="push TLS privkey & certificate to MikroTik RouterOS router" + ) + parser.add_argument( + "-k", "--privkey", type=Path, required=True, help="private key file" + ) + parser.add_argument("--cert", type=Path, required=True, help="certificate file") + parser.add_argument( + "--chain", type=Path, help="separate certificate chain file (optional)" + ) + parser.add_argument("-o", "--output", type=Path, help="output file") + args = parser.parse_args() + + privkey_data = args.privkey.read_text() + cert_data = args.cert.read_text() + chain_data = args.chain.read_text() if args.chain is not None else None + + key_passphrase = getpass.getpass("set the passphrase:") + + pkcs12_data = SSLUtil().export_pkcs12( + privkey_data=privkey_data, + cert_data=cert_data, + chain_data=chain_data, + passphrase=key_passphrase, + ) + + if args.output: + args.output.write_bytes(pkcs12_data) + else: + sys.stdout.buffer.write(pkcs12_data) diff --git a/mtik_cert_pusher/routeros_ssh.py b/mtik_cert_pusher/routeros_ssh.py new file mode 100644 index 0000000..868bf6e --- /dev/null +++ b/mtik_cert_pusher/routeros_ssh.py @@ -0,0 +1,132 @@ +#!python3 +# dlitz 2026 + +import shlex +import subprocess +import tempfile +import json +import os +from pathlib import Path + +from .ssl_util import SSLUtil +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import load_pem_private_key + +class Connector: + pass + +class SSHConnector(Connector): + + ssh_executable = "ssh" + scp_executable = "scp" + temporary_directory = "/dev/shm" + common_args = ["-oBatchMode=yes", "-oControlMaster=no"] + + def __init__(self, host: str, *, user: str|None=None, port: int|None=None, ssh_config_path: str|None=None, extra_ssh_options=None, extra_ssh_args=None, extra_scp_args=None): + self.ssh_host = host + self.ssh_user = user + self.ssh_port = port + self.ssh_config_path = ssh_config_path + if extra_ssh_options is None: + extra_ssh_options = {} + assert isinstance(extra_ssh_options, dict), "extra_ssh_options should be a dict" + self.extra_ssh_options = extra_ssh_options + self.extra_ssh_args = extra_ssh_args or () + self.extra_scp_args = extra_scp_args or () + + def _ssh_option_args(self) -> list: + result = [] + if self.ssh_config_path: + result.extend(["-F", self.ssh_config_path]) + if self.ssh_user: + result.append("-oUser={self.ssh_user}") + if self.ssh_port: + result.append("-oPort={self.ssh_port:d}") + for k, v in self.extra_ssh_options.items(): + assert "=" not in k + assert k + result.append(f"-o{k}={v}") + return result + + def ssh_args(self, args, /) -> list: + return [ + self.ssh_executable, + *self.common_args, + *self._ssh_option_args(), + *self.extra_ssh_args, + *args, + ] + + def scp_args(self, args, /) -> list: + return [ + self.scp_executable, + *self.common_args, + *self._ssh_option_args(), + *self.extra_scp_args, + *args, + ] + + def invoke_remote_command(self, cmdline: str, text:bool=False) -> str: + cmd = self.ssh_args([self.ssh_host, cmdline]) + result = subprocess.check_output(cmd, text=text) + return result + + def copy_to_remote(self, content: bytes, remote_name: str): + with tempfile.NamedTemporaryFile(dir=self.temporary_directory) as tf: + tf.write(content) + tf.flush() + cmd = self.scp_args([ + "-q", + tf.name, + f"{self.ssh_host}:{remote_name}", + ]) + print("running: ", shlex.join(cmd)) + subprocess.run(cmd, check=True) + +class RouterOS: + def __init__(self, connector: Connector, ssl_util:SSLUtil): + self.connector = connector + self.ssl_util = ssl_util + + def get_certificate_info(self): + cmdline = ":put [:serialize to=json [/certificate print detail as-value]]" + result_json = self.connector.invoke_remote_command(cmdline) + return json.loads(result_json) + + def _generate_random_pkcs12_passphrase(self): + return os.urandom(32).hex() + + def install_key_and_certificates( + self, key: str, cert: str, chain: str, + ): + private_key_obj = load_pem_private_key(key.encode(), password=None) + cert_obj = x509.load_pem_x509_certificate(cert.encode()) + if cert_obj.public_key() != private_key_obj.public_key(): + raise ValueError("certificate does not match private key") + + fpr_hex = cert_obj.fingerprint(hashes.SHA256()).hex() + + pubkey_obj = cert_obj.public_key() + skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj) + skid_hex = skid_obj.key_identifier.hex() + + remote_filename = f"{fpr_hex}.p12" + #remote_name = f"skid_{skid_hex}" + remote_name = f"fpr_{fpr_hex}" + + passphrase = self._generate_random_pkcs12_passphrase() + p12 = self.ssl_util.create_pkcs12_from_key_and_certificates(key=key, cert=cert, chain=chain, passphrase=passphrase) + + #Path("test.p12").write_bytes(p12) # DEBUG FIXME + + # Copy PKCS12 to remote + self.connector.copy_to_remote(p12, remote_filename) + cmdline = f'/certificate import no-key-export=yes trusted=no name="{remote_name}" file-name="{remote_filename}" passphrase="{passphrase}"' + #print(cmdline) + cmd_out = self.connector.invoke_remote_command(cmdline, text=True) + assert " files-imported: 1" in cmd_out, cmd_out + + # TODO: Check that certs are installed in remote; set cert + + #cmdline = "/ diff --git a/mtik_cert_pusher/ssl_util.py b/mtik_cert_pusher/ssl_util.py new file mode 100644 index 0000000..7fcf608 --- /dev/null +++ b/mtik_cert_pusher/ssl_util.py @@ -0,0 +1,180 @@ +#!python3 +# dlitz 2025-2026 +import contextlib +import fcntl +import os +import re +import subprocess + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import ( + BestAvailableEncryption, + Encoding, + KeySerializationEncryption, + PrivateFormat, + load_pem_private_key, + load_pem_public_key, + pkcs12, +) + + +def pipe_with_buffered_data(data, *, text=False, encoding=None, errors=None): + # Open a pipe and place a small string into its buffer, then return a file + # open for reading the pipe. + rfile, wfile = _open_pipes(text=text) + try: + if text: + bdata = data.encode(encoding=wfile.encoding, errors=wfile.errors) + else: + bdata = data + pipe_buffer_size = fcntl.fcntl(wfile.fileno(), fcntl.F_GETPIPE_SZ) + if pipe_buffer_size < len(bdata): + pipe_buffer_size = fcntl.fcntl( + wfile.fileno(), fcntl.F_SETPIPE_SZ, len(bdata) + ) + assert pipe_buffer_size >= len(bdata) + wfile.write(data) + wfile.close() + return rfile + except: + wfile.close() + rfile.close() + raise + + +def _open_pipes(*, text=False, encoding=None, errors=None): + rfd, wfd = os.pipe() + rfile = wfile = None + try: + rfile = open(rfd, "r" if text else "rb", encoding=encoding, errors=errors) + wfile = open(wfd, "w" if text else "wb", encoding=encoding, errors=errors) + return rfile, wfile + except: + if rfile is not None: + rfile.close() + else: + os.close(rfd) + if wfile is not None: + wfile.close() + else: + os.close(wfd) + raise + + +class ResultParseError(Exception): + pass + + +class SSLUtil: + + openssl_prog = "openssl" + + def cert_fingerprint_sha256(self, pem_cert: str) -> bytes: + """Return the SHA256 fingerprint of the certificate, as bytes.""" + cert_obj = x509.load_pem_x509_certificate(pem_cert.encode()) + result = cert_obj.fingerprint(hashes.SHA256()) + assert isinstance(result, bytes) and len(result) == 32, (result,) + return result + + def cert_serial(self, cert: str) -> int: + """Return the serial number of the certificate, as integer. Might be negative.""" + cert_obj = x509.load_pem_x509_certificate(cert.encode()) + result = cert_obj.serial_number + assert isinstance(result, int), ("serial number not integer?", result) + return result + + def cert_skid(self, pem_cert: str) -> bytes: + """Return the SubjectKeyIdentifier of the certificate, as bytes.""" + cert_obj = x509.load_pem_x509_certificate(pem_cert.encode()) + pubkey_obj = cert_obj.public_key() + skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj) + assert skid_obj.digest == skid_obj.key_identifier + result = skid_obj.key_identifier + assert isinstance(result, bytes) + assert len(result) == 20 + return result + + def skid_from_pubkey(self, pubkey: str) -> bytes: + pubkey_obj = load_pem_public_key(pubkey.encode()) + result = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj).key_identifier + assert isinstance(result, bytes) + assert len(result) == 20 + return result + + def skid_from_private_key(self, private_key: str) -> bytes: + private_key_obj = load_pem_private_key(private_key.encode()) + pubkey_obj = private_key_obj.public_key() + result = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj).key_identifier + assert isinstance(result, bytes) + assert len(result) == 20 + return result + + def create_pkcs12_from_key_and_certificates( + self, + *, + name: str | None = None, + key: str, + cert: str, + chain: str | None = None, + passphrase: str, + ) -> bytes: + private_key_obj = load_pem_private_key(key.encode(), password=None) + cert_obj = x509.load_pem_x509_certificate(cert.encode()) + chain_objs = x509.load_pem_x509_certificates(chain.encode()) if chain else [] + + if name is None: + pubkey_obj = cert_obj.public_key() + skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj) + skid = skid_obj.key_identifier.hex() + fingerprint = cert_obj.fingerprint(hashes.SHA256()).hex() + name = f"SKID:{skid} FPR:{fingerprint}" + + result = pkcs12.serialize_key_and_certificates( + name=name.encode(), + key=private_key_obj, + cert=cert_obj, + cas=chain_objs, + encryption_algorithm=self.pkcs12_encryption_algorithm(passphrase.encode()), + ) + assert isinstance(result, bytes) + assert result + return result + + # def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase): + # assert re.search( + # r"^-----BEGIN(?: (.*))? PRIVATE KEY-----\n", privkey_data, re.M + # ) + # assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M) + # assert "PRIVATE KEY" not in cert_data + # assert chain_data is None or "PRIVATE KEY" not in chain_data + # + # fullchain_data = cert_data + "\n" + (chain_data or "") + "\n" + # + # all_data = privkey_data + "\n" + fullchain_data + # + # with pipe_with_buffered_data(passphrase, text=True) as passphrase_r: + # cmd = [ + # self.openssl_prog, + # "pkcs12", + # "-export", + # "-passout", + # f"fd:{passphrase_r.fileno():d}", + # # "-macalg", "SHA256", + # # "-keypbe", "AES-256-CBC", + # # "-certpbe", "NONE", + # ] + # return subprocess.check_output( + # cmd, pass_fds=[passphrase_r.fileno()], input=all_data.encode() + # ) + + def pkcs12_encryption_algorithm( + self, passphrase: bytes + ) -> KeySerializationEncryption: + return ( + PrivateFormat.PKCS12.encryption_builder() + .key_cert_algorithm(pkcs12.PBES.PBESv2SHA256AndAES256CBC) + .kdf_rounds(20000) + .hmac_hash(hashes.SHA256()) + .build(passphrase) + ) diff --git a/p12test.py b/p12test.py new file mode 100644 index 0000000..5bdc628 --- /dev/null +++ b/p12test.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# dlitz 2026 + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import ( + Encoding, + load_pem_private_key, + load_pem_public_key, + PrivateFormat, +) +from cryptography.hazmat.primitives.serialization import ( + pkcs12, + BestAvailableEncryption, + KeySerializationEncryption, +) + +from pathlib import Path + +privkey_obj = load_pem_private_key(Path("x-private.pem").read_bytes(), password=None) +cert_obj = x509.load_pem_x509_certificate(Path("x-cert.pem").read_bytes()) +chain_objs = x509.load_pem_x509_certificates(Path("threecerts.pem").read_bytes()) + + +def pkcs12_pbes(password: bytes) -> KeySerializationEncryption: + return ( + PrivateFormat.PKCS12.encryption_builder() + .kdf_rounds(50000) + .key_cert_algorithm(pkcs12.PBES.PBESv2SHA256AndAES256CBC) + .hmac_hash(hashes.SHA256()) + .build(password) + ) + + +p12 = pkcs12.serialize_key_and_certificates( + name=b"friendly-name", + key=privkey_obj, + cert=cert_obj, + cas=chain_objs, + encryption_algorithm=pkcs12_pbes(b"secret"), +) + +Path("out.p12").write_bytes(p12) diff --git a/pkcs12_export.py b/pkcs12_export.py deleted file mode 100644 index 85466dd..0000000 --- a/pkcs12_export.py +++ /dev/null @@ -1,116 +0,0 @@ -#!python3 -# dlitz 2025 - -import fcntl -import os -import re -import subprocess -from contextlib import ExitStack, contextmanager - - -class PKCS12Exporter: - - openssl_prog = "openssl" - - def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase): - assert re.search(r"^-----BEGIN (.*) PRIVATE KEY-----\n", privkey_data, re.M) - assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M) - assert "PRIVATE KEY" not in cert_data - assert chain_data is None or "PRIVATE KEY" not in chain_data - - fullchain_data = cert_data + "\n" + (chain_data or "") + "\n" - - all_data = privkey_data + "\n" + fullchain_data - - with self.pipe_with_buffered_data(passphrase, text=True) as passphrase_r: - cmd = [ - self.openssl_prog, - "pkcs12", - "-export", - "-passout", - f"fd:{passphrase_r.fileno():d}", - # "-macalg", "SHA256", - # "-keypbe", "AES-256-CBC", - # "-certpbe", "NONE", - ] - return subprocess.check_output( - cmd, pass_fds=[passphrase_r.fileno()], input=all_data.encode() - ) - - def open_pipes(self, *, text=False, encoding=None, errors=None): - rfd, wfd = os.pipe() - rfile = wfile = None - try: - rfile = open(rfd, "r" if text else "rb", encoding=encoding, errors=errors) - wfile = open(wfd, "w" if text else "wb", encoding=encoding, errors=errors) - return rfile, wfile - except: - if rfile is not None: - rfile.close() - else: - os.close(rfd) - if wfile is not None: - wfile.close() - else: - os.close(wfd) - raise - - def pipe_with_buffered_data(self, data, *, text=False, encoding=None, errors=None): - # Open a pipe and place a small string into its buffer, then return a file - # open for reading the pipe. - rfile, wfile = self.open_pipes(text=text) - try: - if text: - bdata = data.encode(encoding=wfile.encoding, errors=wfile.errors) - else: - bdata = data - pipe_buffer_size = fcntl.fcntl(wfile.fileno(), fcntl.F_GETPIPE_SZ) - if pipe_buffer_size < len(bdata): - pipe_buffer_size = fcntl.fcntl( - wfile.fileno(), fcntl.F_SETPIPE_SZ, len(bdata) - ) - assert pipe_buffer_size >= len(bdata) - wfile.write(data) - wfile.close() - return rfile - except: - wfile.close() - rfile.close() - raise - -if __name__ == '__main__': - from argparse import ArgumentParser - from pathlib import Path - import getpass - import sys - - parser = ArgumentParser( - description="push TLS privkey & certificate to MikroTik RouterOS router" - ) - parser.add_argument( - "-k", "--privkey", type=Path, required=True, help="private key file" - ) - parser.add_argument("--cert", type=Path, required=True, help="certificate file") - parser.add_argument( - "--chain", type=Path, help="separate certificate chain file (optional)" - ) - parser.add_argument("-o", "--output", type=Path, help="output file") - args = parser.parse_args() - - privkey_data = args.privkey.read_text() - cert_data = args.cert.read_text() - chain_data = args.chain.read_text() if args.chain is not None else None - - key_passphrase = getpass.getpass("set the passphrase:") - - pkcs12_data = PKCS12Exporter().export_pkcs12( - privkey_data=privkey_data, - cert_data=cert_data, - chain_data=chain_data, - passphrase=key_passphrase, - ) - - if args.output: - args.output.write_bytes(pkcs12_data) - else: - sys.stdout.buffer.write(pkcs12_data) diff --git a/pushcert-mikrotik b/pushcert-mikrotik deleted file mode 100755 index 22f7217..0000000 --- a/pushcert-mikrotik +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python3 -# dlitz 2025 - -import os -import shlex -import subprocess -import tempfile -from argparse import ArgumentParser -from pathlib import Path - -from pkcs12_export import PKCS12Exporter - - -def generate_random_passphrase(): - return os.urandom(64).hex() - - -def make_arg_parser(): - parser = ArgumentParser( - description="push TLS privkey & certificate to MikroTik RouterOS router" - ) - parser.add_argument( - "-k", "--privkey", type=Path, required=True, help="private key file" - ) - parser.add_argument("--cert", type=Path, required=True, help="certificate file") - parser.add_argument( - "--chain", type=Path, help="separate certificate chain file (optional)" - ) - parser.add_argument("--ssh-host", required=True, help="target ssh host") - return parser - - -def parse_args(): - parser = make_arg_parser() - args = parser.parse_args() - assert ":" not in args.ssh_host - return args, parser - - -def main(): - args, parser = parse_args() - - # TODO: Check certificate serial number before attempting to copy cert, and at end. - - privkey_data = args.privkey.read_text() - cert_data = args.cert.read_text() - chain_data = args.chain.read_text() if args.chain is not None else None - - key_passphrase = generate_random_passphrase() - - pkcs12_data = PKCS12Exporter().export_pkcs12( - privkey_data=privkey_data, - cert_data=cert_data, - chain_data=chain_data, - passphrase=key_passphrase, - ) - - with tempfile.NamedTemporaryFile(dir="/dev/shm") as tf: - tf.write(pkcs12_data) - tf.flush() - - ssh_options = [ - "-oBatchMode=yes", - "-oControlMaster=no", - ] - - cmd = ["scp", *ssh_options, "-q", tf.name, f"{args.ssh_host}:/cert-pusher-data.p12"] - # print("executing:", shlex.join(cmd)) - subprocess.run(cmd, check=True) - - # ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"' - ros_command = f'/certificate import name=www_ssl_cert file-name=cert-pusher-data.p12 no-key-export=yes passphrase="{key_passphrase}"; /file remove [/file find name=cert-pusher-data.p12]' - cmd = [ - "ssh", - *ssh_options, - args.ssh_host, - ros_command, - ] - result = subprocess.check_output(cmd, text=True) - assert " files-imported: 1" in result - # print(result) - - - -if __name__ == "__main__": - main()