Streamline extraction display, increase queue size

This commit is contained in:
RichardG867
2022-04-26 20:46:34 -03:00
parent 61876d032f
commit eb208a2da2
2 changed files with 20 additions and 17 deletions

View File

@@ -26,11 +26,11 @@ DEFAULT_REMOTE_PORT = 8620
# Extraction module.
def extract_dir(file_extractors, dir_number_path, next_dir_number_path, scan_dir_path, scan_file_names):
def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_names):
"""Process a given directory for extraction."""
# Determine the destination subdirectory.
dest_subdir = scan_dir_path[len(dir_number_path):]
dest_subdir = scan_dir_path[subdir_trim_index:]
while dest_subdir[:len(os.sep)] == os.sep:
dest_subdir = dest_subdir[len(os.sep):]
@@ -79,7 +79,7 @@ def extract_dir(file_extractors, dir_number_path, next_dir_number_path, scan_dir
# Handle the line break ourselves, since Python prints the main
# body and line break separately, causing issues when multiple
# threads/processes are printing simultaneously.
print('{0} => {1}{2}\n'.format(file_path, extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='')
print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='')
break
# Remove destination directories if they were created but are empty.
@@ -124,12 +124,17 @@ def extract_process(queue, dir_number_path, next_dir_number_path, debug):
for extractor in file_extractors:
extractor.debug_print = dummy_func
# Cache trim index values for determining a file's relative paths.
dir_number_path = dir_number_path.rstrip(os.sep)
subdir_trim_index = len(dir_number_path)
path_trim_index = len(os.path.dirname(dir_number_path)) + len(os.sep)
# Receive work from the queue.
while True:
item = queue.get()
if item == None: # special item to stop the loop
break
extract_dir(file_extractors, dir_number_path, next_dir_number_path, *item)
extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item)
def extract(dir_path, _, options):
"""Main function for extraction."""
@@ -168,7 +173,7 @@ def extract(dir_path, _, options):
# Start multiprocessing pool.
print('Starting extraction on directory {0}'.format(dir_number), end='', flush=True)
queue_size = options['threads'] + len(options['remote_servers'])
queue = multiprocessing.Queue(maxsize=queue_size)
queue = multiprocessing.Queue(maxsize=queue_size * 4)
initargs = (queue, dir_number_path, next_dir_number_path, options['debug'])
mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs)
print(flush=True)
@@ -214,9 +219,7 @@ def extract(dir_path, _, options):
mp_pool.close()
mp_pool.join()
# Stop remote clients and wait for them to finish.
for client in remote_clients:
client.close()
# Wait for remote clients to finish.
for client in remote_clients:
client.join()
@@ -395,16 +398,16 @@ def analyze_dir(formatter, scan_base, file_analyzers, scan_dir_path, scan_file_n
found_flag_file = True
while found_flag_file:
# Find archive indicator.
archive_idx = scan_file_path_full.rfind(':' + os.sep)
if archive_idx == -1:
archive_index = scan_file_path_full.rfind(':' + os.sep)
if archive_index == -1:
break
# Check if a combined or header flag file exists.
found_flag_file = False
for flag_file in (':combined:', ':header:'):
if os.path.exists(os.path.join(scan_file_path_full[:archive_idx] + ':', flag_file)):
if os.path.exists(os.path.join(scan_file_path_full[:archive_index] + ':', flag_file)):
# Trim the directory off.
scan_file_path_full = scan_file_path_full[:archive_idx]
scan_file_path_full = scan_file_path_full[:archive_index]
found_flag_file = True
break
@@ -583,7 +586,7 @@ def analyze(dir_path, formatter_args, options):
dir_path = dir_path[:-1]
# Start multiprocessing pool.
queue = multiprocessing.Queue(maxsize=options['threads'])
queue = multiprocessing.Queue(maxsize=options['threads'] * 4)
mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options['debug']))
if os.path.isdir(dir_path):
@@ -763,7 +766,7 @@ class RemoteServerClient:
if line[0:1] in b'xa':
# Start multiprocessing pool.
print(self.addr, 'Starting pool for', (line[0] == b'x') and 'extraction' or 'analysis')
self.queue = multiprocessing.Queue(maxsize=self.options['threads'])
self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 4)
if line[0:1] == b'x':
func = extract_process
else:

View File

@@ -2438,9 +2438,9 @@ class VMExtractor(ArchiveExtractor):
fn = in_f.read(12) # filename
if fn == None:
break
idx = fn.find(b'\x00')
if idx > -1:
fn = fn[:idx]
nul_index = fn.find(b'\x00')
if nul_index > -1:
fn = fn[:nul_index]
if len(fn) == 0:
break
fn = fn.decode('cp437', 'ignore')