Source code for ducky.devices.rtc

import time
import datetime

from six.moves import range

from . import Device, IRQProvider, IOProvider, IRQList
from ..errors import InvalidResourceError
from ..mm import u8_t, UINT16_FMT
from ..reactor import RunInIntervalTask
from ..util import F

DEFAULT_IRQ  = 0x00
DEFAULT_FREQ = 100
DEFAULT_PORT_RANGE = 0x300
PORT_RANGE = 0x0007

[docs]class RTCTask(RunInIntervalTask): def __init__(self, machine, rtc): super(RTCTask, self).__init__(10, self.on_tick) self.machine = machine self.rtc = rtc self.stamp = 0 self.tick = 0 self.update_tick()
[docs] def update_tick(self): self.tick = 1.0 / float(self.rtc.frequency) self.machine.DEBUG('rtc: new frequency: %i => %f' % (self.rtc.frequency, self.tick))
[docs] def on_tick(self, task): stamp = time.time() diff = stamp - self.stamp self.machine.DEBUG('rtc: tick: stamp=%s, last=%s, diff=%s, tick=%s, ?=%s' % (stamp, self.stamp, diff, self.tick, diff < self.tick)) if diff < self.tick: return self.machine.DEBUG('rtc: trigger irq') self.stamp = stamp self.machine.trigger_irq(self.rtc)
[docs]class RTC(IRQProvider, IOProvider, Device): def __init__(self, machine, name, frequency = None, port = None, irq = None, *args, **kwargs): super(RTC, self).__init__(machine, 'rtc', name, *args, **kwargs) self.frequency = frequency or DEFAULT_FREQ self.port = port or DEFAULT_PORT_RANGE self.ports = list(range(port, port + PORT_RANGE)) self.irq = irq or DEFAULT_IRQ self.timer_task = RTCTask(machine, self) if self.frequency >= 256: raise InvalidResourceError('Maximum RTC ticks per second is 255') @staticmethod
[docs] def create_from_config(machine, config, section): return RTC(machine, section, frequency = config.getint(section, 'frequency', DEFAULT_FREQ), port = config.getint(section, 'port', DEFAULT_PORT_RANGE), irq = config.getint(section, 'irq', IRQList.TIMER))
[docs] def boot(self): self.machine.DEBUG('RTC.boot') for port in self.ports: self.machine.register_port(port, self) self.machine.reactor.add_task(self.timer_task) self.machine.reactor.task_runnable(self.timer_task) now = datetime.datetime.now() self.machine.tenh('RTC: time %02i:%02i:%02i, date: %02i/%02i/%02i', now.hour, now.minute, now.second, now.day, now.month, now.year - 2000)
[docs] def halt(self): for port in self.ports: self.machine.unregister_port(port) self.machine.reactor.remove_task(self.timer_task)
[docs] def read_u8(self, port): if port not in self.ports: raise InvalidResourceError('Unhandled port: %s' % UINT16_FMT(port)) port -= self.port if port == 0x0000: return u8_t(self.frequency).value now = datetime.datetime.now() if port == 0x0001: return u8_t(now.second).value if port == 0x0002: return u8_t(now.minute).value if port == 0x0003: return u8_t(now.hour).value if port == 0x0004: return u8_t(now.day).value if port == 0x0005: return u8_t(now.month).value if port == 0x0006: return u8_t(now.year - 2000).value
[docs] def write_u8(self, port, value): if port not in self.ports: raise InvalidResourceError(F('Unhandled port: {port:S}', port = port)) if port != self.ports[0]: raise InvalidResourceError(F('Unable to write to read-only port: port={port:S}, value={value:B}', port = port, value = value)) self.frequency = value self.timer_task.update_tick()