Source code for borg.locking

import json
import os
import socket
import time

from borg.helpers import Error, ErrorWithTraceback

ADD, REMOVE = 'add', 'remove'
SHARED, EXCLUSIVE = 'shared', 'exclusive'

# only determine the PID and hostname once.
# for FUSE mounts, we fork a child process that needs to release
# the lock made by the parent, so it needs to use the same PID for that.
_pid = os.getpid()
_hostname = socket.gethostname()


[docs]def get_id(): """Get identification tuple for 'us'""" thread_id = 0 return _hostname, _pid, thread_id
[docs]class TimeoutTimer: """ A timer for timeout checks (can also deal with no timeout, give timeout=None [default]). It can also compute and optionally execute a reasonable sleep time (e.g. to avoid polling too often or to support thread/process rescheduling). """ def __init__(self, timeout=None, sleep=None): """ Initialize a timer. :param timeout: time out interval [s] or None (no timeout) :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep) or None (autocompute: use 10% of timeout, or 1s for no timeout) """ if timeout is not None and timeout < 0: raise ValueError("timeout must be >= 0") self.timeout_interval = timeout if sleep is None: if timeout is None: sleep = 1.0 else: sleep = timeout / 10.0 self.sleep_interval = sleep self.start_time = None self.end_time = None def __repr__(self): return "<%s: start=%r end=%r timeout=%r sleep=%r>" % ( self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval)
[docs] def start(self): self.start_time = time.time() if self.timeout_interval is not None: self.end_time = self.start_time + self.timeout_interval return self
[docs] def sleep(self): if self.sleep_interval >= 0: time.sleep(self.sleep_interval)
[docs] def timed_out(self): return self.end_time is not None and time.time() >= self.end_time
[docs] def timed_out_or_sleep(self): if self.timed_out(): return True else: self.sleep() return False
[docs]class LockError(Error): """Failed to acquire the lock {}."""
[docs]class LockErrorT(ErrorWithTraceback): """Failed to acquire the lock {}."""
[docs]class LockTimeout(LockError): """Failed to create/acquire the lock {} (timeout)."""
[docs]class LockFailed(LockErrorT): """Failed to create/acquire the lock {} ({})."""
[docs]class NotLocked(LockErrorT): """Failed to release the lock {} (was not locked)."""
[docs]class NotMyLock(LockErrorT): """Failed to release the lock {} (was/is locked, but not by me)."""
[docs]class ExclusiveLock: """An exclusive Lock based on mkdir fs operation being atomic. If possible, try to use the contextmanager here like: with ExclusiveLock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, timeout=None, sleep=None, id=None): self.timeout = timeout self.sleep = sleep self.path = os.path.abspath(path) self.id = id or get_id() self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id) def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
[docs] def acquire(self, timeout=None, sleep=None): if timeout is None: timeout = self.timeout if sleep is None: sleep = self.sleep timer = TimeoutTimer(timeout, sleep).start() while True: try: os.mkdir(self.path) except FileExistsError: # already locked if self.by_me(): return self if timer.timed_out_or_sleep(): raise LockTimeout(self.path) except OSError as err: raise LockFailed(self.path, str(err)) from None else: with open(self.unique_name, "wb"): pass return self
[docs] def release(self): if not self.is_locked(): raise NotLocked(self.path) if not self.by_me(): raise NotMyLock(self.path) os.unlink(self.unique_name) os.rmdir(self.path)
[docs] def is_locked(self): return os.path.exists(self.path)
[docs] def by_me(self): return os.path.exists(self.unique_name)
[docs] def break_lock(self): if self.is_locked(): for name in os.listdir(self.path): os.unlink(os.path.join(self.path, name)) os.rmdir(self.path)
[docs]class LockRoster: """ A Lock Roster to track shared/exclusive lockers. Note: you usually should call the methods with an exclusive lock held, to avoid conflicting access by multiple threads/processes/machines. """ def __init__(self, path, id=None): self.path = path self.id = id or get_id()
[docs] def load(self): try: with open(self.path) as f: data = json.load(f) except (FileNotFoundError, ValueError): # no or corrupt/empty roster file? data = {} return data
[docs] def save(self, data): with open(self.path, "w") as f: json.dump(data, f)
[docs] def remove(self): try: os.unlink(self.path) except FileNotFoundError: pass
[docs] def get(self, key): roster = self.load() return set(tuple(e) for e in roster.get(key, []))
[docs] def modify(self, key, op): roster = self.load() try: elements = set(tuple(e) for e in roster[key]) except KeyError: elements = set() if op == ADD: elements.add(self.id) elif op == REMOVE: elements.remove(self.id) else: raise ValueError('Unknown LockRoster op %r' % op) roster[key] = list(list(e) for e in elements) self.save(roster)
[docs]class UpgradableLock: """ A Lock for a resource that can be accessed in a shared or exclusive way. Typically, write access to a resource needs an exclusive lock (1 writer, noone is allowed reading) and read access to a resource needs a shared lock (multiple readers are allowed). If possible, try to use the contextmanager here like: with UpgradableLock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None): self.path = path self.is_exclusive = exclusive self.sleep = sleep self.timeout = timeout self.id = id or get_id() # globally keeping track of shared and exclusive lockers: self._roster = LockRoster(path + '.roster', id=id) # an exclusive lock, used for: # - holding while doing roster queries / updates # - holding while the UpgradableLock itself is exclusive self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout) def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return "<%s: %r>" % (self.__class__.__name__, self.id)
[docs] def acquire(self, exclusive=None, remove=None, sleep=None): if exclusive is None: exclusive = self.is_exclusive sleep = sleep or self.sleep or 0.2 if exclusive: self._wait_for_readers_finishing(remove, sleep) self._roster.modify(EXCLUSIVE, ADD) else: with self._lock: if remove is not None: self._roster.modify(remove, REMOVE) self._roster.modify(SHARED, ADD) self.is_exclusive = exclusive return self
def _wait_for_readers_finishing(self, remove, sleep): timer = TimeoutTimer(self.timeout, sleep).start() while True: self._lock.acquire() try: if remove is not None: self._roster.modify(remove, REMOVE) if len(self._roster.get(SHARED)) == 0: return # we are the only one and we keep the lock! # restore the roster state as before (undo the roster change): if remove is not None: self._roster.modify(remove, ADD) except: # avoid orphan lock when an exception happens here, e.g. Ctrl-C! self._lock.release() raise else: self._lock.release() if timer.timed_out_or_sleep(): raise LockTimeout(self.path)
[docs] def release(self): if self.is_exclusive: self._roster.modify(EXCLUSIVE, REMOVE) self._lock.release() else: with self._lock: self._roster.modify(SHARED, REMOVE)
[docs] def upgrade(self): if not self.is_exclusive: self.acquire(exclusive=True, remove=SHARED)
[docs] def downgrade(self): if self.is_exclusive: self.acquire(exclusive=False, remove=EXCLUSIVE)
[docs] def break_lock(self): self._roster.remove() self._lock.break_lock()