Speed up extraction with file-level parallelism where supported

This commit is contained in:
RichardG867
2022-05-18 22:01:44 -03:00
parent 5ee832b8b8
commit f7955bcdc7
2 changed files with 103 additions and 84 deletions

View File

@@ -26,75 +26,76 @@ DEFAULT_REMOTE_PORT = 8620
# Extraction module.
def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_names):
"""Process a given directory for extraction."""
def extract_file(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_name):
"""Process a given file for extraction."""
# Determine the destination subdirectory.
# Build source file path.
file_path = os.path.join(scan_dir_path, scan_file_name)
# Remove links.
if os.path.islink(file_path):
try:
os.remove(file_path)
except:
try:
os.rmdir(file_path)
except:
pass
return
# Read header.
try:
f = open(file_path, 'rb')
file_data = f.read(32782) # upper limit set by ISOExtractor
f.close()
except:
# The file might have been removed after the fact by an extractor.
return
# Come up with a destination directory for this file.
dest_subdir = scan_dir_path[subdir_trim_index:]
while dest_subdir[:len(os.sep)] == os.sep:
dest_subdir = dest_subdir[len(os.sep):]
dest_file_path = os.path.join(dest_subdir, scan_file_name + ':')
dest_dir = os.path.join(next_dir_number_path, dest_file_path)
dest_dir_0 = os.path.join(os.path.dirname(next_dir_number_path), '0', dest_file_path)
# Iterate through files.
for scan_file_name in scan_file_names:
file_path = os.path.join(scan_dir_path, scan_file_name)
# Remove links.
if os.path.islink(file_path):
try:
os.remove(file_path)
except:
try:
os.rmdir(file_path)
except:
pass
continue
# Read header.
# Run through file extractors until one succeeds.
for extractor in file_extractors:
# Run the extractor.
try:
f = open(file_path, 'rb')
file_data = f.read(32782) # upper limit set by ISOExtractor
f.close()
except:
# Permission issues or after-the-fact removal of other files by
# extractors can cause this. Give up.
extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0)
except extractors.MultifileStaleException:
# This file has gone missing between the multi-file lock being
# requested and successfully acquired. Stop extraction immediately.
break
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort on no space if requested.
print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='')
raise
# Log an error.
util.log_traceback('extracting', file_path)
continue
finally:
if extractor.multifile_locked:
extractor.multifile_locked = False
extractor.multifile_lock.release()
# Come up with a destination directory for this file.
dest_file_path = os.path.join(dest_subdir, scan_file_name + ':')
dest_dir = os.path.join(next_dir_number_path, dest_file_path)
dest_dir_0 = os.path.join(os.path.dirname(next_dir_number_path), '0', dest_file_path)
# Check if the extractor produced any results.
if extractor_result:
# Handle the line break ourselves, since Python prints the main
# body and line break separately, causing issues when multiple
# threads/processes are printing simultaneously.
print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='')
break
# Run through file extractors until one succeeds.
for extractor in file_extractors:
# Run the extractor.
try:
extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0)
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort on no space if requested.
print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='')
raise
# Remove destination directories if they were created but are empty.
for to_remove in (dest_dir, dest_dir_0):
util.rmdirs(to_remove)
# Log an error.
util.log_traceback('extracting', file_path)
continue
# Check if the extractor produced any results.
if extractor_result:
# Handle the line break ourselves, since Python prints the main
# body and line break separately, causing issues when multiple
# threads/processes are printing simultaneously.
print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='')
break
# Remove destination directories if they were created but are empty.
for to_remove in (dest_dir, dest_dir_0):
util.rmdirs(to_remove)
# Remove this directory if it ends up empty.
util.rmdirs(scan_dir_path)
def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, options):
def extract_process(queue, abort_flag, multifile_lock, dir_number_path, next_dir_number_path, options):
"""Main loop for the extraction multiprocessing pool."""
# Set up extractors.
@@ -125,9 +126,10 @@ def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, op
extractors.MBRUnsafeExtractor(),
]
# Disable debug mode and add a reference to the image extractor on all extractors.
# Disable debug mode and add a reference to some common objects on all extractors.
dummy_func = lambda self, *args: None
for extractor in file_extractors:
extractor.multifile_lock = multifile_lock
extractor.image_extractor = image_extractor
if not options['debug']:
extractor.debug = False
@@ -149,7 +151,7 @@ def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, op
elif abort_flag.value:
continue
try:
extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item)
extract_file(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item)
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort all threads if ENOSPC was raised.
@@ -196,7 +198,7 @@ def extract(dir_path, _, options):
queue_size = options['threads'] + len(options['remote_servers'])
queue = multiprocessing.Queue(maxsize=queue_size * 8)
abort_flag = multiprocessing.Value('B', 0)
initargs = (queue, abort_flag, dir_number_path, next_dir_number_path, options)
initargs = (queue, abort_flag, multiprocessing.Lock(), dir_number_path, next_dir_number_path, options)
mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs)
print(flush=True)
@@ -209,14 +211,12 @@ def extract(dir_path, _, options):
if not os.path.isdir(next_dir_number_path):
os.makedirs(next_dir_number_path)
# Scan directory structure. I really wanted this to have file-level
# granularity, but IntelExtractor and InterleaveBIOSExtractor
# both require directory-level granularity for inspecting other files.
# Scan directory structure.
found_any_files = False
for scan_dir_path, scan_dir_names, scan_file_names in os.walk(dir_number_path):
if len(scan_file_names) > 0:
for scan_file_name in scan_file_names:
found_any_files = True
queue.put((scan_dir_path, scan_file_names))
queue.put((scan_dir_path, scan_file_name))
if abort_flag.value: # stop feeding queue if a thread abort was requested
break

