"""
This file provides necessary code to allow boot up of a virtual machine with
the correct program running. This code may provide slightly different environment
when compared to real hardware process, since e.g. external files can be mmap-ed
into VM's memory for writing.
"""
import importlib
import mmap
from functools import partial
from ctypes import sizeof
from six import PY2
from .interfaces import IMachineWorker
from .errors import InvalidResourceError
from .util import align, BinaryFile
from .mm import u8_t, u16_t, u32_t, UINT32_FMT, PAGE_SIZE, area_to_pages, PAGE_MASK, ExternalMemoryPage
from .mm.binary import SectionFlags, File
from .snapshot import SnapshotNode
from .hdt import HDT, HDTEntry_Argument, HDTEntry_Device
from .debugging import Point # noqa
#: By default, Hardware Description Table starts at this address after boot.
DEFAULT_HDT_ADDRESS = 0x00000100
#: By default, CPU starts executing instructions at this address after boot.
DEFAULT_BOOTLOADER_ADDRESS = 0x00020000
[docs]class MMapMemoryPage(ExternalMemoryPage):
"""
Memory page backed by an external file that is accessible via ``mmap()``
call. It's a part of one of :py:class:`ducky.boot.MMapArea` instances, and
if such area was opened as `shared`, every change in the content of its
pages will reflect onto the content of an external file, and vice versa,
every change of external file will be reflected in content of this page
(if this page lies in affected area).
:param MMapArea area: area this page belongs to.
"""
def __init__(self, area, *args, **kwargs):
super(MMapMemoryPage, self).__init__(*args, **kwargs)
self.area = area
if PY2:
self.get, self.put = self._get_py2, self._put_py2
else:
self.get, self.put = self._get_py3, self._put_py3
[docs] def get(self, offset):
"""
Read one byte from page.
This is an abstract method, ``__init__`` is expected to replace it with
a method, tailored for the Python version used.
:param int offset: offset of the requested byte.
:rtype: int
"""
raise NotImplementedError()
[docs] def put(self, offset, b):
"""
Write one byte to page.
This is an abstract method, ``__init__`` is expected to replace it with
a method, tailored for the Python version used.
:param int offset: offset of the modified byte.
:param int b: new value of the modified byte.
"""
raise NotImplementedError()
[docs] def _get_py2(self, offset):
"""
Read one byte from page.
:param int offset: offset of the requested byte.
:rtype: int
"""
return ord(self.data[self.offset + offset])
[docs] def _put_py2(self, offset, b):
"""
Write one byte to page.
:param int offset: offset of the modified byte.
:param int b: new value of the modified byte.
"""
self.data[self.offset + offset] = chr(b)
[docs] def _get_py3(self, offset):
"""
Read one byte from page.
:param int offset: offset of the requested byte.
:rtype: int
"""
return self.data[self.offset + offset]
[docs] def _put_py3(self, offset, b):
"""
Write one byte to page.
:param int offset: offset of the modified byte.
:param int b: new value of the modified byte.
"""
self.data[self.offset + offset] = b
[docs]class MMapAreaState(SnapshotNode):
def __init__(self):
super(MMapAreaState, self).__init__('address', 'size', 'path', 'offset')
[docs]class MMapArea(object):
"""
Objects of this class represent one mmaped memory area each, to track this
information for later use.
:param ptr: ``mmap object``, as returned by :py:meth:`mmap.mmap` function.
:param u32_t address: address of the first byte of an area in the memory.
:param u32_t size: length of the area, in bytes.
:param file_path: path to a source file.
:param u32_t offset: offset of the first byte in the source file.
:param int pages_start: first page of the area.
:param int pages_cnt: number of pages in the area.
:param mm.binary.SectionFlags flags: flags applied to this area.
"""
def __init__(self, ptr, address, size, file_path, offset, pages_start, pages_cnt, flags):
super(MMapArea, self).__init__()
self.ptr = ptr
self.address = address
self.size = size
self.file_path = file_path
self.offset = offset
self.pages_start = pages_start
self.pages_cnt = pages_cnt
self.flags = flags
def __repr__(self):
return '<MMapArea: address=%s, size=%s, filepath=%s, pages-start=%s, pages-cnt=%i, flags=%s>' % (UINT32_FMT(self.address), self.size, self.file_path, self.pages_start, self.pages_cnt, self.flags.to_string())
[docs] def save_state(self, parent):
pass
[docs] def load_state(self, state):
pass
[docs]class ROMLoader(IMachineWorker):
"""
This class provides methods for loading all necessary pieces into VM's
memory. These methods are called in VM's `boot` phase.
"""
def __init__(self, machine):
self.machine = machine
self.config = machine.config
self.opened_mmap_files = {} # path: (cnt, file)
self.mmap_areas = {}
self.logger = self.machine.LOGGER
self.DEBUG = self.machine.DEBUG
[docs] def _get_mmap_fileno(self, file_path):
if file_path not in self.opened_mmap_files:
self.opened_mmap_files[file_path] = [0, open(file_path, 'r+b')]
desc = self.opened_mmap_files[file_path]
desc[0] += 1
return desc[1].fileno()
[docs] def _put_mmap_fileno(self, file_path):
desc = self.opened_mmap_files[file_path]
desc[0] -= 1
if desc[0] > 0:
return
desc[1].close()
del self.opened_mmap_files[file_path]
[docs] def mmap_area(self, file_path, address, size, offset = 0, flags = None, shared = False):
"""
Assign set of memory pages to mirror external file, mapped into memory.
:param string file_path: path of external file, whose content new area
should reflect.
:param u24 address: address where new area should start.
:param u24 size: length of area, in bytes.
:param int offset: starting point of the area in mmaped file.
:param ducky.mm.binary.SectionFlags flags: specifies required flags for mmaped
pages.
:param bool shared: if ``True``, content of external file is mmaped as
shared, i.e. all changes are visible to all processes, not only to the
current ducky virtual machine.
:returns: newly created mmap area.
:rtype: ducky.mm.MMapArea
:raises ducky.errors.InvalidResourceError: when ``size`` is not multiply of
:py:data:`ducky.mm.PAGE_SIZE`, or when ``address`` is not multiply of
:py:data:`ducky.mm.PAGE_SIZE`, or when any of pages in the affected area
is already allocated.
"""
self.DEBUG('%s.mmap_area: file=%s, offset=%s, size=%s, address=%s, flags=%s, shared=%s', self.__class__.__name__, file_path, offset, size, UINT32_FMT(address), flags.to_string(), shared)
if size % PAGE_SIZE != 0:
raise InvalidResourceError('Memory size must be multiple of PAGE_SIZE')
if address % PAGE_SIZE != 0:
raise InvalidResourceError('MMap area address must be multiple of PAGE_SIZE')
mc = self.machine.memory
pages_start, pages_cnt = area_to_pages(address, size)
for i in range(pages_start, pages_start + pages_cnt):
if i in mc.pages:
raise InvalidResourceError('MMap request overlaps with existing pages: page=%s, area=%s' % (mc.pages[i], mc.pages[i].area))
mmap_flags = mmap.MAP_SHARED if shared else mmap.MAP_PRIVATE
# Always mmap as writable - VM will force read-only access using
# page flags. But since it is possible to change page flags
# in run-time, and request write access to areas originaly
# loaded as read-only, such write access would fail because
# the underlying mmap area was mmaped as read-only only, and this
# limitation is not possible to overcome.
mmap_prot = mmap.PROT_READ | mmap.PROT_WRITE
ptr = mmap.mmap(
self._get_mmap_fileno(file_path),
size,
flags = mmap_flags,
prot = mmap_prot,
offset = offset)
area = MMapArea(ptr, address, size, file_path, ptr, pages_start, pages_cnt, flags)
for i in range(pages_start, pages_start + pages_cnt):
mc.register_page(MMapMemoryPage(area, mc, i, ptr, offset = (i - pages_start) * PAGE_SIZE))
self.mmap_areas[area.address] = area
return area
[docs] def unmmap_area(self, mmap_area):
mc = self.machine.memory
for pg in mc.get_pages(pages_start = mmap_area.pages_start, pages_cnt = mmap_area.pages_cnt):
mc.unregister_page(pg)
del self.mmap_areas[mmap_area.address]
mmap_area.ptr.close()
self._put_mmap_fileno(mmap_area.file_path)
[docs] def setup_hdt(self):
"""
Initialize memory area containing :ref:`HDT`.
If VM config file specifies ``HDT`` image file, it is loaded, otherwise HDT
is constructed for the actual configuration, and then it's copied into memory.
:param u32_t machine.hdt-address: Base address of ``HDT`` in memory. If not
set, :py:const:`ducky.boot.DEFAULT_HDT_ADDRESS` is used.
:param str machine.hdt-image: ``HDT`` image to load. If not set, ``HDT`` is
constructed for the actual VM's configuration.
"""
self.DEBUG('%s.setup_hdt', self.__class__.__name__)
hdt_address = self.config.getint('machine', 'hdt-address', DEFAULT_HDT_ADDRESS)
if hdt_address & ~PAGE_MASK:
raise InvalidResourceError('HDT address must be page-aligned: address=%s' % UINT32_FMT(hdt_address))
self.DEBUG('HDT address=%s', UINT32_FMT(hdt_address))
def __alloc_pages(size):
pages = self.machine.memory.alloc_pages(base = hdt_address, count = align(PAGE_SIZE, size) // PAGE_SIZE)
self.machine.DEBUG('%s.setup_hdt: address=%s, size=%s (%s pages)', self.__class__.__name__, UINT32_FMT(hdt_address), size, len(pages))
hdt_image = self.config.get('machine', 'hdt-image', None)
if hdt_image is None:
self.DEBUG('HDT image not specified, creating one')
hdt = HDT(self.machine.LOGGER, config = self.config)
hdt.create()
__alloc_pages(len(hdt))
def __write_field(writer_fn, size, address, field_value):
writer_fn(address, field_value)
return address + size
def __write_array(max_length, address, field_value):
for i in range(0, max_length):
self.machine.memory.write_u8(address + i, field_value[i])
return address + max_length
def __write_struct(address, struct):
self.DEBUG('__write_struct: address=%s, struct=%s (%s)', UINT32_FMT(address), struct, sizeof(struct))
for n, t in struct._fields_:
address = writers[sizeof(t)](address, getattr(struct, n))
return address
writers = {
1: partial(__write_field, self.machine.memory.write_u8, 1),
2: partial(__write_field, self.machine.memory.write_u16, 2),
4: partial(__write_field, self.machine.memory.write_u32, 4),
HDTEntry_Argument.MAX_NAME_LENGTH: partial(__write_array, HDTEntry_Argument.MAX_NAME_LENGTH),
HDTEntry_Device.MAX_NAME_LENGTH: partial(__write_array, HDTEntry_Device.MAX_NAME_LENGTH),
HDTEntry_Device.MAX_IDENT_LENGTH: partial(__write_array, HDTEntry_Device.MAX_IDENT_LENGTH)
}
address = __write_struct(hdt_address, hdt.header)
for entry in hdt.entries:
address = __write_struct(address, entry)
else:
self.DEBUG('Loading HDT image %s', hdt_image)
with BinaryFile.open(self.logger, hdt_image, 'r') as f_in:
img = f_in.read()
__alloc_pages(len(img))
for address, b in zip(range(hdt_address, hdt_address + len(img)), img):
self.machine.memory.write_u8(address, b)
[docs] def setup_mmaps(self):
self.DEBUG('%s.setup_mmaps', self.__class__.__name__)
for section in self.config.iter_mmaps():
_get, _getbool, _getint = self.config.create_getters(section)
access = _get('access', 'r')
flags = SectionFlags.create(readable = 'r' in access, writable = 'w' in access, executable = 'x' in access)
self.mmap_area(_get('file'), _getint('address'), _getint('size'), offset = _getint('offset', 0), flags = flags, shared = _getbool('shared', False))
[docs] def setup_debugging(self):
self.DEBUG('%s.setup_debugging', self.__class__.__name__)
for section in self.config.iter_breakpoints():
_get, _getint, _getbool = self.config.create_getters(section)
core = self.machine.core(_get('core', '#0:#0'))
core.init_debug_set()
klass = _get('klass', 'ducky.debugging.BreakPoint').split('.')
klass = getattr(importlib.import_module('.'.join(klass[0:-1])), klass[-1])
p = klass.create_from_config(core.debug, self.config, section)
core.debug.add_point(p, _get('chain', 'pre-step'))
for action_section in _get('actions', '').split(','):
action_section = action_section.strip()
if not action_section:
continue
klass = self.config.get(action_section, 'klass').split('.')
klass = getattr(importlib.import_module('.'.join(klass[0:-1])), klass[-1])
a = klass.create_from_config(core.debug, self.config, action_section)
p.actions.append(a)
[docs] def setup_bootloader(self, filepath, base = None):
"""
Load :term:`bootloader` into main memory.
In the world of a real hardware, bootloader binary would be transformed
into an :term:`image`, and then "burned" in some form into the memory -
main, or some kind of ROM from which it'd be loaded into main memory at
the very beginning of boot process.
:param str filepath: path to bootloader binary.
:param u32_t base: address of the first byte of bootloader in memory.
By default, :py:const:`ducky.boot.DEFAULT_BOOTLOADER_ADDRESS` is used.
"""
self.DEBUG('%s.setup_bootloader: filepath=%s, base=%s', self.__class__.__name__, filepath, UINT32_FMT(base) if base is not None else '<none>')
base = DEFAULT_BOOTLOADER_ADDRESS if base is None else base
mc = self.machine.memory
with File.open(self.machine.LOGGER, filepath, 'r') as f:
for section in f.sections:
self.DEBUG('%s.setup_bootloader: section=%s, base=%s', self.__class__.__name__, section.name, UINT32_FMT(section.header.base))
if section.header.flags.loadable != 1:
self.DEBUG('%s.setup_bootloader: section is not loadable', self.__class__.__name__)
continue
section_base = base + section.header.base
self.DEBUG('%s.setup_bootloader: place to %s', self.__class__.__name__, UINT32_FMT(section_base))
pages_start, pages_cnt = area_to_pages(section_base, section.header.data_size)
for i in range(pages_start, pages_start + pages_cnt):
mc.alloc_specific_page(i)
if section.header.flags.bss == 1:
self.DEBUG('%s.setup_bootloader: BSS section, allocating pages is good enough', self.__class__.__name__)
continue
for b in section.payload:
self.machine.memory.write_u8(section_base, b)
section_base += 1
[docs] def poke(self, address, value, length):
self.DEBUG('%s.poke: addr=%s, value=%s, length=%s', self.__class__.__name__, UINT32_FMT(address), UINT32_FMT(value), length)
if length == 1:
self.machine.memory.write_u8(address, u8_t(value).value)
elif length == 2:
self.machine.memory.write_u16(address, u16_t(value).value)
else:
self.machine.memory.write_u32(address, u32_t(value).value)
[docs] def boot(self):
self.DEBUG('%s.boot', self.__class__.__name__)
self.setup_hdt()
self.setup_mmaps()
self.setup_debugging()
if self.config.has_section('bootloader'):
self.setup_bootloader(self.config.get('bootloader', 'file'), base = self.config.getint('bootloader', 'base', DEFAULT_BOOTLOADER_ADDRESS))
[docs] def halt(self):
self.DEBUG('%s.halt', self.__class__.__name__)
for area in list(self.mmap_areas.values()):
self.unmmap_area(area)