Files
86Box-bios-tools/biostools/extractors.py
2025-05-13 17:35:01 -03:00

3241 lines
103 KiB
Python

#!/usr/bin/python3
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# BIOS and archive extraction classes.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#
import array, codecs, datetime, io, itertools, math, os, re, shutil, socket, struct, subprocess, sys, time, zlib
try:
import PIL.Image
except ImportError:
PIL = lambda x: x
PIL.Image = None
from . import util
class MultifileStaleException(Exception):
"""Exception raised by Extractor.multifile_lock_acquire() if the
file has gone missing after the multi-file lock was acquired."""
pass
class Extractor:
def __init__(self):
self.debug = True
self.multifile_locked = False
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract the given file into one of the destination directories:
dest_dir allows extracted files to be reprocessed in the next run,
while dest_dir_0 does not. This must return either:
- False if this extractor can't handle the given file
- True if this extractor can handle the given file, but no output was produced
- a string with the produced output file/directory path"""
raise NotImplementedError()
def debug_print(self, *args):
"""Print a log line if debug output is enabled."""
print(self.__class__.__name__ + ':', *args, file=sys.stderr)
def multifile_lock_acquire(self, file_path):
"""Acquire the global multi-file lock. The lock is automatically released
by the main module after extract() returns or raises an exception."""
self.multifile_lock.acquire()
self.multifile_locked = True
# Raise the special exception if another extractor already processed this file.
try:
return os.path.getsize(file_path)
except:
raise MultifileStaleException()
class ApricotExtractor(Extractor):
"""Extract Apricot BIOS recovery files. Only one instance of this format
(Trimond Trent) has been observed, let us know if you find any other!"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Apricot version signature.
self._apricot_pattern = re.compile(b'''@\\(#\\)Apricot ''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this isn't a slightly-bigger-than-power-of-two file.
# The only observed file has a 2071-byte header.
try:
file_size = os.path.getsize(file_path)
except:
return False
if file_size < 4096:
return False
pow2 = 1 << math.floor(math.log2(file_size))
if file_size <= pow2 or file_size > pow2 + 4096:
return False
# Look for the Apricot signature as a safety net.
if not self._apricot_pattern.search(file_header):
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Separate payload and header.
try:
# Open Apricot file.
in_f = open(file_path, 'rb')
# Read header.
header = in_f.read(file_size - pow2)
# Copy payload.
try:
out_f = open(os.path.join(dest_dir, 'apricot.bin'), 'wb')
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
out_f.close()
except:
in_f.close()
return True
# Write header.
try:
out_f = open(os.path.join(dest_dir, ':header:'), 'wb')
out_f.write(header)
out_f.close()
except:
pass
# Remove Apricot file.
in_f.close()
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class ArchiveExtractor(Extractor):
"""Extract known archive types."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Known signatures for archive files.
self._signature_pattern = re.compile(
b'''PK(?:00PK)?\\x03\\x04|''' # zip
b'''Rar!\\x1A\\x07|''' # rar
b'''7z\\xBC\\xAF\\x27\\x1C|''' # 7z
b'''MSCF|''' # cab
b'''\\x1F\\x8B|''' # gzip
b'''BZh|''' # bzip2
b'''\\xFD7zXZ\\x00|''' # xz
b'''[\\x00-\\xFF]{2}-l(?:h[0467]|z4)-|''' # lha (methods supported by 7-Zip - HACK: except lh5 due to Award)
b'''ZOO''' # zoo
)
# /dev/null handle for suppressing output.
self._devnull = open(os.devnull, 'wb')
# 7-Zip has this annoying quirk where it scans the archive's parent
# directory structure before extracting the archive itself. This
# takes a very long time if any of the parent directories has a lot
# of files. Therefore, we try to find a location as close to / as
# possible, so we can symlink the archive there and make that parent
# scan as quick as possible. Igor recognizes this is an inefficiency
# in p7zip, but even the native Linux 7-Zip 21.07 still has it...?
dirs = []
my_file_path = os.path.abspath(__file__)
for dir_path in (os.path.dirname(my_file_path), os.getcwd(), '/tmp', '/run/user/' + str(hasattr(os, 'getuid') and os.getuid() or 0)):
# Get file count for all levels of the path.
levels = []
while True:
try:
list_len = len(os.listdir(dir_path))
except:
list_len = 2 ** 32
levels.append((dir_path, list_len))
parent_dir_path = os.path.dirname(dir_path)
if parent_dir_path == dir_path:
break
dir_path = parent_dir_path
# Go through levels in ascending (therefore closest to /) order.
levels.sort()
total_count = 0
for level_dir, level_count in levels:
total_count += level_count
dirs.append((level_dir, total_count))
# Remove duplicates and sort by total children count.
dirs = list(set(dirs))
dirs.sort(key=lambda x: (x[1], x[0]))
# See where we can create a symlink.
temp_file_name = 'biostools_{0}_{1}_{2}'.format(socket.gethostname(), hex(os.getpid())[2:], hex(id(self))[2:])
self._temp_paths = []
for dir_path, dir_children in dirs:
# Test symlink creation.
link_path = os.path.join(dir_path, temp_file_name)
try:
# Create symlink and check if it was actually created.
os.symlink(my_file_path, link_path)
if os.readlink(link_path) == my_file_path:
# Test passed, add to temporary path list.
self._temp_paths.append(link_path)
except:
pass
# Remove any created symlink.
try:
os.remove(link_path)
except:
pass
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract an archive."""
# Stop if this is apparently not an archive.
match = self._signature_pattern.match(file_header)
if not match:
return False
# Do the actual extraction.
return self._extract_archive(file_path, dest_dir)
def _extract_archive(self, file_path, dest_dir, remove=True):
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Try creating temporary symlink with the archive's extension.
file_path_abs = os.path.abspath(file_path)
_, ext = os.path.splitext(file_path_abs)
link_path = file_path_abs
for temp_path in self._temp_paths:
temp_path_ext = temp_path + ext
try:
# Create symlink and check if it was actually created.
os.symlink(file_path_abs, temp_path_ext)
if os.readlink(temp_path_ext) == file_path_abs:
# Test passed, make this link the new path.
link_path = temp_path_ext
break
else:
# Remove link if it was created.
os.remove(temp_path_ext)
except:
pass
# Run 7z command to extract the archive.
# The dummy password prevents any password prompts from stalling 7z.
subprocess.run(['7z', 'x', '-y', '-aou', '-ppassword', '--', link_path], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir)
# Remove temporary symlink.
if link_path != file_path_abs:
while os.path.islink(link_path):
try:
os.remove(link_path)
except:
break
# Assume failure if nothing was extracted.
files_extracted = os.listdir(dest_dir)
if len(files_extracted) < 1:
self.debug_print('Extraction produced no files:', file_path)
return False
# Rename single file. (gzip/bzip2/etc.)
if len(files_extracted) == 1 and link_path != file_path_abs:
link_name = os.path.splitext(os.path.basename(link_path))[0]
if files_extracted[0][:len(link_name)] == link_name:
try:
shutil.move(os.path.join(dest_dir, files_extracted[0]), os.path.join(dest_dir, os.path.splitext(os.path.basename(file_path))[0] + files_extracted[0][len(link_name):]))
except:
pass
# Remove archive file.
if remove:
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class ASTExtractor(Extractor):
"""Extract AST BIOS flash floppy images. These appear to contain a specially
crafted FAT filesystem, likely with static sector offsets for the payload,
so we work on the entire image before FATExtractor has a chance to claim it."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# AST flash signature.
self._ast_start_pattern = re.compile(b'''This is a flash update from AST Research, Inc\\.''')
self._ast_payload_pattern = re.compile(b'''AST FLASH UPDATE''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this file is too small.
try:
file_size = os.path.getsize(file_path)
except:
return False
if file_size <= 0x9083:
return False
# Look for the AST signatures.
if not self._ast_start_pattern.match(file_header[0x4200:0x422e]):
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Open AST image.
try:
with open(file_path, 'rb') as in_f:
# Skip the initial 72 sectors.
in_f.seek(0x9000)
# Copy payload.
header = b''
dest_file_path = os.path.join(dest_dir, 'ast.bin')
try:
with open(dest_file_path, 'wb') as out_f:
data = remaining = True
while data and remaining > 0:
payload_size = 15 * 512
if data == True:
# Check the header on the first payload sector.
header += in_f.read(0x83)
if not self._ast_payload_pattern.match(header[:0x10]):
raise Exception('missing header')
# Subtract header from payload.
remaining, = struct.unpack('<I', header[-5:-1])
payload_size -= 0x83
# Copy the next 15 sectors of payload.
data = in_f.read(min(payload_size, remaining))
out_f.write(data)
remaining -= len(data)
# Skip the next 3 blank sectors.
in_f.seek(3 * 512, 1)
except:
try:
os.remove(dest_file_path)
except:
pass
return True
# Write header.
try:
with open(os.path.join(dest_dir, ':header:'), 'wb') as out_f:
out_f.write(header)
except:
pass
except:
pass
# Remove AST image.
os.remove(file_path)
# Return destination directory path.
return dest_dir
class BIOSExtractor(Extractor):
"""Extract a bios_extract-compatible BIOS file."""
# BIOS entrypoint signatures (faster search)
_entrypoint_pattern = re.compile(
b'''\\xEA[\\x00-\\xFF]{2}\\x00\\xF0|''' # typical AMI/Award/Phoenix
b'''\\x0F\\x09\\xE9|''' # Intel AMIBIOS 6
b'''\\xE9[\\x00-\\xFF]{2}\\x00{5}''' # weird Intel (observed in SRSH4)
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Fallback BIOS signatures (slower search), based on bios_extract.c
self._signature_pattern = re.compile(
b'''AMI(?:BIOS(?: \\(C\\)1993 American Megatrends Inc.,| W 0[45]|C0[6789]|C\\x00{4})|BOOT ROM|EBBLK| Flash Utility for DOS Command mode\\.)|'''
b'''SUPER ROM|'''
b'''\\$ASUSAMI\\$|'''
b'''= Award Decompression Bios =|'''
b'''awardext.rom|'''
b'''Phoenix Technologies|'''
b'''IBM AT Compatible Phoenix NuBIOS|'''
b'''[\\xEE\\xFF]\\x88SYSBIOS|'''
b'''\\xEE\\x88\\x42IOS SCU'''
)
# Workaround for an annoying PhoenixNet entry type where the size field is wrong (compressed?)
fn = b'''[^\\x01-\\x1F\\x7F-\\xFF\\\\/:\\*\\?"<>\\|]'''
self._phoenixnet_workaround_pattern = re.compile(
fn + b'''(?:\\x00{7}|''' +
fn + b'''(?:\\x00{6}|''' +
fn + b'''(?:\\x00{5}|''' +
fn + b'''(?:\\x00{4}|''' +
fn + b'''(?:\\x00{3}|''' +
fn + b'''(?:\\x00{2}|''' +
fn + b'''(?:\\x00{1}|''' +
fn + b''')))))))''' +
fn + b'''(?:\\x00{2}|''' +
fn + b'''(?:\\x00{1}|''' +
fn + b'''))'''
)
# Path to the bios_extract utility.
self._bios_extract_path = os.path.abspath(os.path.join('bios_extract', 'bios_extract'))
if not os.path.exists(self._bios_extract_path):
self._bios_extract_path = None
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if bios_extract is not available.
if not self._bios_extract_path:
return False
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Stop if no BIOS signatures are found.
if not BIOSExtractor._entrypoint_pattern.match(file_header[-16:]) and not self._signature_pattern.search(file_header):
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir_0):
return True
# Start bios_extract process.
file_path_abs = os.path.abspath(file_path)
try:
proc = subprocess.run([self._bios_extract_path, file_path_abs], timeout=30, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dest_dir_0)
except:
# Bad data can cause infinite loops.
proc = None
self.debug_print('Processing timed out on:', file_path)
if proc and proc.returncode not in (0, 1, 86):
self.debug_print('Bad return code:', proc.returncode)
# Assume failure if nothing was extracted. A lone remainder file also counts as a failure.
dest_dir_files = os.listdir(dest_dir_0)
num_files_extracted = len(dest_dir_files)
if num_files_extracted < 1:
self.debug_print('Extraction produced no files:', file_path)
return False
elif num_files_extracted == 1 and dest_dir_files[0] == 'remainder.rom':
# Remove remainder file so that the destination directory can be rmdir'd later.
self.debug_print('Extraction only produced remainder file:', file_path)
util.remove_all(dest_dir_files, lambda x: os.path.join(dest_dir_0, x))
return False
elif proc and proc.returncode == 86:
# We received the magic exit code that tells us the Intel pipeline found
# an option ROM but not the main body. This could indicate a non-Intel
# BIOS with LH5-compressed option ROMs. Check the files just in case.
have_intelopt = have_intelbody = False
for dest_dir_file in dest_dir_files:
if dest_dir_file[:9] in ('intelopt_', 'intelunk_'):
have_intelopt = True
elif dest_dir_file[:10] == 'intelbody_':
have_intelbody = True
break
if have_intelopt and not have_intelbody:
# Remove all files so that the destination directory can be rmdir'd later.
self.debug_print('Extraction produced Intel option ROM without main body:', file_path)
util.remove_all(dest_dir_files, lambda x: os.path.join(dest_dir_0, x))
return False
# A missing remainder.rom may indicate an extraction interrupted by a segfault
# or something else gone wrong. Copy the original file to its place for safety.
if 'remainder.rom' not in dest_dir_files:
self.debug_print('Creating remainder stand-in for:', file_path)
util.hardlink_or_copy(file_path, os.path.join(dest_dir_0, 'remainder.rom'))
# Remove extraneous files containing Intel body remains. (Batman's Revenge 04/15/1994)
if not proc or b'intelbody_' in proc.stdout:
intel_bodies = [dest_dir_file for dest_dir_file in dest_dir_files if dest_dir_file[:10] == 'intelbody_']
if len(intel_bodies) > 1:
# Get size for all body files.
for x in range(len(intel_bodies)):
try:
body_size = os.path.getsize(os.path.join(dest_dir_0, intel_bodies[x]))
except:
body_size = 0
intel_bodies[x] = (body_size, intel_bodies[x])
# Remove all but the largest body file.
intel_bodies.sort(reverse=True)
self.debug_print('Keeping Intel body file', intel_bodies[0], 'and discarding', intel_bodies[1:])
util.remove_all(intel_bodies[1:], lambda x: os.path.join(dest_dir_0, x[1]))
# Remove removed files from file list.
for _, body_name in intel_bodies[1:]:
dest_dir_files.remove(body_name)
# Extract Award BIOS PhoenixNet ROS filesystem.
if not proc or b'Found Award BIOS.' in proc.stdout:
for dest_dir_file in dest_dir_files:
# Read and check for ROS header.
dest_dir_file_path = os.path.join(dest_dir_0, dest_dir_file)
if not os.path.isfile(dest_dir_file_path):
continue
with open(dest_dir_file_path, 'rb') as in_f:
if in_f.read(3) == b'ROS':
self.debug_print('Extracting PhoenixNet ROS:', dest_dir_file)
# Create new destination directory for the expanded ROS.
dest_dir_ros = os.path.join(dest_dir_0, dest_dir_file + ':')
if util.try_makedirs(dest_dir_ros):
# Skip initial header.
in_f.seek(32)
# Parse file entries.
while True:
# Read file entry header.
header = in_f.read(32)
if len(header) != 32:
break
file_size, = struct.unpack('<H', header[10:12])
# Read data.
if header[28] & 0x10:
# Workaround for an annoying entry type where the size field is wrong (compressed?)
pos = in_f.tell()
data = in_f.read(65536 + 32)
match = self._phoenixnet_workaround_pattern.search(data)
if match:
file_size = match.start(0) - 17
in_f.seek(pos + file_size)
data = data[:file_size]
else:
in_f.seek(0, 2)
else:
data = in_f.read(file_size)
# Generate a file name.
file_name = (util.read_string(header[17:25]) + '.' + util.read_string(header[25:28])).replace('/', '\\')
# Write data.
if len(file_name) > 1:
self.debug_print('ROS file:', file_name)
with open(os.path.join(dest_dir_ros, file_name), 'wb') as out_f:
out_f.write(data)
# Run image converter on the destination directory.
self.image_extractor.convert_inline(os.listdir(dest_dir_ros), dest_dir_ros)
# Don't remove ROS as the analyzer uses it for PhoenixNet detection.
# Just remove the destination directory if it's empty.
util.rmdirs(dest_dir_ros)
# Convert any BIOS logo images in-line (to the same destination directory).
self.image_extractor.convert_inline(dest_dir_files, dest_dir_0)
# Create flag file on the destination directory for the analyzer to
# treat it as a big chunk of data.
open(os.path.join(dest_dir_0, ':combined:'), 'wb').close()
# Hardlink or copy any header file to extracted directory, to help with
# identifying Intel BIOSes. See AMIAnalyzer.can_handle for more information.
parent_header = os.path.join(os.path.dirname(file_path_abs), ':header:')
if os.path.exists(parent_header):
util.hardlink_or_copy(parent_header, os.path.join(dest_dir_0, ':header:'))
# Remove BIOS file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir_0
class CPUZExtractor(Extractor):
"""Extract CPU-Z BIOS dump reports."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Patterns for parsing a report hex dump.
self._cpuz_pattern = re.compile(b'''CPU-Z version\\t+([^\\r\\n]+)''')
self._hex_pattern = re.compile(b'''[0-9A-F]+\\t((?:[0-9A-F]{2} ){16})\\t''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is not a CPU-Z dump.
cpuz_match = self._cpuz_pattern.search(file_header)
if not cpuz_match:
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Convert hex back to binary.
try:
f = open(os.path.join(dest_dir, 'cpuz.bin'), 'wb')
for match in self._hex_pattern.finditer(file_header):
f.write(codecs.decode(match.group(1).replace(b' ', b''), 'hex'))
f.close()
except:
return True
# Create header file with the CPU-Z version string.
try:
f = open(os.path.join(dest_dir, ':header:'), 'wb')
f.write(cpuz_match.group(1))
f.close()
except:
pass
# Remove report file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class DellExtractor(Extractor):
"""Extract Dell/Phoenix ROM BIOS PLUS images.
Based on dell_inspiron_1100_unpacker.py"""
def _memcpy(self, arr1, off1, arr2, off2, count):
while count:
if off1 < len(arr1):
try:
arr1[off1] = arr2[off2]
except:
break
elif off1 >= len(arr1):
while off1 >= len(arr1):
arr1.append(0xFF)
continue
else:
break
off1 += 1
off2 += 1
count -= 1
def _dell_unpack(self, indata):
srcoff = 0
dstoff = 0
src = bytearray(indata)
dst = bytearray()
inlen = len(indata)
while srcoff < inlen:
b = src[srcoff]
nibl, nibh = b & 0x0F, (b >> 4) & 0x0F
srcoff += 1
if nibl:
if nibl == 0xF:
al = src[srcoff]
ah = src[srcoff+1]
srcoff += 2
cx = nibh | (ah << 4)
count = (cx & 0x3F) + 2
delta = ((ah >> 2) << 8) | al
else:
count = nibl + 1
delta = (nibh << 8) | src[srcoff]
srcoff += 1
self._memcpy(dst, dstoff, dst, dstoff - delta - 1, count)
dstoff += count
elif nibh == 0x0E:
count = src[srcoff] + 1
srcoff += 1
self._memcpy(dst, dstoff, dst, dstoff - 1, count)
dstoff += count
else:
if nibh == 0x0F:
count = src[srcoff] + 15
srcoff += 1
else:
count = nibh + 1
self._memcpy(dst, dstoff, src, srcoff, count)
dstoff += count
srcoff += count
return dst
def _dell_unpack_alt(self, indata):
srcoff = 0
dstoff = 0
src = bytearray(indata)
dst = bytearray()
inlen = len(indata)
while srcoff < inlen:
b = src[srcoff]
nibl, nibh = b & 0x0F, (b >> 4) & 0x0F
srcoff += 1
if nibl:
count = nibl + 1
delta = (nibh << 8) | src[srcoff]
srcoff += 1
self._memcpy(dst, dstoff, dst, dstoff - delta - 1, count)
dstoff += count
else:
count = nibh + 1
self._memcpy(dst, dstoff, src, srcoff, count)
dstoff += count
srcoff += count
return dst
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Stop if this is not the type of BIOS we're looking for.
copyright_string = b'\xF0\x00Copyright 1985-\x02\x04\xF0\x0F8 Phoenix Technologies Ltd.'
alt_mode = False
offset = file_header.find(copyright_string)
if offset < 5:
alt_mode = True
copyright_string = b'\xE0Copyright 1985-\x02\x04\xF08 Phoenix Techno\xD0logies Ltd.'
offset = file_header.find(copyright_string)
if offset < 2:
copyright_string = b'Copyright 1985-1988 Phoenix Technologies Ltd.'
offset = file_header.find(copyright_string)
if offset > 2 and (offset & 0xffff) == 0 and file_header[2] == 0xf0: # partial compression (OptiPlex 5xx)
offset = 2
else:
return False
# Determine the length format.
if alt_mode:
# 16-bit length (no module type prefix) with different compression.
length_size = 2
struct_format = '<0sH'
elif file_header[offset - 5] == 1:
# 32-bit length.
length_size = 5
struct_format = '<BI'
elif file_header[offset - 3] == 1:
# 16-bit length.
length_size = 3
struct_format = '<BH'
else:
# Unknown length format.
return False
offset -= length_size
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir_0):
return True
# Create flag file on the destination directory for the analyzer to
# treat it as a big chunk of data.
open(os.path.join(dest_dir_0, ':combined:'), 'wb').close()
# Extract any preceding data as EC code.
if offset > 0:
self.debug_print('Extracting', offset, 'bytes of EC code')
f = open(os.path.join(dest_dir_0, 'ec.bin'), 'wb')
f.write(file_header[:offset])
f.close()
# Extract modules.
file_size = len(file_header)
module_number = 0
while (offset + length_size) < file_size:
# Read module type and length.
module_type, module_length = struct.unpack(struct_format, file_header[offset:offset + length_size])
if (alt_mode and module_length in (0, 0xFFFF)) or module_type == 0xFF:
break
self.debug_print('Extracting module number', module_number, 'type', module_type, 'size', module_length)
offset += length_size
# Decompress data if required.
data = file_header[offset:offset + module_length]
if module_type != 0x0C:
try:
data = (alt_mode and self._dell_unpack_alt or self._dell_unpack)(data)
if len(data) == 0:
self.debug_print('Extraction produced blank output')
except:
self.debug_print('Extraction failed')
offset += module_length
# Write module.
f = open(os.path.join(dest_dir_0, 'module_{0:02}.bin'.format(module_number)), 'wb')
f.write(data)
f.close()
# Increase filename counter.
module_number += 1
# Extract remainder if applicable.
if offset < file_size:
try:
f = open(os.path.join(dest_dir_0, 'remainder.bin'), 'wb')
f.write(file_header[offset:])
f.close()
except:
pass
# Create header file with the copyright string, to tell the analyzer
# this BIOS went through this extractor.
try:
f = open(os.path.join(dest_dir_0, ':header:'), 'wb')
f.write(copyright_string)
f.close()
except:
pass
# Remove BIOS file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir_0
class DiscardExtractor(Extractor):
"""Detect and discard known non-useful file types."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# File signatures to discard.
self._signature_pattern = re.compile(
# images
b'''\\x0A[\\x00-\\x05][\\x00-\\x01][\\x01\\x02\\x04\\x08]|''' # PCX
b'''BM|''' # BMP
b'''\\xFF\\xD8\\xFF|''' # JPEG
b'''GIF8|''' # GIF
b'''\\x89PNG|''' # PNG
# documents
b'''%PDF|''' # PDF
b'''\\xD0\\xCF\\x11\\xE0\\xA1\\xB1\\x1A\\xE1|''' # Office (mszip)
b'''\\x3F\\x5F\\x03\\x00|''' # WinHelp
b'''<(?:\\![Dd][Oo][Cc][Tt][Yy][Pp][Ee]|[Hh][Tt][Mm][Ll])[ >]|''' # HTML (a cursory check ought not to upset anyone)
# executables
b'''(\\x7FELF)|''' # ELF
# reports
b'''CPU-Z TXT Report|\\s{7}File: A|-+\\[ AIDA32 |HWiNFO64 Version |3DMARK2001 PROJECT|Report Dr. Hardware|'''
b'''\\r\\n(?:\\s+HWiNFO v|\\r\\n\\s+\\r\\n\\s+Microsoft Diagnostics version )|'''
b'''SIV[^\\s]+ - System Information Viewer V|UID,Name,Score,'''
)
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Determine if this is a known non-useful file type through the signature pattern.
match = self._signature_pattern.match(file_header)
if match:
# Don't discard LinuxBIOS ELFs.
if match.group(1) and file_header[128:136] == b'ELFBoot\x00':
return False
# Remove file and stop.
try:
os.remove(file_path)
except:
pass
return True
# Not a known file type, cleared to go.
return False
class ImageExtractor(Extractor):
"""Extract BIOS logo images by converting them into PNG."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Standard EGA/VGA palette for v1 and palette-less v2 Award EPAs.
self._vga_palette = [
0x000000, 0x0000aa, 0x00aa00, 0x00aaaa, 0xaa0000, 0xaa00aa, 0xaa5500, 0xaaaaaa, 0x555555, 0x5555ff, 0x55ff55, 0x55ffff, 0xff5555, 0xff55ff, 0xffff55, 0xffffff,
0x000000, 0x101010, 0x202020, 0x353535, 0x454545, 0x555555, 0x656565, 0x757575, 0x8a8a8a, 0x9a9a9a, 0xaaaaaa, 0xbababa, 0xcacaca, 0xdfdfdf, 0xefefef, 0xffffff,
0x0000ff, 0x4100ff, 0x8200ff, 0xbe00ff, 0xff00ff, 0xff00be, 0xff0082, 0xff0041, 0xff0000, 0xff4100, 0xff8200, 0xffbe00, 0xffff00, 0xbeff00, 0x82ff00, 0x41ff00,
0x00ff00, 0x00ff41, 0x00ff82, 0x00ffbe, 0x00ffff, 0x00beff, 0x0082ff, 0x0041ff, 0x8282ff, 0x9e82ff, 0xbe82ff, 0xdf82ff, 0xff82ff, 0xff82df, 0xff82be, 0xff829e,
0xff8282, 0xff9e82, 0xffbe82, 0xffdf82, 0xffff82, 0xdfff82, 0xbeff82, 0x9eff82, 0x82ff82, 0x82ff9e, 0x82ffbe, 0x82ffdf, 0x82ffff, 0x82dfff, 0x82beff, 0x829eff,
0xbabaff, 0xcabaff, 0xdfbaff, 0xefbaff, 0xffbaff, 0xffbaef, 0xffbadf, 0xffbaca, 0xffbaba, 0xffcaba, 0xffdfba, 0xffefba, 0xffffba, 0xefffba, 0xdfffba, 0xcaffba,
0xbaffba, 0xbaffca, 0xbaffdf, 0xbaffef, 0xbaffff, 0xbaefff, 0xbadfff, 0xbacaff, 0x000071, 0x1c0071, 0x390071, 0x550071, 0x710071, 0x710055, 0x710039, 0x71001c,
0x710000, 0x711c00, 0x713900, 0x715500, 0x717100, 0x557100, 0x397100, 0x1c7100, 0x007100, 0x00711c, 0x007139, 0x007155, 0x007171, 0x005571, 0x003971, 0x001c71,
0x393971, 0x453971, 0x553971, 0x613971, 0x713971, 0x713961, 0x713955, 0x713945, 0x713939, 0x714539, 0x715539, 0x716139, 0x717139, 0x617139, 0x557139, 0x457139,
0x397139, 0x397145, 0x397155, 0x397161, 0x397171, 0x396171, 0x395571, 0x394571, 0x515171, 0x595171, 0x615171, 0x695171, 0x715171, 0x715169, 0x715161, 0x715159,
0x715151, 0x715951, 0x716151, 0x716951, 0x717151, 0x697151, 0x617151, 0x597151, 0x517151, 0x517159, 0x517161, 0x517169, 0x517171, 0x516971, 0x516171, 0x515971,
0x000041, 0x100041, 0x200041, 0x310041, 0x410041, 0x410031, 0x410020, 0x410010, 0x410000, 0x411000, 0x412000, 0x413100, 0x414100, 0x314100, 0x204100, 0x104100,
0x004100, 0x004110, 0x004120, 0x004131, 0x004141, 0x003141, 0x002041, 0x001041, 0x202041, 0x282041, 0x312041, 0x392041, 0x412041, 0x412039, 0x412031, 0x412028,
0x412020, 0x412820, 0x413120, 0x413920, 0x414120, 0x394120, 0x314120, 0x284120, 0x204120, 0x204128, 0x204131, 0x204139, 0x204141, 0x203941, 0x203141, 0x202841,
0x2d2d41, 0x312d41, 0x352d41, 0x3d2d41, 0x412d41, 0x412d3d, 0x412d35, 0x412d31, 0x412d2d, 0x41312d, 0x41352d, 0x413d2d, 0x41412d, 0x3d412d, 0x35412d, 0x31412d,
0x2d412d, 0x2d4131, 0x2d4135, 0x2d413d, 0x2d4141, 0x2d3d41, 0x2d3541, 0x2d3141, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000
]
# Header pattern for common format images.
self._pil_pattern = re.compile(
b'''\\x0A[\\x00-\\x05][\\x00-\\x01][\\x01\\x02\\x04\\x08]|''' # PCX
b'''BM(?!\\x00{3})[\\x00-\\xFF]{3}\\x00[\\x00-\\xFF]{4}(?!\\x00{3})[\\x00-\\xFF]{3}\\x00|''' # BMP (limited to 16 MB size and start offset)
b'''\\xFF\\xD8\\xFF|''' # JPEG
b'''GIF8[79]a|''' # GIF
b'''\\x89PNG''' # PNG
)
def convert_inline(self, dest_dir_files, dest_dir_0):
# Detect and convert image files.
for dest_dir_file in dest_dir_files:
# Read 64 KB, which is enough to ascertain any potential logo type,
# even if embedded in the file. (Monorail SiS 550x: PCX in AMI module)
dest_dir_file_path = os.path.join(dest_dir_0, dest_dir_file)
if os.path.isdir(dest_dir_file_path) or dest_dir_file == ':header:':
continue
f = open(dest_dir_file_path, 'rb')
dest_dir_file_header = f.read(65536)
f.close()
# Run ImageExtractor.
image_dest_dir = dest_dir_file_path + ':'
if self.extract(dest_dir_file_path, dest_dir_file_header, image_dest_dir, image_dest_dir, any_offset=True):
# Remove destination directory if it was created but is empty.
util.rmdirs(image_dest_dir)
def extract(self, file_path, file_header, dest_dir, dest_dir_0, any_offset=False):
# Stop if PIL is not available or this file is too small.
if not PIL.Image or len(file_header) < 16:
return False
# Determine if this is an image, and which type it is.
func = None
image_data_offset = 0
if file_header[:4] == b'AWBM':
# Get width and height for a v2 EPA.
width, height = struct.unpack('<HH', file_header[4:8])
# Determine if this file is a 4-bit or 8-bit EPA according to the file size.
try:
file_size = os.path.getsize(file_path)
except:
file_size = len(file_header)
if file_size >= 8 + (width * height):
func = self._convert_epav2_8b
else:
func = self._convert_epav2_4b
elif file_header[:2] == b'PG':
# Get width and height for a Phoenix Graphics image.
width, height = struct.unpack('<HH', file_header[10:14])
if width == 0 and height == 0 and file_header[2:17] == b'\x09\x00\x00\x80\x02\x16\x00\x00\x00\x00\x00\x00\x00\x00\x0F':
# Ignore invalid image at the beginning of the compressed payload (Micronics Tigercat)
if not any_offset or os.path.basename(file_path) == 'remainder.rom':
return False
# Some HP 4.0R6 have a width and height of 0 on 640x480 images.
width = 640
height = 480
# Check if the file is actually paletted, as some
# images (PhoenixNet) are incorrectly set as paletted.
paletted = file_header[3] != 0
payload_size = math.ceil((width * height) / 2)
if paletted:
try:
file_size = os.path.getsize(file_path)
except:
file_size = len(file_header)
if file_size > 18 + payload_size:
palette_size, = struct.unpack('<H', file_header[10:12])
file_header += util.read_complement(file_path, file_header, max_size=len(file_header) + (4 * palette_size))
post_palette = file_header[12 + (4 * palette_size):16 + (4 * palette_size)]
if len(post_palette) == 4:
width, height = struct.unpack('<HH', post_palette)
payload_size = math.ceil((width * height) / 2)
if file_size >= 20 + (4 * palette_size) + payload_size:
# Special marker that the palette should be read.
width = -width
if width != 0 and height != 0:
func = self._convert_pgx
if not func:
# Determine if this file has valid dimensions and is the right size for a v1 EPA.
width, height = struct.unpack('BB', file_header[:2])
if width < 80 and height < 25 and len(file_header) == 72 + (15 * width * height):
func = self._convert_epav1
else:
# Determine if this is a common image format.
match = (any_offset and self._pil_pattern.search or self._pil_pattern.match)(file_header)
if match:
func = self._convert_pil
image_data_offset = match.start(0)
self.debug_print('PIL signature', match.group(0), 'on', file_path, '@', hex(image_data_offset))
else:
# Stop if this is not an image.
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir_0):
return True
# Read up to 16 MB (+ data offset) as a safety net.
max_size = 16777216 + image_data_offset
file_header += util.read_complement(file_path, file_header, max_size)
# Stop if the file was cut off, preventing parsing exceptions.
if len(file_header) == max_size:
return True
# Run extractor function, and stop if it was not successful.
self.debug_print('Calling', func.__name__, 'on', file_path)
ret = func(file_header, image_data_offset, width, height, dest_dir_0)
if not ret:
return True
# Remove original file if it's the entire image (with a maximum margin of 1 byte).
if ret == True or len(file_header) - ret <= 1:
try:
os.remove(file_path)
except:
pass
return dest_dir_0
def _convert_epav1(self, file_data, image_data_offset, width, height, dest_dir_0):
# Write file type as a header.
self._write_type(dest_dir_0, 'EPA v1')
# Fill color map.
color_map = []
index = 2
for x in range(width * height):
# Read character cell color information.
color = file_data[index]
index += 1
# Save RGB background and foreground color.
color_map.append((self._vga_palette[color & 0x0f], self._vga_palette[color >> 4]))
# Create output image.
image = PIL.Image.new('RGB', (width * 8, height * 14))
# Read image data.
for y in range(height):
for x in range(width):
# Determine foreground/background colors for this character cell.
fg_color, bg_color = color_map.pop(0)
# Read the 14 row bitmaps.
for cy in range(14):
# Stop row bitmap processing if the file is truncated.
if index >= len(file_data):
width = height = 0
break
# Read bitmap byte.
bitmap = file_data[index]
index += 1
# Parse the foreground/background bitmap.
for cx in range(8):
# Determine palette color and write pixel.
color = (bitmap & (1 << cx)) and fg_color or bg_color
image.putpixel(((x * 8) + (7 - cx), (y * 14) + cy),
((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff))
# Stop column processing if the file is truncated.
if width == 0 or len(color_map) == 0:
break
# Stop row processing if the file is truncated.
if height == 0 or len(color_map) == 0:
break
# Save output image.
return self._save_image(image, dest_dir_0)
def _convert_epav2_4b(self, file_data, image_data_offset, width, height, dest_dir_0):
# Read palette if the file contains one, while
# writing the file type as a header accordingly.
palette = self._read_palette_epav2(file_data, -52, False)
if palette:
self._write_type(dest_dir_0, 'EPA v2 4-bit (with palette)')
else:
self._write_type(dest_dir_0, 'EPA v2 4-bit (without palette)')
# Use standard EGA palette.
palette = self._vga_palette
# Create output image.
image = PIL.Image.new('RGB', (width, height))
# Read image data.
index = 8
bitmap_width = math.ceil(width / 8)
for y in range(height):
for x in range(bitmap_width):
# Stop column processing if the file is truncated.
if index + x + (bitmap_width * 3) >= len(file_data):
index = 0
break
for cx in range(8):
# Skip this pixel if it's outside the image width.
output_x = (x * 8) + cx
if output_x >= width:
continue
# Read color values. Each bit is stored in a separate bitmap.
pixel = (file_data[index + x] >> (7 - cx)) & 1
pixel |= ((file_data[index + x + bitmap_width] >> (7 - cx)) & 1) << 1
pixel |= ((file_data[index + x + (bitmap_width * 2)] >> (7 - cx)) & 1) << 2
pixel |= ((file_data[index + x + (bitmap_width * 3)] >> (7 - cx)) & 1) << 3
# Determine palette color and write pixel.
if pixel > len(palette):
pixel = len(palette) - 1
color = palette[pixel]
image.putpixel((output_x, y),
((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff))
# Stop row processing if the file is truncated.
if index == 0:
break
# Move on to the next set of 4 bitmaps.
index += bitmap_width * 4
# Save output image.
return self._save_image(image, dest_dir_0)
def _convert_epav2_8b(self, file_data, image_data_offset, width, height, dest_dir_0):
# Read palette if the file contains one, while
# writing the file type as a header accordingly.
palette = self._read_palette_epav2(file_data, -772)
if palette:
self._write_type(dest_dir_0, 'EPA v2 8-bit (with palette)')
else:
self._write_type(dest_dir_0, 'EPA v2 8-bit (without palette)')
# Use standard VGA palette.
palette = self._vga_palette
# Create output image.
image = PIL.Image.new('RGB', (width, height))
# Read image data.
index = 8
for y in range(height):
for x in range(width):
# Read pixel.
pixel = file_data[index]
index += 1
# Determine palette color and write pixel.
if pixel > len(palette):
pixel = len(palette) - 1
color = palette[pixel]
image.putpixel((x, y),
((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff))
# Save output image.
return self._save_image(image, dest_dir_0)
def _convert_pgx(self, file_data, image_data_offset, width, height, dest_dir_0):
# Read palette if the file contains one, while
# writing the file type as a header accordingly.
if width < 0:
# Normalize width.
width = -width
# Read palette.
palette_size, = struct.unpack('<H', file_data[10:12])
palette = self._vga_palette[::] # start with standard EGA palette
palette_index = 0
index = 12
while palette_index < palette_size:
palette_color = file_data[index:index + 4]
if len(palette_color) != 4:
break
palette[palette_index], = struct.unpack('>I', palette_color) # shortcut to parse _RGB value
palette_index += 1
index += 4
self._write_type(dest_dir_0, 'PGX (with {0}-color palette)'.format(palette_size))
else:
# Use standard EGA palette.
palette = self._vga_palette
self._write_type(dest_dir_0, 'PGX (without palette)')
# Create output image.
image = PIL.Image.new('RGB', (width, height))
# Read image data. This looks a lot like EPA v2 4-bit but it's slightly different.
index = 18
bitmap_width = math.ceil(width / 8)
bitmap_size = height * bitmap_width
for y in range(height):
for x in range(bitmap_width):
# Stop column processing if the file is truncated.
if index + x + (bitmap_size * 3) >= len(file_data):
index = 0
break
for cx in range(8):
# Skip this pixel if it's outside the image width.
output_x = (x * 8) + cx
if output_x >= width:
continue
# Read color values. Each bit is stored in a separate bitmap.
pixel = (file_data[index + x] >> (7 - cx)) & 1
pixel |= ((file_data[index + x + bitmap_size] >> (7 - cx)) & 1) << 1
pixel |= ((file_data[index + x + (bitmap_size * 2)] >> (7 - cx)) & 1) << 2
pixel |= ((file_data[index + x + (bitmap_size * 3)] >> (7 - cx)) & 1) << 3
# Determine palette color and write pixel.
if pixel > len(palette):
pixel = len(palette) - 1
color = palette[pixel]
image.putpixel((output_x, y),
((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff))
# Stop row processing if the file is truncated.
if index == 0:
break
# Move on to the next line in the 4 bitmaps.
index += bitmap_width
# Save output image.
return self._save_image(image, dest_dir_0)
def _convert_pil(self, file_data, image_data_offset, width, height, dest_dir_0):
# Load image.
try:
file_data_io = io.BytesIO(file_data[image_data_offset:])
image = PIL.Image.open(file_data_io)
# Don't save image if it's too small.
x, y = image.size
if (x * y) < 10000:
raise Exception('too small')
except:
self.debug_print('PIL open failed')
return False
# Write the file type as a header.
self._write_type(dest_dir_0, image.format)
# Save output image.
if image.format in ('GIF', 'PNG', 'JPEG'):
if image.format == 'JPEG':
ext = 'jpg'
else:
ext = image.format.lower()
try:
f = open(os.path.join(dest_dir_0, 'image.' + ext), 'wb')
f.write(file_data[image_data_offset:])
f.close()
return True
except:
self.debug_print('As-is copy failed')
return False
elif self._save_image(image, dest_dir_0):
return file_data_io.tell()
else:
return False
def _read_palette_epav2(self, file_data, rgbs_offset, rgb=True):
# Stop if this file has no palette.
if file_data[rgbs_offset:rgbs_offset + 4] != b'RGB ':
return None
# Read 6-bit palette entries, while converting to 8-bit.
palette = []
index = rgbs_offset + 4
while index < 0:
palette.append((file_data[index] << (rgb and 18 or 2)) |
(file_data[index + 1] << 10) |
(file_data[index + 2] << (rgb and 2 or 18)))
index += 3
return palette
def _save_image(self, image, dest_dir_0):
# Save image to destination directory.
image_path = os.path.join(dest_dir_0, 'image.png')
try:
image.save(image_path)
return True
except:
self.debug_print('PIL save failed')
# Clean up.
try:
os.remove(image_path)
except:
pass
try:
os.remove(os.path.join(dest_dir_0, ':header:'))
except:
pass
return False
def _write_type(self, dest_dir_0, identifier):
self.debug_print('Type:', identifier)
try:
f = open(os.path.join(dest_dir_0, ':header:'), 'w')
f.write(identifier)
f.close()
except:
pass
class FATExtractor(ArchiveExtractor):
"""Extract FAT disk images."""
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Determine if this is a FAT filesystem.
# Stop if this file is too small.
if len(file_header) < 512:
return False
# Stop if this doesn't appear to be a FAT filesystem.
if not self._is_fat(file_header):
# Check for 20-byte Unisys header followed by FAT filesystem.
# Only 4 samples (from the Aquanta line) were found, the header is identical across all of them.
if file_header[:20] == b'\x1A\x12\x34\x1A\x0E\x00\x00\x01\x01\x00\x04\x00\x02\x00\x12\x00\x02\x00\x50\x00' and self._is_fat(file_header[20:]):
self.debug_print('Unisys header found')
return self._extract_payload(file_path, dest_dir, 20, 'unisys.bin')
# Check for 4-byte AST header followed by FAT filesystem.
ast_size, unknown = struct.unpack('<HH', file_header[:4])
try:
file_size = os.path.getsize(file_path)
except:
file_size = 2 ** 32
if (ast_size * 512) <= (file_size - 4) and self._is_fat(file_header[4:]):
self.debug_print('AST size', hex(ast_size), 'sectors, unknown field', hex(unknown))
return self._extract_payload(file_path, dest_dir, 4, 'ast.bin')
return False
# Inject the 55 AA signature (expected by 7-Zip) on images that don't have it.
if file_header[510:512] != b'\x55\xAA':
try:
with open(file_path, 'r+b') as f:
f.seek(510)
f.write(b'\x55\xAA')
except:
pass
# Extract this as an archive.
return self._extract_archive(file_path, dest_dir)
def _is_fat(self, file_header):
# Check for bootstrap jump.
if (file_header[0] != 0xEB or file_header[2] != 0x90) and file_header[0] != 0xE9:
return False
# Check for media descriptor type.
if file_header[21] < 0xF0:
return False
return True
def _extract_payload(self, file_path, dest_dir, header_size, dest_file_name):
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Separate payload and header.
try:
# Open file.
with open(file_path, 'rb') as in_f:
# Read header.
header = in_f.read(header_size)
# Copy payload.
try:
with open(os.path.join(dest_dir, dest_file_name), 'wb') as out_f:
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
except:
return True
# Write header.
try:
with open(os.path.join(dest_dir, ':header:'), 'wb') as out_f:
out_f.write(header)
except:
pass
# Remove file.
os.remove(file_path)
except:
pass
# Return destination directory.
return dest_dir
class HexExtractor(Extractor):
"""Extract Intel HEX format ROMs."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Signatures for parsing a HEX.
self._hex_start_pattern = re.compile(b''':(?:[0-9A-F]{2}){1,}\\r?\\n''')
self._hex_eof_pattern = re.compile(b''':00[0-9A-F]{4}01[0-9A-F]{2}\\r?\\n?$''')
self._hex_data_pattern = re.compile(b''':([0-9A-F]{2})([0-9A-F]{4})00([0-9A-F]{2,})\\r?\\n''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is not a HEX.
if not self._hex_start_pattern.match(file_header):
return False
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Stop if no EOF was found.
if not self._hex_eof_pattern.search(file_header):
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
try:
# Create destination file.
f = open(os.path.join(dest_dir, 'intelhex.bin'), 'wb')
# Extract data blocks.
for match in self._hex_data_pattern.finditer(file_header):
length, addr, data = match.groups()
# Move on to the next block if the data length doesn't match.
if ((len(data) >> 1) - 1) != int(length, 16):
continue
# Decode data.
data = codecs.decode(data[:-2], 'hex')
# Write data block at the specified address.
f.seek(int(addr, 16))
f.write(data)
# Finish destination file.
f.close()
except:
return True
# Create dummy header file.
try:
open(os.path.join(dest_dir, ':header:'), 'wb').close()
except:
pass
# Remove file.
try:
os.remove(file_path)
except:
pass
# Return destination directory.
return dest_dir
class ISOExtractor(ArchiveExtractor):
"""Extract ISO 9660 images."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Signature for identifying El Torito header data.
self._eltorito_pattern = re.compile(b'''\\x01\\x00\\x00\\x00[\\x00-\\xFF]{26}\\x55\\xAA\\x88\\x04[\\x00-\\xFF]{3}\\x00[\\x00-\\xFF]{2}([\\x00-\\xFF]{4})''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is not an ISO.
if file_header[32769:32774] != b'CD001' and file_header[32777:32782] != b'CDROM':
return False
# Extract this as an archive.
ret = self._extract_archive(file_path, dest_dir, remove=False)
# Some El Torito hard disk images have an MBR (Lenovo ThinkPad UEFI updaters).
# 7-Zip doesn't care about MBRs and just takes the El Torito sector count field
# for granted, even though it may be inaccurate. Try to detect such inaccuracies.
if type(ret) == str:
# Check what 7-Zip tried to extract, if anything.
elt_path = os.path.join(ret, '[BOOT]', 'Boot-HardDisk.img')
try:
elt_size = os.path.getsize(elt_path)
except:
elt_size = 0
# Does the size match known bad extractions?
if elt_size == 512:
# Read file.
try:
f = open(elt_path, 'rb')
data = f.read(512)
f.close()
except:
data = b''
# Check for MBR boot signature.
if data[-2:] == b'\x55\xAA':
# Read up to 16 MB of the ISO as a safety net.
file_header += util.read_complement(file_path, file_header)
# Look for El Torito data.
match = self._eltorito_pattern.search(file_header)
if match:
# Start a new El Torito extraction file.
out_f = open(elt_path, 'wb')
# Copy the entire ISO data starting from the boot offset.
# Parsing the MBR would have pitfalls of its own...
in_f = open(file_path, 'rb')
in_f.seek(struct.unpack('<I', match.group(1))[0] * 2048)
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
in_f.close()
# Finish new file.
out_f.close()
# Remove ISO file if it was successfully extracted.
if ret:
try:
os.remove(file_path)
except:
pass
return ret
class IntelExtractor(Extractor):
"""Extract Intel multi-part BIOS updates."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Fill a list of potential extensions for BIOS part files.
self._part_extensions = []
for base_extension in ('bio', 'bbo'):
# Produce all possible variants (ext, ex1-ex9, exa-) for this extension. While the boot blocks are
# always one file, they are technically able to form a chain, so count them in here for safety.
extension_chars = base_extension[-1] + '123456789abcdefghijklm'
for x in range(len(extension_chars)):
extension = base_extension[:2] + extension_chars[x]
self._part_extensions.append(extension)
# Add recovery boot block extension.
self._part_extensions.append('rcv')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is not an Intel BIOS update.
if file_header[90:95] != b'FLASH' and file_header[602:607] != b'FLASH':
return False
# Stop if this is a boot block file, as those have a separate
# part count which breaks the main body part count check.
if file_header[40:50].upper() == b'BOOT BLOCK':
return True
# Stop if this file is too small (may be a copied header).
if len(file_header) <= 608:
return True
# Stop if this file has no extension.
file_name = os.path.basename(file_path)
if file_name[-4:-3] != '.':
return True
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# Stop if this file has an irrelevant extension.
file_name_lower = file_name.lower()
if file_name_lower[-3:] not in self._part_extensions:
# Remove file.
try:
os.remove(file_path)
except:
pass
return True
# Scan this directory's contents.
dir_path = os.path.dirname(file_path)
dir_files = {}
for dir_file_name in os.listdir(dir_path):
dir_file_name_lower = dir_file_name.lower()
dir_file_path = os.path.join(dir_path, dir_file_name)
# Remove irrelevant files which lack an Intel header.
if dir_file_name_lower[-4:] in ('.lng', '.rec'):
try:
os.remove(dir_file_path)
except:
pass
continue
# Add to the file list.
dir_files[dir_file_name_lower] = dir_file_path
# Try to find matching parts in the same directory.
file_name_base = file_name[:-3]
file_name_base_lower = file_name_lower[:-3]
found_parts_main = []
found_parts_boot = []
have_bbo = False
have_rcv = False
largest_part_size = 0
# Try all part extensions.
for extension in self._part_extensions:
# Check if this part exists in the directory.
found_part_path = dir_files.get(file_name_base_lower + extension, None)
if found_part_path:
# Get the part's file size.
try:
found_part_size = os.path.getsize(found_part_path)
except:
continue
# Treat main and non-main body parts differently.
if extension[:2] == 'bi':
# Add part to the main body part list.
found_parts_main.append((found_part_path, found_part_size))
else:
# Add part to the boot block part list.
found_parts_boot.append((found_part_path, found_part_size))
# Flag the presence of main and recovery boot block files.
if extension[:2] == 'bb':
have_bbo = True
elif extension[:2] == 'rc':
have_rcv = True
# Update largest part size.
if found_part_size > largest_part_size:
largest_part_size = found_part_size
# Stop if no main body parts were found somehow.
if len(found_parts_main) == 0:
return True
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Determine header-related sizes and offsets.
start_offset = (file_header[90:95] != b'FLASH') and 512 or 0
version = util.read_string(file_header[start_offset + 112:]) # this assumes the chain name is less than 32 characters
header_size = 112 + len(version) + 1 # and the logical area name is less than 24 characters,
remaining = header_size & 31 # but there's no fast detection of Intel files otherwise
if remaining: # padded to 32 bytes
header_size += 32 - remaining
part_data_offset = start_offset + header_size
# Subtract header from largest part size.
largest_part_size -= part_data_offset
# Determine if this is an inverted BIOS. This is quite tricky, since
# the header data and presence of boot blocks in the main body isn't
# super accurate, so we make a best guess through the following rules:
if have_rcv and not have_bbo:
# Recovery boot block file present but no main boot block file present => inverted
invert = True
elif have_bbo and not have_rcv:
# Main boot block file present but no recovery boot block file present => non-inverted
invert = False
elif len(version) <= 16:
# Short version (first AMI and Phoenix runs) => inverted
invert = True
else:
# Long version (second AMI and Phoenix runs) => non-inverted
invert = have_rcv # backup check for AN430TX which is inverted but has rcv
# Join the part lists together.
found_parts = found_parts_main + found_parts_boot
# Create destination file.
dest_file_path = os.path.join(dest_dir, 'intel.bin')
out_f = open(dest_file_path, 'wb')
self.debug_print('Found', len(found_parts), 'parts, header size', header_size, 'bytes, largest part size', largest_part_size, 'bytes')
# Copy parts to the destination file.
bootblock_offset = None
end_offset = 0
while len(found_parts) > 0:
found_part_path, found_part_size = found_parts.pop(0)
try:
f = open(found_part_path, 'rb')
# Read and parse header if present.
if found_part_path[-4:].lower() == '.rcv':
header = b''
logical_area = dest_offset = 0
logical_area_size = data_length = found_part_size
else:
f.seek(start_offset)
header = f.read(part_data_offset)
logical_area, logical_area_size = struct.unpack('<BI', header[32:37])
dest_offset, data_length, _, last_part = struct.unpack('<IIBB', header[80:90])
# Update ROM end offset.
if logical_area_size > end_offset:
end_offset = logical_area_size
# Apply inversion if needed.
if invert:
dest_offset ^= 0x10000
# Determine the part's location.
if logical_area == 0:
# Place boot block at the end of the ROM. Usually, the last part is cut
# short and the boot block slots in at the end of the gap, but D845PT has
# a full 64 KB last part containing a copy of the BBO boot block data.
if bootblock_offset == None:
bootblock_offset = end_offset - data_length
if bootblock_offset < 0:
bootblock_offset = 0
dest_offset += bootblock_offset
out_f.seek(dest_offset)
# Copy data.
self.debug_print(data_length, 'bytes @', hex(dest_offset), '=>', found_part_path)
remaining = max(data_length, largest_part_size)
part_data = b' '
while part_data and remaining > 0:
part_data = f.read(min(remaining, 1048576))
out_f.write(part_data)
remaining -= len(part_data)
# Write padding.
if logical_area == 1 and last_part == 0xff:
if data_length <= 8192 and len(found_parts_boot) == 0:
# Workaround for JN440BX, which requires its final
# part (sized 8 KB) to be at the end of the image.
self.debug_print('> Final part non-padded')
remaining = 0
elif data_length == largest_part_size and ((dest_offset >> 16) & 1) == int(invert):
# Workaround for SE440BX-2 and SRMK2, which require a
# gap at the final 64 KB where the boot block goes.
if BIOSExtractor._entrypoint_pattern.match(part_data[-16:]):
# This does not apply to N440BX, which ends
# its parts with an entry point as expected.
self.debug_print('> Entry point found, not applying final part gap')
else:
self.debug_print('> Final part gap')
remaining += largest_part_size
elif logical_area == 0 and dest_offset == bootblock_offset:
# Don't pad a boot block insertion.
remaining = 0
if remaining > 0:
self.debug_print('> Adding', remaining, 'padding bytes')
while remaining > 0:
out_f.write(b'\xFF' * min(remaining, 1048576))
remaining -= 1048576
f.close()
# Update ROM end offset.
part_end_offset = out_f.tell()
if part_end_offset > end_offset:
end_offset = part_end_offset
# Remove part.
os.remove(found_part_path)
except:
import traceback
traceback.print_exc()
pass
out_f.close()
# Create new file with padding if the total size isn't a power of two.
if end_offset > 0:
padding_size = (1 << math.ceil(math.log2(end_offset))) - end_offset
if padding_size > 0:
try:
# Create a new file.
out_f = open(dest_file_path + '.padded', 'wb')
# Write padding.
self.debug_print('Adding', padding_size, 'bytes of initial padding')
while padding_size > 0:
out_f.write(b'\xFF' * min(padding_size, 1048576))
padding_size -= 1048576
# Write the original file contents.
f = open(dest_file_path, 'rb')
part_data = b' '
while part_data:
part_data = f.read(1048576)
out_f.write(part_data)
f.close()
out_f.close()
# Remove the old file.
try:
os.remove(dest_file_path)
except:
pass
# Move the new file into place.
shutil.move(dest_file_path + '.padded', dest_file_path)
except:
pass
# Copy the header to a file, so we can still get the BIOS version
# from it in case the payload cannot be decompressed successfully.
try:
out_f = open(os.path.join(dest_dir, ':header:'), 'wb')
out_f.write(file_header[start_offset:part_data_offset])
out_f.close()
except:
pass
# Return destination directory.
return dest_dir
class IntelNewExtractor(Extractor):
"""Extract newer Intel single-part BIOS updates."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# BIOS payload header. Checking the first bytes makes sure
# we don't accidentally parse $IBIOSI$ version strings.
self._ibi_pattern = re.compile(b'''\\$IBI[\\x00-\\x4E\\x50-\\xFF][\\x00-\\x52\\x54-\\xFF][\\x00-\\x23\\x25-\\xFF][\\x00-\\xFF]([\\x00-\\xFF]{8})''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if the payload header couldn't be found.
match = self._ibi_pattern.search(file_header)
if not match:
return False
# Parse payload header and stop if the sizes appear to be wrong.
header_sizes = match.group(1)
if header_sizes == b'\xAA\x55\xAA\x55\x55\xAA\x55\xAA':
# This is a :header: file with an invalidated size field.
return True
header_size, payload_size = struct.unpack('<II', header_sizes)
self.debug_print('$IBI header at', hex(match.start(1)), 'declaring header size', header_size, 'and payload size', payload_size)
if header_size > 65536 or payload_size > 16777216:
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Separate payload and header.
try:
# Open update file.
in_f = open(file_path, 'rb')
# Copy payload.
payload_offset = match.start(0) + header_size
try:
in_f.seek(payload_offset)
out_f = open(os.path.join(dest_dir, 'intel.bin'), 'wb')
while payload_size > 0:
data = in_f.read(min(payload_size, 1048576))
out_f.write(data)
payload_size -= len(data)
out_f.close()
except:
in_f.close()
return True
# Copy header.
try:
out_f = open(os.path.join(dest_dir, ':header:'), 'wb')
# Copy data before the header size fields.
in_f.seek(0)
size = match.start(1)
while size > 0:
data = in_f.read(min(size, 1048576))
out_f.write(data)
size -= len(data)
# Invalidate the size fields so we don't process the header again.
out_f.write(b'\xAA\x55\xAA\x55\x55\xAA\x55\xAA')
# Write the rest of the header.
in_f.seek(match.end(1))
size = payload_offset - match.end(1)
while size > 0:
data = in_f.read(min(size, 1048576))
out_f.write(data)
size -= len(data)
# Copy data after the payload.
in_f.seek(payload_offset + payload_size)
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
out_f.close()
except:
pass
# Remove update file.
in_f.close()
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class InterleaveExtractor(Extractor):
"""Detect and de-interleave any interleaved ROMs."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# List of strings an interleaved BIOS might contain once deinterleaved.
self._deinterleaved_strings = [
b'ALL RIGHTS RESERVED',
b'All Rights Reserved',
b'Illegal Interrupt No.',
b'Phoenix Technologies Ltd.', # Phoenix
b' COPR. IBM 198', # IBM and Tandon
b'memory (parity error)',
b'Copyright COMPAQ Computer Corporation', # Compaq
b'Press any key when ready', # Access Methods
b'* AMPRO Little Board', # AMPRO
b'Philips ROM BIOS ', # Philips
b'The following POST errors have been ' # Acer
]
# Interleave the strings.
self._interleaved_odd = [string[1::2] for string in self._deinterleaved_strings]
self._interleaved_even = [string[::2] for string in self._deinterleaved_strings]
self._interleaved_q3 = [string[3::4] for string in self._deinterleaved_strings]
self._interleaved_q2 = [string[2::4] for string in self._deinterleaved_strings]
self._interleaved_q1 = [string[1::4] for string in self._deinterleaved_strings]
self._interleaved_q0 = [string[::4] for string in self._deinterleaved_strings]
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this was already deinterleaved.
dir_path, file_name = os.path.split(file_path)
if os.path.exists(os.path.join(dir_path, ':combined:')):
return False
# Read up to 128 KB.
file_header += util.read_complement(file_path, file_header, max_size=131072)
# Check for interleaved strings.
counterpart_string_sets = None
sets_2 = [self._interleaved_odd, self._interleaved_even]
sets_4 = [self._interleaved_q0, self._interleaved_q1, self._interleaved_q2, self._interleaved_q3]
for part_set in (sets_2, sets_4):
# Go through sets.
for counterpart_set in part_set:
# Go through strings.
for string in counterpart_set:
# Check if the string is present.
if string in file_header:
# Generate new string set list without this set.
counterpart_string_sets = [new_set for new_set in part_set if new_set != counterpart_set]
break
# Stop if a set was found.
if counterpart_string_sets:
break
if counterpart_string_sets:
break
# Stop if no interleaved strings could be found.
if not counterpart_string_sets:
return False
# Acquire the multi-file lock.
file_size = self.multifile_lock_acquire(file_path)
# Create temporary interleaved data array.
part_size = min(file_size, 16777216)
data = []
# Look for each counterpart.
dir_files = os.listdir(dir_path)
dir_files.sort()
counterpart_paths = [file_path]
for counterpart_string_set in counterpart_string_sets:
# Try to find this file's counterpart in the directory.
counterpart_candidates = []
file_size = os.path.getsize(file_path)
for file_in_dir in dir_files:
# Skip seen files.
file_in_dir_path = os.path.join(dir_path, file_in_dir)
if file_in_dir_path in counterpart_paths:
continue
# Skip any files which differ in size.
file_in_dir_size = 0
try:
file_in_dir_size = os.path.getsize(file_in_dir_path)
except:
continue
if file_in_dir_size != file_size:
continue
# Read up to 128 KB.
file_in_dir_data = util.read_complement(file_in_dir_path, max_size=131072)
if not file_in_dir_data:
continue
# Determine if this is a counterpart.
counterpart = False
for string in counterpart_string_set:
if string in file_in_dir_data:
counterpart = True
break
del file_in_dir_data
# Move on if this is not a counterpart.
if not counterpart:
continue
# Add to the list of candidates.
counterpart_candidates.append(file_in_dir)
# Find the closest counterpart candidate to this
# file, and stop if no counterpart was found.
counterpart_candidate = util.closest_prefix(file_name, counterpart_candidates, lambda x: util.remove_extension(x).lower())
if not counterpart_candidate:
return False
self.debug_print('Pairing', file_path, 'with', counterpart_candidate)
counterpart_path = os.path.join(dir_path, counterpart_candidate)
counterpart_paths.append(counterpart_path)
# Read into the data array.
f = open(counterpart_path, 'rb')
data.append(f.read(part_size))
f.close()
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Read this file into the data array.
f = open(file_path, 'rb')
data.insert(0, f.read(part_size))
f.close()
# Write all deinterleaved permutations, as some sets may
# contain the same interleaved string on more than one part.
file_counter = 0
part_count = len(data)
buf = bytearray(part_size * part_count)
for permutation in itertools.permutations(range(part_count)):
# Deinterleave from the array into the buffer.
data_offset = 0
for data_index in permutation:
buf[data_offset::part_count] = data[data_index]
data_offset += 1
# Write deinterleaved file.
f = open(os.path.join(dest_dir, 'deinterleaved_' + ''.join(util.base62[data_index] for data_index in permutation) + '.bin'), 'wb')
f.write(buf)
f.close()
file_counter += 1
# Save some memory. Might be placebo, but it doesn't hurt.
del buf
del data
# Move interleaved files to preserve them,
# as some sets may deinterleave incorrectly.
file_counter = 0
for counterpart_path in counterpart_paths:
# Move original file.
try:
shutil.move(counterpart_path, os.path.join(dest_dir, 'interleaved_' + util.base62[file_counter] + '.bin'))
except:
pass
file_counter += 1
# Remove the original file in case moving failed.
try:
os.remove(counterpart_path)
except:
pass
# Create flag file on the destination directory for the analyzer
# to treat it as a big chunk of data, combining all permutations.
f = open(os.path.join(dest_dir, ':combined:'), 'wb')
f.write(b'\x00' * part_count)
f.close()
# Return destination directory path.
return dest_dir
class MBRSafeExtractor(ArchiveExtractor):
"""Extract MBR disk images which appear to have a valid MBR."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Signature for identifying typical MBRs.
self._mbr_pattern = re.compile(b'''(?:Error loading|Missing) operating system''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Extract this as an archive if MBR signatures are present.
if file_header[510:512] == b'\x55\xAA' and self._is_mbr(file_header):
return self._extract_archive(file_path, dest_dir)
# No MBR found.
return False
def _is_mbr(self, file_header):
# Helper function to determine if this *really* looks like some kind of MBR.
return self._mbr_pattern.search(file_header[:510])
class MBRUnsafeExtractor(MBRSafeExtractor):
"""Extract MBR disk images which have the MBR signature."""
def _is_mbr(self, file_header):
# Anything goes over here.
return True
class OCFExtractor(Extractor):
"""Extract Fujitsu/ICL OCF BIOS files."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# RLE header.
self._snipac_pattern = re.compile(b'''SNIPAC([0-9A-F]{2})([\\x00-\\xFF]{4})''')
def _expand_rle(self, match):
length, = struct.unpack('<I', match.group(2))
return bytes([int(match.group(1), 16)]) * length
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is not an RLE compressed file.
if not self._snipac_pattern.search(file_header):
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Decompress RLE data.
file_header = self._snipac_pattern.sub(self._expand_rle, file_header)
# Write decompressed data.
try:
out_f = open(os.path.join(dest_dir, 'ocf.bin'), 'wb')
out_f.write(file_header)
out_f.close()
except:
return True
# Create dummy header file.
try:
open(os.path.join(dest_dir, ':header:'), 'wb').close()
except:
pass
# Remove OCF file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class OMFExtractor(Extractor):
"""Extract Fujitsu/ICL OMF BIOS files."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Quick header signature for checking validity.
self._header_pattern = re.compile(
b'''[\\x00-\\xFF]''' # arbitrary byte (not always B2!)
b'''([\\x00-\\xFF]{4})''' # file size
b'''([\\x1A\\x20-\\x7E]{32})''' # file timestamp
b'''[\\x00-\\xFF]{14}''' # more fields
b'''([\\x00\\x20-\\x7E]{8})''' # file signature (can be all 00 on auxiliary files at least)
)
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this file is too small (may be a copied header).
if len(file_header) <= 112:
return False
# Stop if this is not an OMF file.
header_offset = 0
match = self._header_pattern.match(file_header)
if not match: # whatever has this offset header was lost to time
header_offset = 112
match = self._header_pattern.match(file_header[header_offset:header_offset + 112])
if not match:
return False
# Stop if the OMF size is invalid. Should catch other files that match the quick check.
try:
file_size = os.path.getsize(file_path)
except:
return False
omf_size, = struct.unpack('<I', match.group(1))
if omf_size > file_size:
return False
self.debug_print('Size', omf_size, 'timestamp', match.group(2), 'signature', match.group(3))
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Separate payload and header.
try:
# Open OMF file.
in_f = open(file_path, 'rb')
# Read header.
in_f.seek(header_offset)
header = in_f.read(112)
# Copy payload.
try:
out_f = open(os.path.join(dest_dir, 'omf.bin'), 'wb')
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
# Truncate payloads with an extra byte.
pos = out_f.tell()
if pos & 1:
out_f.truncate(pos - 1)
out_f.close()
except:
in_f.close()
return True
# Write header.
try:
out_f = open(os.path.join(dest_dir, ':header:'), 'wb')
out_f.write(header)
out_f.close()
except:
pass
# Remove OMF file.
in_f.close()
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class PEExtractor(ArchiveExtractor):
"""Extract PE executables."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Signatures for flash tool executables which may have an embedded ROM.
self._flashtool_pattern = re.compile(
b'''(?P<afuwin>Software\\\\AMI\\\\AFUWIN)|''' # AMIBIOS 8 AFUWIN (many ASRock)
b'''(?P<aopen>AOpen FLASH ROM Utility R)|''' # AOpen (AP61)
b'''Micro Firmware, Incorporated \\* |''' # Micro Firmware (Intel Monsoon surfaced so far)
b'''(?P<asus>ASUS Floppy Image Self-Extrator\\.)''' # ASUS floppy self-extractor (P4VP-MX)
)
# Path to the deark utility.
self._deark_path = os.path.abspath(os.path.join('deark', 'deark'))
if not os.path.exists(self._deark_path):
self._deark_path = None
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Determine if this is a PE/MZ.
if file_header[:2] != b'MZ':
return False
# Cover PKZIP self-extractor (with PKLITE-compressed stub)
# with an incorrect extension. This does appear to happen.
if file_header[30:36] != b'PKLITE' or b'PK\x03\x04' not in file_header:
# The MZ signature is way too short. Check extension as well to be safe.
# This also stops :header: files from being re-processed as flash tools.
if file_path[-4:].lower() not in ('.exe', '.dll', '.scr'):
return False
# Determine if this executable can be extracted with deark.
ret = self.extract_deark(file_path, file_header, dest_dir)
if ret:
return ret
# Cover Inno Setup installers.
if file_header[48:52] == b'Inno':
# Determine if this executable can be extracted with innoextract.
ret = self._extract_inno(file_path, file_header, dest_dir)
if ret:
return ret
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Determine if this executable may have an embedded ROM.
match = self._flashtool_pattern.search(file_header)
if match:
# Extract embedded ROM and stop if extraction was successful.
ret = self._extract_flashtool(file_path, file_header, dest_dir, match)
if ret:
return ret
# Extract this as an archive.
return self._extract_archive(file_path, dest_dir)
def extract_deark(self, file_path, file_header, dest_dir, remove=True, delegated=False):
# Stop if deark is not available.
if not self._deark_path:
return False
# Determine if deark can extract this file as an executable, and stop if it can't.
file_path_abs = os.path.abspath(file_path)
if not delegated:
proc = subprocess.run([self._deark_path, '-opt', 'execomp', '-l', file_path_abs], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if proc.stdout[:12] != b'Module: exe\n' or proc.stdout[-16:] != b'\noutput.000.exe\n':
return False
self.debug_print('Using deark')
# Stop if this is a dry run.
if not dest_dir:
return True
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Run deark command to extract the executable.
orig_basename, orig_ext = os.path.splitext(os.path.basename(file_path_abs))
subprocess.run([self._deark_path, '-opt', 'execomp', '-o', orig_basename, file_path_abs], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir)
# Assume failure if nothing was extracted.
files_extracted = os.listdir(dest_dir)
if len(files_extracted) < 1:
self.debug_print('deark produced no files:', file_path)
return False
# Rename single file.
if len(files_extracted) == 1:
_, dest_ext = os.path.splitext(files_extracted[0])
if orig_ext.lower() == dest_ext.lower(): # keep casing if the extension hasn't changed
dest_ext = orig_ext
self.debug_print('Renaming', repr(files_extracted[0]), 'to', repr(orig_basename + dest_ext))
try:
new_fn = orig_basename + dest_ext
shutil.move(os.path.join(dest_dir, files_extracted[0]), os.path.join(dest_dir, new_fn))
if delegated:
files_extracted = [new_fn]
except:
pass
# Remove original file.
if remove:
try:
os.remove(file_path)
except:
pass
# Return destination directory path or first file path.
if delegated:
return os.path.join(dest_dir, files_extracted[0])
else:
return dest_dir
def _extract_flashtool(self, file_path, file_header, dest_dir, match):
# Determine embedded ROM start and end offsets.
dest_file_name = 'flashtool.bin'
if match.group('afuwin'):
# Look for markers and stop if one of them wasn't found.
rom_start_offset = file_header.find(b'_EMBEDDED_ROM_START_\x00')
if rom_start_offset == -1:
return False
rom_start_offset += 21
rom_end_offset = file_header.find(b'_EMBEDDED_ROM_END_\x00', rom_start_offset)
if rom_end_offset == -1:
return False
elif match.group('asus'):
# Change output file name.
dest_file_name = 'floppy.img'
# Extract zlib compressed data.
try:
file_header = file_header[:0xc000] + zlib.decompress(file_header[0xc000:])
except:
self.debug_print('ASUS zlib decompression failed')
return False
rom_start_offset = 0xc000
rom_end_offset = len(file_header)
else: # others
# Round ROM size down to a power of two.
try:
file_size = os.path.getsize(file_path)
except:
file_size = len(file_header)
rom_size = 1 << math.floor(math.log2(file_size))
rom_start_offset = file_size - rom_size # ROM located at the end of the file
rom_end_offset = file_size
# Adjust offsets if needed.
if match.group('aopen'):
# Stop if this file appears to be standalone AOFLASH.
if file_size < 32768:
return False
# Skip checksum word at the end. All files I've seen
# contain it, but check for its presence just in case.
remaining = file_size & 15
if remaining == 2:
rom_start_offset -= remaining
rom_end_offset -= remaining
# Skip BBOO data block. Same caveat as above.
rom_half_offset = int((rom_start_offset + rom_end_offset) / 2)
if file_header[rom_half_offset:rom_half_offset + 6] == b'*BBOO*':
rom_end_offset = rom_half_offset
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Extract ROM.
try:
f = open(os.path.join(dest_dir, dest_file_name), 'wb')
f.write(file_header[rom_start_offset:rom_end_offset])
f.close()
except:
return True
# Write data before and after the embedded ROM as a header.
try:
f = open(os.path.join(dest_dir, ':header:'), 'wb')
f.write(file_header[:rom_start_offset])
f.write(file_header[rom_end_offset:])
f.close()
except:
pass
# Remove file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
def _extract_inno(self, file_path, file_header, dest_dir):
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Run innoextract command.
try:
subprocess.run(['innoextract', '-e', os.path.abspath(file_path)], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir)
except:
pass
# Assume failure if nothing was extracted.
files_extracted = os.listdir(dest_dir)
if len(files_extracted) < 1:
self.debug_print('Extraction produced no files:', file_path)
return False
# Remove file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class TarExtractor(ArchiveExtractor):
"""Extract tar archives."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 00 00 00 = POSIX tar
# 20 20 00 = GNU tar
# 00 30 30 = some other form of tar?
self._signature_pattern = re.compile(b'''ustar(?:\\x00(?:\\x00\\x00|\\x30\\x30)|\\x20\\x20\\x00)''')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Determine if this is a tar archive.
for offset in (0, 257):
if self._signature_pattern.match(file_header[offset:offset + 8]):
# Extract this as an archive.
return self._extract_archive(file_path, dest_dir)
# Not a tar archive.
return False
class TrimondExtractor(Extractor):
"""Extract Trimond/Mitsubishi BIOS updates."""
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Act only on files at least 128 KB with a chunk of 8-32 KB missing, as a
# safety margin since only 256-minus-16 KB images have been observed so far.
try:
file_size = os.path.getsize(file_path)
except:
return False
if file_size < 131072:
return False
pow2 = 1 << math.ceil(math.log2(file_size))
if pow2 - file_size not in (8192, 16384, 32768):
return False
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# As a second safety layer, check for Trimond's flasher files.
dir_path, file_name = os.path.split(file_path)
dir_files = os.listdir(dir_path)
dir_files_lower = [filename.lower() for filename in dir_files]
if 'aflash.exe' not in dir_files_lower or 'cnv.exe' not in dir_files_lower or 'b.bat' not in dir_files_lower:
return False
# Look for other counterpart candidates.
counterpart_candidates = []
for counterpart_name in dir_files:
if counterpart_name == file_name:
continue
try:
counterpart_size = os.path.getsize(os.path.join(dir_path, counterpart_name))
except:
continue
# Must add up to the next power of two.
if (file_size + counterpart_size) == pow2:
counterpart_candidates.append(counterpart_name)
# Find the closest counterpart candidate to this
# file, and stop if no counterpart was found.
counterpart_candidate = util.closest_prefix(file_name, counterpart_candidates, lambda x: util.remove_extension(x).lower())
if not counterpart_candidate:
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Join both files together.
counterpart_path = os.path.join(dir_path, counterpart_candidate)
out_f = open(os.path.join(dest_dir, counterpart_candidate), 'wb')
in_f = open(file_path, 'rb')
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
in_f.close()
in_f = open(counterpart_path, 'rb')
data = b' '
while data:
data = in_f.read(1048576)
out_f.write(data)
in_f.close()
out_f.close()
# Create dummy header file on the destination directory.
try:
open(os.path.join(dest_dir, ':header:'), 'wb').close()
except:
pass
# Remove files.
try:
os.remove(file_path)
except:
pass
try:
os.remove(counterpart_path)
except:
pass
return dest_dir
class UEFIExtractor(Extractor):
"""Extract UEFI BIOS images."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Known UEFI signatures.
self._signature_pattern = re.compile(b'''EFI_|D(?:xe|XE)|P(?:ei|EI)|NVAR[\\x00-\\xFF]{7}(?:StdDefaults|PlatformLang|AMITSESetup|SecureBootSetup)\\x00''')
# Ignore padding and microcode files.
self._invalid_file_pattern = re.compile('''(?:Padding|Microcode)_''')
# Path to the UEFIExtract utility.
self._uefiextract_path = os.path.abspath('UEFIExtract')
if not os.path.exists(self._uefiextract_path):
self._uefiextract_path = None
# /dev/null handle for suppressing output.
self._devnull = open(os.devnull, 'wb')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if UEFIExtract is not available.
if not self._uefiextract_path:
return False
# Read up to 32 MB as a safety net.
file_header += util.read_complement(file_path, file_header, 33554432)
# Stop if no UEFI signatures are found.
if not self._signature_pattern.search(file_header):
return False
# Start UEFIExtract process.
file_path_abs = os.path.abspath(file_path)
try:
subprocess.run([self._uefiextract_path, file_path_abs, 'unpack'], timeout=30, stdout=self._devnull, stderr=subprocess.STDOUT)
except:
pass
# Remove report file.
try:
os.remove(file_path_abs + '.report.txt')
except:
pass
# Stop if the dump directory was somehow not created.
dump_dir = file_path_abs + '.dump'
if not os.path.isdir(dump_dir):
try:
os.remove(report_file)
except:
pass
return False
# Move dump directory over to the destination.
try:
# Move within the same filesystem.
os.rename(dump_dir, dest_dir_0)
if not os.path.isdir(dest_dir_0):
raise Exception()
except:
try:
# Move across filesystems.
shutil.move(dump_dir, dest_dir_0)
if not os.path.isdir(dest_dir_0):
raise Exception()
except:
# Remove left-overs and stop if the move failed.
for to_remove in (dump_dir, dest_dir_0):
try:
shutil.rmtree(to_remove)
except:
pass
return True
# Go through the dump, counting valid .bin files and removing .txt ones.
valid_file_count = 0
for scan_file_name in os.listdir(dest_dir_0):
if scan_file_name[-4:] == '.bin':
# Non-UEFI images will only produce padding and microcode files.
if not self._invalid_file_pattern.match(scan_file_name):
valid_file_count += 1
else:
try:
os.remove(os.path.join(dest_dir_0, scan_file_name))
except:
pass
# Assume failure if nothing valid was extracted.
# Actual UEFI images produce thousands of files, so 5 is a safe barrier.
if valid_file_count < 1:
return False
elif valid_file_count < 5:
# Remove left-overs and stop.
try:
shutil.rmtree(dest_dir_0)
except:
pass
return False
# Convert any BIOS logo images in-line (to the same destination directory).
self.image_extractor.convert_inline(os.listdir(dest_dir_0), dest_dir_0)
# Create header file with a dummy string, to tell the analyzer
# this BIOS went through this extractor.
try:
f = open(os.path.join(dest_dir_0, ':header:'), 'wb')
f.write(b'\x00\xFFUEFIExtract\xFF\x00')
f.close()
except:
pass
# Create flag file on the destination directory for the analyzer to
# treat it as a big chunk of data.
open(os.path.join(dest_dir_0, ':combined:'), 'wb').close()
# Remove BIOS file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir_0
class UnshieldExtractor(Extractor):
"""Extract InstallShield CAB archives."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# InstallShield CAB signature.
self._signature_pattern = re.compile(b'''ISc\\x28''')
# /dev/null handle for suppressing output.
self._devnull = open(os.devnull, 'wb')
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is apparently not an InstallShield CAB.
match = self._signature_pattern.match(file_header)
if not match:
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Run unshield command.
try:
subprocess.run(['unshield', 'x', os.path.abspath(file_path)], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir)
except:
pass
# Assume failure if nothing was extracted.
files_extracted = os.listdir(dest_dir)
if len(files_extracted) < 1:
self.debug_print('Extraction produced no files:', file_path)
return False
# Remove archive file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir_0
class VMExtractor(PEExtractor):
"""Extract files which must be executed in a virtual machine."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Known signatures.
self._floppy_pattern = re.compile(
b'''(?P<fastpacket> FastPacket V[0-9])|''' # Siemens Nixdorf FastPacket
b''', Sydex, Inc\\. All Rights Reserved\\.|''' # IBM Sydex
b'''Disk eXPress Self-Extracting Diskette Image|''' # HP DXP
b'''(?P<nec>\\x00Diskette Image Decompression Utility(?: +v%s|\\.)\\x00)|''' # NEC in-house
b'''(?P<ardi>Copyright Daniel Valot |\\x00ARDI - \\x00)|''' # IBM ARDI
b'''(?P<zenith>Ready to build distribution image with the following attributes:)|''' # Zenith in-house
b'''(?P<softpaq>Error reading the Softpaq File information)|''' # Compaq Softpaq
b'''(?P<dell>Intel Flash Memory Update Utility|DELLXBIOS[\\x00-\\xFF]+;C_FILE_INFO)[\\x00-\\xFF]+<<NMSG>>''' # Dell in-house
)
self._eti_pattern = re.compile(b'''[0-9\\.\\x00]{10}[0-9]{2}/[0-9]{2}/[0-9]{2}\\x00{2}[0-9]{2}:[0-9]{2}:[0-9]{2}\\x00{3}''')
self._rompaq_pattern = re.compile(b'''[\\x00-\\xFF]{12}[A-Z0-9]{7}\\x00[0-9]{2}/[0-9]{2}/[0-9]{2}\\x00''')
# Filename sanitization pattern.
self._dos_fn_pattern = re.compile('''[\\x00-\\x1F\\x7F-\\xFF\\\\/:\\*\\?"<>\\|]''')
# /dev/null handle for suppressing output.
self._devnull = open(os.devnull, 'wb')
# Path to QEMU.
self._qemu_path = None
for path in ('qemu-system-i386', 'qemu-system-x86_64'):
try:
subprocess.run([path, '-version'], stdout=self._devnull, stderr=subprocess.STDOUT).check_returncode()
self._qemu_path = path
break
except:
pass
# Check for other dependencies.
self._dep_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(util.__file__))), 'vm')
for dep in ('floppy.144', 'freedos.img'):
if not os.path.exists(os.path.join(self._dep_dir, dep)):
self._qemu_path = None
break
def extract(self, file_path, file_header, dest_dir, dest_dir_0, *, allow_deark=True):
# Stop if QEMU or other dependencies are not available.
if not self._qemu_path:
return False
# Check for cases which require this extractor.
# All signatures should be within the first 32 KB or so.
extractor = None
extractor_kwargs = {}
if file_header[:2] == b'MZ' and b'PK\x03\x04' not in file_header: # skip ZIP self-extractors with compressed stubs
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
match = self._floppy_pattern.search(file_header)
if match:
extractor = self._extract_floppy
extractor_kwargs['match'] = match
elif allow_deark and self.extract_deark(file_path, file_header, None): # avoid infinite loops
# Acquire the multi-file lock if this is a ROMPAQ.EXE.
# This is required for ROMPAQ extraction below.
if os.path.basename(file_path).lower() == 'rompaq.exe':
self.multifile_lock_acquire(file_path)
extractor = self._extract_deark
elif self._eti_pattern.match(file_header):
extractor = self._extract_eti
elif self._rompaq_pattern.match(file_header):
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# The ROMPAQ format appears to be version specific in some way.
# We will only extract files that have a ROMPAQ.EXE next to them.
dir_path = os.path.dirname(file_path)
rompaq_path = None
for file_in_dir in os.listdir(dir_path):
if file_in_dir.lower() == 'rompaq.exe':
rompaq_path = os.path.join(dir_path, file_in_dir)
break
# Now look for a PKLITE-decompressed ROMPAQ.EXE.
dest_parent_dir = os.path.dirname(dest_dir)
if not rompaq_path and os.path.isdir(dest_parent_dir):
for file_in_dir in os.listdir(dest_parent_dir):
if file_in_dir.lower() == 'rompaq.exe:':
rompaq_path = os.path.join(dest_parent_dir, file_in_dir, file_in_dir[:-1])
if os.path.exists(rompaq_path):
break
else:
rompaq_path = None
# Enter ROMPAQ mode if the EXE was found.
if rompaq_path:
extractor = self._extract_rompaq
extractor_kwargs['rompaq_path'] = rompaq_path
# Stop if no case was found.
if not extractor:
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Run extractor.
self.debug_print('Running', extractor.__name__)
return extractor(file_path, file_header, dest_dir, dest_dir_0, **extractor_kwargs)
def _run_qemu(self, hdd=None, hdd_snapshot=True, floppy=None, floppy_snapshot=True, vvfat=None, boot='c', monitor_cmd=None, monitor_flag_file=None):
# Build QEMU arguments.
args = [self._qemu_path, '-m', '32', '-boot', boot]
if not self.debug or not os.getenv('DISPLAY'):
args += ['-display', 'none', '-vga', 'none']
if monitor_cmd:
args += ['-monitor', 'stdio']
for drive, drive_snapshot, drive_if in ((floppy, floppy_snapshot, 'floppy'), (hdd, hdd_snapshot, 'ide')):
# Don't add this drive if an image was not specified.
if not drive:
# Add dummy floppy to prevent errors if no floppy is specified.
if drive_if == 'floppy':
drive = os.path.join(self._dep_dir, 'floppy.144')
drive_snapshot = True
else:
continue
# Add drive.
args += ['-drive', 'if=' + drive_if + ',snapshot=' + (drive_snapshot and 'on' or 'off') + ',format=raw,file=' + drive.replace(',', ',,')]
if vvfat:
# Add vvfat if requested.
args += ['-drive', 'if=ide,driver=vvfat,rw=on,dir=' + vvfat.replace(',', ',,')] # regular vvfat syntax can't handle : in path
# Run QEMU.
self.debug_print('Running QEMU with args:', args)
proc = None
try:
if monitor_cmd and monitor_flag_file:
proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=self._devnull, stderr=subprocess.STDOUT)
# Wait for flag file if one was specified.
if monitor_flag_file:
spins = 0
while not os.path.exists(monitor_flag_file) and spins < 60:
time.sleep(1)
spins += 1
if spins < 60:
self.debug_print('Monitor flag file found')
else:
self.debug_print('Monitor flag file timed out')
# Send monitor command if one was specified, and wait for the QEMU process.
proc.communicate(input=monitor_cmd, timeout=60)
else:
subprocess.run(args, input=monitor_cmd, timeout=60, stdout=self._devnull, stderr=subprocess.STDOUT)
except:
self.debug_print('Running QEMU failed (timed out?)')
try:
proc.terminate()
except:
pass
def _extract_floppy(self, file_path, file_header, dest_dir, dest_dir_0, *, match):
"""Extract DOS-based floppy self-extractors."""
# Only support 1.44 MB floppies for now.
floppy_media = 'floppy.144'
# Copy original file and blank floppy image to the destination directory.
if match.group('dell'): # Dell in-house names the extracted file after the executable
exe_name = 'dell.exe'
else:
exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe'
exe_path = os.path.join(dest_dir, exe_name)
image_path = os.path.join(dest_dir, util.random_name(8) + '.img')
shutil.copy2(file_path, exe_path)
shutil.copy2(os.path.join(self._dep_dir, floppy_media), image_path)
flag_name = flag_path = None
# Create batch file for calling the executable.
bat_path = os.path.join(dest_dir, 'autoexec.bat')
f = open(bat_path, 'wb')
if match.group('nec'):
# This SFX has trouble with (at least) the FreeDOS memory manager.
# Work around that by moving config.sys out of the way to disable the
# memory manager, rebooting the system, then executing the SFX proper.
f.write(b'c:\r\n')
f.write(b'if not exist config.sys goto sfx\r\n')
f.write(b'move /y config.sys config.old\r\n')
f.write(b'echo o cf9 6|debug\r\n') # TRC reset
f.write(b'exit\r\n') # just in case
f.write(b':sfx\r\n')
f.write(b'move /y config.old config.sys\r\n') # just in case again (snapshot shouldn't persist changes)
f.write(b'echo 0|') # later revision prompts for standard or LS-120 drive
elif match.group('ardi'):
f.write(b'echo.|')
elif match.group('zenith') or match.group('dell'):
f.write(b'a:\r\n')
elif match.group('softpaq'):
# Create flag file for sending the monitor commands.
flag_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.dat'
flag_path = os.path.join(dest_dir, flag_name)
f.write(b'echo. >d:\\' + flag_name.encode('cp437', 'ignore') + b'\r\n')
f.write(b'd:' + exe_name.encode('cp437', 'ignore'))
if match.group('fastpacket'):
f.write(b' /b a:\r\n')
elif match.group('ardi') or match.group('softpaq'):
f.write(b'\r\n')
elif match.group('zenith'):
f.write(b' <c:\\agreed.txt\r\n')
elif match.group('dell'):
f.write(b' -writeromfile\r\n')
else:
f.write(b' a: <c:\\y.txt\r\n')
f.close()
# Assemble QEMU monitor commands for Compaq Softpaq.
monitor_cmd = None
if match.group('softpaq'):
monitor_cmd = (
b'sendkey pgdn\n'
b'sendkey a\n'
b'sendkey g\n'
b'sendkey r\n'
b'sendkey e\n'
b'sendkey e\n'
b'sendkey a\n'
b'sendkey kp_enter\n'
b'sendkey kp_enter\n'
b'sendkey kp_enter\n'
b'sendkey kp_enter\n'
b'sendkey kp_enter\n'
b'sendkey esc\n'
)
# Run QEMU.
self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), floppy=image_path, floppy_snapshot=False, vvfat=dest_dir, monitor_cmd=monitor_cmd, monitor_flag_file=flag_path)
# Detect and recover from a Softpaq crash.
if match.group('softpaq'):
temp_image_path = os.path.join(dest_dir, 'image')
if os.path.exists(temp_image_path) and os.path.getsize(temp_image_path) > 0:
try:
os.remove(image_path)
except:
pass
image_path = temp_image_path
# Remove temporary files. (exename.tmp = FastPacket)
util.remove_all((bat_path, exe_path, exe_path[:-3] + 'tmp', os.path.join(dest_dir, exe_name[:-3].upper() + 'TMP'), flag_path))
# Extract image as an archive.
ret = self._extract_archive(image_path, dest_dir, remove=False)
if type(ret) == str and len(os.listdir(dest_dir)) > 1:
# Remove original file.
try:
os.remove(file_path)
except:
pass
# Flag success.
ret = dest_dir
else:
ret = False
# Remove image.
try:
os.remove(image_path)
except:
pass
return ret
def _extract_eti(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract Evergreen ETI files."""
# Read ETI header.
in_f = open(file_path, 'rb')
header = in_f.read(0x1f)
# Parse creation date and time.
try:
date = header[10:18].decode('cp437', 'ignore')
time = header[20:28].decode('cp437', 'ignore')
dt = datetime.datetime.strptime(date + ' ' + time, '%m/%d/%y %H:%M:%S')
ctime = (dt - datetime.datetime(1970, 1, 1)).total_seconds()
except:
ctime = 0
# Start the extraction batch file.
bat_f = open(os.path.join(dest_dir, 'autoexec.bat'), 'wb')
bat_f.write(b'd:\r\n')
# Extract files into individual ETIs.
temp_files = ['autoexec.bat', 'contact.eti', 'contact.txt', 'prevlang.dat']
while True:
# Parse file header.
fn = in_f.read(12) # filename
if fn == None:
break
nul_index = fn.find(b'\x00')
if nul_index > -1:
fn = fn[:nul_index]
if len(fn) == 0:
break
fn = fn.decode('cp437', 'ignore')
self.debug_print('ETI file:', fn)
in_f.read(5) # rest of header
size = struct.unpack('<I', in_f.read(4))[0] # size
# Create filename for the individual ETI.
eti_name = temp_files[0] # dummy
while eti_name in temp_files:
eti_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.eti'
temp_files.append(eti_name)
# Sanitize extracted filename to not overwrite ourselves.
if fn.lower() in temp_files:
fn = fn[:-1] + '_'
fn = self._dos_fn_pattern.sub('_', fn)
# Add individual ETI to the batch file.
bat_f.write(b'del CONTACT.ETI CONTACT.TXT PREVLANG.DAT\r\n') # remove old files
bat_f.write(b'c:move /y ' + eti_name.encode('cp437', 'ignore') + b' CONTACT.ETI\r\n') # insert ourselves
bat_f.write(b'c:instl2o\r\n') # run hacked executable
bat_f.write(b'c:move /y CONTACT.TXT ' + fn.encode('cp437', 'ignore') + b'\r\n') # rename decompressed file
# Write individual ETI.
out_f = open(os.path.join(dest_dir, eti_name), 'wb')
out_f.write(header) # file header
out_f.write(b'\x00\x00\x00\xB3\xD2\x40\xC6') # single-file header
out_f.write(b'\xFF\xFF\xFF\x00') # unpacked size (unknown, assume 16 MB at most)
while size > 0:
data = in_f.read(min(size, 1048576))
out_f.write(data) # data
size -= len(data)
out_f.close()
# Finish the batch file.
bat_f.close()
# Run QEMU.
self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir)
# Remove temporary files.
util.remove_all(temp_files, lambda x: (os.path.join(dest_dir, x), os.path.join(dest_dir, x.upper())))
# Check if anything was extracted.
dest_dir_files = os.listdir(dest_dir)
if len(dest_dir_files) > 0:
# Remove original file.
try:
os.remove(file_path)
except:
pass
# Set timestamps if applicable.
if ctime > 0:
for fn in dest_dir_files:
try:
os.utime(os.path.join(dest_dir, fn), (ctime, ctime))
except:
pass
return dest_dir
else:
return True
def _extract_deark(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract compressed executables with deark and run them through the same pipeline.
This is required for the following self-extractors, which contain a compressed stub:
- Compaq Softpaq (PKLITE)
- Dell in-house (LZEXE with no LZ91 signature)
- NEC in-house (PKLITE)
- Siemens Nixdorf FastPacket (LZEXE)
- Zenith in-house (PKLITE)
The decompressed files cannot be executed (they're garbage), so the pipeline has to run
with the original file path, while file_header gets the decompressed executable's data."""
# Run deark extractor and stop if it wasn't successful.
unpacked_path = self.extract_deark(file_path, file_header, dest_dir, remove=False, delegated=True)
if type(unpacked_path) != str:
return unpacked_path
# Read unpacked file.
decomp_file_data = util.read_complement(unpacked_path)
# Run this same extractor with detectors pointed at the unpacked data.
ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0, allow_deark=False)
# Remove original file.
try:
os.remove(file_path)
except:
pass
# Stop if extraction was successful.
if ret:
# Remove unpacked file.
try:
os.remove(unpacked_path)
except:
pass
return ret
# Keep the unpacked file around for other extractors to process.
return dest_dir
def _extract_rompaq(self, file_path, file_header, dest_dir, dest_dir_0, *, rompaq_path):
"""Extract Compaq ROMPAQ-compressed BIOS images using a ROMPAQ.EXE provided next to the image."""
# Copy original file and ROMPAQ.EXE to the destination directory.
# Also determine output file name.
rom_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.bin'
rom_path = os.path.join(dest_dir, rom_name)
exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe'
exe_path = os.path.join(dest_dir, exe_name)
try:
shutil.copy2(file_path, rom_path)
shutil.copy2(rompaq_path, exe_path)
except:
return True
# Set a name for the unpacked file.
unpacked_name = 'rompaq.bin'
unpacked_path = os.path.join(dest_dir, unpacked_name)
# Create batch file for extraction.
bat_path = os.path.join(dest_dir, 'autoexec.bat')
f = open(bat_path, 'wb')
f.write(b'd:\r\n' + exe_name.encode('cp437', 'ignore') + b' /D ' + rom_name.encode('cp437', 'ignore') + b' ' + unpacked_name.encode('cp437', 'ignore') + b'\r\n')
f.close()
# Run QEMU.
self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir)
# Remove temporary files.
util.remove_all((bat_path, rom_path, exe_path))
# Stop if unpacking was not successful.
if not os.path.exists(unpacked_path):
unpacked_path = os.path.join(dest_dir, unpacked_name.upper()) # just in case
if not os.path.exists(unpacked_path):
return False
# Remove original file.
try:
os.remove(file_path)
except:
pass
# Create dummy header file.
try:
open(os.path.join(dest_dir, ':header:'), 'wb').close()
except:
pass
# Return destination directory path.
return dest_dir