View File

@@ -23,9 +23,16 @@ except ImportError:
PIL.Image = None
from . import util
class MultifileStaleException(Exception):
"""Exception raised by Extractor.multifile_lock_acquire() if the
file has gone missing after the multi-file lock was acquired."""
pass
class Extractor:
def __init__(self):
self.debug = True
self.multifile_locked = False
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract the given file into one of the destination directories:
@@ -40,6 +47,16 @@ class Extractor:
"""Print a log line if debug output is enabled."""
print(self.__class__.__name__ + ':', *args, file=sys.stderr)
def multifile_lock_acquire(self, file_path):
"""Acquire the global multi-file lock. The lock is automatically released
by the main module after extract() returns or raises an exception."""
self.multifile_lock.acquire()
self.multifile_locked = True
# Raise the special exception if another extractor already processed this file.
if not os.path.exists(file_path):
raise MultifileStaleException()
class ApricotExtractor(Extractor):
"""Extract Apricot BIOS recovery files. Only one instance of this format
@@ -1473,11 +1490,12 @@ class IntelExtractor(Extractor):
try:
os.remove(file_path)
except:
import traceback
traceback.print_exc()
pass
return True
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# Scan this directory's contents.
dir_path = os.path.dirname(file_path)
dir_files = {}
@@ -1866,6 +1884,9 @@ class InterleaveExtractor(Extractor):
if not counterpart_string_sets:
return False
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# Create temporary interleaved data array.
part_size = min(os.path.getsize(file_path), 16777216)
data = []
@@ -1886,20 +1907,10 @@ class InterleaveExtractor(Extractor):
# Skip any files which differ in size.
file_in_dir_size = 0
for retry in range(10): # mergerfs hack
try:
file_in_dir_size = os.path.getsize(file_in_dir_path)
try:
if retry > 0:
with util._error_log_lock:
f = open('biostools_retry.log', 'a')
f.write('{0} => {1} (took {2} retries)\n'.format(file_path, file_in_dir, retry))
f.close()
except:
pass
break
except:
time.sleep(retry)
try:
file_in_dir_size = os.path.getsize(file_in_dir_path)
except:
continue
if file_in_dir_size != file_size:
continue
@@ -2240,6 +2251,9 @@ class TrimondExtractor(Extractor):
if pow2 - file_size not in (8192, 16384, 32768):
return False
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# As a second safety layer, check for Trimond's flasher files.
dir_path, file_name = os.path.split(file_path)
dir_files = os.listdir(dir_path)
@@ -2489,6 +2503,9 @@ class VMExtractor(ArchiveExtractor):
if file_header[28:32] == b'LZ91':
extractor = self._extract_lzexe
elif file_header[30:36] == b'PKLITE':
# Acquire the multi-file lock. This is required for ROMPAQ below.
self.multifile_lock_acquire(file_path)
extractor = self._extract_pklite
else:
match = self._floppy_pattern.search(file_header)
@@ -2498,12 +2515,14 @@ class VMExtractor(ArchiveExtractor):
elif self._eti_pattern.match(file_header):
extractor = self._extract_eti
elif self._rompaq_pattern.match(file_header):
# Acquire the multi-file lock.
self.multifile_lock_acquire(file_path)
# The ROMPAQ format appears to be version specific in some way.
# We will only extract files that have a ROMPAQ.EXE next to them.
dir_path = os.path.dirname(file_path)
rompaq_path = None
for file_in_dir in os.listdir(dir_path):
if file_in_dir.lower() == 'rompaq.exe':
rompaq_path = os.path.join(dir_path, file_in_dir)
break