Add abort-on-ENOSPC command line flag

This commit is contained in:
RichardG867
2022-05-15 14:37:04 -03:00
parent 04770a0539
commit 3ab0f5c822
2 changed files with 66 additions and 22 deletions

View File

@@ -16,7 +16,7 @@
# Copyright 2021 RichardG.
#
import getopt, os, pickle, multiprocessing, re, socket, subprocess, sys, threading
import errno, getopt, os, pickle, multiprocessing, re, socket, subprocess, sys, threading
from . import analyzers, extractors, formatters, util
# Constants.
@@ -69,7 +69,12 @@ def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_nu
# Run the extractor.
try:
extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0)
except:
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort on no space if requested.
print('{0} => aborting extraction due to disk space\n'.format(file_path[path_trim_index:]), end='')
raise
# Log an error.
util.log_traceback('extracting', file_path)
continue
@@ -89,7 +94,7 @@ def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_nu
# Remove this directory if it ends up empty.
util.rmdirs(scan_dir_path)
def extract_process(queue, dir_number_path, next_dir_number_path, debug):
def extract_process(queue, abort_flag, dir_number_path, next_dir_number_path, options):
"""Main loop for the extraction multiprocessing pool."""
# Set up extractors.
@@ -119,11 +124,14 @@ def extract_process(queue, dir_number_path, next_dir_number_path, debug):
]
# Disable debug mode on extractors.
if not debug:
if not options['debug']:
dummy_func = lambda self, *args: None
for extractor in file_extractors:
extractor.debug_print = dummy_func
# Raise exceptions on no space if requested.
util.raise_enospc = options['enospc']
# Cache trim index values for determining a file's relative paths.
dir_number_path = dir_number_path.rstrip(os.sep)
subdir_trim_index = len(dir_number_path)
@@ -134,7 +142,16 @@ def extract_process(queue, dir_number_path, next_dir_number_path, debug):
item = queue.get()
if item == None: # special item to stop the loop
break
extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item)
elif abort_flag.value:
continue
try:
extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item)
except Exception as e:
if util.raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
# Abort all threads if ENOSPC was raised.
abort_flag.value = 1
continue
raise
def extract(dir_path, _, options):
"""Main function for extraction."""
@@ -173,8 +190,9 @@ def extract(dir_path, _, options):
# Start multiprocessing pool.
print('Starting extraction on directory {0}'.format(dir_number), end='', flush=True)
queue_size = options['threads'] + len(options['remote_servers'])
queue = multiprocessing.Queue(maxsize=queue_size * 4)
initargs = (queue, dir_number_path, next_dir_number_path, options['debug'])
queue = multiprocessing.Queue(maxsize=queue_size * 8)
abort_flag = multiprocessing.Value('B', 0)
initargs = (queue, abort_flag, dir_number_path, next_dir_number_path, options)
mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs)
print(flush=True)
@@ -195,6 +213,8 @@ def extract(dir_path, _, options):
if len(scan_file_names) > 0:
found_any_files = True
queue.put((scan_dir_path, scan_file_names))
if abort_flag.value: # stop feeding queue if a thread abort was requested
break
# Stop if no files are left.
if not found_any_files:
@@ -223,6 +243,10 @@ def extract(dir_path, _, options):
for client in remote_clients:
client.join()
# Abort extraction if a thread abort was requested.
if abort_flag.value:
return 1
# Create 0 directory if it doesn't exist.
print('Merging directories:', end=' ')
merge_dest_path = os.path.join(dir_path, '0')
@@ -494,7 +518,7 @@ def analyze_dir(formatter, scan_base, file_analyzers, scan_dir_path, scan_file_n
# Output the results.
formatter.output_row(fields)
def analyze_process(queue, formatter, scan_base, debug):
def analyze_process(queue, formatter, scan_base, options):
"""Main loop for the analysis multiprocessing pool."""
# Set up analyzers.
@@ -543,7 +567,7 @@ def analyze_process(queue, formatter, scan_base, debug):
]
# Disable debug mode on analyzers.
if not debug:
if not options['debug']:
dummy_func = lambda self, *args: None
for analyzer in file_analyzers:
analyzer.debug_print = dummy_func
@@ -586,8 +610,8 @@ def analyze(dir_path, formatter_args, options):
dir_path = dir_path[:-1]
# Start multiprocessing pool.
queue = multiprocessing.Queue(maxsize=options['threads'] * 4)
mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options['debug']))
queue = multiprocessing.Queue(maxsize=options['threads'] * 8)
mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options))
if os.path.isdir(dir_path):
# Scan directory structure.
@@ -611,13 +635,22 @@ def analyze(dir_path, formatter_args, options):
# Remote server module.
class DummyAbortFlag:
def __init__(self):
self.value = False
class RemoteClient:
"""State and functions for communicating with a remote server."""
def __init__(self, addr, action, initargs):
# Initialize state.
self.action = action
self.initargs = initargs[1:]
if isinstance(initargs[0], multiprocessing.Value):
self.initargs = (DummyAbortFlag(),) + initargs[2:]
self.abort_flag = initargs[0]
else:
self.initargs = initargs[1:]
self.abort_flag = DummyAbortFlag()
self.queue = initargs[0]
self.sock = self.f = None
@@ -703,7 +736,7 @@ class RemoteClient:
# Read queue item.
item = self.queue.get()
if item == None: # special item to stop the loop
if item == None or self.abort_flag.value: # special item to stop the loop
self.close()
break
@@ -766,7 +799,7 @@ class RemoteServerClient:
if line[0:1] in b'xa':
# Start multiprocessing pool.
print(self.addr, 'Starting pool for', (line[0] == b'x') and 'extraction' or 'analysis')
self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 4)
self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 8)
if line[0:1] == b'x':
func = extract_process
else:
@@ -844,6 +877,7 @@ def main():
options = {
'array': False,
'debug': False,
'enospc': False,
'format': 'csv',
'headers': True,
'hyperlink': False,
@@ -868,6 +902,7 @@ def main():
options['hyperlink'] = True
elif opt in ('-n', '--no-headers'):
options['headers'] = False
options['enospc'] = True
elif opt in ('-r', '--array'):
options['array'] = True
elif opt in ('-t', '--threads'):
@@ -908,11 +943,12 @@ Usage: docker run -v directory:/bios biostools [-d] [-f output_format] [-h] [-n]
'''
else:
usage = '''
Usage: python3 -m biostools [-d] [-t threads] -x directory
Usage: python3 -m biostools [-d] [-n] [-t threads] -x directory
python3 -m biostools [-d] [-f output_format] [-h] [-n] [-r] [-t threads]
-a directory|single_file [formatter_options]
-x Extract archives and BIOS images recursively in the given directory
-n Abort extraction if disk space runs out.
-a Analyze extracted BIOS images in the given directory, or a single
extracted file (extracting with -x first is recommended)'''

View File

@@ -15,7 +15,7 @@
#
# Copyright 2021 RichardG.
#
import multiprocessing, os, math, random, re, shutil, traceback, urllib.request
import errno, multiprocessing, os, math, random, re, shutil, traceback, urllib.request
from biostools.pciutil import *
date_pattern_mmddyy = re.compile('''(?P<month>[0-9]{2})/(?P<day>[0-9]{2})/(?P<year>[0-9]{2,4})''')
@@ -29,6 +29,8 @@ base62 = digits + lowercase + uppercase
random_name_symbols = lowercase + digits + fn_symbols + uppercase
random_name_nosymbols = lowercase + digits + uppercase
raise_enospc = False
_error_log_lock = multiprocessing.Lock()
@@ -147,7 +149,9 @@ def hardlink_or_copy(src, dest):
except:
try:
shutil.copy2(src, dest)
except:
except Exception as e:
if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
raise
return False
return True
@@ -204,7 +208,9 @@ def rmdirs(dir_path):
dir_path = os.path.dirname(dir_path)
except OSError:
break
except:
except Exception as e:
if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
raise
continue
return removed_count
@@ -218,8 +224,9 @@ def remove_all(files, func=lambda x: x):
for subfile in file:
try:
os.remove(subfile)
except:
pass
except Exception as e:
if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
raise
def remove_extension(file_name):
"""Remove file_name's extension, if one is present."""
@@ -233,6 +240,7 @@ def try_makedirs(dir_path):
"""Try to create dir_path. Returns True if successful, False if not."""
try:
os.makedirs(dir_path)
except:
pass
except Exception as e:
if raise_enospc and getattr(e, 'errno', None) == errno.ENOSPC:
raise
return os.path.isdir(dir_path)