Files
86Box-bios-tools/biostools/__main__.py
2025-05-12 11:54:15 -03:00

1017 lines
32 KiB
Python

#!/usr/bin/python3 -u
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# Main BIOS extractor and analyzer program.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#
import errno, getopt, multiprocessing, os, pickle, re, socket, subprocess, sys, threading
from . import analyzers, extractors, formatters, util
# Constants.
ANALYZER_MAX_CACHE_MB = 512
DEFAULT_REMOTE_PORT = 8620
# Extraction module.
def extract_file(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_name):
"""Process a given file for extraction."""
# Build source file path.
file_path = os.path.join(scan_dir_path, scan_file_name)
# Remove links.
if os.path.islink(file_path):
try:
os.remove(file_path)
except:
try:
os.rmdir(file_path)
except:
pass
return
# Read header.
try:
f = open(file_path, 'rb')
file_data = f.read(32782) # upper limit set by ISOExtractor
f.close()
except:
# The file might have been removed after the fact by an extractor.
return
# Come up with a destination directory for this file.
dest_subdir = scan_dir_path[subdir_trim_index:]
while dest_subdir[:len(os.sep)] == os.sep:
dest_subdir = dest_subdir[len(os.sep):]
dest_file_path = os.path.join(dest_subdir, scan_file_name + ':')
dest_dir = os.path.join(next_dir_number_path, dest_file_path)
dest_dir_0 = os.path.join(os.path.dirname(next_dir_number_path), '0', dest_file_path)
# Run through file extractors until one succeeds.
for extractor in file_extractors:
# Run the extractor.
try:
extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0)
except extractors.MultifileStaleException:
# This file has gone missing between the multi-file lock being
# requested and successfully acquired. Stop extraction immediately.
break
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort on no space if requested.
print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='')
raise
# Log an error.
util.log_traceback('extracting', file_path)
continue
finally:
if extractor.multifile_locked:
extractor.multifile_locked = False
extractor.multifile_lock.release()
# Check if the extractor produced any results.
if extractor_result:
# Handle the line break ourselves, since Python prints the main
# body and line break separately, causing issues when multiple
# threads/processes are printing simultaneously.
print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='')
break
# Remove destination directories if they were created but are empty.
for to_remove in (dest_dir, dest_dir_0):
util.rmdirs(to_remove)
def extract_process(queue, abort_flag, multifile_lock, dir_number_path, next_dir_number_path, options):
"""Main loop for the extraction multiprocessing pool."""
# Set up extractors.
image_extractor = extractors.ImageExtractor()
if options['unpack-only']:
file_extractors = []
else:
file_extractors = [
extractors.DiscardExtractor(),
]
file_extractors += [
extractors.ISOExtractor(),
extractors.VMExtractor(),
extractors.PEExtractor(),
extractors.UnshieldExtractor(),
extractors.ASTExtractor(),
extractors.FATExtractor(),
extractors.MBRSafeExtractor(),
extractors.TarExtractor(),
extractors.ArchiveExtractor(),
extractors.CPUZExtractor(),
extractors.HexExtractor(),
image_extractor,
extractors.ApricotExtractor(),
extractors.IntelNewExtractor(),
]
if not options['unpack-only']:
file_extractors += [
extractors.DellExtractor(),
]
file_extractors += [
extractors.IntelExtractor(),
extractors.OCFExtractor(),
extractors.OMFExtractor(),
extractors.TrimondExtractor(),
extractors.InterleaveExtractor(),
]
if not options['unpack-only']:
file_extractors += [
extractors.BIOSExtractor(),
extractors.UEFIExtractor(),
]
file_extractors += [
extractors.MBRUnsafeExtractor(),
]
# Disable debug mode and add a reference to some common objects on all extractors.
dummy_func = lambda self, *args: None
for extractor in file_extractors:
extractor.multifile_lock = multifile_lock
extractor.image_extractor = image_extractor
if not options['debug']:
extractor.debug = False
extractor.debug_print = dummy_func
# Raise exceptions on no space if requested.
util.raise_enospc = options['enospc']
# Cache trim index values for determining a file's relative paths.
dir_number_path = dir_number_path.rstrip(os.sep)
subdir_trim_index = len(dir_number_path)
path_trim_index = len(os.path.dirname(dir_number_path)) + len(os.sep)
# Receive work from the queue.
while True:
item = queue.get()
if item == None: # special item to stop the loop
break
elif abort_flag.value:
continue
try:
extract_file(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item)
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort all threads if ENOSPC was raised.
abort_flag.value = 1
continue
raise
def extract(dir_path, _, options):
"""Main function for extraction."""
# Check if the structure is correct.
if not os.path.exists(os.path.join(dir_path, '1')):
print('Incorrect directory structure. All data to unpack should be located inside', file=sys.stderr)
print('a directory named 1 in turn located inside the given directory.', file=sys.stderr)
return 2
# Check if bios_extract is there.
if not os.path.exists(os.path.abspath(os.path.join('bios_extract', 'bios_extract'))):
print('bios_extract binary not found, did you compile it?', file=sys.stderr)
return 3
# Open devnull file for shell command output.
devnull = open(os.devnull, 'wb')
# Recurse through directory numbers.
dir_number = 1
while True:
dir_number_path = os.path.join(dir_path, str(dir_number))
next_dir_number_path = os.path.join(dir_path, str(dir_number + 1))
# Fix permissions on extracted archives.
print('Fixing up directory {0}:'.format(dir_number), end=' ', flush=True)
try:
print('chown', end=' ', flush=True)
subprocess.run(['chown', '-hR', '--reference=' + dir_path, '--', dir_number_path], stdout=devnull, stderr=subprocess.STDOUT)
print('chmod', end=' ', flush=True)
subprocess.run(['chmod', '-R', 'u+rwx', '--', dir_number_path], stdout=devnull, stderr=subprocess.STDOUT) # execute for listing directories
except:
pass
print()
# Start multiprocessing pool.
print('Starting extraction on directory {0}'.format(dir_number), end='', flush=True)
queue_size = options['threads'] + len(options['remote_servers'])
queue = multiprocessing.Queue(maxsize=queue_size * 8)
abort_flag = multiprocessing.Value('B', 0)
initargs = (queue, abort_flag, multiprocessing.Lock(), dir_number_path, next_dir_number_path, options)
mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs)
print(flush=True)
# Start remote clients.
remote_clients = []
for remote_server in options['remote_servers']:
remote_clients.append(RemoteClient(remote_server, 'x', initargs))
# Create next directory.
if not os.path.isdir(next_dir_number_path):
os.makedirs(next_dir_number_path)
# Scan directory structure.
found_any_files = False
for scan_dir_path, scan_dir_names, scan_file_names in os.walk(dir_number_path):
for scan_file_name in scan_file_names:
found_any_files = True
queue.put((scan_dir_path, scan_file_name))
if abort_flag.value: # stop feeding queue if a thread abort was requested
break
# Stop if no files are left.
if not found_any_files:
# Remove this directory and the directory if they're empty.
try:
os.rmdir(dir_number_path)
dir_number -= 1
except:
pass
try:
os.rmdir(next_dir_number_path)
except:
pass
break
# Increase number.
dir_number += 1
# Stop multiprocessing pool and wait for its workers to finish.
for _ in range(queue_size):
queue.put(None)
mp_pool.close()
mp_pool.join()
# Wait for remote clients to finish.
for client in remote_clients:
client.join()
# Abort extraction if a thread abort was requested.
if abort_flag.value:
return 1
# Create 0 directory if it doesn't exist.
print('Merging directories:', end=' ')
merge_dest_path = os.path.join(dir_path, '0')
if not os.path.isdir(merge_dest_path):
os.makedirs(merge_dest_path)
# Merge all directories into the 0 directory.
removals = []
for merge_dir_name in range(1, dir_number + 1):
merge_dir_path = os.path.join(dir_path, str(merge_dir_name))
if not os.path.isdir(merge_dir_path):
continue
print(merge_dir_name, end=' ')
subprocess.run(['cp', '-rlaT', merge_dir_path, merge_dest_path], stdout=devnull, stderr=subprocess.STDOUT)
removals.append(subprocess.Popen(['rm', '-rf', merge_dir_path], stdout=devnull, stderr=subprocess.STDOUT))
for removal in removals:
removal.wait()
# Clean up.
devnull.close()
print()
return 0
# Analysis module.
amipci_pattern = re.compile('''amipci_([0-9A-F]{4})_([0-9A-F]{4})\\.rom$''')
def analyze_files(formatter, scan_base, file_analyzers, scan_dir_path, scan_file_names):
"""Process the given files for analysis."""
# Set up caches.
files_flags = {}
files_data = {}
combined_oroms = []
header_data = None
# In combined mode (enabled by InterleaveExtractor and BIOSExtractor), we
# handle all files in the directory as a single large blob, to avoid any doubts.
combined = ':combined:' in scan_file_names
if combined:
files_data[''] = b''
# Sort file names for better predictability. The key= function forces
# "original.tm1" to be combined after "original.tmp" for if the Award
# identification data spans across both files (AOpen AX6B(+) R2.00)
if len(scan_file_names) > 1:
scan_file_names.sort(key=lambda fn: (fn == 'original.tm1') and 'original.tmq' or fn)
# Read files into the cache.
cache_quota = ANALYZER_MAX_CACHE_MB * 1073741824
for scan_file_name in scan_file_names:
# Read up to 16 MB as a safety net.
file_data = util.read_complement(os.path.join(scan_dir_path, scan_file_name))
# Write data to cache.
if scan_file_name == ':header:':
header_data = file_data
elif combined and scan_file_name != ':combined:':
files_data[''] += file_data
# Add PCI option ROM IDs extracted from AMI BIOSes by bios_extract, since the ROM might not
# contain a valid PCI header to begin with. (Apple PC Card with OPTi Viper and AMIBIOS 6)
match = amipci_pattern.match(scan_file_name)
if match:
combined_oroms.append((int(match.group(1), 16), int(match.group(2), 16)))
else:
files_data[scan_file_name] = file_data
# Stop reading if the cache has gotten too big.
cache_quota -= len(file_data)
if cache_quota <= 0:
break
# Prepare combined-mode analysis.
if combined:
# Set interleaved flag on de-interleaved blobs.
try:
flag_size = os.path.getsize(os.path.join(scan_dir_path, ':combined:'))
if flag_size >= 2:
combined = 'Interleaved'
if flag_size > 2:
combined += str(flag_size)
except:
pass
# Commit to only analyzing the large blob.
scan_file_names = ['']
elif header_data:
# Remove header flag file from list.
scan_file_names.remove(':header:')
# Analyze each file.
for scan_file_name in scan_file_names:
# Read file from cache if possible.
scan_file_path = os.path.join(scan_dir_path, scan_file_name)
file_data = files_data.get(scan_file_name, None)
if file_data == None:
# Read up to 16 MB as a safety net.
file_data = util.read_complement(scan_file_path)
# Check for an analyzer which can handle this file.
analyzer_file_path = combined and scan_dir_path or scan_file_path
bonus_analyzer_metadata = bonus_analyzer_oroms = None
bonus_analyzer_dmi = False
file_analyzer = None
strings = None
for analyzer in file_analyzers:
# Reset this analyzer.
analyzer.reset()
analyzer._file_path = scan_file_path
analyzer._bonus_dmi = bonus_analyzer_dmi
# Check if the analyzer can handle this file.
try:
analyzer_result = analyzer.can_handle(analyzer_file_path, file_data, header_data)
except:
# Log an error.
util.log_traceback('searching for analyzers for', os.path.join(scan_dir_path, scan_file_name))
continue
# Move on if the analyzer responded negatively.
if not analyzer_result:
# Extract metadata and option ROMs from the bonus analyzer.
if bonus_analyzer_metadata == None:
bonus_analyzer_metadata = analyzer.metadata
bonus_analyzer_oroms = analyzer.oroms
bonus_analyzer_dmi = 'DMI' in (key for key, value in bonus_analyzer_metadata)
continue
# Run strings on the file data if required (only once if requested by analyzer).
if analyzer.can_analyze():
if not strings:
try:
strings = subprocess.run(['strings', '-n8'], input=file_data, stdout=subprocess.PIPE).stdout.decode('ascii', 'ignore').split('\n')
except:
util.log_traceback('running strings on', os.path.join(scan_dir_path, scan_file_name))
continue
# Analyze each string.
try:
for string in strings:
analyzer.analyze_line(string)
except analyzers.AbortAnalysisError:
# Analysis aborted.
pass
except:
# Log an error.
util.log_traceback('analyzing', os.path.join(scan_dir_path, scan_file_name))
continue
# Take this analyzer if it produced a version.
if analyzer.version:
# Clean up version field if an unknown version was returned.
if analyzer.version == '?':
analyzer.version = ''
# Stop looking for analyzers.
file_analyzer = analyzer
break
# Did any analyzer successfully handle this file?
if not file_analyzer:
# Treat this as a standalone PCI option ROM file if BonusAnalyzer found any.
if bonus_analyzer_oroms:
bonus_analyzer_metadata = []
file_analyzer = file_analyzers[0]
else:
# Move on to the next file if nothing else.
continue
# Add interleaved flag to metadata.
if type(combined) == str:
bonus_analyzer_metadata.append(('ROM', combined))
# Clean up the file path.
scan_file_path_full = os.path.join(scan_dir_path, scan_file_name)
# Remove combined directories from the path.
found_flag_file = True
while found_flag_file:
# Find archive indicator.
archive_index = scan_file_path_full.rfind(':' + os.sep)
if archive_index == -1:
break
# Check if a combined or header flag file exists.
found_flag_file = False
for flag_file in (':combined:', ':header:'):
if os.path.exists(os.path.join(scan_file_path_full[:archive_index] + ':', flag_file)):
# Trim the directory off.
scan_file_path_full = scan_file_path_full[:archive_index]
found_flag_file = True
break
scan_file_path = scan_file_path_full[len(scan_base) + len(os.sep):]
# Remove root extraction directory.
slash_index = scan_file_path.find(os.sep)
if slash_index == 1 and scan_file_path[0] == '0':
scan_file_path = scan_file_path[2:]
# De-duplicate and sort metadata and option ROMs.
metadata = list(set('[{0}] {1}'.format(key, value.replace('\n', '\n' + (' ' * (len(key) + 3)))).strip() for key, value in (analyzer.metadata + bonus_analyzer_metadata)))
metadata.sort()
oroms = list(set(combined_oroms + analyzer.oroms + bonus_analyzer_oroms))
oroms.sort()
# Add names to option ROMs.
previous_vendor = previous_device = None
for x in range(len(oroms)):
if oroms[x][0] == -1 and type(oroms[x][1]) == str: # generic ROM
# Format string.
oroms[x] = '[{1}] {2}'.format(*oroms[x]).replace('\n', '\n' + (' ' * (len(oroms[x][1]) + 3)))
elif len(oroms[x]) == 2: # PCI ROM
# Get vendor and device IDs and names.
vendor_id, device_id = oroms[x]
vendor, device = util.get_pci_id(vendor_id, device_id)
# Skip valid vendor IDs associated to a bogus device ID.
if device == '[Unknown]' and device_id == 0x0000:
oroms[x] = None
continue
# Clean up IDs.
vendor = util.clean_vendor(vendor).strip()
device = util.clean_device(device, vendor).strip()
# De-duplicate vendor names.
if vendor == previous_vendor and vendor != '[Unknown]':
if device == previous_device:
previous_device, device = device, ''
previous_vendor, vendor = vendor, '\u2196' # up-left arrow
else:
previous_device = device
previous_vendor, vendor = vendor, ' ' * len(vendor)
else:
previous_device = device
previous_vendor = vendor
# Format string.
oroms[x] = '[{0:04x}:{1:04x}] {2} {3}'.format(vendor_id, device_id, vendor, device)
else: # PnP ROM
# Get PnP ID, vendor name and device name.
device_id, vendor, device = oroms[x]
# Extract ASCII letters from the PnP ID.
pnp_id = ''.join(chr(0x40 + (letter & 0x1f)) for letter in (device_id >> 26, device_id >> 21, device_id >> 16))
# Add the numeric part of the PnP ID.
pnp_id += format(device_id & 0xffff, '04x').upper()
# Clean up vendor and device names.
vendor_device = ((vendor or '') + '\n' + (device or '')).replace('\r', '')
vendor_device = '\n'.join(x.strip() for x in vendor_device.split('\n') if x.strip())
# Format string.
oroms[x] = '[{0}] {1}'.format(pnp_id, vendor_device.replace('\n', '\n' + (' ' * (len(pnp_id) + 3))))
# Remove bogus option ROM device ID entries.
while None in oroms:
oroms.remove(None)
# Add file name in single-file analysis.
if not scan_dir_path and not scan_file_path:
scan_file_path = os.path.basename(scan_base)
# Collect the analyzer's results.
fields = [((type(field) == str) and field.replace('\t', ' ').strip() or field) for field in [
scan_file_path,
file_analyzer.vendor,
file_analyzer.version,
formatter.split_if_required('\n', file_analyzer.string),
formatter.split_if_required('\n', file_analyzer.signon),
formatter.join_if_required('\n', metadata),
formatter.join_if_required('\n', oroms),
]]
# Output the results.
formatter.output_row(fields)
def analyze_process(queue, formatter, scan_base, options):
"""Main loop for the analysis multiprocessing pool."""
# Set up analyzers.
file_analyzers = [
analyzers.BonusAnalyzer(), # must be the first one
analyzers.AwardPowerAnalyzer(), # must run before AwardAnalyzer
analyzers.ToshibaAnalyzer(), # must run before AwardAnalyzer
analyzers.AwardAnalyzer(), # must run before PhoenixAnalyzer
analyzers.QuadtelAnalyzer(), # must run before PhoenixAnalyzer
analyzers.PhoenixAnalyzer(), # must run before AMIDellAnalyzer and AMIIntelAnalyzer
analyzers.AMIUEFIAnalyzer(), # must run before AMIAnalyzer
analyzers.AMIAnalyzer(), # must run before AMIIntelAnalyzer
analyzers.AMIIntelAnalyzer(),
analyzers.MRAnalyzer(),
# less common BIOSes with no dependencies on the common part begin here #
analyzers.AcerAnalyzer(),
analyzers.AcerMultitechAnalyzer(),
analyzers.AmproAnalyzer(),
analyzers.AmstradAnalyzer(),
analyzers.CDIAnalyzer(),
analyzers.CentralPointAnalyzer(),
analyzers.ChipsAnalyzer(),
analyzers.CommodoreAnalyzer(),
analyzers.CompaqAnalyzer(),
analyzers.CopamAnalyzer(),
analyzers.CorebootAnalyzer(),
analyzers.DTKAnalyzer(),
analyzers.GeneralSoftwareAnalyzer(),
analyzers.IBMSurePathAnalyzer(),
analyzers.IBMAnalyzer(),
analyzers.ICLAnalyzer(),
analyzers.InsydeAnalyzer(),
analyzers.IntelUEFIAnalyzer(),
analyzers.JukoAnalyzer(),
analyzers.MRAnalyzer(),
analyzers.MylexAnalyzer(),
analyzers.OlivettiAnalyzer(),
analyzers.PhilipsAnalyzer(),
analyzers.PromagAnalyzer(),
analyzers.SchneiderAnalyzer(),
analyzers.SystemSoftAnalyzer(),
analyzers.TandonAnalyzer(),
analyzers.TinyBIOSAnalyzer(),
analyzers.WhizproAnalyzer(),
analyzers.ZenithAnalyzer(),
]
# Disable debug mode on all analyzers.
if not options['debug']:
dummy_func = lambda self, *args: None
for analyzer in file_analyzers:
analyzer.debug_print = dummy_func
analyzer.debug = False
# Receive work from the queue.
while True:
item = queue.get()
if item == None: # special item to stop the loop
break
analyze_files(formatter, scan_base, file_analyzers, *item)
def analyze(dir_path, formatter_args, options):
"""Main function for analysis."""
# Initialize output formatter.
output_formats = {
'csv': (formatters.XSVFormatter, ','),
'scsv': (formatters.XSVFormatter, ';'),
'json': formatters.JSONObjectFormatter,
'jsontable': formatters.JSONTableFormatter,
}
formatter = output_formats.get(options['format'], None)
if not formatter:
raise Exception('unknown output format ' + options['format'])
if type(formatter) == tuple:
formatter = formatter[0](*formatter[1:], sys.stdout, options, formatter_args)
else:
formatter = formatter(sys.stdout, options, formatter_args)
# Begin output.
formatter.begin()
formatter.output_headers(['File', 'Vendor', 'Version', 'String', 'Sign-on', 'Metadata', 'ROMs'], options.get('headers'))
# Remove any trailing slash from the root path, as the output path cleanup
# functions rely on it not being present.
if dir_path[-len(os.sep):] == os.sep:
dir_path = dir_path[:-len(os.sep)]
elif dir_path[-1:] == '/':
dir_path = dir_path[:-1]
# Start multiprocessing pool.
queue = multiprocessing.Queue(maxsize=options['threads'] * 8)
mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options))
if os.path.isdir(dir_path):
# Scan directory structure.
for scan_dir_path, scan_dir_names, scan_file_names in os.walk(dir_path):
if ':combined:' in scan_file_names or ':header:' in scan_file_names: # combined mode: process entire directory at once
queue.put((scan_dir_path, scan_file_names))
else: # regular mode: process individual files
for scan_file_name in scan_file_names:
queue.put((scan_dir_path, [scan_file_name]))
else:
# Scan single file.
queue.put(('', [dir_path]))
# Stop multiprocessing pool and wait for its workers to finish.
for _ in range(options['threads']):
queue.put(None)
mp_pool.close()
mp_pool.join()
# End output.
formatter.end()
return 0
# Remote server module.
class DummyAbortFlag:
def __init__(self):
self.value = False
class RemoteClient:
"""State and functions for communicating with a remote server."""
def __init__(self, addr, action, initargs):
# Initialize state.
self.action = action
if isinstance(initargs[0], multiprocessing.Value):
self.initargs = (DummyAbortFlag(),) + initargs[2:]
self.abort_flag = initargs[0]
else:
self.initargs = initargs[1:]
self.abort_flag = DummyAbortFlag()
self.queue = initargs[0]
self.sock = self.f = None
self.queue_lock = threading.Lock()
self.write_lock = threading.Lock()
self.close_event = threading.Event()
self.close_event.clear()
# Parse address:port.
addr_split = addr.split(':')
self.port = DEFAULT_REMOTE_PORT
if len(addr_split) == 0:
return
elif len(addr_split) == 1:
self.addr = addr_split[0]
else:
self.port = int(addr_split[1])
self.addr = addr_split[0]
# Start client thread.
self.queue_thread = None
self.client_thread = threading.Thread(target=self.client_thread_func)
self.client_thread.daemon = True
self.client_thread.start()
def client_thread_func(self):
"""Thread function for a remote client."""
# Connect to server.
print('Connecting to {0}:{1}\n'.format(self.addr, self.port), end='')
self.sock = socket.create_connection((self.addr, self.port))
self.f = self.sock.makefile('rwb')
print('Connected to {0}:{1}\n'.format(self.addr, self.port), end='')
# Start multiprocessing pool.
self.f.write((self.action + '\n').encode('utf8', 'ignore'))
self.f.write(pickle.dumps(self.initargs))
self.f.flush()
# Read responses from server.
while True:
try:
line = self.f.readline().rstrip(b'\r\n')
except:
break
if not line:
break
if line[0:1] in b'xa':
# Multiprocessing pool started, now start the queue thread.
self.queue_thread = threading.Thread(target=self.queue_thread_func)
self.queue_thread.daemon = True
self.queue_thread.start()
elif line[0:1] == b'q':
# Allow queue thread to proceed.
try:
self.queue_lock.release()
except:
pass
elif line[0:1] == b'j':
# We're done.
self.close_event.set()
break
# Close connection.
try:
self.f.close()
except:
pass
try:
self.sock.close()
except:
pass
print('Disconnected from {0}:{1}\n'.format(self.addr, self.port), end='')
def queue_thread_func(self):
"""Thread function to remove items from the local
queue and push them to the remote server's queue."""
while True:
# Wait for the queue to be available.
self.queue_lock.acquire()
# Read queue item.
item = self.queue.get()
if item == None or self.abort_flag.value: # special item to stop the loop
self.close()
break
# Send queue item to server.
scan_dir_path, scan_file_names = item
with self.write_lock:
self.f.write(b'q' + scan_dir_path.encode('utf8', 'ignore'))
for scan_file_name in scan_file_names:
self.f.write(b'\x00' + scan_file_name.encode('utf8', 'ignore'))
self.f.write(b'\n')
self.f.flush()
def close(self):
"""Close connection to the server."""
# Write stop message.
with self.write_lock:
try:
self.f.write(b'j\n')
self.f.flush()
except:
return
def join(self):
"""Wait for the server connection to be closed."""
self.close_event.wait()
class RemoteServerClient:
"""State and functions for communicating with remote clients."""
def __init__(self, accept, options):
# Initialize state.
self.sock, self.addr = accept
self.options = options
self.queue = self.mp_pool = None
self.write_lock = threading.Lock()
self.queue_lock = threading.Lock()
self.f = self.sock.makefile('rwb')
# Start client thread.
self.client_thread = threading.Thread(target=self.client_thread_func)
self.client_thread.daemon = True
self.client_thread.start()
def client_thread_func(self):
"""Thread function for a remote client."""
print(self.addr, 'New connection')
# Parse commands.
while True:
try:
line = self.f.readline().rstrip(b'\r\n')
except:
break
if not line:
break
if line[0:1] in b'xa':
# Start multiprocessing pool.
print(self.addr, 'Starting pool for', (line[0] == b'x') and 'extraction' or 'analysis')
self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 8)
if line[0:1] == b'x':
func = extract_process
else:
func = analyze_process
self.mp_pool = multiprocessing.Pool(self.options['threads'], initializer=func, initargs=(self.queue,) + pickle.load(self.f))
elif line[0:1] == b'q':
# Add directory to queue.
file_list = [item.decode('utf8', 'ignore') for item in line[1:].split(b'\x00')]
if self.options['debug']:
print(self.addr, 'Queuing', file_list[0], 'with', len(file_list) - 1, 'files')
if self.queue:
self.queue.put((file_list[0], file_list[1:]))
else:
print(self.addr, 'Attempted queuing with no queue')
elif line[0:1] == b'j':
# Stop multiprocessing pool and wait for its workers to finish.
print(self.addr, 'Waiting for pool')
if self.mp_pool and self.queue:
for _ in range(self.options['threads']):
self.queue.put(None)
self.mp_pool.close()
self.mp_pool.join()
self.mp_pool = None
else:
print(self.addr, 'Attempted pool wait with no pool/queue')
# Write acknowledgement.
with self.write_lock:
self.f.write(line[0:1] + b'\n')
self.f.flush()
# Stop if requested by the client.
if line[0:1] == b'j':
break
# Close connection.
print(self.addr, 'Closing connection')
try:
self.f.close()
except:
pass
try:
self.sock.close()
except:
pass
if self.mp_pool:
self.mp_pool.close()
self.mp_pool.join()
def remote_server(dir_path, formatter_args, options):
# Create server and listen for connections.
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('', options['remote_port']))
server.listen(5)
print('Listening on port', options['remote_port'])
# Receive connections.
try:
while True:
RemoteServerClient(server.accept(), options)
except KeyboardInterrupt:
pass
# Close server.
print('Closing server')
server.close()
return 0
def main():
# Set default options.
mode = None
options = {
'array': False,
'debug': False,
'enospc': False,
'format': 'csv',
'headers': True,
'hyperlink': False,
'threads': 0,
'unpack-only': False,
'docker-usage': False,
'remote_servers': [],
'remote_port': 0,
}
# Parse arguments.
args, remainder = getopt.gnu_getopt(sys.argv[1:], 'xadf:hnrtu', ['extract', 'analyze', 'debug', 'format=', 'hyperlink', 'no-headers', 'array', 'threads', 'unpack-only', 'remote=', 'remote-server', 'docker-usage'])
for opt, arg in args:
if opt in ('-x', '--extract'):
mode = extract
elif opt in ('-a', '--analyze'):
mode = analyze
elif opt in ('-d', '--debug'):
options['debug'] = True
elif opt in ('-f', '--format'):
options['format'] = arg.lower()
elif opt in ('-h', '--hyperlink'):
options['hyperlink'] = True
elif opt in ('-n', '--no-headers'):
options['headers'] = False
options['enospc'] = True
elif opt in ('-r', '--array'):
options['array'] = True
elif opt in ('-t', '--threads'):
try:
options['threads'] = int(arg)
except:
pass
elif opt in ('-u', '--unpack-only'):
options['unpack-only'] = True
elif opt == '--remote':
options['remote_servers'].append(arg)
elif opt == '--remote-server':
mode = remote_server
try:
options['remote_port'] = int(remainder[0])
except:
pass
remainder.append(None) # dummy
elif opt == '--docker-usage':
options['docker-usage'] = True
if len(remainder) > 0:
# Set default numeric options.
if options['threads'] <= 0:
options['threads'] = options['debug'] and 1 or (os.cpu_count() or 4)
if options['remote_port'] <= 0:
options['remote_port'] = DEFAULT_REMOTE_PORT
# Run mode handler.
if mode:
return mode(remainder[0], remainder[1:], options)
# Print usage.
if options['docker-usage']:
usage = '''
Usage: docker run -v directory:/bios biostools [-d] [-f output_format] [-h] [-n] [-r] [formatter_options]
Archives and BIOS images in the directory mounted to /bios will be
extracted and analyzed.
'''
else:
usage = '''
Usage: python3 -m biostools [-d] [-n] [-t threads] [-u] -x directory
python3 -m biostools [-d] [-f output_format] [-h] [-n] [-r] [-t threads]
-a directory|single_file [formatter_options]
-x Extract archives and BIOS images recursively in the given directory
-n Abort extraction if disk space runs out.
-u Extract archives only, don't extract BIOS images.
-a Analyze extracted BIOS images in the given directory, or a single
extracted file (extracting with -x first is recommended)'''
usage += '''
-f Output format:
csv Comma-separated values with quotes (default)
scsv Semicolon-separated values with quotes
json JSON object array
jsontable JSON table
-h Generate download links for file paths representing HTTP URLs.
csv/scsv: The Excel HYPERLINK formula is used; if you have
non-English Excel, you must provide your language's
HYPERLINK formula name in formatter_options.
-n csv/scsv/jsontable: Don't output column headers.
-r json/jsontable: Output multi-value cells as arrays.
Common options (applicable to both -x and -a modes):
-d Enable debug output.
-t Set number of threads to use.
'''
print(usage, file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())