mirror of
https://gitlab.com/xCrystal/pokecrystal-board.git
synced 2024-11-16 11:27:33 -08:00
581 lines
16 KiB
Python
581 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Pokemon Crystal data de/compression.
|
|
"""
|
|
|
|
"""
|
|
A rundown of Pokemon Crystal's compression scheme:
|
|
|
|
Control commands occupy bits 5-7.
|
|
Bits 0-4 serve as the first parameter <n> for each command.
|
|
"""
|
|
lz_commands = {
|
|
'literal': 0, # n values for n bytes
|
|
'iterate': 1, # one value for n bytes
|
|
'alternate': 2, # alternate two values for n bytes
|
|
'blank': 3, # zero for n bytes
|
|
}
|
|
|
|
"""
|
|
Repeater commands repeat any data that was just decompressed.
|
|
They take an additional signed parameter <s> to mark a relative starting point.
|
|
These wrap around (positive from the start, negative from the current position).
|
|
"""
|
|
lz_commands.update({
|
|
'repeat': 4, # n bytes starting from s
|
|
'flip': 5, # n bytes in reverse bit order starting from s
|
|
'reverse': 6, # n bytes backwards starting from s
|
|
})
|
|
|
|
"""
|
|
The long command is used when 5 bits aren't enough. Bits 2-4 contain a new control code.
|
|
Bits 0-1 are appended to a new byte as 8-9, allowing a 10-bit parameter.
|
|
"""
|
|
lz_commands.update({
|
|
'long': 7, # n is now 10 bits for a new control code
|
|
})
|
|
max_length = 1 << 10 # can't go higher than 10 bits
|
|
lowmax = 1 << 5 # standard 5-bit param
|
|
|
|
"""
|
|
If 0xff is encountered instead of a command, decompression ends.
|
|
"""
|
|
lz_end = 0xff
|
|
|
|
|
|
bit_flipped = [
|
|
sum(((byte >> i) & 1) << (7 - i) for i in xrange(8))
|
|
for byte in xrange(0x100)
|
|
]
|
|
|
|
|
|
class Compressed:
|
|
|
|
"""
|
|
Usage:
|
|
lz = Compressed(data).output
|
|
or
|
|
lz = Compressed().compress(data)
|
|
or
|
|
c = Compressed()
|
|
c.data = data
|
|
lz = c.compress()
|
|
|
|
There are some issues with reproducing the target compressor.
|
|
Some notes are listed here:
|
|
- the criteria for detecting a lookback is inconsistent
|
|
- sometimes lookbacks that are mostly 0s are pruned, sometimes not
|
|
- target appears to skip ahead if it can use a lookback soon, stopping the current command short or in some cases truncating it with literals.
|
|
- this has been implemented, but the specifics are unknown
|
|
- self.min_scores: It's unknown if blank's minimum score should be 1 or 2. Most likely it's 1, with some other hack to account for edge cases.
|
|
- may be related to the above
|
|
- target does not appear to compress backwards
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
self.min_scores = {
|
|
'blank': 1,
|
|
'iterate': 2,
|
|
'alternate': 3,
|
|
'repeat': 3,
|
|
'reverse': 3,
|
|
'flip': 3,
|
|
}
|
|
|
|
self.preference = [
|
|
'repeat',
|
|
'blank',
|
|
'flip',
|
|
'reverse',
|
|
'iterate',
|
|
'alternate',
|
|
#'literal',
|
|
]
|
|
|
|
self.lookback_methods = 'repeat', 'reverse', 'flip'
|
|
|
|
self.__dict__.update({
|
|
'data': None,
|
|
'commands': lz_commands,
|
|
'debug': False,
|
|
'literal_only': False,
|
|
})
|
|
|
|
self.arg_names = 'data', 'commands', 'debug', 'literal_only'
|
|
|
|
self.__dict__.update(kwargs)
|
|
self.__dict__.update(dict(zip(self.arg_names, args)))
|
|
|
|
if self.data is not None:
|
|
self.compress()
|
|
|
|
def compress(self, data=None):
|
|
if data is not None:
|
|
self.data = data
|
|
|
|
self.data = list(bytearray(self.data))
|
|
|
|
self.indexes = {}
|
|
self.lookbacks = {}
|
|
for method in self.lookback_methods:
|
|
self.lookbacks[method] = {}
|
|
|
|
self.address = 0
|
|
self.end = len(self.data)
|
|
self.output = []
|
|
self.literal = None
|
|
|
|
while self.address < self.end:
|
|
|
|
if self.score():
|
|
self.do_literal()
|
|
self.do_winner()
|
|
|
|
else:
|
|
if self.literal == None:
|
|
self.literal = self.address
|
|
self.address += 1
|
|
|
|
self.do_literal()
|
|
|
|
self.output += [lz_end]
|
|
return self.output
|
|
|
|
def reset_scores(self):
|
|
self.scores = {}
|
|
self.offsets = {}
|
|
self.helpers = {}
|
|
for method in self.min_scores.iterkeys():
|
|
self.scores[method] = 0
|
|
|
|
def bit_flip(self, byte):
|
|
return bit_flipped[byte]
|
|
|
|
def do_literal(self):
|
|
if self.literal != None:
|
|
length = abs(self.address - self.literal)
|
|
start = min(self.literal, self.address + 1)
|
|
self.helpers['literal'] = self.data[start:start+length]
|
|
self.do_cmd('literal', length)
|
|
self.literal = None
|
|
|
|
def score(self):
|
|
self.reset_scores()
|
|
|
|
map(self.score_literal, ['iterate', 'alternate', 'blank'])
|
|
|
|
for method in self.lookback_methods:
|
|
self.scores[method], self.offsets[method] = self.find_lookback(method, self.address)
|
|
|
|
self.stop_short()
|
|
|
|
return any(
|
|
score
|
|
> self.min_scores[method] + int(score > lowmax)
|
|
for method, score in self.scores.iteritems()
|
|
)
|
|
|
|
def stop_short(self):
|
|
"""
|
|
If a lookback is close, reduce the scores of other commands.
|
|
"""
|
|
best_method, best_score = max(
|
|
self.scores.items(),
|
|
key = lambda x: (
|
|
x[1],
|
|
-self.preference.index(x[0])
|
|
)
|
|
)
|
|
for method in self.lookback_methods:
|
|
min_score = self.min_scores[method]
|
|
for address in xrange(self.address+1, self.address+best_score):
|
|
length, index = self.find_lookback(method, address)
|
|
if length > max(min_score, best_score):
|
|
# BUG: lookbacks can reduce themselves. This appears to be a bug in the target also.
|
|
for m, score in self.scores.items():
|
|
self.scores[m] = min(score, address - self.address)
|
|
|
|
|
|
def read(self, address=None):
|
|
if address is None:
|
|
address = self.address
|
|
if 0 <= address < len(self.data):
|
|
return self.data[address]
|
|
return None
|
|
|
|
def find_all_lookbacks(self):
|
|
for method in self.lookback_methods:
|
|
for address, byte in enumerate(self.data):
|
|
self.find_lookback(method, address)
|
|
|
|
def find_lookback(self, method, address=None):
|
|
"""Temporarily stubbed, because the real function doesn't run in polynomial time."""
|
|
return 0, None
|
|
|
|
def broken_find_lookback(self, method, address=None):
|
|
if address is None:
|
|
address = self.address
|
|
|
|
existing = self.lookbacks.get(method, {}).get(address)
|
|
if existing != None:
|
|
return existing
|
|
|
|
lookback = 0, None
|
|
|
|
# Better to not carelessly optimize at the moment.
|
|
"""
|
|
if address < 2:
|
|
return lookback
|
|
"""
|
|
|
|
byte = self.read(address)
|
|
if byte is None:
|
|
return lookback
|
|
|
|
direction, mutate = {
|
|
'repeat': ( 1, int),
|
|
'reverse': (-1, int),
|
|
'flip': ( 1, self.bit_flip),
|
|
}[method]
|
|
|
|
# Doesn't seem to help
|
|
"""
|
|
if mutate == self.bit_flip:
|
|
if byte == 0:
|
|
self.lookbacks[method][address] = lookback
|
|
return lookback
|
|
"""
|
|
|
|
data_len = len(self.data)
|
|
is_two_byte_index = lambda index: int(index < address - 0x7f)
|
|
|
|
for index in self.get_indexes(mutate(byte)):
|
|
|
|
if index >= address:
|
|
break
|
|
|
|
old_length, old_index = lookback
|
|
if direction == 1:
|
|
if old_length > data_len - index: break
|
|
else:
|
|
if old_length > index: continue
|
|
|
|
if self.read(index) in [None]: continue
|
|
|
|
length = 1 # we know there's at least one match, or we wouldn't be checking this index
|
|
while 1:
|
|
this_byte = self.read(address + length)
|
|
that_byte = self.read(index + length * direction)
|
|
if that_byte == None or this_byte != mutate(that_byte):
|
|
break
|
|
length += 1
|
|
|
|
score = length - is_two_byte_index(index)
|
|
old_score = old_length - is_two_byte_index(old_index)
|
|
if score >= old_score or (score == old_score and length > old_length):
|
|
# XXX maybe avoid two-byte indexes when possible
|
|
if score >= lookback[0] - is_two_byte_index(lookback[1]):
|
|
lookback = length, index
|
|
|
|
self.lookbacks[method][address] = lookback
|
|
return lookback
|
|
|
|
def get_indexes(self, byte):
|
|
if not self.indexes.has_key(byte):
|
|
self.indexes[byte] = []
|
|
index = -1
|
|
while 1:
|
|
try:
|
|
index = self.data.index(byte, index + 1)
|
|
except ValueError:
|
|
break
|
|
self.indexes[byte].append(index)
|
|
return self.indexes[byte]
|
|
|
|
def score_literal(self, method):
|
|
address = self.address
|
|
|
|
compare = {
|
|
'blank': [0],
|
|
'iterate': [self.read(address)],
|
|
'alternate': [self.read(address), self.read(address + 1)],
|
|
}[method]
|
|
|
|
# XXX may or may not be correct
|
|
if method == 'alternate' and compare[0] == 0:
|
|
return
|
|
|
|
length = 0
|
|
while self.read(address + length) == compare[length % len(compare)]:
|
|
length += 1
|
|
|
|
self.scores[method] = length
|
|
self.helpers[method] = compare
|
|
|
|
def do_winner(self):
|
|
winners = filter(
|
|
lambda (method, score):
|
|
score
|
|
> self.min_scores[method] + int(score > lowmax),
|
|
self.scores.iteritems()
|
|
)
|
|
winners.sort(
|
|
key = lambda (method, score): (
|
|
-(score - self.min_scores[method] - int(score > lowmax)),
|
|
self.preference.index(method)
|
|
)
|
|
)
|
|
winner, score = winners[0]
|
|
|
|
length = min(score, max_length)
|
|
self.do_cmd(winner, length)
|
|
self.address += length
|
|
|
|
def do_cmd(self, cmd, length):
|
|
start_address = self.address
|
|
|
|
cmd_length = length - 1
|
|
|
|
output = []
|
|
|
|
if length > lowmax:
|
|
output.append(
|
|
(self.commands['long'] << 5)
|
|
+ (self.commands[cmd] << 2)
|
|
+ (cmd_length >> 8)
|
|
)
|
|
output.append(
|
|
cmd_length & 0xff
|
|
)
|
|
else:
|
|
output.append(
|
|
(self.commands[cmd] << 5)
|
|
+ cmd_length
|
|
)
|
|
|
|
self.helpers['blank'] = [] # quick hack
|
|
output += self.helpers.get(cmd, [])
|
|
|
|
if cmd in self.lookback_methods:
|
|
offset = self.offsets[cmd]
|
|
# Negative offsets are one byte.
|
|
# Positive offsets are two.
|
|
if 0 < start_address - offset - 1 <= 0x7f:
|
|
offset = (start_address - offset - 1) | 0x80
|
|
output += [offset]
|
|
else:
|
|
output += [offset / 0x100, offset % 0x100] # big endian
|
|
|
|
if self.debug:
|
|
print ' '.join(map(str, [
|
|
cmd, length, '\t',
|
|
' '.join(map('{:02x}'.format, output)),
|
|
self.data[start_address:start_address+length] if cmd in self.lookback_methods else '',
|
|
]))
|
|
|
|
self.output += output
|
|
|
|
|
|
|
|
class Decompressed:
|
|
"""
|
|
Interpret and decompress lz-compressed data, usually 2bpp.
|
|
"""
|
|
|
|
"""
|
|
Usage:
|
|
data = Decompressed(lz).output
|
|
or
|
|
data = Decompressed().decompress(lz)
|
|
or
|
|
d = Decompressed()
|
|
d.lz = lz
|
|
data = d.decompress()
|
|
|
|
To decompress from offset 0x80000 in a rom:
|
|
data = Decompressed(rom, start=0x80000).output
|
|
"""
|
|
|
|
lz = None
|
|
start = 0
|
|
commands = lz_commands
|
|
debug = False
|
|
|
|
arg_names = 'lz', 'start', 'commands', 'debug'
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.__dict__.update(dict(zip(self.arg_names, args)))
|
|
self.__dict__.update(kwargs)
|
|
|
|
self.command_names = dict(map(reversed, self.commands.items()))
|
|
self.address = self.start
|
|
|
|
if self.lz is not None:
|
|
self.decompress()
|
|
|
|
if self.debug: print self.command_list()
|
|
|
|
|
|
def command_list(self):
|
|
"""
|
|
Print a list of commands that were used. Useful for debugging.
|
|
"""
|
|
|
|
text = ''
|
|
|
|
output_address = 0
|
|
for name, attrs in self.used_commands:
|
|
length = attrs['length']
|
|
address = attrs['address']
|
|
offset = attrs['offset']
|
|
direction = attrs['direction']
|
|
|
|
text += '{2:03x} {0}: {1}'.format(name, length, output_address)
|
|
text += '\t' + ' '.join(
|
|
'{:02x}'.format(int(byte))
|
|
for byte in self.lz[ address : address + attrs['cmd_length'] ]
|
|
)
|
|
|
|
if offset is not None:
|
|
repeated_data = self.output[ offset : offset + length * direction : direction ]
|
|
if name == 'flip':
|
|
repeated_data = map(bit_flipped.__getitem__, repeated_data)
|
|
text += ' [' + ' '.join(map('{:02x}'.format, repeated_data)) + ']'
|
|
|
|
text += '\n'
|
|
output_address += length
|
|
|
|
return text
|
|
|
|
|
|
def decompress(self, lz=None):
|
|
|
|
if lz is not None:
|
|
self.lz = lz
|
|
|
|
self.lz = bytearray(self.lz)
|
|
|
|
self.used_commands = []
|
|
self.output = []
|
|
|
|
while 1:
|
|
|
|
cmd_address = self.address
|
|
self.offset = None
|
|
self.direction = None
|
|
|
|
if (self.byte == lz_end):
|
|
self.next()
|
|
break
|
|
|
|
self.cmd = (self.byte & 0b11100000) >> 5
|
|
|
|
if self.cmd_name == 'long':
|
|
# 10-bit length
|
|
self.cmd = (self.byte & 0b00011100) >> 2
|
|
self.length = (self.next() & 0b00000011) * 0x100
|
|
self.length += self.next() + 1
|
|
else:
|
|
# 5-bit length
|
|
self.length = (self.next() & 0b00011111) + 1
|
|
|
|
self.__class__.__dict__[self.cmd_name](self)
|
|
|
|
self.used_commands += [(
|
|
self.cmd_name,
|
|
{
|
|
'length': self.length,
|
|
'address': cmd_address,
|
|
'offset': self.offset,
|
|
'cmd_length': self.address - cmd_address,
|
|
'direction': self.direction,
|
|
}
|
|
)]
|
|
|
|
# Keep track of the data we just decompressed.
|
|
self.compressed_data = self.lz[self.start : self.address]
|
|
|
|
|
|
@property
|
|
def byte(self):
|
|
return self.lz[ self.address ]
|
|
|
|
def next(self):
|
|
byte = self.byte
|
|
self.address += 1
|
|
return byte
|
|
|
|
@property
|
|
def cmd_name(self):
|
|
return self.command_names.get(self.cmd)
|
|
|
|
|
|
def get_offset(self):
|
|
|
|
if self.byte >= 0x80: # negative
|
|
# negative
|
|
offset = self.next() & 0x7f
|
|
offset = len(self.output) - offset - 1
|
|
else:
|
|
# positive
|
|
offset = self.next() * 0x100
|
|
offset += self.next()
|
|
|
|
self.offset = offset
|
|
|
|
|
|
def literal(self):
|
|
"""
|
|
Copy data directly.
|
|
"""
|
|
self.output += self.lz[ self.address : self.address + self.length ]
|
|
self.address += self.length
|
|
|
|
def iterate(self):
|
|
"""
|
|
Write one byte repeatedly.
|
|
"""
|
|
self.output += [self.next()] * self.length
|
|
|
|
def alternate(self):
|
|
"""
|
|
Write alternating bytes.
|
|
"""
|
|
alts = [self.next(), self.next()]
|
|
self.output += [ alts[x & 1] for x in xrange(self.length) ]
|
|
|
|
def blank(self):
|
|
"""
|
|
Write zeros.
|
|
"""
|
|
self.output += [0] * self.length
|
|
|
|
def flip(self):
|
|
"""
|
|
Repeat flipped bytes from output.
|
|
|
|
Example: 11100100 -> 00100111
|
|
"""
|
|
self._repeat(table=bit_flipped)
|
|
|
|
def reverse(self):
|
|
"""
|
|
Repeat reversed bytes from output.
|
|
"""
|
|
self._repeat(direction=-1)
|
|
|
|
def repeat(self):
|
|
"""
|
|
Repeat bytes from output.
|
|
"""
|
|
self._repeat()
|
|
|
|
def _repeat(self, direction=1, table=None):
|
|
self.get_offset()
|
|
self.direction = direction
|
|
# Note: appends must be one at a time (this way, repeats can draw from themselves if required)
|
|
for i in xrange(self.length):
|
|
byte = self.output[ self.offset + i * direction ]
|
|
self.output.append( table[byte] if table else byte )
|