Source code for rdflib.plugins.stores.concurrent
from threading import Lock
[docs]class ResponsibleGenerator:
"""A generator that will help clean up when it is done being used."""
__slots__ = ["cleanup", "gen"]
[docs] def __init__(self, gen, cleanup):
self.cleanup = cleanup
self.gen = gen
[docs] def __del__(self):
self.cleanup()
[docs] def __iter__(self):
return self
[docs] def __next__(self):
return next(self.gen)
[docs]class ConcurrentStore:
[docs] def __init__(self, store):
self.store = store
# number of calls to visit still in progress
self.__visit_count = 0
# lock for locking down the indices
self.__lock = Lock()
# lists for keeping track of added and removed triples while
# we wait for the lock
self.__pending_removes = []
self.__pending_adds = []
[docs] def add(self, triple):
(s, p, o) = triple
if self.__visit_count == 0:
self.store.add((s, p, o))
else:
self.__pending_adds.append((s, p, o))
[docs] def remove(self, triple):
(s, p, o) = triple
if self.__visit_count == 0:
self.store.remove((s, p, o))
else:
self.__pending_removes.append((s, p, o))
[docs] def triples(self, triple):
(su, pr, ob) = triple
g = self.store.triples((su, pr, ob))
pending_removes = self.__pending_removes
self.__begin_read()
for s, p, o in ResponsibleGenerator(g, self.__end_read):
if not (s, p, o) in pending_removes: # noqa: E713
yield s, p, o
for s, p, o in self.__pending_adds:
if (
(su is None or su == s)
and (pr is None or pr == p)
and (ob is None or ob == o)
):
yield s, p, o
[docs] def __len__(self):
return self.store.__len__()
def __begin_read(self):
lock = self.__lock
lock.acquire()
self.__visit_count = self.__visit_count + 1
lock.release()
def __end_read(self):
lock = self.__lock
lock.acquire()
self.__visit_count = self.__visit_count - 1
if self.__visit_count == 0:
pending_removes = self.__pending_removes
while pending_removes:
(s, p, o) = pending_removes.pop()
try:
self.store.remove((s, p, o))
except: # noqa: E722
# TODO: change to try finally?
print(s, p, o, "Not in store to remove")
pending_adds = self.__pending_adds
while pending_adds:
(s, p, o) = pending_adds.pop()
self.store.add((s, p, o))
lock.release()