Source code for ducky.devices.tty

"""
Very simple character device that just "prints" characters on the screen.
It does not care about dimensions of the display, it kknow only how to
"print" characters. Suited for the most basic output possible - just "print"
chars by writing to this device, and you'll get this written into a stream
attached to the frontend (``stdout``, file, ...).
"""

from . import IOProvider, DeviceFrontend, DeviceBackend
from ..errors import InvalidResourceError
from ..mm import UINT16_FMT, UINT8_FMT
from ..interfaces import IReactorTask

DEFAULT_PORT_RANGE = 0x200

[docs]class FrontendFlushTask(IReactorTask): def __init__(self, frontend, queue, stream): super(FrontendFlushTask, self).__init__() self._machine = frontend.machine self._queue = queue self._stream = stream
[docs] def set_output(self, stream): self._stream = stream
[docs] def run(self): self._machine.DEBUG('%s.run', self.__class__.__name__) b = self._queue.read_out() if b is None: self._machine.DEBUG('%s.run: no events', self.__class__.__name__) return self._machine.DEBUG('%s.run: event=%r', self.__class__.__name__, b) self._stream.write([b])
[docs]class Frontend(DeviceFrontend): def __init__(self, machine, name): super(Frontend, self).__init__(machine, self.__class__, name) self._comm_queue = machine.comm_channel.get_queue(name) self._stream = None self._flush_task = None self.set_backend(machine.get_device_by_name(name)) self.backend.set_frontend(self) @staticmethod
[docs] def create_from_config(machine, config, section): slave = config.get(section, 'slave', default = section) return Frontend(machine, slave)
[docs] def boot(self): super(Frontend, self).boot() self._flush_task = FrontendFlushTask(self, self._comm_queue, self._stream) self.machine.reactor.add_task(self._flush_task) self.machine.reactor.task_runnable(self._flush_task) self.backend.boot()
[docs] def halt(self): self.backend.halt() self.machine.reactor.remove_task(self._flush_task) super(Frontend, self).halt()
[docs] def set_output(self, stream): self.machine.DEBUG('%s.set_output: stream=%s', self.__class__.__name__, stream) self._stream = stream if self._flush_task is not None: self._flush_task.set_output(stream)
[docs] def flush(self): while not self._comm_queue.is_empty_out(): self._flush_task.run()
[docs] def close(self, allow = False): if allow is True: self._stream.allow_close = True self._stream.close()
[docs] def tenh_enable(self): if self._stream is not None: self._stream.allow_close = False
[docs]class Backend(IOProvider, DeviceBackend): def __init__(self, machine, name, stream = None, port = None, *args, **kwargs): super(Backend, self).__init__(machine, 'output', name, *args, **kwargs) self.port = port or DEFAULT_PORT_RANGE self.ports = [port] self.comm_queue = machine.comm_channel.create_queue(name) @staticmethod
[docs] def create_from_config(machine, config, section): return Backend(machine, section, port = config.getint(section, 'port', DEFAULT_PORT_RANGE))
def __repr__(self): return 'basic tty on [%s] as %s' % (', '.join([UINT16_FMT(port) for port in self.ports]), self.name)
[docs] def write_u8(self, port, value): self.machine.DEBUG('%s.write_u8: port=%s, value=%s', self.__class__.__name__, UINT16_FMT(port), UINT8_FMT(value)) if port not in self.ports: raise InvalidResourceError('Unhandled port: %s' % UINT16_FMT(port)) self.comm_queue.write_out(value)
[docs] def tenh(self, s, *args): self.machine.DEBUG('%s.tenh: s="%s", args=%s', self.__class__.__name__, s, args) s = s % args for c in s: self.comm_queue.write_out(ord(c))
[docs] def tenh_enable(self): self.frontend.tenh_enable()
[docs] def tenh_flush_stream(self): self.frontend.flush()
[docs] def tenh_close_stream(self): self.frontend.close(allow = True)
[docs] def boot(self): self.machine.DEBUG('%s.boot', self.__class__.__name__) for port in self.ports: self.machine.register_port(port, self) self.machine.tenh('hid: %s', self)
[docs] def halt(self): self.machine.DEBUG('%s.halt', self.__class__.__name__) for port in self.ports: self.machine.unregister_port(port)