From 8900f19bab4b1fdc1100f5fa60df1e16cd3341d5 Mon Sep 17 00:00:00 2001 From: RichardG867 Date: Mon, 16 May 2022 23:21:37 -0300 Subject: [PATCH] Add Compaq SOFTPAQ and ROMPAQ extraction --- biostools/extractors.py | 167 ++++++++++++++++++++++++++++++++++------ biostools/util.py | 4 +- 2 files changed, 147 insertions(+), 24 deletions(-) diff --git a/biostools/extractors.py b/biostools/extractors.py index 1952beb..e6d5bde 100644 --- a/biostools/extractors.py +++ b/biostools/extractors.py @@ -2379,9 +2379,11 @@ class VMExtractor(ArchiveExtractor): b'''Disk eXPress Self-Extracting Diskette Image|''' # HP DXP b'''(\\x00Diskette Image Decompression Utility\\.\\x00)|''' # NEC in-house b'''(Copyright Daniel Valot |\\x00ARDI - \\x00)|''' # IBM ARDI - b'''(Ready to build distribution image with the following attributes:)''' # Zenith in-house + b'''(Ready to build distribution image with the following attributes:)|''' # Zenith in-house + b'''(Error reading the Softpaq File information)''' # Compaq Softpaq ) self._eti_pattern = re.compile(b'''[0-9\\.\\x00]{10}[0-9]{2}/[0-9]{2}/[0-9]{2}\\x00{2}[0-9]{2}:[0-9]{2}:[0-9]{2}\\x00{3}''') + self._rompaq_pattern = re.compile(b'''[\\x00-\\xFF]{12}[A-Z0-9]{7}\\x00[0-9]{2}/[0-9]{2}/[0-9]{2}\\x00''') # Filename sanitization pattern. self._dos_fn_pattern = re.compile('''[\\x00-\\x1F\\x7F-\\xFF\\\\/:\\*\\?"<>\\|]''') @@ -2427,6 +2429,32 @@ class VMExtractor(ArchiveExtractor): extractor_kwargs['match'] = match elif self._eti_pattern.match(file_header): extractor = self._extract_eti + elif self._rompaq_pattern.match(file_header): + # The ROMPAQ format appears to be version specific in some way. + # We will only extract files that have a ROMPAQ.EXE next to them. + dir_path = os.path.dirname(file_path) + rompaq_path = None + for file_in_dir in os.listdir(dir_path): + + if file_in_dir.lower() == 'rompaq.exe': + rompaq_path = os.path.join(dir_path, file_in_dir) + break + + # Now look for a PKLITE-decompressed ROMPAQ.EXE. + dest_parent_dir = os.path.dirname(dest_dir) + if not rompaq_path and os.path.isdir(dest_parent_dir): + for file_in_dir in os.listdir(dest_parent_dir): + if file_in_dir.lower() == 'rompaq.exe:': + rompaq_path = os.path.join(dest_parent_dir, file_in_dir, file_in_dir[:-1]) + if os.path.exists(rompaq_path): + break + else: + rompaq_path = None + + # Enter ROMPAQ mode if the EXE was found. + if rompaq_path: + extractor = self._extract_rompaq + extractor_kwargs['rompaq_path'] = rompaq_path # Stop if no case was found. if not extractor: @@ -2440,11 +2468,13 @@ class VMExtractor(ArchiveExtractor): self.debug_print('Running', extractor.__name__) return extractor(file_path, file_header, dest_dir, dest_dir_0, **extractor_kwargs) - def _run_qemu(self, hdd=None, hdd_snapshot=True, floppy=None, floppy_snapshot=True, vvfat=None, boot='c'): + def _run_qemu(self, hdd=None, hdd_snapshot=True, floppy=None, floppy_snapshot=True, vvfat=None, boot='c', monitor_cmd=None, monitor_flag_file=None): # Build QEMU arguments. args = [self._qemu_path, '-m', '32', '-boot', boot] if not self.debug: args += ['-display', 'none', '-vga', 'none'] + if monitor_cmd: + args += ['-monitor', 'stdio'] for drive, drive_snapshot, drive_if in ((floppy, floppy_snapshot, 'floppy'), (hdd, hdd_snapshot, 'ide')): # Don't add this drive if an image was not specified. if not drive: @@ -2462,8 +2492,23 @@ class VMExtractor(ArchiveExtractor): args += ['-drive', 'if=ide,driver=vvfat,rw=on,dir=' + vvfat.replace(',', ',,')] # regular vvfat syntax can't handle : in path # Run QEMU. + self.debug_print('Running QEMU with args:', args) try: - subprocess.run(args, timeout=60, input=None, stdout=self._devnull, stderr=subprocess.STDOUT) + proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=self._devnull, stderr=subprocess.STDOUT) + + # Wait for flag file if one was specified. + if monitor_flag_file: + spins = 0 + while not os.path.exists(monitor_flag_file) and spins < 60: + time.sleep(1) + spins += 1 + if spins < 60: + self.debug_print('Monitor flag file found') + else: + self.debug_print('Monitor flag file timed out') + + # Send monitor command if one was specified, and wait for the QEMU process. + proc.communicate(input=monitor_cmd, timeout=60) except: self.debug_print('Running QEMU failed (timed out?)') @@ -2479,6 +2524,7 @@ class VMExtractor(ArchiveExtractor): image_path = os.path.join(dest_dir, util.random_name(8) + '.img') shutil.copy2(file_path, exe_path) shutil.copy2(os.path.join(self._dep_dir, floppy_media), image_path) + flag_name = flag_path = None # Create batch file for calling the executable. bat_path = os.path.join(dest_dir, 'autoexec.bat') @@ -2498,10 +2544,15 @@ class VMExtractor(ArchiveExtractor): f.write(b'echo.|') elif match.group(4): # Zenith in-house f.write(b'a:\r\n') + elif match.group(5): # Compaq Softpaq + # Create flag file for sending the monitor commands. + flag_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.dat' + flag_path = os.path.join(dest_dir, flag_name) + f.write(b'echo. >d:\\' + flag_name.encode('cp437', 'ignore') + b'\r\n') f.write(b'd:' + exe_name.encode('cp437', 'ignore')) if match.group(1): # FastPacket f.write(b' /b a:\r\n') - elif match.group(3): # ARDI + elif match.group(3) or match.group(5): # ARDI or Compaq Softpaq f.write(b'\r\n') elif match.group(4): # Zenith in-house f.write(b' 1: + if type(ret) == str and len(os.listdir(dest_dir)) > 2: # Remove original file. try: os.remove(file_path) @@ -2666,13 +2731,20 @@ class VMExtractor(ArchiveExtractor): if not os.path.exists(unpacked_path): return False + # Strip any remains of LZEXE off the executable to avoid infinite loops. + try: + f = open(unpacked_path, 'r+b') + f.seek(28) + if f.read(4) == b'LZ91': + f.seek(28) + f.write(b'\xFF') + f.close() + except: + pass + # Read unpacked file. decomp_file_data = util.read_complement(unpacked_path) - # Strip any remains of LZEXE to avoid infinite loops. - if decomp_file_data[28:32] == b'LZ91': - decomp_file_data = decomp_file_data[:28] + b'\xFF' + decomp_file_data[29:] - # Run this same extractor with detectors pointed at the unpacked data. ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0) @@ -2692,16 +2764,11 @@ class VMExtractor(ArchiveExtractor): return ret - # Keep the unpacked file around (with a dummy - # header file) for other extractors to process. + # Keep the unpacked file around for other extractors to process. try: shutil.move(exe_path, os.path.join(dest_dir, os.path.basename(file_path))) except: pass - try: - open(os.path.join(dest_dir, ':header:'), 'wb').close() - except: - pass return dest_dir def _extract_pklite(self, file_path, file_header, dest_dir, dest_dir_0): @@ -2749,13 +2816,20 @@ class VMExtractor(ArchiveExtractor): # Remove batch file. util.remove_all((bat_path,)) + # Strip any remains of PKLITE off the executable to avoid infinite loops. + try: + f = open(exe_path, 'r+b') + f.seek(30) + if f.read(6) == b'PKLITE': + f.seek(30) + f.write(b'\xFF') + f.close() + except: + pass + # Read unpacked file. decomp_file_data = util.read_complement(exe_path) - # Strip any remains of PKLITE to avoid infinite loops. - if decomp_file_data[30:36] == b'PKLITE': - decomp_file_data = decomp_file_data[:30] + b'\xFF' + decomp_file_data[31:] - # Run this same extractor with detectors pointed at the unpacked data. ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0) @@ -2775,14 +2849,61 @@ class VMExtractor(ArchiveExtractor): return ret - # Keep the unpacked file around (with a dummy - # header file) for other extractors to process. + # Keep the unpacked file around for other extractors to process. try: shutil.move(exe_path, os.path.join(dest_dir, os.path.basename(file_path))) except: pass + return dest_dir + + def _extract_rompaq(self, file_path, file_header, dest_dir, dest_dir_0, *, rompaq_path): + """Extract Compaq ROMPAQ-compressed BIOS images using a ROMPAQ.EXE provided next to the image.""" + + # Copy original file and ROMPAQ.EXE to the destination directory. + # Also determine output file name. + rom_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.bin' + rom_path = os.path.join(dest_dir, rom_name) + exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe' + exe_path = os.path.join(dest_dir, exe_name) + try: + shutil.copy2(file_path, rom_path) + shutil.copy2(rompaq_path, exe_path) + except: + return True + + # Set a name for the unpacked file. + unpacked_name = 'rompaq.bin' + unpacked_path = os.path.join(dest_dir, unpacked_name) + + # Create batch file for extraction. + bat_path = os.path.join(dest_dir, 'autoexec.bat') + f = open(bat_path, 'wb') + f.write(b'd:\r\n' + exe_name.encode('cp437', 'ignore') + b' /D ' + rom_name.encode('cp437', 'ignore') + b' ' + unpacked_name.encode('cp437', 'ignore') + b'\r\n') + f.close() + + # Run QEMU. + self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir) + + # Remove temporary files. + util.remove_all((bat_path, rom_path, exe_path)) + + # Stop if unpacking was not successful. + if not os.path.exists(unpacked_path): + unpacked_path = os.path.join(dest_dir, unpacked_name.upper()) # just in case + if not os.path.exists(unpacked_path): + return False + + # Remove original file. + try: + os.remove(file_path) + except: + pass + + # Create dummy header file. try: open(os.path.join(dest_dir, ':header:'), 'wb').close() except: pass + + # Return destination directory path. return dest_dir diff --git a/biostools/util.py b/biostools/util.py index 4cc9740..0f68107 100644 --- a/biostools/util.py +++ b/biostools/util.py @@ -219,7 +219,9 @@ def remove_all(files, func=lambda x: x): func can return a string or iterable object.""" for file in files: file = func(file) - if type(file) == str: + if not file: + continue + elif type(file) == str: file = [file] for subfile in file: try: