"""Methods for serialization of Task and Queue."""
import json
import shutil
from datetime import datetime, timezone
from functools import wraps
from os import PathLike
from typing import Optional
import attr
from xdg import xdg_data_home
from aside.models.models import Queue, QueueManager, Task
from ..boilerplate.observable import Event, EventType
__all__ = [
"Database",
]
[docs]def check_locking(func):
@wraps(func)
def checked(self: "Database", *args, **kwargs):
if self.locked:
return func(self, *args, **kwargs)
raise RuntimeError("Database is not controlling the lock!")
return checked
[docs]class Database:
"""Serialize into files on disk in tree-like directories."""
def __enter__(self):
"""Obtain lock on disk database representation."""
try:
(self.data_dir / ".lock").touch(exist_ok=False)
except FileExistsError as exist_err:
raise RuntimeError("Another aside instance is running!") from exist_err
self.locked = True
def __exit__(self, exc_type, exc_val, exc_tb):
"""Release lock on disk database representation."""
self.locked = False
(self.data_dir / ".lock").unlink()
[docs] def __init__(
self, queue_manager: QueueManager, data_dir: Optional[PathLike] = None
):
"""Set up data directory."""
self.locked = False
self.data_dir = (xdg_data_home() if data_dir is None else data_dir) / "aside"
self.data_dir.mkdir(exist_ok=True)
self.populate_manager_from_disk(queue_manager)
queue_manager.subscribe(self.observe_queue, "queues/[^/]*")
queue_manager.subscribe(self.observe_queue_metadata, "queues/[^/]*/[^/]*")
queue_manager.subscribe(self.observe_task, ".*/tasks/[^/]*")
queue_manager.subscribe(self.observe_task_metadata, ".*/tasks/[^/]*/[^/]*")
[docs] @check_locking
def observe_task(self, event: Event):
"""Observe tasks collection events raised by queue manager.
Regexp string for matching events: ``.*/tasks/[^/]*``
"""
task_uuid = event.split_attr_path[-1]
queue = event.get_nested_object(2)
if event.event_type is EventType.ADD:
self._dump_task(queue, event.get_nested_object())
elif event.event_type is EventType.DISCARD:
self._drop_task(queue, task_uuid)
else:
pass # pragma: no cover
[docs] @check_locking
def observe_queue(self, event: Event):
"""Observe queue collection events raised by queue manager.
Regexp string for matching events: ``queues/[^/]*``
"""
queue_uuid = event.split_attr_path[-1]
if event.event_type is EventType.ADD:
self._dump_queue(event.get_nested_object())
elif event.event_type is EventType.DISCARD:
self._drop_queue(queue_uuid)
else:
pass # pragma: no cover
@check_locking
def _dump_queue(self, queue: Queue):
self._dump_queue_metadata(queue)
queue_dir = self.data_dir / queue.uuid
queue_dir.mkdir(exist_ok=True)
for task in queue.tasks.keys():
self._dump_task(queue, queue.tasks[task])
@check_locking
def _dump_queue_metadata(self, queue: Queue):
metadata_dict = attr.asdict(
queue, filter=lambda x, _: x.name not in queue.__observable_attrs__
)
with (self.data_dir / f"{queue.uuid}.json").open("w") as out_file:
json.dump(metadata_dict, out_file, ensure_ascii=False, indent=4)
@check_locking
def _drop_queue(self, queue_uuid: str):
queue_dir = self.data_dir / queue_uuid
(self.data_dir / f"{queue_uuid}.json").unlink()
shutil.rmtree(queue_dir)
@check_locking
def _dump_task(self, queue: Queue, task: Task):
queue_path = self.data_dir / queue.uuid
queue_path.mkdir(exist_ok=True)
with (queue_path / f"{task.uuid}.json").open("w") as out_file:
attr_asdict = attr.asdict(task)
attr_asdict["deadline"] = (
attr_asdict["deadline"]
.astimezone(timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%S")
)
json.dump(attr_asdict, out_file, ensure_ascii=False, indent=4)
@check_locking
def _drop_task(self, queue: Queue, task_uuid: str):
(self.data_dir / queue.uuid / f"{task_uuid}.json").unlink()
[docs] def populate_manager_from_disk(self, queue_manager: QueueManager):
"""Take queue manager and fill it with queues from disk."""
for queue_path in self.data_dir.glob("*.json"):
with (self.data_dir / queue_path).open("r") as readfile:
queue_metadata = json.load(readfile)
queue = Queue(uuid=queue_metadata["uuid"])
queue_manager.queues.add(queue)
for k in queue_metadata:
if not k == "uuid":
setattr(queue, k, queue_metadata[k])
self.populate_queue_from_disk(queue)
[docs] def populate_queue_from_disk(self, queue: Queue) -> None:
"""Read queue by id from disk."""
for task_path in (self.data_dir / queue.uuid).iterdir():
with task_path.open("r") as readfile:
task_metadata = json.load(readfile)
task_metadata["deadline"] = datetime.strptime(
task_metadata["deadline"], "%Y-%m-%dT%H:%M:%S"
).astimezone()
task = Task(uuid=task_metadata["uuid"])
queue.tasks.add(task)
for k in task_metadata:
if not k == "uuid":
setattr(task, k, task_metadata[k])