Initial commit
This commit is contained in:
116
pkcs12_export.py
Normal file
116
pkcs12_export.py
Normal file
@@ -0,0 +1,116 @@
|
||||
#!python3
|
||||
# dlitz 2025
|
||||
|
||||
import fcntl
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from contextlib import ExitStack, contextmanager
|
||||
|
||||
|
||||
class PKCS12Exporter:
|
||||
|
||||
openssl_prog = "openssl"
|
||||
|
||||
def export_pkcs12(self, privkey_data, cert_data, chain_data, passphrase):
|
||||
assert re.search(r"^-----BEGIN (.*) PRIVATE KEY-----\n", privkey_data, re.M)
|
||||
assert re.search(r"^-----BEGIN CERTIFICATE-----\n", cert_data, re.M)
|
||||
assert "PRIVATE KEY" not in cert_data
|
||||
assert chain_data is None or "PRIVATE KEY" not in chain_data
|
||||
|
||||
fullchain_data = cert_data + "\n" + (chain_data or "") + "\n"
|
||||
|
||||
all_data = privkey_data + "\n" + fullchain_data
|
||||
|
||||
with self.pipe_with_buffered_data(passphrase, text=True) as passphrase_r:
|
||||
cmd = [
|
||||
self.openssl_prog,
|
||||
"pkcs12",
|
||||
"-export",
|
||||
"-passout",
|
||||
f"fd:{passphrase_r.fileno():d}",
|
||||
# "-macalg", "SHA256",
|
||||
# "-keypbe", "AES-256-CBC",
|
||||
# "-certpbe", "NONE",
|
||||
]
|
||||
return subprocess.check_output(
|
||||
cmd, pass_fds=[passphrase_r.fileno()], input=all_data.encode()
|
||||
)
|
||||
|
||||
def open_pipes(self, *, text=False, encoding=None, errors=None):
|
||||
rfd, wfd = os.pipe()
|
||||
rfile = wfile = None
|
||||
try:
|
||||
rfile = open(rfd, "r" if text else "rb", encoding=encoding, errors=errors)
|
||||
wfile = open(wfd, "w" if text else "wb", encoding=encoding, errors=errors)
|
||||
return rfile, wfile
|
||||
except:
|
||||
if rfile is not None:
|
||||
rfile.close()
|
||||
else:
|
||||
os.close(rfd)
|
||||
if wfile is not None:
|
||||
wfile.close()
|
||||
else:
|
||||
os.close(wfd)
|
||||
raise
|
||||
|
||||
def pipe_with_buffered_data(self, data, *, text=False, encoding=None, errors=None):
|
||||
# Open a pipe and place a small string into its buffer, then return a file
|
||||
# open for reading the pipe.
|
||||
rfile, wfile = self.open_pipes(text=text)
|
||||
try:
|
||||
if text:
|
||||
bdata = data.encode(encoding=wfile.encoding, errors=wfile.errors)
|
||||
else:
|
||||
bdata = data
|
||||
pipe_buffer_size = fcntl.fcntl(wfile.fileno(), fcntl.F_GETPIPE_SZ)
|
||||
if pipe_buffer_size < len(bdata):
|
||||
pipe_buffer_size = fcntl.fcntl(
|
||||
wfile.fileno(), fcntl.F_SETPIPE_SZ, len(bdata)
|
||||
)
|
||||
assert pipe_buffer_size >= len(bdata)
|
||||
wfile.write(data)
|
||||
wfile.close()
|
||||
return rfile
|
||||
except:
|
||||
wfile.close()
|
||||
rfile.close()
|
||||
raise
|
||||
|
||||
if __name__ == '__main__':
|
||||
from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
import getpass
|
||||
import sys
|
||||
|
||||
parser = ArgumentParser(
|
||||
description="push TLS privkey & certificate to MikroTik RouterOS router"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k", "--privkey", type=Path, required=True, help="private key file"
|
||||
)
|
||||
parser.add_argument("--cert", type=Path, required=True, help="certificate file")
|
||||
parser.add_argument(
|
||||
"--chain", type=Path, help="separate certificate chain file (optional)"
|
||||
)
|
||||
parser.add_argument("-o", "--output", type=Path, help="output file")
|
||||
args = parser.parse_args()
|
||||
|
||||
privkey_data = args.privkey.read_text()
|
||||
cert_data = args.cert.read_text()
|
||||
chain_data = args.chain.read_text() if args.chain is not None else None
|
||||
|
||||
key_passphrase = getpass.getpass("set the passphrase:")
|
||||
|
||||
pkcs12_data = PKCS12Exporter().export_pkcs12(
|
||||
privkey_data=privkey_data,
|
||||
cert_data=cert_data,
|
||||
chain_data=chain_data,
|
||||
passphrase=key_passphrase,
|
||||
)
|
||||
|
||||
if args.output:
|
||||
args.output.write_bytes(pkcs12_data)
|
||||
else:
|
||||
sys.stdout.buffer.write(pkcs12_data)
|
||||
Reference in New Issue
Block a user