Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pathlib import Path
- from typing import Union, Optional
- import shutil
- import os
- import json
- import sqlite3
- import logging
- Pathish = Union[Path, str]
- class Task:
- """Abstract class for qcp Tasks. Should not be instantiated directly."""
- def __init__(self) -> None:
- self.type = 0
- def run(self) -> None:
- """Run the Task"""
- pass
- @staticmethod
- def from_dict(x, validate: bool = False) -> "Task":
- """Create a Task of the appropriate subclass from a python dict"""
- task_type = x["type"]
- if task_type == -1:
- return KillTask()
- elif task_type == 0:
- return Task()
- elif task_type == 1:
- return EchoTask(x["msg"])
- elif task_type == 2:
- return FileTask(x["src"], validate=validate)
- elif task_type == 3:
- return DeleteTask(x["src"], validate=validate)
- elif task_type == 4:
- return CopyTask(x["src"], x["dst"], validate=validate)
- elif task_type == 5:
- return MoveTask(x["src"], x["dst"], validate=validate)
- elif task_type == 6:
- raise NotImplementedError
- else:
- raise ValueError
- def __repr__(self) -> str:
- return 'NULL'
- def __eq__(self, other) -> bool:
- return self.__dict__ == other.__dict__
- def __ne__(self, other) -> bool:
- return self.__dict__ != other.__dict__
- class KillTask(Task):
- """Kill the qcp server"""
- def __init__(self) -> None:
- self.type = -1
- super().__init__()
- def run(self) -> None:
- raise NotImplementedError
- def __repr__(self) -> str:
- return 'KILL'
- class EchoTask(Task):
- """Log a message"""
- def __init__(self, msg: str) -> None:
- super().__init__()
- self.msg = msg
- self.type = 1
- def run(self) -> None:
- print(self.msg)
- def __repr__(self) -> str:
- return f'Echo: "{self.msg}"'
- class FileTask(Task):
- """Abstract class for all file-based tasks"""
- def __init__(self, src: Pathish, validate: bool = True) -> None:
- super().__init__()
- self.validate = validate
- self.src = Path(src).as_posix()
- self.type = 2
- if validate:
- self.__validate__()
- def __validate__(self) -> None:
- if not Path(self.src).exists():
- raise FileNotFoundError(f'{self.src} does not exist')
- elif not (Path(self.src).is_dir() or Path(self.src).is_file()):
- raise TypeError(f'{self.src} is neither a file nor directory')
- class DeleteTask(FileTask):
- """Delete a file"""
- def __init__(self, src: Pathish, validate: bool = True) -> None:
- super().__init__(src=src, validate=validate)
- self.type = 3
- def run(self) -> None:
- os.unlink(self.src)
- def __repr__(self) -> str:
- return f'DEL {self.src}'
- class CopyTask(FileTask):
- """Copy a file"""
- def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
- super().__init__(src=src, validate=False)
- self.dst = Path(dst).as_posix()
- self.type = 4
- self.validate = validate
- if validate:
- self.__validate__()
- def __repr__(self) -> str:
- return f'COPY {self.src} -> {self.dst}'
- def __validate__(self) -> None:
- super().__validate__()
- if Path(self.dst).exists():
- raise FileExistsError
- def run(self) -> None:
- self.__validate__()
- shutil.copy(self.src, self.dst)
- class MoveTask(CopyTask):
- """Move a file"""
- def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
- super().__init__(src=src, dst=dst, validate=validate)
- self.type = 5
- def run(self) -> None:
- super().__validate__()
- shutil.move(self.src, self.dst)
- def __repr__(self) -> str:
- return f'MOVE {self.src} -> {self.dst}'
- class TaskQueueElement:
- """An enqueued Task"""
- task = None #: A Task
- status = None #: Status of the queued Task
- priority = None #: Priority of the queued Task
- def __init__(self, task: Task, priority: 1) -> None:
- self.task = task
- self.priority = priority
- def __lt__(self, other) -> bool:
- return self.priority < other.priority
- def __gt__(self, other) -> bool:
- return self.priority > other.priority
- def __eq__(self, other) -> bool:
- return self.__dict__ == other.__dict__
- def __ne__(self, other) -> bool:
- return self.__dict__ != other.__dict__
- class TaskQueue:
- """A prioritzed queue for tasks"""
- def __init__(self, path: Pathish = 'qcp.db') -> None:
- """
- Instantiate a TaskQueue
- :param path: Path to store the persistent queue
- :type path: Path or str
- """
- self.con = sqlite3.connect(path, isolation_level="EXCLUSIVE")
- self.path = Path(path)
- cur = self.con.cursor()
- cur.execute("""
- CREATE TABLE IF NOT EXISTS tasks (
- priority INTEGER,
- task TEXT,
- status INTEGER,
- owner INTEGER
- )
- """)
- self.con.commit()
- @property
- def n_total(self) -> int:
- """Count of all tasks in queue (including failed and completed)"""
- cur = self.con.cursor()
- return cur.execute("SELECT COUNT(1) from tasks").fetchall()[0][0]
- @property
- def n_pending(self) -> int:
- """Number of pending tasks"""
- cur = self.con.cursor()
- return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 0").fetchall()[0][0]
- @property
- def n_running(self) -> int:
- """Count of currently running tasks"""
- cur = self.con.cursor()
- return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 1").fetchall()[0][0]
- @property
- def n_done(self) -> int:
- """count of completed tasks"""
- cur = self.con.cursor()
- return cur.execute("SELECT COUNT(1) from tasks WHERE status = 2").fetchall()[0][0]
- @property
- def n_failed(self) -> int:
- """count of completed tasks"""
- cur = self.con.cursor()
- return cur.execute("SELECT COUNT(1) from tasks WHERE status = -1").fetchall()[0][0]
- def put(self, task: "Task", priority: Optional[int] = None) -> None:
- """
- Enqueue a task
- :param task: Task to be added to the queue
- :type task: Task
- :param priority: (optional) priority for executing `task` (tasks with lower priority will be executed earlier)
- :type priority: int
- """
- cur = self.con.cursor()
- cur.execute(
- "INSERT INTO tasks (priority, task, status) VALUES (?, ?, ?)", (priority, json.dumps(task.__dict__), 0)
- )
- self.con.commit()
- def pop(self) -> "Task":
- """
- Retrieves Task object and sets status of Task in database to "in progress" (1)
- :raises AlreadyUnderEvaluationError: If trying to pop a tasks that is already being processed (i.e. if a race
- condition occurs if the queue is processed in parallel)
- """
- cur = self.con.cursor()
- cur.execute("SELECT _ROWID_ from tasks WHERE status = 0 ORDER BY priority LIMIT 1")
- oid = cur.fetchall()[0][0].__str__()
- self.mark_running(oid, id(self))
- cur.execute("SELECT owner, task FROM tasks WHERE _ROWID_ = ?", oid)
- record = cur.fetchall()[0]
- if record[0] != id(self):
- raise AlreadyUnderEvaluationError
- task = Task.from_dict(json.loads(record[1]))
- task.oid = oid
- return task
- def peek(self) -> "Task":
- """
- Retrieves Task object without changing its status in the queue
- """
- cur = self.con.cursor()
- cur.execute("SELECT * from tasks ORDER BY priority LIMIT 1")
- record = cur.fetchall()[0]
- oid = record[0].__str__()
- task = Task.from_dict(json.loads(record[1]), validate=False)
- task.oid = oid
- return task
- def print(self, n: int = 10) -> None:
- """
- Print an overview of the queue
- :param n: number of tasks to preview
- :type n: int
- """
- assert isinstance(n, int) and n > 0
- cur = self.con.cursor()
- cur.execute("SELECT status, task from tasks ORDER BY priority LIMIT ?", (str(n), ))
- records = cur.fetchall()
- for record in records:
- print(f"[{record[0]}] {Task.from_dict(json.loads(record[1]))}")
- def mark_pending(self, oid: int) -> None:
- """
- Mark the operation with the _ROWID_ `oid` as "pending" (0)
- :param oid: ID of the task to mark
- :type oid: int
- """
- cur = self.con.cursor()
- cur.execute("UPDATE tasks SET status = 0, owner = NULL where _ROWID_ = ?", (oid, ))
- self.con.commit()
- def mark_running(self, oid: int, owner: int) -> None:
- """Mark the operation with the _ROWID_ `oid` as "running" (1). The "owner" Id is to ensure no two processes
- are trying to execute the same operation
- :param oid: ID of the task to mark
- :type oid: int
- :param owner: Id of the process that is handling the operation
- :type owner: int
- """
- cur = self.con.cursor()
- cur.execute("UPDATE tasks SET status = 1, owner = ? where _ROWID_ = ?", (owner, oid))
- self.con.commit()
- def mark_done(self, oid: int) -> None:
- """
- Mark the operation with the _ROWID_ `oid` as "done" (2)
- :param oid: ID of the task to mark
- :type oid: int
- """
- cur = self.con.cursor()
- cur.execute("UPDATE tasks SET status = 2, owner = NULL where _ROWID_ = ?", (oid, ))
- self.con.commit()
- def mark_failed(self, oid: int) -> None:
- """
- Mark the operation with the _ROWID_ `oid` as "failed" (-1)
- :param oid: ID of the task to mark
- :type oid: int
- """
- cur = self.con.cursor()
- cur.execute("UPDATE tasks SET status = -1, owner = NULL where _ROWID_ = ?", (oid, ))
- self.con.commit()
- def run(self) -> None:
- """Execute all pending tasks"""
- if self.n_pending < 1:
- logging.getLogger().warn("Queue is empty")
- while self.n_pending > 0:
- op = self.pop()
- op.run()
- self.mark_done(op.oid)
- class AlreadyUnderEvaluationError(Exception):
- """This Task is already being processed by a different worker"""
- pass
- import tasks
- import pytest
- def test_TaskQueue(tmp_path):
- """TaskQueue can queue and execute tasks"""
- src = tmp_path.joinpath("foo")
- src.touch()
- q = tasks.TaskQueue(tmp_path.joinpath("qcp.db"))
- q.put(tasks.CopyTask(src, tmp_path.joinpath("copied_file")))
- q.run()
- assert tmp_path.joinpath("copied_file").is_file()
- q.put(tasks.MoveTask(tmp_path.joinpath("copied_file"), tmp_path.joinpath("moved_file")))
- q.run()
- assert not tmp_path.joinpath("copied_file").is_file()
- assert tmp_path.joinpath("moved_file").is_file()
- q.put(tasks.DeleteTask(tmp_path.joinpath("moved_file")))
- q.run()
- assert not tmp_path.joinpath("moved_file").is_file()
- assert src.is_file()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement