Source code for ducky.cpu

import functools
import sys

from six import iterkeys, itervalues, iteritems
from six.moves import range

from functools import partial

from . import registers
from .. import profiler

from ..interfaces import IMachineWorker, ISnapshotable
from ..mm import UINT8_FMT, UINT16_FMT, UINT32_FMT, PAGE_SIZE, PAGE_MASK, PAGE_SHIFT, PageTableEntry, UINT64_FMT
from .registers import Registers, REGISTER_NAMES, FLAGS
from .instructions import DuckyInstructionSet
from ..errors import AccessViolationError, InvalidResourceError
from ..util import LRUCache, Flags
from ..snapshot import SnapshotNode
from ..devices import IRQList

#: Default IVT address
DEFAULT_IVT_ADDRESS = 0x00000000

#: Default PT address
DEFAULT_PT_ADDRESS = 0x00010000

#: Default size of core instruction cache, in instructions.
DEFAULT_CORE_INST_CACHE_SIZE = 256

[docs]class CPUState(SnapshotNode):
[docs] def get_core_states(self): return [__state for __name, __state in iteritems(self.get_children()) if __name.startswith('core')]
[docs] def get_core_state_by_id(self, coreid): return self.get_children()['core{}'.format(coreid)]
[docs]class CPUCoreState(SnapshotNode): def __init__(self): super(CPUCoreState, self).__init__('cpuid', 'coreid', 'registers', 'exit_code', 'alive', 'running', 'idle', 'ivt_address', 'pt_address', 'pt_enabled', 'flags')
[docs]class InterruptVector(object): """ Interrupt vector table entry. """ SIZE = 8 def __init__(self, ip = 0x0000, sp = 0x0000): self.ip = ip self.sp = sp def __repr__(self): return '<InterruptVector: ip=%s, sp=%s>' % (UINT32_FMT(self.ip), UINT32_FMT(self.sp))
[docs]class CPUException(Exception): """ Base class for CPU-related exceptions. :param string msg: message describing exceptional state. :param ducky.cpu.CPUCore core: CPU core that raised exception, if any. :param u32_t ip: address of an instruction that caused exception, if any. """ def __init__(self, msg, core = None, ip = None): super(CPUException, self).__init__(msg) self.core = core self.ip = ip
[docs]class InvalidOpcodeError(CPUException): """ Raised when unknown or invalid opcode is found in instruction. :param int opcode: wrong opcode. """ def __init__(self, opcode, *args, **kwargs): super(InvalidOpcodeError, self).__init__('Invalid opcode: opcode={}'.format(opcode), *args, **kwargs) self.opcode = opcode
[docs]class InvalidInstructionSetError(CPUException): """ Raised when switch to unknown or invalid instruction set is requested. :param int inst_set: instruction set id. """ def __init__(self, inst_set, *args, **kwargs): super(InvalidInstructionSetError, self).__init__('Invalid instruction set requested: inst_set={}'.format(inst_set), *args, **kwargs) self.inst_set = inst_set
[docs]def do_log_cpu_core_state(core, logger = None, disassemble = True, inst_set = None): """ Log state of a CPU core. Content of its registers, and other interesting or useful internal variables are logged. :param ducky.cpu.CPUCore core: core whose state should be logged. :param logger: called for each line of output to actualy log it. By default, core's :py:meth:`ducky.cpu.CPUCore.DEBUG` method is used. """ logger = logger or core.DEBUG inst_set = inst_set or core.instruction_set for i in range(0, Registers.REGISTER_SPECIAL, 4): regs = [(i + j) for j in range(0, 4) if (i + j) < Registers.REGISTER_SPECIAL] s = ['r{:02d}={}'.format(reg, UINT32_FMT(core.registers.map[reg].value)) for reg in regs] logger(' '.join(s)) logger('fp=%s sp=%s ip=%s', UINT32_FMT(core.registers.fp.value), UINT32_FMT(core.registers.sp.value), UINT32_FMT(core.registers.ip.value)) logger('flags=%s', core.flags.to_string()) logger('cnt=%s, alive=%s, running=%s, idle=%s, exit=%i', core.registers.cnt.value, core.alive, core.running, core.idle, core.exit_code) if hasattr(core, 'math_coprocessor'): for index, v in enumerate(core.math_coprocessor.registers.stack): logger('MC: %02i: %s', index, UINT64_FMT(v.value)) if hasattr(core, 'control_coprocessor'): cp = core.control_coprocessor logger('CC: cr0=%s, cr1=%s, cr2=%s, cr3=%s', UINT32_FMT(cp.read_cr0()), UINT32_FMT(cp.read_cr1()), UINT32_FMT(cp.read_cr2()), UINT32_FMT(cp.read_cr3())) if disassemble is True: if core.current_instruction is not None: inst = inst_set.disassemble_instruction(core.LOGGER, core.current_instruction) logger('inst-set=%02d current=%s', inst_set.instruction_set_id, inst) else: logger('inst-set=%02d current=<none>', inst_set.instruction_set_id) else: logger('inst-set=%02d current=<unknown>', inst_set.instruction_set_id) for index, frame in enumerate(core.backtrace()): logger('Frame #%i: %s', index, frame)
[docs]def log_cpu_core_state(*args, **kwargs): """ This is a wrapper for ducky.cpu.do_log_cpu_core_state function. Its main purpose is to be removed when debug mode is not set, therefore all debug calls of ducky.cpu.do_log_cpu_core_state will disappear from code, making such code effectively "quiet". """ do_log_cpu_core_state(*args, **kwargs)
[docs]class StackFrame(object): def __init__(self, fp): super(StackFrame, self).__init__() self.FP = fp self.IP = None def __getattribute__(self, name): if name == 'address': return self.FP return super(StackFrame, self).__getattribute__(name) def __repr__(self): return '<StackFrame: FP={}, IP={}>'.format(UINT32_FMT(self.FP), UINT32_FMT(self.IP if self.IP is not None else 0))
[docs]class CoreFlags(Flags): _flags = ['privileged', 'hwint_allowed', 'equal', 'zero', 'overflow', 'sign'] _labels = 'PHEZOS'
[docs]class InstructionCache(LRUCache): """ Simple instruction cache class, based on LRU dictionary, with a limited size. :param ducky.cpu.CPUCore core: CPU core that owns this cache. :param int size: maximal number of entries this cache can store. """ def __init__(self, mmu, size, *args, **kwargs): super(InstructionCache, self).__init__(mmu.core.cpu.machine.LOGGER, size, *args, **kwargs) self.mmu = mmu self.core = mmu.core if mmu.core.cpu.machine.config.getbool('machine', 'jit', False): self.get_object = self.get_object_jit
[docs] def get_object(self, addr): """ Read instruction from memory. This method is responsible for the real job of fetching instructions and filling the cache. :param u24 addr: absolute address to read from :return: instruction :rtype: ``InstBinaryFormat_Master`` """ core = self.core inst, desc, opcode = core.instruction_set.decode_instruction(core.LOGGER, core.MEM_IN32(addr, not_execute = False), core = core) return inst, opcode, partial(desc.execute, core, inst)
[docs] def get_object_jit(self, addr): """ Read instruction from memory. This method is responsible for the real job of fetching instructions and filling the cache. :param u24 addr: absolute address to read from :return: instruction :rtype: ``InstBinaryFormat_Master`` """ core = self.core inst, desc, opcode = core.instruction_set.decode_instruction(core.LOGGER, core.MEM_IN32(addr, not_execute = False), core = core) fn = desc.jit(core, inst) if fn is None: return inst, opcode, partial(desc.execute, core, inst) return inst, opcode, fn
[docs]class MMU(ISnapshotable): """ Memory management unit (aka MMU) provides a single point handling all core's memory operations. All memory reads and writes must go through this unit, which is then responsible for all translations, access control, and caching. :param ducky.cpu.CPUCore core: parent core. :param ducky.mm.MemoryController memory_controller: memory controller that provides access to the main memory. """ def __init__(self, core, memory_controller): super(MMU, self).__init__() config = core.cpu.machine.config self.core = core self.memory = memory_controller self.force_aligned_access = config.getbool('memory', 'force-aligned-access', default = False) self.pt_address = config.getint('cpu', 'pt-address', DEFAULT_PT_ADDRESS) self.pt_enabled = False self.pte_cache = {} self.DEBUG = core.DEBUG self.instruction_cache = InstructionCache(self, config.getint('cpu', 'inst-cache', default = DEFAULT_CORE_INST_CACHE_SIZE)) self.set_access_methods() def __debug_wrapper_read(self, reader, *args, **kwargs): self.core.debug.pre_memory(args[0], read = True) if not self.core.running: return value = reader(*args, **kwargs) self.core.debug.post_memory(args[0], read = True) return value def __debug_wrapper_write(self, writer, *args, **kwargs): self.core.debug.pre_memory(args[0], read = False) if not self.core.running: return writer(*args, **kwargs) self.core.debug.post_memory(args[0], read = False)
[docs] def set_access_methods(self): """ Set parent core's memory-access methods to proper shortcuts. """ self.DEBUG('MMU.set_access_methods') self.core.MEM_IN8 = self.full_read_u8 self.core.MEM_IN16 = self.full_read_u16 self.core.MEM_IN32 = self.full_read_u32 self.core.MEM_OUT8 = self.full_write_u8 self.core.MEM_OUT16 = self.full_write_u16 self.core.MEM_OUT32 = self.full_write_u32 if self.core.debug is not None: self.core.MEM_IN8 = partial(self.__debug_wrapper_read, self.core.MEM_IN8) self.core.MEM_IN16 = partial(self.__debug_wrapper_read, self.core.MEM_IN16) self.core.MEM_IN32 = partial(self.__debug_wrapper_read, self.core.MEM_IN32) self.core.MEM_OUT8 = partial(self.__debug_wrapper_write, self.core.MEM_OUT8) self.core.MEM_OUT16 = partial(self.__debug_wrapper_write, self.core.MEM_OUT16) self.core.MEM_OUT32 = partial(self.__debug_wrapper_write, self.core.MEM_OUT32)
[docs] def reset(self): self.instruction_cache.clear() self.pt_enabled = False self.pte_cache = {}
[docs] def halt(self): pass
[docs] def release_ptes(self): self.DEBUG('MMU.release_ptes') self.pte_cache = {}
[docs] def get_pte(self, addr): """ Find out PTE for particular physical address. If PTE is not in internal PTE cache, it is fetched from PTE table. :param u24 addr: memory address. """ pg_index = (addr & PAGE_MASK) >> PAGE_SHIFT self.DEBUG('MMU.get_pte: addr=%s, pte-address=%s', UINT32_FMT(addr), UINT32_FMT(self.pt_address + pg_index)) if pg_index not in self.pte_cache: self.pte_cache[pg_index] = pte = PageTableEntry.from_int(self.memory.read_u8(self.pt_address + pg_index)) else: pte = self.pte_cache[pg_index] self.DEBUG(' pte=%s (%s)', pte.to_string(), pte.to_int()) return pte
[docs] def check_access(self, access, addr, align = None): """ Check attempted access against PTE. Be aware that each check can be turned off by configuration file. :param access: ``read``, ``write`` or ``execute``. :param u24 addr: memory address. :param int align: if set, operation is expected to be aligned to this boundary. :raises ducky.errors.AccessViolationError: when access is denied. """ self.DEBUG('MMU.check_access: access=%s, addr=%s', access, UINT32_FMT(addr)) if self.force_aligned_access and align is not None and addr % align: raise AccessViolationError('Not allowed to access unaligned memory: access=%s, address=%s, align=%s' % (access, UINT32_FMT(addr), align)) pg_index = (addr & PAGE_MASK) >> PAGE_SHIFT if self.core.privileged or self.pt_enabled is not True: return self.memory.get_page(pg_index) pte = self.get_pte(addr) if getattr(pte, access) == 1: return self.memory.get_page(pg_index) raise AccessViolationError('Not allowed to access memory: access=%s, address=%s, pte=%s' % (access, UINT32_FMT(addr), pte.to_string()))
[docs] def full_read_u8(self, addr): self.DEBUG('MMU.raw_read_u8: addr=%s', UINT32_FMT(addr)) return self.check_access('read', addr).read_u8(addr & (PAGE_SIZE - 1))
[docs] def full_read_u16(self, addr): self.DEBUG('MMU.raw_read_u16: addr=%s', UINT32_FMT(addr)) return self.check_access('read', addr, align = 2).read_u16(addr & (PAGE_SIZE - 1))
[docs] def full_read_u32(self, addr, not_execute = True): self.DEBUG('MMU.raw_read_u32: addr=%s', UINT32_FMT(addr)) pg = self.check_access('read', addr, align = 4) if not_execute is not True: pg = self.check_access('execute', addr) return pg.read_u32(addr & (PAGE_SIZE - 1))
[docs] def full_write_u8(self, addr, value): self.DEBUG('MMU.raw_write_u8: addr=%s, value=%s', UINT32_FMT(addr), UINT8_FMT(value)) return self.check_access('write', addr).write_u8(addr & (PAGE_SIZE - 1), value)
[docs] def full_write_u16(self, addr, value): self.DEBUG('MMU.raw_write_u16: addr=%s, value=%s', UINT32_FMT(addr), UINT16_FMT(value)) return self.check_access('write', addr).write_u16(addr & (PAGE_SIZE - 1), value)
[docs] def full_write_u32(self, addr, value): self.DEBUG('MMU.raw_write_u32: addr=%s, value=%s', UINT32_FMT(addr), UINT32_FMT(value)) return self.check_access('write', addr).write_u32(addr & (PAGE_SIZE - 1), value)
[docs]class CPUCore(ISnapshotable, IMachineWorker): """ This class represents the main workhorse, one of CPU cores. Reads instructions, executes them, has registers, caches, handles interrupts, ... :param int coreid: id of this core. Usually, it's its serial number but it has no special meaning. :param ducky.cpu.CPU cpu: CPU that owns this core. :param ducky.mm.MemoryController memory_controller: use this controller to access main memory. """ def __init__(self, coreid, cpu, memory_controller): super(CPUCore, self).__init__() config = cpu.machine.config self.cpuid = '#{}:#{}'.format(cpu.id, coreid) self.cpuid_prefix = self.cpuid + ':' def __log(logger, *args, **kwargs): args = ('%s ' + args[0],) + (self.cpuid_prefix,) + args[1:] logger(*args) def __log_exception(logger_fn, exc): self.cpu.machine.LOGGER.exception('Exception raised in CPU core') do_log_cpu_core_state(self, logger = logger_fn, disassemble = False if isinstance(exc, InvalidOpcodeError) else True) self.LOGGER = cpu.machine.LOGGER self.DEBUG = lambda *args, **kwargs: __log(self.cpu.machine.DEBUG, *args, **kwargs) self.INFO = lambda *args, **kwargs: __log(self.cpu.machine.INFO, *args, **kwargs) self.WARN = lambda *args, **kwargs: __log(self.cpu.machine.WARN, *args, **kwargs) self.ERROR = lambda *args, **kwargs: __log(self.cpu.machine.ERROR, *args, **kwargs) self.EXCEPTION = functools.partial(__log_exception, self.ERROR) self.id = coreid self.cpu = cpu self.debug = None self.mmu = MMU(self, memory_controller) self.registers = registers.RegisterSet() self.privileged = True self.hwint_allowed = False self.arith_equal = False self.arith_zero = False self.arith_overflow = False self.arith_sign = False self.ivt_address = config.getint('cpu', 'ivt-address', DEFAULT_IVT_ADDRESS) self.instruction_set = DuckyInstructionSet self.instruction_set_stack = [] self.current_ip = None self.current_instruction = None self.alive = False self.running = False self.idle = False self.core_profiler = profiler.STORE.get_core_profiler(self) if profiler.STORE.is_cpu_enabled() else None self.exit_code = 0 self.frames = [] self.check_frames = cpu.machine.config.getbool('cpu', 'check-frames', default = False) self.coprocessors = {} if self.cpu.machine.config.getbool('cpu', 'math-coprocessor', False): from .coprocessor import math_copro self.math_coprocessor = self.coprocessors['math'] = math_copro.MathCoprocessor(self) if config.getbool('cpu', 'control-coprocessor', True): from .coprocessor import control self.control_coprocessor = self.coprocessors['control'] = control.ControlCoprocessor(self)
[docs] def has_coprocessor(self, name): return hasattr(self, '{}_coprocessor'.format(name))
def __repr__(self): return '#{}:#{}'.format(self.cpu.id, self.id)
[docs] def save_state(self, parent): self.DEBUG('save_state') state = parent.add_child('core{}'.format(self.id), CPUCoreState()) state.cpuid = self.cpu.id state.coreid = self.id state.flags = self.flags.to_int() state.registers = [] for i, reg in enumerate(REGISTER_NAMES): state.registers.append(int(self.registers.map[reg].value)) state.ivt_address = self.ivt_address state.pt_address = self.mmu.pt_address state.pt_enabled = self.mmu.pt_enabled state.exit_code = self.exit_code state.idle = self.idle state.alive = self.alive state.running = self.running if self.has_coprocessor('math'): self.math_coprocessor.save_state(state)
[docs] def load_state(self, state): self.flags = CoreFlags.from_int(state.flags) for i, reg in enumerate(REGISTER_NAMES): self.registers.map[reg].value = state.registers[i] self.ivt_address = state.ivt_address self.mmu.pt_address = state.pt_address self.mmu.pt_enabled = state.pt_enabled self.exit_code = state.exit_code self.idle = state.idle self.alive = state.alive self.running = state.running if self.has_coprocessor('math'): self.math_coprocessor.load_state(state.get_children()['math_coprocessor'])
[docs] def init_debug_set(self): if self.debug is None: from .. import debugging self.debug = debugging.DebuggingSet(self) self.mmu.set_access_methods()
[docs] def REG(self, reg): return self.registers.map[reg]
[docs] def IP(self): return self.registers.ip
[docs] def SP(self): return self.registers.sp
[docs] def FP(self): return self.registers.fp
[docs] def reset(self, new_ip = 0x00000000): """ Reset core's state. All registers are set to zero, all flags are set to zero, except ``HWINT`` flag which is set to one, and ``IP`` is set to requested value. :param u32_t new_ip: new ``IP`` value, defaults to zero """ self.instruction_set = DuckyInstructionSet self.instruction_set_stack = [] for reg in registers.RESETABLE_REGISTERS: self.REG(reg).value = 0 self.flags = CoreFlags.create(privileged = True) self.registers.ip.value = new_ip self.mmu.reset()
[docs] def backtrace(self): bt = [] if self.check_frames: for frame in self.frames: bt.append(repr(frame)) return bt bt = [] for frame_index, frame in enumerate(self.frames): ip = self.mmu.memory.read_u32(frame.address + 4) bt.append(ip) ip = self.registers.ip.value - 4 bt.append(ip) return bt
[docs] def raw_push(self, val): """ Push value on stack. ``SP`` is decremented by four, and value is written at this new address. :param u32 val: value to be pushed """ self.DEBUG("raw_push: sp=%s, value=%s", UINT32_FMT(self.registers.sp.value), UINT32_FMT(val)) self.registers.sp.value -= 4 self.MEM_OUT32(self.registers.sp.value, val)
[docs] def raw_pop(self): """ Pop value from stack. 4 byte number is read from address in ``SP``, then ``SP`` is incremented by four. :return: popped value :rtype: ``u32`` """ ret = self.MEM_IN32(self.registers.sp.value) self.registers.sp.value += 4 return ret
[docs] def push(self, *regs): for reg_id in regs: value = self.flags.to_int() if reg_id == FLAGS else self.registers.map[reg_id].value self.DEBUG('push: %s (%s) at %s', reg_id, UINT32_FMT(value), UINT32_FMT(self.registers.sp.value - 4)) self.raw_push(value)
[docs] def pop(self, *regs): for reg_id in regs: value = self.raw_pop() if reg_id == FLAGS: self.flags = CoreFlags.from_int(value) else: self.registers.map[reg_id].value = value self.DEBUG('pop: %s (%s) from %s', reg_id, UINT32_FMT(value), UINT32_FMT(self.registers.sp.value - 4))
[docs] def create_frame(self): """ Create new call stack frame. Push ``IP`` and ``FP`` registers and set ``FP`` value to ``SP``. """ self.DEBUG('create_frame') self.push(Registers.IP, Registers.FP) self.registers.fp.value = self.registers.sp.value if self.check_frames: self.frames.append(StackFrame(self.registers.fp.value))
[docs] def destroy_frame(self): """ Destroy current call frame. Pop ``FP`` and ``IP`` from stack, by popping ``FP`` restores previous frame. :raises CPUException: if current frame does not match last created frame. """ self.DEBUG('destroy_frame') if self.check_frames: if self.frames[-1].FP != self.registers.sp.value: raise CPUException('Leaving frame with wrong SP: IP={}, saved SP={}, current SP={}'.format(UINT32_FMT(self.registers.ip.value), UINT32_FMT(self.frames[-1].FP), UINT32_FMT(self.registers.sp.value))) self.frames.pop() self.pop(Registers.FP, Registers.IP)
def __load_interrupt_vector(self, index): self.DEBUG('load_interrupt_vector: ivt=%s, index=%i', UINT32_FMT(self.ivt_address), index) desc = InterruptVector() vector_address = self.ivt_address + index * InterruptVector.SIZE desc.ip = self.MEM_IN32(vector_address) desc.sp = self.MEM_IN32(vector_address + 4) return desc def __enter_interrupt(self, index): """ Prepare CPU for handling interrupt routine. New stack is allocated, content fo registers is saved onto this new stack, and new call frame is created on this stack. CPU is switched into privileged mode. ``CS`` and ``IP`` are set to values, stored in interrupt descriptor table at specified offset. :param u24 table_address: address of interrupt descriptor table :param int index: interrupt number, its index into IDS """ self.DEBUG('__enter_interrupt: index=%i', index) if index >= IRQList.IRQ_COUNT: raise InvalidResourceError('Interrupt index out of range: index=%d' % index) iv = self.__load_interrupt_vector(index) self.DEBUG('__enter_interrupt: desc=%s', iv) old_SP = self.registers.sp.value self.registers.sp.value = iv.sp self.raw_push(old_SP) self.push(FLAGS) self.create_frame() self.privileged = True self.registers.ip.value = iv.ip if self.check_frames: self.frames[-1].IP = iv.ip self.instruction_set_stack.append(self.instruction_set) self.instruction_set = DuckyInstructionSet
[docs] def exit_interrupt(self): """ Restore CPU state after running a interrupt routine. Call frame is destroyed, registers are restored, stack is returned back to memory pool. """ self.DEBUG('exit_interrupt') self.destroy_frame() self.pop(FLAGS) old_SP = self.raw_pop() self.registers.sp.value = old_SP self.instruction_set = self.instruction_set_stack.pop(0)
[docs] def do_int(self, index): """ Handle software interrupt. Real software interrupts cause CPU state to be saved and new stack and register values are prepared by ``__enter_interrupt`` method, virtual interrupts are simply triggered without any prior changes of CPU state. :param int index: interrupt number """ self.DEBUG('do_int: %s', index) if index in self.cpu.machine.virtual_interrupts: self.DEBUG('do_int: calling virtual interrupt') self.cpu.machine.virtual_interrupts[index].run(self) self.DEBUG('do_int: virtual interrupt finished') else: self.__enter_interrupt(index) self.DEBUG('do_int: CPU state prepared to handle interrupt')
def __do_irq(self, index): """ Handle hardware interrupt. CPU state is saved and prepared for interrupt routine by calling ``__enter_interrupt`` method. Receiving of next another interrupts is prevented by clearing ``HWINT`` flag, and ``idle`` flag is set to ``False``. """ self.DEBUG('__do_irq: %s', index) self.__enter_interrupt(index) self.hwint_allowed = False self.change_runnable_state(idle = False) self.DEBUG('__do_irq: CPU state prepared to handle IRQ') log_cpu_core_state(self, inst_set = self.instruction_set_stack[-1])
[docs] def irq(self, index): try: self.__do_irq(index) except (CPUException, ZeroDivisionError, AccessViolationError) as e: e.exc_stack = sys.exc_info() self.die(e)
def __get_flags(self): return CoreFlags.create(privileged = self.privileged, hwint_allowed = self.hwint_allowed, equal = self.arith_equal, zero = self.arith_zero, overflow = self.arith_overflow, sign = self.arith_sign) def __set_flags(self, flags): self.privileged = flags.privileged self.hwint_allowed = flags.hwint_allowed self.arith_equal = flags.equal self.arith_zero = flags.zero self.arith_overflow = flags.overflow self.arith_sign = flags.sign flags = property(__get_flags, __set_flags)
[docs] def check_protected_ins(self): """ Raise ``AccessViolationError`` if core is not running in privileged mode. This method should be used by instruction handlers that require privileged mode, e.g. protected instructions. :raises AccessViolationError: if the core is not in privileged mode """ if not self.privileged: raise AccessViolationError('Instruction not allowed in unprivileged mode: inst={}'.format(self.current_instruction))
[docs] def check_protected_port(self, port): if port not in self.cpu.machine.ports: raise InvalidResourceError('Unhandled port: port={}'.format(UINT16_FMT(port))) if self.privileged: return if self.cpu.machine.ports[port].is_port_protected(port): raise AccessViolationError('Access to port not allowed in unprivileged mode: inst={}, port={}'.format(self.current_instruction, port))
[docs] def step(self): """ Perform one "step" - fetch next instruction, increment IP, and execute instruction's code (see inst_* methods) """ self.DEBUG('----- * ----- * ----- * ----- * ----- * ----- * ----- * -----') has_debug = self.debug is not None if has_debug: self.debug.pre_step() if not self.running: return # Read next instruction self.DEBUG('"FETCH" phase') ip = self.registers.ip self.current_ip = ip.value self.DEBUG('fetch instruction: ip=%s', UINT32_FMT(ip.value)) try: self.current_instruction, opcode, execute = self.mmu.instruction_cache[ip.value] ip.value += 4 self.DEBUG('"EXECUTE" phase: %s %s', UINT32_FMT(self.current_ip), self.instruction_set.disassemble_instruction(self.LOGGER, self.current_instruction)) log_cpu_core_state(self) execute() except (InvalidOpcodeError, AccessViolationError, InvalidResourceError) as e: self.die(e) return cnt = self.registers.cnt cnt.value += 1 self.DEBUG('"SYNC" phase:') log_cpu_core_state(self) if self.core_profiler is not None: self.core_profiler.take_sample() if has_debug: self.debug.post_step()
[docs] def change_runnable_state(self, alive = None, running = None, idle = None): old_state = self.alive and self.running and not self.idle if alive is not None: self.alive = alive if running is not None: self.running = running if idle is not None: self.idle = idle new_state = self.alive and self.running and not self.idle if old_state != new_state: if new_state is True: self.cpu.machine.reactor.task_runnable(self) else: self.cpu.machine.reactor.task_suspended(self)
[docs] def suspend(self): self.DEBUG('CPUCore.suspend') self.change_runnable_state(running = False) self.cpu.machine.events.trigger('on-core-suspend', self)
[docs] def wake_up(self): self.DEBUG('CPUCore.wake_up') self.change_runnable_state(running = True) self.cpu.machine.events.trigger('on-core-running', self)
[docs] def die(self, exc): self.DEBUG('CPUCore.die') self.exit_code = 1 self.EXCEPTION(exc) self.halt()
[docs] def halt(self): self.DEBUG('CPUCore.halt') self.cpu.machine.events.trigger('on-core-suspended', self) self.cpu.machine.events.trigger('on-core-halted', self) self.mmu.halt() self.change_runnable_state(alive = False, running = False) log_cpu_core_state(self) self.cpu.machine.reactor.remove_task(self) self.cpu.machine.tenh('%r: CPU core halted', self)
[docs] def run(self): try: self.step() except (CPUException, ZeroDivisionError, AccessViolationError) as e: e.exc_stack = sys.exc_info() self.die(e) except Exception as e: e.exc_stack = sys.exc_info() self.die(e)
[docs] def boot(self): self.DEBUG('CPUCore.boot') from ..boot import DEFAULT_BOOTLOADER_ADDRESS self.reset(new_ip = DEFAULT_BOOTLOADER_ADDRESS) log_cpu_core_state(self) self.cpu.machine.reactor.add_task(self) self.change_runnable_state(alive = True, running = True) self.cpu.machine.events.trigger('on-core-alive', self) self.cpu.machine.events.trigger('on-core-running', self) if self.core_profiler is not None: self.core_profiler.enable() self.cpu.machine.tenh('%r: CPU core is up', self) if self.mmu.instruction_cache is not None: self.cpu.machine.tenh('%r: %d IC slots', self, self.mmu.instruction_cache.size) self.cpu.machine.tenh('%r: check-frames: %s', self, 'yes' if self.check_frames else 'no') if self.coprocessors: self.cpu.machine.tenh('%r: coprocessor: %s', self, ' '.join(sorted(iterkeys(self.coprocessors))))
[docs]class CPU(ISnapshotable, IMachineWorker): def __init__(self, machine, cpuid, memory_controller, cores = 1): super(CPU, self).__init__() self.cpuid_prefix = '#{}:'.format(cpuid) def __log(logger, *args): args = ('%s ' + args[0],) + (self.cpuid_prefix,) + args[1:] logger(*args) self.DEBUG = lambda *args: __log(self.machine.DEBUG, *args) self.INFO = lambda *args: __log(self.machine.INFO, *args) self.WARN = lambda *args: __log(self.machine.WARN, *args) self.ERROR = lambda *args: __log(self.machine.ERROR, *args) self.EXCEPTION = lambda *args: __log(self.machine.EXCEPTION, *args) self.machine = machine self.id = cpuid self.cores = [] self.living_cores = [] self.halted_cores = [] self.running_cores = [] self.suspended_cores = [] for i in range(0, cores): __core = CPUCore(i, self, memory_controller) self.cores.append(__core) self.halted_cores.append(__core) self.suspended_cores.append(__core) self.machine.console.register_commands([ ('sc', cmd_set_core), ('st', cmd_core_state), ('cont', cmd_cont), ('step', cmd_step), ]) def __repr__(self): return '#%i' % self.id
[docs] def save_state(self, parent): state = parent.add_child('cpu{}'.format(self.id), CPUState()) for core in self.cores: core.save_state(state)
[docs] def load_state(self, state): for core_state in itervalues(state.get_children()): self.cores[core_state.coreid].load_state(core_state)
[docs] def on_core_alive(self, core): """ Triggered when one of cores goes alive. """ self.DEBUG('%s.on_core_alive: core=%s', self.__class__.__name__, core) if core.cpu != self: return self.halted_cores.remove(core) self.living_cores.append(core)
[docs] def on_core_halted(self, core): """ Signal CPU that one of cores is no longer alive. """ self.DEBUG('%s.on_core_halted: core=%s', self.__class__.__name__, core) if core.cpu != self: return self.living_cores.remove(core) self.halted_cores.append(core)
[docs] def on_core_running(self, core): """ Signal CPU that one of cores is now running. """ self.DEBUG('%s.on_core_running: core=%s', self.__class__.__name__, core) if core.cpu != self: return self.suspended_cores.remove(core) self.running_cores.append(core)
[docs] def on_core_suspended(self, core): """ Signal CPU that one of cores is now suspended. """ self.DEBUG('%s.on_core_suspended: core=%s', self.__class__.__name__, core) if core.cpu != self: return self.running_cores.remove(core) self.suspended_cores.append(core)
[docs] def suspend(self): self.DEBUG('CPU.suspend') for core in self.running_cores: core.suspend()
[docs] def wake_up(self): self.DEBUG('CPU.wake_up') for core in self.suspended_cores: core.wake_up()
[docs] def die(self, exc): self.DEBUG('CPU.die') self.EXCEPTION(exc) self.halt()
[docs] def halt(self): self.DEBUG('CPU.halt') for core in self.living_cores: core.halt() self.machine.events.remove_listener('on-core-alive', self.on_core_alive) self.machine.events.remove_listener('on-core-halted', self.on_core_halted) self.machine.events.remove_listener('on-core-running', self.on_core_running) self.machine.events.remove_listener('on-core-suspended', self.on_core_suspended) self.machine.tenh('%r: CPU halted', self)
[docs] def boot(self): self.DEBUG('CPU.boot') self.machine.events.add_listener('on-core-alive', self.on_core_alive) self.machine.events.add_listener('on-core-halted', self.on_core_halted) self.machine.events.add_listener('on-core-running', self.on_core_running) self.machine.events.add_listener('on-core-suspended', self.on_core_suspended) for core in self.cores: core.boot() self.machine.tenh('%r: CPU is up', self)
[docs]def cmd_set_core(console, cmd): """ Set core address of default core used by control commands: sc <coreid> """ M = console.master.machine try: core = M.core(cmd[1]) except InvalidResourceError: console.writeln('go away') return console.default_core = core console.writeln('# OK: default core is %s', core.cpuid)
[docs]def cmd_cont(console, cmd): """ Continue execution until next breakpoint is reached: cont """ if console.default_core is None: console.writeln('# ERR: no core selected') return if console.default_core.running: console.writeln('# ERR: core is not suspended') return console.default_core.wake_up() console.writeln('# OK')
[docs]def cmd_step(console, cmd): """ Step one instruction forward """ if console.default_core is None: console.writeln('# ERR: no core selected') return if console.default_core.running: console.writeln('# ERR: core is not suspended') return console.default_core.run() console.writeln('# OK')
[docs]def cmd_next(console, cmd): """ Proceed to the next instruction in the same stack frame. """ core = console.default_core if hasattr(console, 'default_core') else console.machine.cpus[0].cores[0] if not core.is_suspended(): return def __ip_addr(offset = 0): return core.registers.ip.value + offset try: inst = core.instruction_set.decode_instruction(core.LOGGER, core.memory.read_u32(__ip_addr())) if inst.opcode == core.instruction_set.opcodes.CALL: from ..debugging import add_breakpoint add_breakpoint(core, core.registers.ip.value + 4, ephemeral = True) core.wake_up() else: core.step() core.check_for_events() log_cpu_core_state(core, logger = core.INFO) except CPUException as e: core.die(e)
[docs]def cmd_core_state(console, cmd): """ Print core state """ M = console.master.machine core = console.default_core if console.default_core is not None else M.cpus[0].cores[0] do_log_cpu_core_state(core, logger = functools.partial(console.log, core.INFO))
[docs]def cmd_bt(console, cmd): """ Print current backtrace """ M = console.master.machine core = console.default_core if console.default_core is not None else M.cpus[0].cores[0] table = [ ['Index', 'symbol', 'offset', 'ip'] ] for index, (ip, symbol, offset) in enumerate(core.backtrace()): table.append([index, symbol, UINT32_FMT(offset), UINT32_FMT(ip)]) console.table(table)