From 3ab0f5c82206e826fbc330176ddd2c5e285fa418 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Sun, 15 May 2022 14:37:04 -0300 Subject: [PATCH] Add abort-on-ENOSPC command line flag --- biostools/__main__.py | 66 +++++++++++++++++++++++++++++++++---------- biostools/util.py | 22 ++++++++++----- 2 files changed, 66 insertions(+), 22 deletions(-) diff --git a/biostools/__main__.py b/biostools/__main__.py index 37da4e0..4f23cc1 100644 --- a/biostools/__main__.py +++ b/biostools/__main__.py @@ -16,7 +16,7 @@ # Copyright 2021 RichardG. # -import getopt, os, pickle, multiprocessing, re, socket, subprocess, sys, threading +import errno, getopt, os, pickle, multiprocessing, re, socket, subprocess, sys, threading from . import analyzers, extractors, formatters, util # Constants. @@ -69,7 +69,12 @@ def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_nu # Run the extractor. try: extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0) - except: + except Exception as e: + if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + # Abort on no space if requested. + print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='') + raise + # Log an error. util.log_traceback('extracting', file_path) continue @@ -89,7 +94,7 @@ def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_nu # Remove this directory if it ends up empty. util.rmdirs(scan_dir_path) -def extract_process(queue, dir_number_path, next_dir_number_path, debug): +def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, options): """Main loop for the extraction multiprocessing pool.""" # Set up extractors. @@ -119,11 +124,14 @@ def extract_process(queue, dir_number_path, next_dir_number_path, debug): ] # Disable debug mode on extractors. - if not debug: + if not options['debug']: dummy_func = lambda self, *args: None for extractor in file_extractors: extractor.debug_print = dummy_func + # Raise exceptions on no space if requested. + util.raise_enospc = options['enospc'] + # Cache trim index values for determining a file's relative paths. dir_number_path = dir_number_path.rstrip(os.sep) subdir_trim_index = len(dir_number_path) @@ -134,7 +142,16 @@ def extract_process(queue, dir_number_path, next_dir_number_path, debug): item = queue.get() if item == None: # special item to stop the loop break - extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item) + elif abort_flag.value: + continue + try: + extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item) + except Exception as e: + if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + # Abort all threads if ENOSPC was raised. + abort_flag.value = 1 + continue + raise def extract(dir_path, _, options): """Main function for extraction.""" @@ -173,8 +190,9 @@ def extract(dir_path, _, options): # Start multiprocessing pool. print('Starting extraction on directory {0}'.format(dir_number), end='', flush=True) queue_size = options['threads'] + len(options['remote_servers']) - queue = multiprocessing.Queue(maxsize=queue_size * 4) - initargs = (queue, dir_number_path, next_dir_number_path, options['debug']) + queue = multiprocessing.Queue(maxsize=queue_size * 8) + abort_flag = multiprocessing.Value('B', 0) + initargs = (queue, abort_flag, dir_number_path, next_dir_number_path, options) mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs) print(flush=True) @@ -195,6 +213,8 @@ def extract(dir_path, _, options): if len(scan_file_names) > 0: found_any_files = True queue.put((scan_dir_path, scan_file_names)) + if abort_flag.value: # stop feeding queue if a thread abort was requested + break # Stop if no files are left. if not found_any_files: @@ -223,6 +243,10 @@ def extract(dir_path, _, options): for client in remote_clients: client.join() + # Abort extraction if a thread abort was requested. + if abort_flag.value: + return 1 + # Create 0 directory if it doesn't exist. print('Merging directories:', end=' ') merge_dest_path = os.path.join(dir_path, '0') @@ -494,7 +518,7 @@ def analyze_dir(formatter, scan_base, file_analyzers, scan_dir_path, scan_file_n # Output the results. formatter.output_row(fields) -def analyze_process(queue, formatter, scan_base, debug): +def analyze_process(queue, formatter, scan_base, options): """Main loop for the analysis multiprocessing pool.""" # Set up analyzers. @@ -543,7 +567,7 @@ def analyze_process(queue, formatter, scan_base, debug): ] # Disable debug mode on analyzers. - if not debug: + if not options['debug']: dummy_func = lambda self, *args: None for analyzer in file_analyzers: analyzer.debug_print = dummy_func @@ -586,8 +610,8 @@ def analyze(dir_path, formatter_args, options): dir_path = dir_path[:-1] # Start multiprocessing pool. - queue = multiprocessing.Queue(maxsize=options['threads'] * 4) - mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options['debug'])) + queue = multiprocessing.Queue(maxsize=options['threads'] * 8) + mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options)) if os.path.isdir(dir_path): # Scan directory structure. @@ -611,13 +635,22 @@ def analyze(dir_path, formatter_args, options): # Remote server module. +class DummyAbortFlag: + def __init__(self): + self.value = False + class RemoteClient: """State and functions for communicating with a remote server.""" def __init__(self, addr, action, initargs): # Initialize state. self.action = action - self.initargs = initargs[1:] + if isinstance(initargs[0], multiprocessing.Value): + self.initargs = (DummyAbortFlag(),) + initargs[2:] + self.abort_flag = initargs[0] + else: + self.initargs = initargs[1:] + self.abort_flag = DummyAbortFlag() self.queue = initargs[0] self.sock = self.f = None @@ -703,7 +736,7 @@ class RemoteClient: # Read queue item. item = self.queue.get() - if item == None: # special item to stop the loop + if item == None or self.abort_flag.value: # special item to stop the loop self.close() break @@ -766,7 +799,7 @@ class RemoteServerClient: if line[0:1] in b'xa': # Start multiprocessing pool. print(self.addr, 'Starting pool for', (line[0] == b'x') and 'extraction' or 'analysis') - self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 4) + self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 8) if line[0:1] == b'x': func = extract_process else: @@ -844,6 +877,7 @@ def main(): options = { 'array': False, 'debug': False, + 'enospc': False, 'format': 'csv', 'headers': True, 'hyperlink': False, @@ -868,6 +902,7 @@ def main(): options['hyperlink'] = True elif opt in ('-n', '--no-headers'): options['headers'] = False + options['enospc'] = True elif opt in ('-r', '--array'): options['array'] = True elif opt in ('-t', '--threads'): @@ -908,11 +943,12 @@ Usage: docker run -v directory:/bios biostools [-d] [-f output_format] [-h] [-n] ''' else: usage = ''' -Usage: python3 -m biostools [-d] [-t threads] -x directory +Usage: python3 -m biostools [-d] [-n] [-t threads] -x directory python3 -m biostools [-d] [-f output_format] [-h] [-n] [-r] [-t threads] -a directory|single_file [formatter_options] -x Extract archives and BIOS images recursively in the given directory + -n Abort extraction if disk space runs out. -a Analyze extracted BIOS images in the given directory, or a single extracted file (extracting with -x first is recommended)''' diff --git a/biostools/util.py b/biostools/util.py index 7034b16..4cc9740 100644 --- a/biostools/util.py +++ b/biostools/util.py @@ -15,7 +15,7 @@ # # Copyright 2021 RichardG. # -import multiprocessing, os, math, random, re, shutil, traceback, urllib.request +import errno, multiprocessing, os, math, random, re, shutil, traceback, urllib.request from biostools.pciutil import * date_pattern_mmddyy = re.compile('''(?P[0-9]{2})/(?P[0-9]{2})/(?P[0-9]{2,4})''') @@ -29,6 +29,8 @@ base62 = digits + lowercase + uppercase random_name_symbols = lowercase + digits + fn_symbols + uppercase random_name_nosymbols = lowercase + digits + uppercase +raise_enospc = False + _error_log_lock = multiprocessing.Lock() @@ -147,7 +149,9 @@ def hardlink_or_copy(src, dest): except: try: shutil.copy2(src, dest) - except: + except Exception as e: + if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + raise return False return True @@ -204,7 +208,9 @@ def rmdirs(dir_path): dir_path = os.path.dirname(dir_path) except OSError: break - except: + except Exception as e: + if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + raise continue return removed_count @@ -218,8 +224,9 @@ def remove_all(files, func=lambda x: x): for subfile in file: try: os.remove(subfile) - except: - pass + except Exception as e: + if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + raise def remove_extension(file_name): """Remove file_name's extension, if one is present.""" @@ -233,6 +240,7 @@ def try_makedirs(dir_path): """Try to create dir_path. Returns True if successful, False if not.""" try: os.makedirs(dir_path) - except: - pass + except Exception as e: + if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + raise return os.path.isdir(dir_path)