"""
Very simple character device that just "prints" characters on the screen.
It does not care about dimensions of the display, it kknow only how to
"print" characters. Suited for the most basic output possible - just "print"
chars by writing to this device, and you'll get this written into a stream
attached to the frontend (``stdout``, file, ...).
"""
import enum
from . import DeviceFrontend, DeviceBackend, MMIOMemoryPage
from ..mm import UINT8_FMT, addr_to_page, UINT32_FMT, u32_t
from ..interfaces import IReactorTask
from ..hdt import HDTEntry_Device
DEFAULT_MMIO_ADDRESS = 0x8200
[docs]class TTYPorts(enum.IntEnum):
DATA = 0x00
[docs]class TTYMMIOMemoryPage(MMIOMemoryPage):
[docs] def write_u8(self, offset, value):
self.DEBUG('%s.write_u8: offset=%s, value=%s', self.__class__.__name__, UINT8_FMT(offset), UINT8_FMT(value))
if offset == TTYPorts.DATA:
self._device.comm_queue.write_out(value)
self._device.frontend.wakeup_flush()
return
self.WARN('%s.write_u8: attempt to write to a virtual page: offset=%s', self.__class__.__name__, UINT8_FMT(offset))
[docs]class HDTEntry_TTY(HDTEntry_Device):
_fields_ = HDTEntry_Device.ENTRY_HEADER + [
('mmio_address', u32_t)
]
def __init__(self, logger, config, section):
super(HDTEntry_TTY, self).__init__(logger, section, 'Virtual TTY')
self.mmio_address = config.getint(section, 'mmio-address', DEFAULT_MMIO_ADDRESS)
logger.debug('%s: mmio-address=%s', self.__class__.__name__, UINT32_FMT(self.mmio_address))
[docs]class FrontendFlushTask(IReactorTask):
def __init__(self, frontend, queue, stream):
super(FrontendFlushTask, self).__init__()
self._frontend = frontend
self._machine = frontend.machine
self._queue = queue
self._stream = stream
[docs] def set_output(self, stream):
self._stream = stream
[docs] def run(self):
self._machine.DEBUG('%s.run', self.__class__.__name__)
b = self._queue.read_out()
if b is None:
self._machine.DEBUG('%s.run: no events', self.__class__.__name__)
self._frontend.sleep_flush()
return
self._machine.DEBUG('%s.run: event=%r', self.__class__.__name__, b)
self._stream.write([b])
[docs]class Frontend(DeviceFrontend):
def __init__(self, machine, name):
super(Frontend, self).__init__(machine, self.__class__, name)
self._comm_queue = machine.comm_channel.get_queue(name)
self._stream = None
self._flush_task = None
self.set_backend(machine.get_device_by_name(name))
self.backend.set_frontend(self)
@staticmethod
[docs] def create_from_config(machine, config, section):
slave = config.get(section, 'slave', default = section)
return Frontend(machine, slave)
[docs] def boot(self):
super(Frontend, self).boot()
self._flush_task = FrontendFlushTask(self, self._comm_queue, self._stream)
self.machine.reactor.add_task(self._flush_task)
self.backend.boot()
[docs] def halt(self):
self.backend.halt()
self.machine.reactor.remove_task(self._flush_task)
super(Frontend, self).halt()
[docs] def set_output(self, stream):
self.machine.DEBUG('%s.set_output: stream=%s', self.__class__.__name__, stream)
self._stream = stream
if self._flush_task is not None:
self._flush_task.set_output(stream)
[docs] def flush(self):
while not self._comm_queue.is_empty_out():
self._flush_task.run()
[docs] def wakeup_flush(self):
if self._flush_task is None:
return
self.machine.reactor.task_runnable(self._flush_task)
[docs] def sleep_flush(self):
if self._flush_task is None:
return
self.machine.reactor.task_suspended(self._flush_task)
[docs] def close(self, allow = False):
if allow is True:
self._stream.allow_close = True
self._stream.close()
[docs] def tenh_enable(self):
if self._stream is not None:
self._stream.allow_close = False
[docs]class Backend(DeviceBackend):
def __init__(self, machine, name, stream = None, mmio_address = None, *args, **kwargs):
super(Backend, self).__init__(machine, 'output', name, *args, **kwargs)
self._mmio_address = mmio_address or DEFAULT_MMIO_ADDRESS
self._mmio_page = None
self.comm_queue = machine.comm_channel.create_queue(name)
@staticmethod
[docs] def create_from_config(machine, config, section):
return Backend(machine, section,
mmio_address = config.getint(section, 'mmio-address', DEFAULT_MMIO_ADDRESS))
@staticmethod
[docs] def create_hdt_entries(logger, config, section):
return [HDTEntry_TTY(logger, config, section)]
def __repr__(self):
return 'basic tty on [%s] as %s' % (UINT32_FMT(self._mmio_address), self.name)
[docs] def tenh(self, s, *args):
self.machine.DEBUG('%s.tenh: s="%s", args=%s', self.__class__.__name__, s, args)
s = s % args
for c in s:
self.comm_queue.write_out(ord(c))
self.frontend.wakeup_flush()
[docs] def tenh_enable(self):
self.frontend.tenh_enable()
[docs] def tenh_flush_stream(self):
self.frontend.flush()
[docs] def tenh_close_stream(self):
self.frontend.close(allow = True)
[docs] def boot(self):
self.machine.DEBUG('%s.boot', self.__class__.__name__)
self._mmio_page = TTYMMIOMemoryPage(self, self.machine.memory, addr_to_page(self._mmio_address))
self.machine.memory.register_page(self._mmio_page)
self.machine.tenh('hid: %s', self)
[docs] def halt(self):
self.machine.DEBUG('%s.halt', self.__class__.__name__)
self.machine.memory.unregister_page(self._mmio_page)