"""
:py:class:`ducky.machine.Machine` is *the* virtual machine. Each instance
represents self-contained virtual machine, with all its devices, memory, CPUs
and other necessary properties.
"""
import itertools
import collections
import os
import sys
import time
from six import iteritems, itervalues
from collections import defaultdict, OrderedDict, deque
from . import mm
from . import snapshot
from . import __version__
from .interfaces import IMachineWorker, ISnapshotable, IReactorTask
from .console import ConsoleMaster
from .errors import InvalidResourceError, ExceptionList
from .log import create_logger
from .reactor import Reactor
from .snapshot import SnapshotNode
from .util import F
from .boot import ROMLoader
from functools import partial
[docs]class MachineState(SnapshotNode):
def __init__(self):
super(MachineState, self).__init__('nr_cpus', 'nr_cores')
[docs] def get_cpu_states(self):
return [__state for __name, __state in iteritems(self.get_children()) if __name.startswith('cpu')]
[docs] def get_cpu_state_by_id(self, cpuid):
return self.get_children()['cpu{}'.format(cpuid)]
[docs]class CommQueue(object):
def __init__(self, channel):
self.channel = channel
self.queue_in = deque()
self.queue_out = deque()
[docs] def is_empty_out(self):
return not bool(self.queue_out)
[docs] def is_empty_in(self):
return not bool(self.queue_in)
[docs] def write_out(self, o):
self.queue_out.append(o)
[docs] def write_in(self, o):
self.queue_in.append(o)
[docs] def read_out(self):
q = self.queue_out
return q.popleft() if q else None
[docs] def read_in(self):
q = self.queue_in
return q.popleft() if q else None
[docs]class CommChannel(object):
def __init__(self, machine):
self.machine = machine
self._queues = {}
[docs] def create_queue(self, name):
queue = CommQueue(self)
self._queues[name] = queue
return queue
[docs] def get_queue(self, name):
return self._queues[name]
[docs] def unregister_queue(self, name):
del self._queues[name]
[docs]class IRQRouterTask(IReactorTask):
"""
This task is responsible for distributing triggered IRQs between CPU cores.
When IRQ is triggered, IRQ source (i.e. device that requires attention) is
appended to this tasks queue (:py:attr:`ducky.machine.IRQRouterTask.qeueu`).
As long as this queue is not empty, this task pops IRQ sources, selects
free CPU core, and by calling its :py:meth:`ducky.cpu.CPUCore.irq` method
core takes reponsibility for executing interrupt routine.
:param ducky.machine.Machine machine: machine this task belongs to.
"""
def __init__(self, machine):
self.machine = machine
self.queue = [False for _ in range(0, ExceptionList.COUNT)]
[docs] def run(self):
self.machine.DEBUG('irq: router has %i waiting irqs', self.queue.count(True))
for irq, triggered in enumerate(self.queue):
if triggered is not True:
continue
self.machine.DEBUG('irq: triggered %i', irq)
for core in self.machine.living_cores:
if core.hwint_allowed is not True:
self.machine.DEBUG('irq: %s hwint not allowed', core.cpuid)
continue
self.machine.DEBUG('irq: interrupt %s', core.cpuid)
self.queue[irq] = False
core.irq(irq)
break
else:
break
if not any(self.queue):
self.machine.reactor.task_suspended(self)
[docs]class HaltMachineTask(IReactorTask):
def __init__(self, machine):
self.machine = machine
[docs] def run(self):
self.machine.halt()
[docs]class EventBus(object):
def __init__(self, machine):
super(EventBus, self).__init__()
self.machine = machine
self.listeners = defaultdict(OrderedDict)
[docs] def add_listener(self, event, callback, *args, **kwargs):
self.machine.DEBUG('%s.add_listener: event=%s, callback=%s, args=%s, kwargs=%s', self.__class__.__name__, event, callback, args, kwargs)
self.listeners[event][callback] = (args, kwargs)
[docs] def remove_listener(self, event, callback):
self.machine.DEBUG('%s.remove_listener: event=%s, callback=%s', self.__class__.__name__, event, callback)
del self.listeners[event][callback]
[docs] def trigger(self, event, *args, **kwargs):
self.machine.DEBUG('%s.trigger: event=%s, args=%s, kwargs=%s', self.__class__.__name__, event, args, kwargs)
for listener, (_args, _kwargs) in iteritems(self.listeners[event]):
_args = _args + args
_kwargs = _kwargs.copy()
_kwargs.update(kwargs)
listener(*args, **kwargs)
[docs]class Machine(ISnapshotable, IMachineWorker):
"""
Virtual machine itself.
"""
[docs] def core(self, cid):
"""
Find CPU core by its string id.
:param string cid: id of searched CPU core, in the form `#<cpuid>:#<coreid>`.
:rtype: :py:class:`ducky.cpu.CPUCore`
:returns: found core
:raises ducky.errors.InvalidResourceError: when no such core exists.
"""
for _cpu in self.cpus:
for _core in _cpu.cores:
if '#%i:#%i' % (_cpu.id, _core.id) == cid:
return _core
raise InvalidResourceError(F('No such CPU core: cid={cid}', cid = cid))
def __init__(self, logger = None, stdin = None, stdout = None, stderr = None):
self.stdin = stdin or sys.stdin
self.stdout = stdout or sys.stdout
self.stderr = stderr or sys.stderr
self.reactor = Reactor(self)
# Setup logging
self.LOGGER = logger or create_logger()
self.DEBUG = self.LOGGER.debug
self.INFO = self.LOGGER.info
self.WARN = self.LOGGER.warning
self.ERROR = self.LOGGER.error
self.EXCEPTION = self.LOGGER.exception
self._tenh = None
self._tenh_device = None
self._tenh_enabled = False
self.console = ConsoleMaster(self)
self.console.register_command('halt', cmd_halt)
self.console.register_command('boot', cmd_boot)
self.console.register_command('run', cmd_run)
self.console.register_command('snap', cmd_snapshot)
self.irq_router_task = IRQRouterTask(self)
self.reactor.add_task(self.irq_router_task)
self.check_living_cores_task = HaltMachineTask(self)
self.reactor.add_task(self.check_living_cores_task)
self.comm_channel = CommChannel(self)
self.events = EventBus(self)
self.living_cores = []
self.running = False
self.halted = False
self.cpus = []
self.memory = None
self.devices = collections.defaultdict(dict)
self.last_state = None
@property
def cores(self):
"""
Get list of all cores in the machine.
:rtype: list
:returns: `list` of :py:class:`ducky.cpu.CPUCore` instances
"""
return [c for c in itertools.chain(*[__cpu.cores for __cpu in self.cpus])]
[docs] def on_core_alive(self, core):
"""
Signal machine that one of CPU cores is now alive.
"""
self.living_cores.append(core)
[docs] def on_core_halted(self, core):
"""
Signal machine that one of CPU cores is no longer alive.
"""
self.living_cores.remove(core)
if not self.living_cores:
self.reactor.task_runnable(self.check_living_cores_task)
[docs] def get_device_by_name(self, name, klass = None):
"""
Get device by its name and class.
:param string name: name of the device.
:param string klass: if set, search only devices with this class.
:rtype: :py:class:`ducky.devices.Device`
:returns: found device
:raises ducky.errors.InvalidResourceError: when no such device exists
"""
self.DEBUG('get_device_by_name: name=%s, klass=%s', name, klass)
for dev_klass, devs in iteritems(self.devices):
if klass and dev_klass != klass:
continue
for dev_name, dev in iteritems(devs):
if dev_name != name:
continue
return dev
raise InvalidResourceError(F('No such device: name={name}, klass={klass}', name = name, klass = klass))
[docs] def get_storage_by_id(self, sid):
"""
Get storage by its id.
:param int sid: id of storage caller is looking for.
:rtype: :py:class:`ducky.devices.Device`
:returns: found device.
:raises ducky.errors.InvalidResourceError: when no such storage exists.
"""
self.DEBUG('get_storage_by_id: id=%s', sid)
self.DEBUG('storages: %s', self.devices['storage'])
for name, dev in iteritems(self.devices['storage']):
if dev.sid != sid:
continue
return dev
raise InvalidResourceError(F('No such storage: sid={sid:d}', sid = sid))
[docs] def save_state(self, parent):
state = parent.add_child('machine', MachineState())
state.nr_cpus = self.nr_cpus
state.nr_cores = self.nr_cores
for cpu in self.cpus:
cpu.save_state(state)
self.memory.save_state(state)
[docs] def load_state(self, state):
self.nr_cpus = state.nr_cpus
self.nr_cores = state.nr_cores
for __cpu in self.cpus:
cpu_state = state.get_children().get('cpu{}'.format(__cpu.id))
if cpu_state is None:
self.WARN('State of CPU #%i not found!', __cpu.id)
continue
__cpu.load_state(cpu_state)
self.memory.load_state(state.get_children()['memory'])
[docs] def setup_devices(self):
from .devices import get_driver
for section in self.config.iter_devices():
_get, _getbool, _getint = self.config.create_getters(section)
klass = _get('klass', None)
driver = _get('driver', None)
if not klass or not driver:
self.ERROR('Unknown class or driver of device %s: klass=%s, driver=%s', section, klass, driver)
continue
if _getbool('enabled', True) is not True:
self.DEBUG('Device %s disabled', section)
continue
dev = get_driver(driver).create_from_config(self, self.config, section)
self.devices[klass][section] = dev
if _get('master', None) is not None:
dev.master = _get('master')
[docs] def hw_setup(self, machine_config):
self.config = machine_config
self._tenh_enabled = machine_config.getbool('machine', 'tenh-enabled', False)
self.nr_cpus = self.config.getint('machine', 'cpus')
self.nr_cores = self.config.getint('machine', 'cores')
# self.evt_address = machine_config.getint('cpu', 'evt-address', DEFAULT_EVT_ADDRESS)
# self.pt_address = machine_config.getint('cpu', 'pt-address', DEFAULT_PT_ADDRESS)
self.memory = mm.MemoryController(self, size = machine_config.getint('memory', 'size', 0x1000000))
self.setup_devices()
self.rom_loader = ROMLoader(self)
from .cpu import CPU
for cpuid in range(0, self.nr_cpus):
self.cpus.append(CPU(self, cpuid, self.memory, cores = self.nr_cores))
@property
def exit_code(self):
return max([c.exit_code for c in itertools.chain(*[__cpu.cores for __cpu in self.cpus])])
[docs] def trigger_irq(self, handler):
self.DEBUG('Machine.trigger_irq: handler=%s', handler)
self.irq_router_task.queue[handler.irq] = True
self.reactor.task_runnable(self.irq_router_task)
[docs] def _do_tenh(self, printer, s, *args):
printer(' ' + s + '\r\n', *args)
self.INFO(s, *args)
[docs] def tenh(self, s, *args):
if not self._tenh_enabled:
self.INFO(s, *args)
return
if self._tenh is None:
for name, device in iteritems(self.devices['output']):
if hasattr(device, 'tenh'):
self._tenh_device = device
self._tenh = partial(self._do_tenh, device.tenh)
device.tenh_enable()
break
else:
self._tenh = self.INFO
self._tenh(s, *args)
[docs] def boot(self):
self.tenh('Ducky VM, version %s', __version__)
self.tenh('Running on %s', sys.version.replace('\n', ' '))
if self.config.getbool('machine', 'jit', False) is True:
self.tenh('JIT enabled')
self.DEBUG('Machine.boot')
self.events.add_listener('on-core-alive', self.on_core_alive)
self.events.add_listener('on-core-halted', self.on_core_halted)
self.memory.boot()
self.console.boot()
for devs in itervalues(self.devices):
for dev in [dev for dev in itervalues(devs) if not dev.is_slave()]:
dev.boot()
self.rom_loader.boot()
for __cpu in self.cpus:
__cpu.boot()
self.running = True
[docs] def run(self):
self.DEBUG('Machine.run')
for devs in itervalues(self.devices):
for dev in [dev for dev in itervalues(devs) if not dev.is_slave()]:
dev.run()
for __cpu in self.cpus:
__cpu.run()
self.start_time = self.end_time = time.time()
self.reactor.run()
self.end_time = time.time()
[docs] def suspend(self):
self.DEBUG('Machine.suspend')
for __cpu in self.cpus:
__cpu.suspend()
[docs] def wake_up(self):
self.DEBUG('Machine.wake_up')
for __cpu in self.cpus:
__cpu.wake_up()
[docs] def die(self, exc):
self.DEBUG('Machine.die: exc=%s', exc)
self.EXCEPTION(exc)
self.halt()
[docs] def halt(self):
self.DEBUG('Machine.halt')
self.capture_state()
for __cpu in self.cpus:
__cpu.halt()
for devs in itervalues(self.devices):
for dev in [dev for dev in itervalues(devs) if not dev.is_slave()]:
dev.halt()
self.rom_loader.halt()
self.memory.halt()
self.console.halt()
self.reactor.remove_task(self.irq_router_task)
self.reactor.remove_task(self.check_living_cores_task)
self.events.remove_listener('on-core-alive', self.on_core_alive)
self.events.remove_listener('on-core-halted', self.on_core_halted)
self.tenh('Halted.')
if self._tenh_enabled is True:
self._tenh_device.tenh_flush_stream()
self._tenh_device.tenh_close_stream()
self.running = False
self.halted = True
[docs] def capture_state(self, suspend = False):
"""
Capture current state of the VM, and store it in it's `last_state` attribute.
:param bool suspend: if `True`, suspend VM before taking snapshot.
"""
self.last_state = snapshot.VMState.capture_vm_state(self, suspend = suspend)
return self.last_state
[docs]def cmd_boot(console, cmd):
"""
Setup HW, load binaries, init everything
"""
M = console.master.machine
M.boot()
M.console.unregister_command('boot')
[docs]def cmd_run(console, cmd):
"""
Start execution of loaded binaries
"""
M = console.master.machine
M.run()
M.console.unregister_command('run')
[docs]def cmd_halt(console, cmd):
"""
Halt execution
"""
M = console.master.machine
M.halt()
M.INFO('VM halted by user')
[docs]def cmd_snapshot(console, cmd):
"""
Create snapshot
"""
M = console.master.machine
state = snapshot.VMState.capture_vm_state(M)
filename = 'ducky-core.{}'.format(os.getpid())
state.save(filename)
M.INFO('Snapshot saved as %s', filename)
console.writeln('Snapshot saved as %s', filename)