Source code for ducky.debugging

"""
Virtual machine debugging tools - break points, watch points, etc.

Create "point" that's triggered when a condition is satisfied (e.g.
processor executes instruction on specified address, memory at
specified address was modified, etc. Then, create "action" (e.g.
suspend core), and bind both pieces together - when point gets
triggered, execute list of actions.
"""

from six import itervalues

from .util import str2int, UINT8_FMT, UINT16_FMT, UINT32_FMT
from .errors import InvalidResourceError

[docs]class Action(object): """ Base class of all debugging actions. :param logging.Logger logger: logger instance used for logging. """ def __init__(self, logger): self.logger = logger
[docs] def act(self, core, point): """ This method is called when "action" is executed. Implement it in child classes to give child actions a functionality. :param ducky.cpu.CPUCore core: CPU core where point was triggered. :param ducky.debugging.Point point: point that was triggered. """ raise NotImplementedError()
def __repr__(self): raise NotImplementedError()
[docs]class Point(object): """ Base class of all debugging points. :param ducky.debugging.DebuggingSet debugging_set: debugging set this point belongs to. :param bool active: if not ``True``, point is not active and will not trigger. :param int countdown: if greater than zero, point has to trigger ``countdown`` times before its actions are executed for the first time. """ def __init__(self, debugging_set, active = True, countdown = 0): super(Point, self).__init__() self.active = active self.countdown = countdown self.debugging_set = debugging_set self.actions = []
[docs] def is_triggered(self, core, *args, **kwargs): """ Test point's condition. :param ducky.cpu.CPUCore core: core requesting the test. :rtype: bool :returns: ``True`` if condition is satisfied. """ raise NotImplementedError()
def __repr__(self): raise NotImplementedError()
[docs]class SuspendCoreAction(Action): """ If executed, this action will suspend the CPU core that triggered its parent point. """
[docs] def act(self, core, point): core.suspend()
def __repr__(self): return '<SuspendCoreAction>' @staticmethod
[docs] def create_from_config(debugging_set, config, section): return SuspendCoreAction(debugging_set.core.LOGGER)
[docs]class LogValueAction(Action): """ This is the base class for actions that log a numerical values. :param logging.Logger logger: logger instance used for logging. :param int size: size of logged number, in bytes. """ def __init__(self, logger, size): super(LogValueAction, self).__init__(logger) self.size = size if size == 4: self.formatter = UINT32_FMT elif size == 2: self.formatter = UINT16_FMT else: self.formatter = UINT8_FMT
[docs] def get_values(self, core, point): """ Prepare dictionary with values for message that will be shown to the user. :param ducky.cpu.CPUCore core: core point was triggered on. :param ducky.debugging.Point point: triggered point. :rtype: dict :returns: dictionary that will be passed to message ``format()`` method. """ raise NotImplementedError()
[docs] def get_message(self, core, point): """ Return message that, formatted with output of ``get_values()``, will be shown to user. :param ducky.cpu.CPUCore core: core point was triggered on. :param ducky.debugging.Point point: triggered point. :rtype: string :returns: information message. """ raise NotImplementedError()
[docs] def act(self, core, point): data = self.get_values(core, point) data.update({ 'watchpoint': repr(point), 'ip': UINT32_FMT(core.registers.ip), 'value': self.formatter(data['value']) }) self.logger.info(self.get_message(core, point).format(**data))
[docs]class LogMemoryContentAction(LogValueAction): """ When triggered, logs content of a specified location in memory. :param logging.Logger logger: logger instance used for logging. :param u32_t address: memory location. :param int size: size of logged number, in bytes. """ def __init__(self, logger, address, size): super(LogMemoryContentAction, self).__init__(logger, size) self.address = address if self.size == 4: self.reader = 'read_u32' elif self.size == 2: self.reader = 'read_u16' else: self.reader = 'read_u8' def __repr__(self): return '<LogMemoryContentAction: address=%s, size=%s>' % (UINT32_FMT(self.address), self.size)
[docs] def get_values(self, core, point): reader = getattr(core.mmu.memory, self.reader) return { 'address': UINT32_FMT(self.address), 'value': reader(self.address), }
[docs] def get_message(self, core, point): return 'memory: IP={ip}, {address}={value}'
@staticmethod
[docs] def create_from_config(debugging_set, config, section): _get, _getbool, _getint = config.create_getters(section) return LogMemoryContentAction(debugging_set.core.LOGGER, _getint('address'), _getint('size', 4))
[docs]class LogRegisterContentAction(LogValueAction): """ When triggered, logs content of a specified register. :param logging.Logger logger: logger instance used for logging. :param list registers: list of register names. """ def __init__(self, logger, registers): super(LogRegisterContentAction, self).__init__(logger, 4) self.registers = [r.strip() for r in registers.split(',')] self.formatter = UINT32_FMT def __repr__(self): return '<LogRegisterContentAction: registers=%s>' % ','.join(self.registers)
[docs] def get_values(self, core, point): values = {'value': 0} for r in self.registers: values[r] = r values[r + '_value'] = UINT32_FMT(getattr(core.registers, r).value) return values
[docs] def get_message(self, core, point): return 'register: IP={ip}, %s' % ', '.join(['{%s}={%s_value}' % (r, r) for r in self.registers])
@staticmethod
[docs] def create_from_config(debugging_set, config, section): _get, _getbool, _getint = config.create_getters(section) return LogRegisterContentAction(debugging_set.core.LOGGER, _get('registers'))
[docs]class BreakPoint(Point): def __init__(self, debugging_set, ip, *args, **kwargs): super(BreakPoint, self).__init__(debugging_set, *args, **kwargs) self.ip = ip
[docs] def is_triggered(self, core): core.DEBUG('core IP=%s, self IP=%s', core.IP(), self.ip) return core.IP() == self.ip
def __repr__(self): return '<BreakPoint: IP=%s>' % UINT32_FMT(self.ip) @staticmethod
[docs] def create_from_config(debugging_set, config, section): _get, _getbool, _getint = config.create_getters(section) return BreakPoint(debugging_set, _getint('address'), active = _getbool('active', True), countdown = _getint('countdown', 0))
[docs]class MemoryWatchPoint(Point): def __init__(self, debugging_set, address, read, *args, **kwargs): super(MemoryWatchPoint, self).__init__(debugging_set, *args, **kwargs) self.address = address self.read = read
[docs] def is_triggered(self, core, address = None, read = None): core.DEBUG('%s.is_triggered: address=%s, read=%s, self.address=%s, self.read=%s', self.__class__.__name__, UINT32_FMT(address), read, UINT32_FMT(self.address), self.read) if self.read is None: return address == self.address if self.read != read: return False return address == self.address
def __repr__(self): return '<MemoryWatchPoint: address=%s>' % UINT32_FMT(self.address) @staticmethod
[docs] def create_from_config(debugging_set, config, section): _get, _getbool, _getint = config.create_getters(section) return MemoryWatchPoint(debugging_set, _getint('address'), _getbool('read', None), active = _getbool('active', True), countdown = _getint('countdown', 0))
[docs]class DebuggingSet(object): def __init__(self, core): super(DebuggingSet, self).__init__() self.core = core self.points = [] self.triggered_points = [] C = core.cpu.machine.console console_commands = [ ('bp-list', cmd_bp_list), ('bp-break', cmd_bp_add_breakpoint), ('bp-mwatch', cmd_bp_add_memory_watchpoint), ('bp-active', cmd_bp_active) ] for name, handler in console_commands: if C.is_registered_command(name): continue C.register_command(name, handler) for chain in ('step', 'memory'): setattr(self, 'triggered_%s' % chain, []) for stage in ('pre', 'post'): setattr(self, 'chain_%s_%s' % (stage, chain), [])
[docs] def add_point(self, p, chain): self.core.DEBUG('adding point %s to chain %s', p, chain) getattr(self, 'chain_' + chain.replace('-', '_')).append(p)
[docs] def remove_point(self, p, chain): self.core.DEBUG('removing point %s from chain %s', p, chain) getattr(self, 'chain_' + chain.replace('-', '_')).remove(p)
def __check_chain(self, stage, chain, clean_triggered = False, *args, **kwargs): D = self.core.DEBUG D('__check_chain: stage=%s, chain=%s, clean_triggered=%s', stage, chain, clean_triggered) triggered = getattr(self, 'triggered_' + chain) chain = getattr(self, 'chain_%s_%s' % (stage, chain)) D('__check_chain: before check: chain=%s, triggered=%s', str(chain), str(triggered)) triggered_in_loop = 0 for p in chain: D(repr(p)) if not p.active: D('inactive, not evaluating') continue if not p.is_triggered(self.core, *args, **kwargs): D('not triggered, skipping') continue if p in triggered: D('already triggered by this step, ignore') continue if p.countdown > 0: p.countdown -= 1 if p.countdown != 0: D('countdown %i, skip for now', p.countdown) continue self.core.INFO('Breakpoint triggered: %s', p) triggered.append(p) triggered_in_loop += 1 for action in p.actions: action.act(self.core, p) D('__check_chain: after check: chain=%s, triggered=%s', str(chain), str(triggered)) if clean_triggered is True: triggered[:] = [] D('__check_chain: after cleanup: chain=%s, triggered=%s', str(chain), str(triggered)) return triggered_in_loop > 0
[docs] def pre_step(self): return self.__check_chain('pre', 'step')
[docs] def post_step(self): return self.__check_chain('post', 'step', clean_triggered = True)
[docs] def pre_memory(self, address = None, read = None): return self.__check_chain('pre', 'memory', address = address, read = read)
[docs] def post_memory(self, address = None, read = None): return self.__check_chain('post', 'memory', clean_triggered = True, address = address, read = read)
[docs]def cmd_bp_list(console, cmd): """ List existing breakpoints """ points = [ ['Point', 'Active', 'Countdown', 'Core'] ] for point in itervalues(Point.points): points.append([ repr(point), '*' if point.active else '', point.countdown, point.debugging_set.core.cpuid_prefix, ]) console.table(points)
[docs]def cmd_bp_add_breakpoint(console, cmd): """ Create new breakpoint: bp-break <#cpuid:#coreid> <address> [active] [countdown] """ try: core = console.master.machine.core(cmd[1]) except InvalidResourceError: console.write('go away') return ip = str2int(cmd[2]) active = True if len(cmd) >= 3 and cmd[2] == 'yes' else False countdown = str2int(cmd[3]) if len(cmd) >= 4 else 0 core.init_debug_set() point = core.debug.create_point(BreakPoint, ip, active = active, countdown = countdown) console.writeln('# OK: %s', point)
[docs]def cmd_bp_add_memory_watchpoint(console, cmd): """ Create new memory watchpoint: bp-mwatch <#cpuid:#coreid> <address> [rw] [active] [countdown]' """ try: core = console.master.machine.core(cmd[1]) except InvalidResourceError: console.write('go away') return address = str2int(cmd[2]) access = cmd[3] if len(cmd) >= 4 else 'r' active = True if len(cmd) >= 5 and cmd[4] == 'yes' else False countdown = str2int(cmd[5]) if len(cmd) >= 6 else 0 core.init_debug_set() point = core.debug.create_point(MemoryWatchPoint, address, access, active = active, countdown = countdown) console.writeln('# OK: %s', point)
[docs]def cmd_bp_remove(console, cmd): """ Remove breakpoint: bp-remove <id> """ point = Point.points.get(int(cmd[1])) if point is None: console.writeln('go away') return point.debugging_set.remove_point(point) console.writeln('# OK')
[docs]def cmd_bp_active(console, cmd): """ Toggle "active" flag for a breakpoint: bp-active <id> """ point = Point.points.get(int(cmd[1])) if point is None: console.writeln('go away') return point.active = not point.active console.writeln('# OK: %s', point)