"""
Keyboard controller - provides events for pressed and released keys.
"""
import enum
import io
from collections import deque
from . import DeviceFrontend, DeviceBackend, MMIOMemoryPage
from ..errors import InvalidResourceError
from ..mm import UINT8_FMT, addr_to_page, UINT32_FMT, u32_t
from ..hdt import HDTEntry_Device
DEFAULT_IRQ = 0x01
DEFAULT_MMIO_ADDRESS = 0x8000
[docs]class KeyboardPorts(enum.IntEnum):
STATUS = 0x00
DATA = 0x01
LAST = 0x01
[docs]class HDTEntry_Keyboard(HDTEntry_Device):
_fields_ = HDTEntry_Device.ENTRY_HEADER + [
('mmio_address', u32_t)
]
def __init__(self, logger, config, section):
super(HDTEntry_Keyboard, self).__init__(logger, section, 'Virtual keyboard controller')
self.mmio_address = config.getint(section, 'mmio-address', DEFAULT_MMIO_ADDRESS)
logger.debug('%s: mmio-address=%s', self.__class__.__name__, UINT32_FMT(self.mmio_address))
[docs]class KeyboardMMIOMemoryPage(MMIOMemoryPage):
[docs] def read_u8(self, offset):
self.DEBUG('%s.read_u8: offset=%s', self.__class__.__name__, UINT8_FMT(offset))
if offset == KeyboardPorts.STATUS:
return 0x00
if offset == KeyboardPorts.DATA:
b = self._device._read_char()
if b is None:
self.DEBUG('%s.get: empty input, signal it downstream', self.__class__.__name__)
return 0xFF
self.DEBUG('%s.get: input byte is %i', self.__class__.__name__, b)
return b
self.WARN('%s.read_u8: attempt to read raw offset: offset=%s', self.__class__.__name__, UINT8_FMT(offset))
return 0x00
[docs]class ControlMessages(enum.IntEnum):
HALT = 1025
CONTROL_MESSAGE_FIRST = 1024
[docs]class Frontend(DeviceFrontend):
def __init__(self, machine, name):
super(Frontend, self).__init__(machine, 'input', name)
self._comm_queue = machine.comm_channel.get_queue(name)
self._streams = []
self._stream = None
self.backend = machine.get_device_by_name(name)
@staticmethod
[docs] def create_from_config(machine, config, section):
slave = config.get(section, 'slave', default = section)
return Frontend(machine, slave)
[docs] def boot(self):
super(Frontend, self).boot()
self._open_input()
self.backend.boot()
[docs] def halt(self):
self._close_input()
self.backend.halt()
super(Frontend, self).halt()
[docs] def enqueue_stream(self, stream):
self.machine.DEBUG('%s.enqueue_input: stream=%s', self.__class__.__name__, stream)
if not stream.has_poll_support():
raise InvalidResourceError('Keyboard stream must support polling')
self._streams.append(stream)
[docs]class Backend(DeviceBackend):
def __init__(self, machine, name, mmio_address = None, irq = None):
super(Backend, self).__init__(machine, 'input', name)
self._mmio_address = mmio_address or DEFAULT_MMIO_ADDRESS
self._mmio_page = None
self.irq = irq or DEFAULT_IRQ
self._comm_queue = machine.comm_channel.create_queue(name)
self._key_queue = deque()
@staticmethod
[docs] def create_from_config(machine, config, section):
return Backend(machine, section,
mmio_address = config.getint(section, 'mmio-address', DEFAULT_MMIO_ADDRESS),
irq = config.getint(section, 'irq', DEFAULT_IRQ))
@staticmethod
[docs] def create_hdt_entries(logger, config, section):
return [HDTEntry_Keyboard(logger, config, section)]
def __repr__(self):
return 'basic keyboard controller on [%s] as %s' % (UINT32_FMT(self._mmio_address), self.name)
[docs] def boot(self):
self.machine.DEBUG('%s.boot', self.__class__.__name__)
self._mmio_page = KeyboardMMIOMemoryPage(self, self.machine.memory, addr_to_page(self._mmio_address))
self.machine.memory.register_page(self._mmio_page)
self.machine.tenh('hid: %s', self)
[docs] def halt(self):
self.machine.DEBUG('%s.halt', self.__class__.__name__)
self.machine.memory.unregister_page(self._mmio_page)
[docs] def _read_char(self):
q = self._key_queue
if not q:
self._process_input_events()
if not q:
return None
b = q.popleft()
if b == ControlMessages.HALT:
self.machine.halt()
return None
return b