"""
This module provides simple reactor core that runs each of registered tasks at
least once during one iteration of its internal loop.
There are two different kinds of objects that reactor manages:
- task - it's called periodicaly, at least once in each reactor loop iteration
- event - asynchronous events are queued and executed before running any tasks.
If there are no runnable tasks, reactor loop waits for incomming events.
"""
import collections
import errno
import select
from .interfaces import IReactorTask
FDCallbacks = collections.namedtuple('FDCallbacks', ['on_read', 'on_write', 'on_error'])
[docs]class CallInReactorTask(IReactorTask):
"""
This task request running particular function during the reactor loop. Useful
for planning future work, and for running tasks in reactor's thread.
:param fn: callback to fire.
:param args: arguments for callback.
:param kwargs: keyword arguments for callback.
"""
def __init__(self, fn, *args, **kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
[docs] def run(self):
self.fn(*self.args, **self.kwargs)
[docs]class RunInIntervalTask(IReactorTask):
"""
This task will run its callback every ``ticks`` iterations of reactor's main loop.
:param int ticks: number of main loop iterations between two callback calls.
:param args: arguments for callback.
:param kwargs: keyword arguments for callback.
"""
def __init__(self, ticks, fn, *args, **kwargs):
self.ticks = ticks
self.counter = 0
self.fn = fn
self.args = args
self.kwargs = kwargs
[docs] def run(self):
self.counter += 1
if self.counter < self.ticks:
return
self.counter = 0
self.fn(self, *self.args, **self.kwargs)
[docs]class SelectTask(IReactorTask):
"""
Private task, serving as a single point where ``select`` syscall is being
executed. When a subsystem is interested in IO on a file descriptor, such
file descriptor should be set as non-blocking, and then registered with
reactor - it's not viable to place ``select`` calls everywhere in different
drivers. This task takes list of registered file descriptors, checks for
possible IO oportunities, and fires callbacks accordingly.
:param ducky.machine.Machine machine: VM this task (and reactor) belongs to.
:param dict fds: dictionary, where keys are descriptors, and values are lists
of their callbacks.
"""
def __init__(self, machine, fds, *args, **kwargs):
super(SelectTask, self).__init__(*args, **kwargs)
self.machine = machine
self.fds = fds
self.real_fds = {}
self.poll = select.poll()
[docs] def add_fd(self, fd, on_read = None, on_write = None, on_error = None):
"""
Register file descriptor with reactor. File descriptor will be checked for
read/write/error posibilities, and appropriate callbacks will be fired.
No arguments are passed to callbacks.
:param int fd: file descriptor.
:param on_read: function that will be called when file descriptor is available for
reading.
:param on_write: function that will be called when file descriptor is available for
write.
:param on_error: function that will be called when error state raised on file
descriptor.
"""
self.machine.DEBUG('%s.add_fd: fd=%s, on_read=%s, on_write=%s, on_error=%s', self.__class__.__name__, fd, on_read, on_write, on_error)
if not isinstance(fd, int):
fd = fd.fileno()
self.fds[fd] = FDCallbacks(on_read, on_write, on_error)
self.poll.register(fd, select.POLLERR | (select.POLLIN if on_read is not None else 0) | (select.POLLOUT if on_write is not None else 0))
[docs] def remove_fd(self, fd):
"""
Unregister file descriptor. It will no longer be checked by its main loop.
:param int fd: previously registered file descriptor.
"""
self.machine.DEBUG('%s.remove_fd: fd=%s', self.__class__.__name__, fd)
if not isinstance(fd, int):
fd = fd.fileno()
self.poll.unregister(fd)
del self.fds[fd]
[docs] def run(self):
self.machine.DEBUG('%s.run: fds=%s', self.__class__.__name__, self.fds.keys())
try:
events = self.poll.poll(0.1)
except select.error as e:
if e.args[0] == errno.EINTR:
self.machine.DEBUG('%s.run: interrupted syscall', self.__class__.__name__)
return
raise e
if not events:
return
self.machine.DEBUG('%s.run: events=%s', self.__class__.__name__, events)
for fd, events in events:
callbacks = self.fds[fd]
if events & select.POLLERR:
if callbacks.on_error is None:
self.machine.WARN(' unhandled error: fd=%s', fd)
continue
self.machine.DEBUG(' trigger err: fd=%s, handler=%s', fd, callbacks.on_error)
callbacks.on_error()
continue
if events & select.POLLIN and callbacks.on_read is not None:
self.machine.DEBUG(' trigger read: fd=%s, handler=%s', fd, callbacks.on_read)
callbacks.on_read()
if events & select.POLLOUT and callbacks.on_write is not None:
self.machine.DEBUG(' trigger err: fd=%s, handler=%s', fd, callbacks.on_write)
callbacks.on_write()
[docs]class Reactor(object):
"""
Main reactor class.
"""
def __init__(self, machine):
self.machine = machine
self.tasks = []
self.runnable_tasks = []
self.events = []
self.fds = {}
self.fds_task = SelectTask(self.machine, self.fds)
[docs] def add_task(self, task):
"""
Register task with reactor's main loop.
"""
self.machine.DEBUG('%s.add_task: task=%s', self.__class__.__name__, task)
self.tasks.append(task)
[docs] def remove_task(self, task):
"""
Unregister task, it will never be ran again.
"""
self.machine.DEBUG('%s.remove_task: task=%s', self.__class__.__name__, task)
self.task_suspended(task)
self.tasks.remove(task)
[docs] def task_runnable(self, task):
"""
If not yet marked as such, task is marked as runnable, and its ``run()``
method will be called every iteration of reactor's main loop.
"""
self.machine.DEBUG('%s.task_runnable: task=%s', self.__class__.__name__, task)
if task not in self.runnable_tasks:
self.runnable_tasks.append(task)
[docs] def task_suspended(self, task):
"""
If runnable, task is marked as suspended, not runnable, and it will no
longer be ran by reactor. It's still registered, so reactor's main loop
will not quit, and task can be later easily re-enabled by calling
:py:meth:`ducky.reactor.Reactor.task_runnable`.
"""
self.machine.DEBUG('%s.task_suspend: task=%s', self.__class__.__name__, task)
if task in self.runnable_tasks:
self.runnable_tasks.remove(task)
[docs] def add_event(self, event):
"""
Enqueue asynchronous event.
"""
self.events.append(event)
[docs] def add_call(self, fn, *args, **kwargs):
"""
Enqueue function call. Function will be called in reactor loop.
"""
self.add_event(CallInReactorTask(fn, *args, **kwargs))
[docs] def add_fd(self, fd, on_read = None, on_write = None, on_error = None):
"""
Register file descriptor with reactor. File descriptor will be checked for
read/write/error posibilities, and appropriate callbacks will be fired.
No arguments are passed to callbacks.
:param int fd: file descriptor.
:param on_read: function that will be called when file descriptor is available for
reading.
:param on_write: function that will be called when file descriptor is available for
write.
:param on_error: function that will be called when error state raised on file
descriptor.
"""
self.machine.DEBUG('%s.add_fd: fd=%s, on_read=%s, on_write=%s, on_error=%s', self.__class__.__name__, fd, on_read, on_write, on_error)
self.fds_task.add_fd(fd, on_read = on_read, on_write = on_write, on_error = on_error)
if len(self.fds) == 1:
self.add_task(self.fds_task)
self.task_runnable(self.fds_task)
[docs] def remove_fd(self, fd):
"""
Unregister file descriptor. It will no longer be checked by its main loop.
:param int fd: previously registered file descriptor.
"""
self.machine.DEBUG('Reactor.remove_fd: fd=%s', fd)
self.fds_task.remove_fd(fd)
if not self.fds:
self.remove_task(self.fds_task)
[docs] def run(self):
"""
Starts reactor loop. Enters endless loop, calling runnable tasks and events,
and - in case there are no runnable tasks - waits for new events.
When there are no tasks managed by reactor, loop quits.
"""
while True:
if not self.tasks:
break
if self.runnable_tasks:
for task in self.runnable_tasks:
task.run()
while self.events:
e = self.events.pop(0)
e.run()
else:
# This would be better with some sort of interruptible sleep...
# Maybe use an Event for that, and avoid using Queue when it's
# not necessary. But that needs more testing, and since I don't
# have much use for machine that's totally idle, that will come
# one day in the future
import time
time.sleep(0.01)