Files
mtik-cert-pusher/mtik_cert_pusher/routeros_ssh.py
2026-03-19 13:45:14 -06:00

133 lines
4.5 KiB
Python

#!python3
# dlitz 2026
import shlex
import subprocess
import tempfile
import json
import os
from pathlib import Path
from .ssl_util import SSLUtil
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
class Connector:
pass
class SSHConnector(Connector):
ssh_executable = "ssh"
scp_executable = "scp"
temporary_directory = "/dev/shm"
common_args = ["-oBatchMode=yes", "-oControlMaster=no"]
def __init__(self, host: str, *, user: str|None=None, port: int|None=None, ssh_config_path: str|None=None, extra_ssh_options=None, extra_ssh_args=None, extra_scp_args=None):
self.ssh_host = host
self.ssh_user = user
self.ssh_port = port
self.ssh_config_path = ssh_config_path
if extra_ssh_options is None:
extra_ssh_options = {}
assert isinstance(extra_ssh_options, dict), "extra_ssh_options should be a dict"
self.extra_ssh_options = extra_ssh_options
self.extra_ssh_args = extra_ssh_args or ()
self.extra_scp_args = extra_scp_args or ()
def _ssh_option_args(self) -> list:
result = []
if self.ssh_config_path:
result.extend(["-F", self.ssh_config_path])
if self.ssh_user:
result.append("-oUser={self.ssh_user}")
if self.ssh_port:
result.append("-oPort={self.ssh_port:d}")
for k, v in self.extra_ssh_options.items():
assert "=" not in k
assert k
result.append(f"-o{k}={v}")
return result
def ssh_args(self, args, /) -> list:
return [
self.ssh_executable,
*self.common_args,
*self._ssh_option_args(),
*self.extra_ssh_args,
*args,
]
def scp_args(self, args, /) -> list:
return [
self.scp_executable,
*self.common_args,
*self._ssh_option_args(),
*self.extra_scp_args,
*args,
]
def invoke_remote_command(self, cmdline: str, text:bool=False) -> str:
cmd = self.ssh_args([self.ssh_host, cmdline])
result = subprocess.check_output(cmd, text=text)
return result
def copy_to_remote(self, content: bytes, remote_name: str):
with tempfile.NamedTemporaryFile(dir=self.temporary_directory) as tf:
tf.write(content)
tf.flush()
cmd = self.scp_args([
"-q",
tf.name,
f"{self.ssh_host}:{remote_name}",
])
print("running: ", shlex.join(cmd))
subprocess.run(cmd, check=True)
class RouterOS:
def __init__(self, connector: Connector, ssl_util:SSLUtil):
self.connector = connector
self.ssl_util = ssl_util
def get_certificate_info(self):
cmdline = ":put [:serialize to=json [/certificate print detail as-value]]"
result_json = self.connector.invoke_remote_command(cmdline)
return json.loads(result_json)
def _generate_random_pkcs12_passphrase(self):
return os.urandom(32).hex()
def install_key_and_certificates(
self, key: str, cert: str, chain: str,
):
private_key_obj = load_pem_private_key(key.encode(), password=None)
cert_obj = x509.load_pem_x509_certificate(cert.encode())
if cert_obj.public_key() != private_key_obj.public_key():
raise ValueError("certificate does not match private key")
fpr_hex = cert_obj.fingerprint(hashes.SHA256()).hex()
pubkey_obj = cert_obj.public_key()
skid_obj = x509.SubjectKeyIdentifier.from_public_key(pubkey_obj)
skid_hex = skid_obj.key_identifier.hex()
remote_filename = f"{fpr_hex}.p12"
#remote_name = f"skid_{skid_hex}"
remote_name = f"fpr_{fpr_hex}"
passphrase = self._generate_random_pkcs12_passphrase()
p12 = self.ssl_util.create_pkcs12_from_key_and_certificates(key=key, cert=cert, chain=chain, passphrase=passphrase)
#Path("test.p12").write_bytes(p12) # DEBUG FIXME
# Copy PKCS12 to remote
self.connector.copy_to_remote(p12, remote_filename)
cmdline = f'/certificate import no-key-export=yes trusted=no name="{remote_name}" file-name="{remote_filename}" passphrase="{passphrase}"'
#print(cmdline)
cmd_out = self.connector.invoke_remote_command(cmdline, text=True)
assert " files-imported: 1" in cmd_out, cmd_out
# TODO: Check that certs are installed in remote; set cert
#cmdline = "/