SHARE
TWEET

Untitled

a guest Aug 18th, 2019 91 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from pathlib import Path
  2. from typing import Union, Optional
  3. import shutil
  4. import os
  5. import json
  6. import sqlite3    
  7. import logging
  8.  
  9. Pathish = Union[Path, str]
  10.  
  11.  
  12. class Task:
  13.     """Abstract class for qcp Tasks. Should not be instantiated directly."""
  14.  
  15.     def __init__(self) -> None:
  16.         self.type = 0
  17.  
  18.     def run(self) -> None:
  19.         """Run the Task"""
  20.         pass
  21.  
  22.     @staticmethod
  23.     def from_dict(x, validate: bool = False) -> "Task":
  24.         """Create a Task of the appropriate subclass from a python dict"""
  25.         task_type = x["type"]
  26.  
  27.         if task_type == -1:
  28.             return KillTask()
  29.         elif task_type == 0:
  30.             return Task()
  31.         elif task_type == 1:
  32.             return EchoTask(x["msg"])
  33.         elif task_type == 2:
  34.             return FileTask(x["src"], validate=validate)
  35.         elif task_type == 3:
  36.             return DeleteTask(x["src"], validate=validate)
  37.         elif task_type == 4:
  38.             return CopyTask(x["src"], x["dst"], validate=validate)
  39.         elif task_type == 5:
  40.             return MoveTask(x["src"], x["dst"], validate=validate)
  41.         elif task_type == 6:
  42.             raise NotImplementedError
  43.         else:
  44.             raise ValueError
  45.  
  46.     def __repr__(self) -> str:
  47.         return 'NULL'
  48.  
  49.     def __eq__(self, other) -> bool:
  50.         return self.__dict__ == other.__dict__
  51.  
  52.     def __ne__(self, other) -> bool:
  53.         return self.__dict__ != other.__dict__
  54.  
  55.  
  56. class KillTask(Task):
  57.     """Kill the qcp server"""
  58.     def __init__(self) -> None:
  59.         self.type = -1
  60.         super().__init__()
  61.  
  62.     def run(self) -> None:
  63.         raise NotImplementedError
  64.  
  65.     def __repr__(self) -> str:
  66.         return 'KILL'
  67.  
  68.  
  69. class EchoTask(Task):
  70.     """Log a message"""
  71.     def __init__(self,  msg: str) -> None:
  72.         super().__init__()
  73.         self.msg = msg
  74.         self.type = 1
  75.  
  76.     def run(self) -> None:
  77.         print(self.msg)
  78.  
  79.     def __repr__(self) -> str:
  80.         return f'Echo: "{self.msg}"'
  81.  
  82.  
  83. class FileTask(Task):
  84.     """Abstract class for all file-based tasks"""
  85.     def __init__(self, src: Pathish, validate: bool = True) -> None:
  86.         super().__init__()
  87.         self.validate = validate
  88.         self.src = Path(src).as_posix()
  89.         self.type = 2
  90.         if validate:
  91.             self.__validate__()
  92.  
  93.     def __validate__(self) -> None:
  94.         if not Path(self.src).exists():
  95.             raise FileNotFoundError(f'{self.src} does not exist')
  96.         elif not (Path(self.src).is_dir() or Path(self.src).is_file()):
  97.             raise TypeError(f'{self.src} is neither a file nor directory')
  98.  
  99.  
  100. class DeleteTask(FileTask):
  101.     """Delete a file"""
  102.     def __init__(self, src: Pathish, validate: bool = True) -> None:
  103.         super().__init__(src=src, validate=validate)
  104.         self.type = 3
  105.  
  106.     def run(self) -> None:
  107.         os.unlink(self.src)
  108.  
  109.     def __repr__(self) -> str:
  110.         return f'DEL {self.src}'
  111.  
  112.  
  113. class CopyTask(FileTask):
  114.     """Copy a file"""
  115.     def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
  116.         super().__init__(src=src, validate=False)
  117.         self.dst = Path(dst).as_posix()
  118.         self.type = 4
  119.         self.validate = validate
  120.         if validate:
  121.             self.__validate__()
  122.  
  123.     def __repr__(self) -> str:
  124.         return f'COPY {self.src} -> {self.dst}'
  125.  
  126.     def __validate__(self) -> None:
  127.         super().__validate__()
  128.         if Path(self.dst).exists():
  129.             raise FileExistsError
  130.  
  131.     def run(self) -> None:
  132.         self.__validate__()
  133.         shutil.copy(self.src, self.dst)
  134.  
  135.  
  136. class MoveTask(CopyTask):
  137.     """Move a file"""
  138.     def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
  139.         super().__init__(src=src, dst=dst, validate=validate)
  140.         self.type = 5
  141.  
  142.     def run(self) -> None:
  143.         super().__validate__()
  144.         shutil.move(self.src, self.dst)
  145.  
  146.     def __repr__(self) -> str:
  147.         return f'MOVE {self.src} -> {self.dst}'
  148.  
  149.  
  150. class TaskQueueElement:
  151.     """An enqueued Task"""
  152.  
  153.     task = None  #: A Task
  154.     status = None  #: Status of the queued Task
  155.     priority = None  #: Priority of the queued Task
  156.  
  157.     def __init__(self, task: Task, priority: 1) -> None:
  158.         self.task = task
  159.         self.priority = priority
  160.  
  161.     def __lt__(self, other) -> bool:
  162.         return self.priority < other.priority
  163.  
  164.     def __gt__(self, other) -> bool:
  165.         return self.priority > other.priority
  166.  
  167.     def __eq__(self, other) -> bool:
  168.         return self.__dict__ == other.__dict__
  169.  
  170.     def __ne__(self, other) -> bool:
  171.         return self.__dict__ != other.__dict__
  172.  
  173.  
  174. class TaskQueue:
  175.     """A prioritzed queue for tasks"""
  176.     def __init__(self, path: Pathish = 'qcp.db') -> None:
  177.         """
  178.         Instantiate a TaskQueue
  179.  
  180.         :param path: Path to store the persistent queue
  181.         :type path: Path or str
  182.         """
  183.  
  184.         self.con = sqlite3.connect(path, isolation_level="EXCLUSIVE")
  185.         self.path = Path(path)
  186.  
  187.         cur = self.con.cursor()
  188.         cur.execute("""
  189.            CREATE TABLE IF NOT EXISTS tasks (
  190.               priority INTEGER,
  191.               task TEXT,
  192.               status INTEGER,
  193.               owner INTEGER              
  194.             )              
  195.         """)
  196.         self.con.commit()
  197.  
  198.     @property
  199.     def n_total(self) -> int:
  200.         """Count of all tasks in queue (including failed and completed)"""
  201.         cur = self.con.cursor()
  202.         return cur.execute("SELECT COUNT(1) from tasks").fetchall()[0][0]
  203.  
  204.     @property
  205.     def n_pending(self) -> int:
  206.         """Number of pending tasks"""
  207.         cur = self.con.cursor()
  208.         return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 0").fetchall()[0][0]
  209.  
  210.     @property
  211.     def n_running(self) -> int:
  212.         """Count of currently running tasks"""
  213.         cur = self.con.cursor()
  214.         return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 1").fetchall()[0][0]
  215.  
  216.     @property
  217.     def n_done(self) -> int:
  218.         """count of completed tasks"""
  219.         cur = self.con.cursor()
  220.         return cur.execute("SELECT COUNT(1) from tasks WHERE status = 2").fetchall()[0][0]
  221.  
  222.     @property
  223.     def n_failed(self) -> int:
  224.         """count of completed tasks"""
  225.         cur = self.con.cursor()
  226.         return cur.execute("SELECT COUNT(1) from tasks WHERE status = -1").fetchall()[0][0]
  227.  
  228.     def put(self, task: "Task", priority: Optional[int] = None) -> None:
  229.         """
  230.         Enqueue a task
  231.  
  232.         :param task: Task to be added to the queue
  233.         :type task: Task
  234.         :param priority: (optional) priority for executing `task` (tasks with lower priority will be executed earlier)
  235.         :type priority: int
  236.         """
  237.  
  238.         cur = self.con.cursor()
  239.         cur.execute(
  240.             "INSERT INTO tasks (priority, task, status) VALUES (?, ?, ?)", (priority, json.dumps(task.__dict__), 0)
  241.         )
  242.         self.con.commit()
  243.  
  244.     def pop(self) -> "Task":
  245.         """
  246.         Retrieves Task object and sets status of Task in database to "in progress" (1)
  247.  
  248.         :raises AlreadyUnderEvaluationError: If trying to pop a tasks that is already being processed  (i.e. if a race
  249.         condition occurs if the queue is processed in parallel)
  250.         """
  251.         cur = self.con.cursor()
  252.         cur.execute("SELECT _ROWID_ from tasks WHERE status = 0 ORDER BY priority LIMIT 1")
  253.         oid = cur.fetchall()[0][0].__str__()
  254.         self.mark_running(oid, id(self))
  255.  
  256.         cur.execute("SELECT owner, task FROM tasks WHERE _ROWID_ = ?", oid)
  257.         record = cur.fetchall()[0]
  258.         if record[0] != id(self):
  259.             raise AlreadyUnderEvaluationError
  260.  
  261.         task = Task.from_dict(json.loads(record[1]))
  262.         task.oid = oid
  263.         return task
  264.  
  265.     def peek(self) -> "Task":
  266.         """
  267.         Retrieves Task object without changing its status in the queue
  268.         """
  269.         cur = self.con.cursor()
  270.         cur.execute("SELECT * from tasks ORDER BY priority LIMIT 1")
  271.         record = cur.fetchall()[0]
  272.         oid = record[0].__str__()
  273.         task = Task.from_dict(json.loads(record[1]), validate=False)
  274.         task.oid = oid
  275.         return task
  276.  
  277.     def print(self, n: int = 10) -> None:
  278.         """
  279.         Print an overview of the queue
  280.  
  281.         :param n: number of tasks to preview
  282.         :type n: int
  283.         """
  284.         assert isinstance(n, int) and n > 0
  285.         cur = self.con.cursor()
  286.         cur.execute("SELECT status, task from tasks ORDER BY priority LIMIT ?", (str(n), ))
  287.         records = cur.fetchall()
  288.         for record in records:
  289.             print(f"[{record[0]}] {Task.from_dict(json.loads(record[1]))}")
  290.  
  291.     def mark_pending(self, oid: int) -> None:
  292.         """
  293.         Mark the operation with the _ROWID_ `oid` as "pending" (0)
  294.  
  295.         :param oid: ID of the task to mark
  296.         :type oid: int
  297.         """
  298.         cur = self.con.cursor()
  299.         cur.execute("UPDATE tasks SET status = 0, owner = NULL where _ROWID_ = ?", (oid, ))
  300.         self.con.commit()
  301.  
  302.     def mark_running(self, oid: int, owner: int) -> None:
  303.         """Mark the operation with the _ROWID_ `oid` as "running" (1). The "owner" Id is to ensure no two processes
  304.         are trying to execute the same operation
  305.  
  306.         :param oid: ID of the task to mark
  307.         :type oid: int
  308.         :param owner: Id of the process that is handling the operation
  309.         :type owner: int
  310.         """
  311.         cur = self.con.cursor()
  312.         cur.execute("UPDATE tasks SET status = 1, owner = ? where _ROWID_ = ?", (owner, oid))
  313.         self.con.commit()
  314.  
  315.     def mark_done(self, oid: int) -> None:
  316.         """
  317.         Mark the operation with the _ROWID_ `oid` as "done" (2)
  318.         :param oid: ID of the task to mark
  319.         :type oid: int
  320.         """
  321.         cur = self.con.cursor()
  322.         cur.execute("UPDATE tasks SET status = 2, owner = NULL where _ROWID_ = ?", (oid, ))
  323.         self.con.commit()
  324.  
  325.     def mark_failed(self, oid: int) -> None:
  326.         """
  327.         Mark the operation with the _ROWID_ `oid` as "failed" (-1)
  328.  
  329.         :param oid: ID of the task to mark
  330.         :type oid: int
  331.         """
  332.         cur = self.con.cursor()
  333.         cur.execute("UPDATE tasks SET status = -1, owner = NULL where _ROWID_ = ?", (oid, ))
  334.         self.con.commit()
  335.  
  336.     def run(self) -> None:
  337.         """Execute all pending tasks"""
  338.         if self.n_pending < 1:
  339.             logging.getLogger().warn("Queue is empty")
  340.  
  341.         while self.n_pending > 0:
  342.             op = self.pop()
  343.             op.run()
  344.             self.mark_done(op.oid)
  345.  
  346.  
  347. class AlreadyUnderEvaluationError(Exception):
  348.     """This Task is already being processed by a different worker"""
  349.     pass
  350.      
  351. import tasks
  352. import pytest
  353.  
  354. def test_TaskQueue(tmp_path):
  355.     """TaskQueue can queue and execute tasks"""
  356.     src = tmp_path.joinpath("foo")
  357.     src.touch()
  358.  
  359.     q = tasks.TaskQueue(tmp_path.joinpath("qcp.db"))
  360.     q.put(tasks.CopyTask(src, tmp_path.joinpath("copied_file")))
  361.     q.run()
  362.     assert tmp_path.joinpath("copied_file").is_file()
  363.     q.put(tasks.MoveTask(tmp_path.joinpath("copied_file"), tmp_path.joinpath("moved_file")))
  364.     q.run()
  365.     assert not tmp_path.joinpath("copied_file").is_file()
  366.     assert tmp_path.joinpath("moved_file").is_file()
  367.     q.put(tasks.DeleteTask(tmp_path.joinpath("moved_file")))
  368.     q.run()
  369.     assert not tmp_path.joinpath("moved_file").is_file()
  370.     assert src.is_file()
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top