diff --git a/biostools/__main__.py b/biostools/__main__.py index eecd7d8..37da4e0 100644 --- a/biostools/__main__.py +++ b/biostools/__main__.py @@ -26,11 +26,11 @@ DEFAULT_REMOTE_PORT = 8620 # Extraction module. -def extract_dir(file_extractors, dir_number_path, next_dir_number_path, scan_dir_path, scan_file_names): +def extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, scan_dir_path, scan_file_names): """Process a given directory for extraction.""" # Determine the destination subdirectory. - dest_subdir = scan_dir_path[len(dir_number_path):] + dest_subdir = scan_dir_path[subdir_trim_index:] while dest_subdir[:len(os.sep)] == os.sep: dest_subdir = dest_subdir[len(os.sep):] @@ -79,7 +79,7 @@ def extract_dir(file_extractors, dir_number_path, next_dir_number_path, scan_dir # Handle the line break ourselves, since Python prints the main # body and line break separately, causing issues when multiple # threads/processes are printing simultaneously. - print('{0} => {1}{2}\n'.format(file_path, extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='') + print('{0} => {1}{2}\n'.format(file_path[path_trim_index:], extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='') break # Remove destination directories if they were created but are empty. @@ -124,12 +124,17 @@ def extract_process(queue, dir_number_path, next_dir_number_path, debug): for extractor in file_extractors: extractor.debug_print = dummy_func + # Cache trim index values for determining a file's relative paths. + dir_number_path = dir_number_path.rstrip(os.sep) + subdir_trim_index = len(dir_number_path) + path_trim_index = len(os.path.dirname(dir_number_path)) + len(os.sep) + # Receive work from the queue. while True: item = queue.get() if item == None: # special item to stop the loop break - extract_dir(file_extractors, dir_number_path, next_dir_number_path, *item) + extract_dir(file_extractors, subdir_trim_index, path_trim_index, next_dir_number_path, *item) def extract(dir_path, _, options): """Main function for extraction.""" @@ -168,7 +173,7 @@ def extract(dir_path, _, options): # Start multiprocessing pool. print('Starting extraction on directory {0}'.format(dir_number), end='', flush=True) queue_size = options['threads'] + len(options['remote_servers']) - queue = multiprocessing.Queue(maxsize=queue_size) + queue = multiprocessing.Queue(maxsize=queue_size * 4) initargs = (queue, dir_number_path, next_dir_number_path, options['debug']) mp_pool = multiprocessing.Pool(options['threads'], initializer=extract_process, initargs=initargs) print(flush=True) @@ -214,9 +219,7 @@ def extract(dir_path, _, options): mp_pool.close() mp_pool.join() - # Stop remote clients and wait for them to finish. - for client in remote_clients: - client.close() + # Wait for remote clients to finish. for client in remote_clients: client.join() @@ -395,16 +398,16 @@ def analyze_dir(formatter, scan_base, file_analyzers, scan_dir_path, scan_file_n found_flag_file = True while found_flag_file: # Find archive indicator. - archive_idx = scan_file_path_full.rfind(':' + os.sep) - if archive_idx == -1: + archive_index = scan_file_path_full.rfind(':' + os.sep) + if archive_index == -1: break # Check if a combined or header flag file exists. found_flag_file = False for flag_file in (':combined:', ':header:'): - if os.path.exists(os.path.join(scan_file_path_full[:archive_idx] + ':', flag_file)): + if os.path.exists(os.path.join(scan_file_path_full[:archive_index] + ':', flag_file)): # Trim the directory off. - scan_file_path_full = scan_file_path_full[:archive_idx] + scan_file_path_full = scan_file_path_full[:archive_index] found_flag_file = True break @@ -583,7 +586,7 @@ def analyze(dir_path, formatter_args, options): dir_path = dir_path[:-1] # Start multiprocessing pool. - queue = multiprocessing.Queue(maxsize=options['threads']) + queue = multiprocessing.Queue(maxsize=options['threads'] * 4) mp_pool = multiprocessing.Pool(options['threads'], initializer=analyze_process, initargs=(queue, formatter, dir_path, options['debug'])) if os.path.isdir(dir_path): @@ -763,7 +766,7 @@ class RemoteServerClient: if line[0:1] in b'xa': # Start multiprocessing pool. print(self.addr, 'Starting pool for', (line[0] == b'x') and 'extraction' or 'analysis') - self.queue = multiprocessing.Queue(maxsize=self.options['threads']) + self.queue = multiprocessing.Queue(maxsize=self.options['threads'] * 4) if line[0:1] == b'x': func = extract_process else: diff --git a/biostools/extractors.py b/biostools/extractors.py index 98b30e1..19440ab 100644 --- a/biostools/extractors.py +++ b/biostools/extractors.py @@ -2438,9 +2438,9 @@ class VMExtractor(ArchiveExtractor): fn = in_f.read(12) # filename if fn == None: break - idx = fn.find(b'\x00') - if idx > -1: - fn = fn[:idx] + nul_index = fn.find(b'\x00') + if nul_index > -1: + fn = fn[:nul_index] if len(fn) == 0: break fn = fn.decode('cp437', 'ignore')