From f7955bcdc752d2da406a568a9f1485008fe7471e Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Wed, 18 May 2022 22:01:44 -0300 Subject: [PATCH] Speed up extraction with file-level parallelism where supported --- biostools/__main__.py | 134 ++++++++++++++++++++-------------------- biostools/extractors.py | 53 +++++++++++----- 2 files changed, 103 insertions(+), 84 deletions(-) diff --git a/biostools/__main__.py b/biostools/__main__.py index 4a78f13..1ff0502 100644 --- a/biostools/__main__.py +++ b/biostools/__main__.py @@ -26,75 +26,76 @@ DEFAULT_REMOTE_PORT = 8620 # Extraction module. -def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_names): - """Process a given directory for extraction.""" +def extract_file(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_name): + """Process a given file for extraction.""" - # Determine the destination subdirectory. + # Build source file path. + file_path = os.path.join(scan_dir_path, scan_file_name) + + # Remove links. + if os.path.islink(file_path): + try: + os.remove(file_path) + except: + try: + os.rmdir(file_path) + except: + pass + return + + # Read header. + try: + f = open(file_path, 'rb') + file_data = f.read(32782) # upper limit set by ISOExtractor + f.close() + except: + # The file might have been removed after the fact by an extractor. + return + + # Come up with a destination directory for this file. dest_subdir = scan_dir_path[subdir_trim_index:] while dest_subdir[:len(os.sep)] == os.sep: dest_subdir = dest_subdir[len(os.sep):] + dest_file_path = os.path.join(dest_subdir, scan_file_name + ':') + dest_dir = os.path.join(next_dir_number_path, dest_file_path) + dest_dir_0 = os.path.join(os.path.dirname(next_dir_number_path), '0', dest_file_path) - # Iterate through files. - for scan_file_name in scan_file_names: - file_path = os.path.join(scan_dir_path, scan_file_name) - - # Remove links. - if os.path.islink(file_path): - try: - os.remove(file_path) - except: - try: - os.rmdir(file_path) - except: - pass - continue - - # Read header. + # Run through file extractors until one succeeds. + for extractor in file_extractors: + # Run the extractor. try: - f = open(file_path, 'rb') - file_data = f.read(32782) # upper limit set by ISOExtractor - f.close() - except: - # Permission issues or after-the-fact removal of other files by - # extractors can cause this. Give up. + extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0) + except extractors.MultifileStaleException: + # This file has gone missing between the multi-file lock being + # requested and successfully acquired. Stop extraction immediately. + break + except Exception as e: + if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: + # Abort on no space if requested. + print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='') + raise + + # Log an error. + util.log_traceback('extracting', file_path) continue + finally: + if extractor.multifile_locked: + extractor.multifile_locked = False + extractor.multifile_lock.release() - # Come up with a destination directory for this file. - dest_file_path = os.path.join(dest_subdir, scan_file_name + ':') - dest_dir = os.path.join(next_dir_number_path, dest_file_path) - dest_dir_0 = os.path.join(os.path.dirname(next_dir_number_path), '0', dest_file_path) + # Check if the extractor produced any results. + if extractor_result: + # Handle the line break ourselves, since Python prints the main + # body and line break separately, causing issues when multiple + # threads/processes are printing simultaneously. + print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='') + break - # Run through file extractors until one succeeds. - for extractor in file_extractors: - # Run the extractor. - try: - extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0) - except Exception as e: - if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: - # Abort on no space if requested. - print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='') - raise + # Remove destination directories if they were created but are empty. + for to_remove in (dest_dir, dest_dir_0): + util.rmdirs(to_remove) - # Log an error. - util.log_traceback('extracting', file_path) - continue - - # Check if the extractor produced any results. - if extractor_result: - # Handle the line break ourselves, since Python prints the main - # body and line break separately, causing issues when multiple - # threads/processes are printing simultaneously. - print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='') - break - - # Remove destination directories if they were created but are empty. - for to_remove in (dest_dir, dest_dir_0): - util.rmdirs(to_remove) - - # Remove this directory if it ends up empty. - util.rmdirs(scan_dir_path) - -def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, options): +def extract_process(queue, abort_flag, multifile_lock, dir_number_path, next_dir_number_path, options): """Main loop for the extraction multiprocessing pool.""" # Set up extractors. @@ -125,9 +126,10 @@ def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, op extractors.MBRUnsafeExtractor(), ] - # Disable debug mode and add a reference to the image extractor on all extractors. + # Disable debug mode and add a reference to some common objects on all extractors. dummy_func = lambda self, *args: None for extractor in file_extractors: + extractor.multifile_lock = multifile_lock extractor.image_extractor = image_extractor if not options['debug']: extractor.debug = False @@ -149,7 +151,7 @@ def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, op elif abort_flag.value: continue try: - extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item) + extract_file(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item) except Exception as e: if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC: # Abort all threads if ENOSPC was raised. @@ -196,7 +198,7 @@ def extract(dir_path, _, options): queue_size = options['threads'] + len(options['remote_servers']) queue = multiprocessing.Queue(maxsize=queue_size * 8) abort_flag = multiprocessing.Value('B', 0) - initargs = (queue, abort_flag, dir_number_path, next_dir_number_path, options) + initargs = (queue, abort_flag, multiprocessing.Lock(), dir_number_path, next_dir_number_path, options) mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs) print(flush=True) @@ -209,14 +211,12 @@ def extract(dir_path, _, options): if not os.path.isdir(next_dir_number_path): os.makedirs(next_dir_number_path) - # Scan directory structure. I really wanted this to have file-level - # granularity, but IntelExtractor and InterleaveBIOSExtractor - # both require directory-level granularity for inspecting other files. + # Scan directory structure. found_any_files = False for scan_dir_path, scan_dir_names, scan_file_names in os.walk(dir_number_path): - if len(scan_file_names) > 0: + for scan_file_name in scan_file_names: found_any_files = True - queue.put((scan_dir_path, scan_file_names)) + queue.put((scan_dir_path, scan_file_name)) if abort_flag.value: # stop feeding queue if a thread abort was requested break diff --git a/biostools/extractors.py b/biostools/extractors.py index 66c9554..d9c6836 100644 --- a/biostools/extractors.py +++ b/biostools/extractors.py @@ -23,9 +23,16 @@ except ImportError: PIL.Image = None from . import util + +class MultifileStaleException(Exception): + """Exception raised by Extractor.multifile_lock_acquire() if the + file has gone missing after the multi-file lock was acquired.""" + pass + class Extractor: def __init__(self): self.debug = True + self.multifile_locked = False def extract(self, file_path, file_header, dest_dir, dest_dir_0): """Extract the given file into one of the destination directories: @@ -40,6 +47,16 @@ class Extractor: """Print a log line if debug output is enabled.""" print(self.__class__.__name__ + ':', *args, file=sys.stderr) + def multifile_lock_acquire(self, file_path): + """Acquire the global multi-file lock. The lock is automatically released + by the main module after extract() returns or raises an exception.""" + self.multifile_lock.acquire() + self.multifile_locked = True + + # Raise the special exception if another extractor already processed this file. + if not os.path.exists(file_path): + raise MultifileStaleException() + class ApricotExtractor(Extractor): """Extract Apricot BIOS recovery files. Only one instance of this format @@ -1473,11 +1490,12 @@ class IntelExtractor(Extractor): try: os.remove(file_path) except: - import traceback - traceback.print_exc() pass return True + # Acquire the multi-file lock. + self.multifile_lock_acquire(file_path) + # Scan this directory's contents. dir_path = os.path.dirname(file_path) dir_files = {} @@ -1866,6 +1884,9 @@ class InterleaveExtractor(Extractor): if not counterpart_string_sets: return False + # Acquire the multi-file lock. + self.multifile_lock_acquire(file_path) + # Create temporary interleaved data array. part_size = min(os.path.getsize(file_path), 16777216) data = [] @@ -1886,20 +1907,10 @@ class InterleaveExtractor(Extractor): # Skip any files which differ in size. file_in_dir_size = 0 - for retry in range(10): # mergerfs hack - try: - file_in_dir_size = os.path.getsize(file_in_dir_path) - try: - if retry > 0: - with util._error_log_lock: - f = open('biostools_retry.log', 'a') - f.write('{0} => {1} (took {2} retries)\n'.format(file_path, file_in_dir, retry)) - f.close() - except: - pass - break - except: - time.sleep(retry) + try: + file_in_dir_size = os.path.getsize(file_in_dir_path) + except: + continue if file_in_dir_size != file_size: continue @@ -2240,6 +2251,9 @@ class TrimondExtractor(Extractor): if pow2 - file_size not in (8192, 16384, 32768): return False + # Acquire the multi-file lock. + self.multifile_lock_acquire(file_path) + # As a second safety layer, check for Trimond's flasher files. dir_path, file_name = os.path.split(file_path) dir_files = os.listdir(dir_path) @@ -2489,6 +2503,9 @@ class VMExtractor(ArchiveExtractor): if file_header[28:32] == b'LZ91': extractor = self._extract_lzexe elif file_header[30:36] == b'PKLITE': + # Acquire the multi-file lock. This is required for ROMPAQ below. + self.multifile_lock_acquire(file_path) + extractor = self._extract_pklite else: match = self._floppy_pattern.search(file_header) @@ -2498,12 +2515,14 @@ class VMExtractor(ArchiveExtractor): elif self._eti_pattern.match(file_header): extractor = self._extract_eti elif self._rompaq_pattern.match(file_header): + # Acquire the multi-file lock. + self.multifile_lock_acquire(file_path) + # The ROMPAQ format appears to be version specific in some way. # We will only extract files that have a ROMPAQ.EXE next to them. dir_path = os.path.dirname(file_path) rompaq_path = None for file_in_dir in os.listdir(dir_path): - if file_in_dir.lower() == 'rompaq.exe': rompaq_path = os.path.join(dir_path, file_in_dir) break