Source code for configmodel.MixinDelayedWrite
# -*- coding: utf-8 -*-
import atexit
import threading
import time
[docs]
class InterruptibleTimer:
"""
Performs a callback after a specified timeout.
The timer can be restarted by calling prolong() method.
"""
def __init__(self, timeout_seconds, callback):
self.callback = callback
self.thread = threading.Thread(target=self._target)
self.event = threading.Event()
self.lock = threading.Lock()
self.end_time = time.time() + timeout_seconds
# register atexit handler to make sure that changes are committed at exit
atexit.register(self._on_exit)
# start timer
self.thread.start()
def _target(self):
while True:
timeout = self.end_time - time.time()
timer_expired = self.event.wait(timeout)
# check if end_time reached in case of timer restart
if self.end_time <= time.time():
self._fire_callback()
break
# check if callback was already fired
with self.lock:
if self.callback is None:
break
def _fire_callback(self):
with self.lock:
if self.callback is not None:
self.callback()
self.callback = None
[docs]
def restart(self, timeout_seconds):
# Reset the event and add extra time to the timeout
self.end_time = time.time() + timeout_seconds
self.event.clear()
[docs]
def cancel(self):
with self.lock:
self.callback = None
self.event.set()
def _on_exit(self):
"""
Fire immediately
"""
self.end_time = 0
self.event.set()
self.thread.join()
[docs]
class MixinDelayedWrite:
"""
Mixin for delayed write
"""
DEFAULT_DELAY_SECONDS = 1.0
def __init__(self, delayed_write_enabled=False, delay_seconds=DEFAULT_DELAY_SECONDS):
self._delayed_write_enabled = delayed_write_enabled
self._delay_seconds = delay_seconds
self._timer = None
def _set_delayed_write(self, delayed_write_enabled, delay_seconds=DEFAULT_DELAY_SECONDS):
"""
Set delayed write
"""
self._delayed_write_enabled = delayed_write_enabled
self._delay_seconds = delay_seconds
def _restart_delayed_timer(self):
"""
Restart delayed timer
"""
if not self._delayed_write_enabled or self._delay_seconds <= 0:
# fire immediately
self._commit_delayed_write()
else:
# restart timer
if self._timer is None:
self._timer = InterruptibleTimer(self._delay_seconds, self._on_timer_expired)
else:
self._timer.restart(self._delay_seconds)
def _on_timer_expired(self):
"""
On timer expired
"""
self._commit_delayed_write()
def _commit_delayed_write(self):
"""
Commit delayed write.
This method should be implemented in derived classes.
"""
raise NotImplementedError()