Guard more extractor file operations to avoid race conditions caused by multi-file extractors

This commit is contained in:
RichardG867
2023-01-12 20:28:38 -03:00
parent 1e38537bfe
commit c32bed4301

View File

@@ -293,7 +293,11 @@ class ASTExtractor(Extractor):
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this file is too small.
if os.path.getsize(file_path) <= 0x9083:
try:
file_size = os.path.getsize(file_path)
except:
return False
if file_size <= 0x9083:
return False
# Look for the AST signatures.
@@ -305,48 +309,51 @@ class ASTExtractor(Extractor):
return True
# Open AST image.
with open(file_path, 'rb') as in_f:
# Skip the initial 72 sectors.
in_f.seek(0x9000)
try:
with open(file_path, 'rb') as in_f:
# Skip the initial 72 sectors.
in_f.seek(0x9000)
# Copy payload.
header = b''
dest_file_path = os.path.join(dest_dir, 'ast.bin')
try:
with open(dest_file_path, 'wb') as out_f:
data = remaining = True
while data and remaining > 0:
payload_size = 15 * 512
if data == True:
# Check the header on the first payload sector.
header += in_f.read(0x83)
if not self._ast_payload_pattern.match(header[:0x10]):
raise Exception('missing header')
# Subtract header from payload.
remaining, = struct.unpack('<I', header[-5:-1])
payload_size -= 0x83
# Copy the next 15 sectors of payload.
data = in_f.read(min(payload_size, remaining))
out_f.write(data)
remaining -= len(data)
# Skip the next 3 blank sectors.
in_f.seek(3 * 512, 1)
except:
# Copy payload.
header = b''
dest_file_path = os.path.join(dest_dir, 'ast.bin')
try:
os.remove(dest_file_path)
with open(dest_file_path, 'wb') as out_f:
data = remaining = True
while data and remaining > 0:
payload_size = 15 * 512
if data == True:
# Check the header on the first payload sector.
header += in_f.read(0x83)
if not self._ast_payload_pattern.match(header[:0x10]):
raise Exception('missing header')
# Subtract header from payload.
remaining, = struct.unpack('<I', header[-5:-1])
payload_size -= 0x83
# Copy the next 15 sectors of payload.
data = in_f.read(min(payload_size, remaining))
out_f.write(data)
remaining -= len(data)
# Skip the next 3 blank sectors.
in_f.seek(3 * 512, 1)
except:
try:
os.remove(dest_file_path)
except:
pass
return True
# Write header.
try:
with open(os.path.join(dest_dir, ':header:'), 'wb') as out_f:
out_f.write(header)
except:
pass
return True
# Write header.
try:
with open(os.path.join(dest_dir, ':header:'), 'wb') as out_f:
out_f.write(header)
except:
pass
except:
pass
# Remove AST image.
os.remove(file_path)
@@ -924,7 +931,11 @@ class ImageExtractor(Extractor):
width, height = struct.unpack('<HH', file_header[4:8])
# Determine if this file is a 4-bit or 8-bit EPA according to the file size.
if os.path.getsize(file_path) >= 8 + (width * height):
try:
file_size = os.path.getsize(file_path)
except:
file_size = len(file_header)
if file_size >= 8 + (width * height):
func = self._convert_epav2_8b
else:
func = self._convert_epav2_4b
@@ -941,7 +952,10 @@ class ImageExtractor(Extractor):
paletted = file_header[3] != 0
payload_size = math.ceil((width * height) / 2)
if paletted:
file_size = os.path.getsize(file_path)
try:
file_size = os.path.getsize(file_path)
except:
file_size = len(file_header)
if file_size > 18 + payload_size:
palette_size, = struct.unpack('<H', file_header[10:12])
file_header += util.read_complement(file_path, file_header, max_size=len(file_header) + (4 * palette_size))
@@ -957,9 +971,9 @@ class ImageExtractor(Extractor):
if width != 0 and height != 0:
func = self._convert_pgx
if not func:
# Determine if this file is the right size for a v1 EPA.
# Determine if this file has valid dimensions and is the right size for a v1 EPA.
width, height = struct.unpack('BB', file_header[:2])
if os.path.getsize(file_path) == 72 + (15 * width * height):
if width < 80 and height < 25 and len(file_header) == 72 + (15 * width * height):
func = self._convert_epav1
else:
# Determine if this is a common image format.
@@ -1468,9 +1482,12 @@ class ISOExtractor(ArchiveExtractor):
# Does the size match known bad extractions?
if elt_size == 512:
# Read file.
f = open(elt_path, 'rb')
data = f.read(512)
f.close()
try:
f = open(elt_path, 'rb')
data = f.read(512)
f.close()
except:
data = b''
# Check for MBR boot signature.
if data[-2:] == b'\x55\xAA':
@@ -2185,7 +2202,10 @@ class OMFExtractor(Extractor):
return False
# Stop if the OMF size is invalid. Should catch other files that match the quick check.
file_size = os.path.getsize(file_path)
try:
file_size = os.path.getsize(file_path)
except:
return False
omf_size, = struct.unpack('<I', match.group(1))
if omf_size > file_size:
return False