Source code for ducky.devices.keyboard

"""
Keyboard controller - provides events for pressed and released keys.
"""

import enum
import io

from . import IRQProvider, IOProvider, DeviceFrontend, DeviceBackend, IRQList
from ..errors import InvalidResourceError
from ..mm import UINT16_FMT

DEFAULT_IRQ = 0x01
DEFAULT_PORT_RANGE = 0x100


[docs]class ControlMessages(enum.IntEnum): HALT = 1025 CONTROL_MESSAGE_FIRST = 1024
[docs]class Frontend(DeviceFrontend): def __init__(self, machine, name): super(Frontend, self).__init__(machine, 'input', name) self._comm_queue = machine.comm_channel.get_queue(name) self._streams = [] self._stream = None self.backend = machine.get_device_by_name(name) @staticmethod
[docs] def create_from_config(machine, config, section): slave = config.get(section, 'slave', default = section) return Frontend(machine, slave)
[docs] def boot(self): super(Frontend, self).boot() self._open_input() self.backend.boot()
[docs] def halt(self): self._close_input() self.backend.halt() super(Frontend, self).halt()
[docs] def enqueue_stream(self, stream): self.machine.DEBUG('%s.enqueue_input: stream=%s', self.__class__.__name__, stream) if not stream.has_poll_support(): raise InvalidResourceError('Keyboard stream must support polling') self._streams.append(stream)
def _close_input(self): self.machine.DEBUG('%s._close_input: input=%s', self.__class__.__name__, self._stream) if self._stream is None: return self._stream.unregister_with_reactor(self.machine.reactor) self._stream = None def _open_input(self): self.machine.DEBUG('%s._open_input', self.__class__.__name__) self._close_input() if not self._streams: self.machine.DEBUG('%s._open_input: no additional input streams', self.__class__.__name__) self._comm_queue.write_in(ControlMessages.HALT) # if not self.queue or self.queue[-1] != ControlMessages.HALT: # self.machine.DEBUG('signal halt') # self.queue.append(ControlMessages.HALT) return self._stream = self._streams.pop(0) self.machine.DEBUG('%s._open_input: stream=%r', self.__class__.__name__, self._stream) self._stream.register_with_reactor(self.machine.reactor, on_read = self._handle_raw_input, on_error = self._handle_input_error) def _handle_input_error(self): self.machine.DEBUG('%s._handle_input_error') self._open_input() def _handle_raw_input(self): self.machine.DEBUG('%s._handle_raw_input', self.__class__.__name__) assert self._stream is not False buff = self._stream.read(size = io.DEFAULT_BUFFER_SIZE) self.machine.DEBUG('%s._handle_raw_input: buff=%s (%s)', self.__class__.__name__, buff, type(buff)) if buff is None: self.machine.DEBUG('%s._handle_raw_input: nothing to do, no input', self.__class__.__name__) return if not buff: # EOF self._open_input() return self.machine.DEBUG('%s._handle_raw_input: adding %i chars', self.__class__.__name__, len(buff)) self._comm_queue.write_in(buff) self.machine.trigger_irq(self.backend)
[docs]class Backend(IRQProvider, IOProvider, DeviceBackend): def __init__(self, machine, name, port = None, irq = None): super(Backend, self).__init__(machine, 'input', name) self.port = port or DEFAULT_PORT_RANGE self.ports = [port] self.irq = irq or DEFAULT_IRQ self._comm_queue = machine.comm_channel.create_queue(name) self._key_queue = [] @staticmethod
[docs] def create_from_config(machine, config, section): return Backend(machine, section, port = config.getint(section, 'port', DEFAULT_PORT_RANGE), irq = config.getint(section, 'irq', IRQList.KEYBOARD))
def __repr__(self): return 'basic keyboard controller on [%s] as %s' % (', '.join([UINT16_FMT(port) for port in self.ports]), self.name)
[docs] def boot(self): self.machine.DEBUG('%s.boot', self.__class__.__name__) for port in self.ports: self.machine.register_port(port, self) self.machine.tenh('hid: %s', self)
[docs] def halt(self): self.machine.DEBUG('%s.halt', self.__class__.__name__) for port in self.ports: self.machine.unregister_port(port)
def _process_input_event(self, e): self.machine.DEBUG('%s.__process_input_event: e=%r', self.__class__.__name__, e) if isinstance(e, (list, bytearray, bytes)): for key in e: self._key_queue.append(key) elif isinstance(e, ControlMessages): self._key_queue.append(e) else: raise InvalidResourceError('Unknown message: e=%s, type=%s' % (e, type(e))) def _process_input_events(self): self.machine.DEBUG('%s.__process_input_events', self.__class__.__name__) while True: e = self._comm_queue.read_in() if e is None: return self._process_input_event(e) def _read_char(self): self.machine.DEBUG('%s._read_char', self.__class__.__name__) def __do_read_char(): try: b = self._key_queue.pop(0) except IndexError: self.machine.DEBUG('%s._read_char: no available chars in queue', self.__class__.__name__) return None return b def __process_char(b): self.machine.DEBUG('%s._read_char: queue now has %i bytes', self.__class__.__name__, len(self._key_queue)) if b == ControlMessages.HALT: self.machine.DEBUG('%s._read_char: planned halt, execute', self.__class__.__name__) self.machine.halt() return None self.machine.DEBUG('%s._read_char: c=%s ()', self.__class__.__name__, b) return b b = __do_read_char() if b is not None: return __process_char(b) self._process_input_events() b = __do_read_char() if b is None: return None return __process_char(b)
[docs] def read_u8(self, port): self.machine.DEBUG('%s.read_u8: port=%s', self.__class__.__name__, UINT16_FMT(port)) if port not in self.ports: raise InvalidResourceError('Unhandled port: %s' % UINT16_FMT(port)) b = self._read_char() if not b: self.machine.DEBUG('%s.read_u8: empty input, signal it downstream', self.__class__.__name__) return 0xFF self.machine.DEBUG('%s.read_u8: input byte is %i', self.__class__.__name__, b) return b