Advertisement
Guest User

Untitled

a guest
Aug 18th, 2019
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.04 KB | None | 0 0
  1. from pathlib import Path
  2. from typing import Union, Optional
  3. import shutil
  4. import os
  5. import json
  6. import sqlite3
  7. import logging
  8.  
  9. Pathish = Union[Path, str]
  10.  
  11.  
  12. class Task:
  13. """Abstract class for qcp Tasks. Should not be instantiated directly."""
  14.  
  15. def __init__(self) -> None:
  16. self.type = 0
  17.  
  18. def run(self) -> None:
  19. """Run the Task"""
  20. pass
  21.  
  22. @staticmethod
  23. def from_dict(x, validate: bool = False) -> "Task":
  24. """Create a Task of the appropriate subclass from a python dict"""
  25. task_type = x["type"]
  26.  
  27. if task_type == -1:
  28. return KillTask()
  29. elif task_type == 0:
  30. return Task()
  31. elif task_type == 1:
  32. return EchoTask(x["msg"])
  33. elif task_type == 2:
  34. return FileTask(x["src"], validate=validate)
  35. elif task_type == 3:
  36. return DeleteTask(x["src"], validate=validate)
  37. elif task_type == 4:
  38. return CopyTask(x["src"], x["dst"], validate=validate)
  39. elif task_type == 5:
  40. return MoveTask(x["src"], x["dst"], validate=validate)
  41. elif task_type == 6:
  42. raise NotImplementedError
  43. else:
  44. raise ValueError
  45.  
  46. def __repr__(self) -> str:
  47. return 'NULL'
  48.  
  49. def __eq__(self, other) -> bool:
  50. return self.__dict__ == other.__dict__
  51.  
  52. def __ne__(self, other) -> bool:
  53. return self.__dict__ != other.__dict__
  54.  
  55.  
  56. class KillTask(Task):
  57. """Kill the qcp server"""
  58. def __init__(self) -> None:
  59. self.type = -1
  60. super().__init__()
  61.  
  62. def run(self) -> None:
  63. raise NotImplementedError
  64.  
  65. def __repr__(self) -> str:
  66. return 'KILL'
  67.  
  68.  
  69. class EchoTask(Task):
  70. """Log a message"""
  71. def __init__(self, msg: str) -> None:
  72. super().__init__()
  73. self.msg = msg
  74. self.type = 1
  75.  
  76. def run(self) -> None:
  77. print(self.msg)
  78.  
  79. def __repr__(self) -> str:
  80. return f'Echo: "{self.msg}"'
  81.  
  82.  
  83. class FileTask(Task):
  84. """Abstract class for all file-based tasks"""
  85. def __init__(self, src: Pathish, validate: bool = True) -> None:
  86. super().__init__()
  87. self.validate = validate
  88. self.src = Path(src).as_posix()
  89. self.type = 2
  90. if validate:
  91. self.__validate__()
  92.  
  93. def __validate__(self) -> None:
  94. if not Path(self.src).exists():
  95. raise FileNotFoundError(f'{self.src} does not exist')
  96. elif not (Path(self.src).is_dir() or Path(self.src).is_file()):
  97. raise TypeError(f'{self.src} is neither a file nor directory')
  98.  
  99.  
  100. class DeleteTask(FileTask):
  101. """Delete a file"""
  102. def __init__(self, src: Pathish, validate: bool = True) -> None:
  103. super().__init__(src=src, validate=validate)
  104. self.type = 3
  105.  
  106. def run(self) -> None:
  107. os.unlink(self.src)
  108.  
  109. def __repr__(self) -> str:
  110. return f'DEL {self.src}'
  111.  
  112.  
  113. class CopyTask(FileTask):
  114. """Copy a file"""
  115. def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
  116. super().__init__(src=src, validate=False)
  117. self.dst = Path(dst).as_posix()
  118. self.type = 4
  119. self.validate = validate
  120. if validate:
  121. self.__validate__()
  122.  
  123. def __repr__(self) -> str:
  124. return f'COPY {self.src} -> {self.dst}'
  125.  
  126. def __validate__(self) -> None:
  127. super().__validate__()
  128. if Path(self.dst).exists():
  129. raise FileExistsError
  130.  
  131. def run(self) -> None:
  132. self.__validate__()
  133. shutil.copy(self.src, self.dst)
  134.  
  135.  
  136. class MoveTask(CopyTask):
  137. """Move a file"""
  138. def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
  139. super().__init__(src=src, dst=dst, validate=validate)
  140. self.type = 5
  141.  
  142. def run(self) -> None:
  143. super().__validate__()
  144. shutil.move(self.src, self.dst)
  145.  
  146. def __repr__(self) -> str:
  147. return f'MOVE {self.src} -> {self.dst}'
  148.  
  149.  
  150. class TaskQueueElement:
  151. """An enqueued Task"""
  152.  
  153. task = None #: A Task
  154. status = None #: Status of the queued Task
  155. priority = None #: Priority of the queued Task
  156.  
  157. def __init__(self, task: Task, priority: 1) -> None:
  158. self.task = task
  159. self.priority = priority
  160.  
  161. def __lt__(self, other) -> bool:
  162. return self.priority < other.priority
  163.  
  164. def __gt__(self, other) -> bool:
  165. return self.priority > other.priority
  166.  
  167. def __eq__(self, other) -> bool:
  168. return self.__dict__ == other.__dict__
  169.  
  170. def __ne__(self, other) -> bool:
  171. return self.__dict__ != other.__dict__
  172.  
  173.  
  174. class TaskQueue:
  175. """A prioritzed queue for tasks"""
  176. def __init__(self, path: Pathish = 'qcp.db') -> None:
  177. """
  178. Instantiate a TaskQueue
  179.  
  180. :param path: Path to store the persistent queue
  181. :type path: Path or str
  182. """
  183.  
  184. self.con = sqlite3.connect(path, isolation_level="EXCLUSIVE")
  185. self.path = Path(path)
  186.  
  187. cur = self.con.cursor()
  188. cur.execute("""
  189. CREATE TABLE IF NOT EXISTS tasks (
  190. priority INTEGER,
  191. task TEXT,
  192. status INTEGER,
  193. owner INTEGER
  194. )
  195. """)
  196. self.con.commit()
  197.  
  198. @property
  199. def n_total(self) -> int:
  200. """Count of all tasks in queue (including failed and completed)"""
  201. cur = self.con.cursor()
  202. return cur.execute("SELECT COUNT(1) from tasks").fetchall()[0][0]
  203.  
  204. @property
  205. def n_pending(self) -> int:
  206. """Number of pending tasks"""
  207. cur = self.con.cursor()
  208. return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 0").fetchall()[0][0]
  209.  
  210. @property
  211. def n_running(self) -> int:
  212. """Count of currently running tasks"""
  213. cur = self.con.cursor()
  214. return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 1").fetchall()[0][0]
  215.  
  216. @property
  217. def n_done(self) -> int:
  218. """count of completed tasks"""
  219. cur = self.con.cursor()
  220. return cur.execute("SELECT COUNT(1) from tasks WHERE status = 2").fetchall()[0][0]
  221.  
  222. @property
  223. def n_failed(self) -> int:
  224. """count of completed tasks"""
  225. cur = self.con.cursor()
  226. return cur.execute("SELECT COUNT(1) from tasks WHERE status = -1").fetchall()[0][0]
  227.  
  228. def put(self, task: "Task", priority: Optional[int] = None) -> None:
  229. """
  230. Enqueue a task
  231.  
  232. :param task: Task to be added to the queue
  233. :type task: Task
  234. :param priority: (optional) priority for executing `task` (tasks with lower priority will be executed earlier)
  235. :type priority: int
  236. """
  237.  
  238. cur = self.con.cursor()
  239. cur.execute(
  240. "INSERT INTO tasks (priority, task, status) VALUES (?, ?, ?)", (priority, json.dumps(task.__dict__), 0)
  241. )
  242. self.con.commit()
  243.  
  244. def pop(self) -> "Task":
  245. """
  246. Retrieves Task object and sets status of Task in database to "in progress" (1)
  247.  
  248. :raises AlreadyUnderEvaluationError: If trying to pop a tasks that is already being processed (i.e. if a race
  249. condition occurs if the queue is processed in parallel)
  250. """
  251. cur = self.con.cursor()
  252. cur.execute("SELECT _ROWID_ from tasks WHERE status = 0 ORDER BY priority LIMIT 1")
  253. oid = cur.fetchall()[0][0].__str__()
  254. self.mark_running(oid, id(self))
  255.  
  256. cur.execute("SELECT owner, task FROM tasks WHERE _ROWID_ = ?", oid)
  257. record = cur.fetchall()[0]
  258. if record[0] != id(self):
  259. raise AlreadyUnderEvaluationError
  260.  
  261. task = Task.from_dict(json.loads(record[1]))
  262. task.oid = oid
  263. return task
  264.  
  265. def peek(self) -> "Task":
  266. """
  267. Retrieves Task object without changing its status in the queue
  268. """
  269. cur = self.con.cursor()
  270. cur.execute("SELECT * from tasks ORDER BY priority LIMIT 1")
  271. record = cur.fetchall()[0]
  272. oid = record[0].__str__()
  273. task = Task.from_dict(json.loads(record[1]), validate=False)
  274. task.oid = oid
  275. return task
  276.  
  277. def print(self, n: int = 10) -> None:
  278. """
  279. Print an overview of the queue
  280.  
  281. :param n: number of tasks to preview
  282. :type n: int
  283. """
  284. assert isinstance(n, int) and n > 0
  285. cur = self.con.cursor()
  286. cur.execute("SELECT status, task from tasks ORDER BY priority LIMIT ?", (str(n), ))
  287. records = cur.fetchall()
  288. for record in records:
  289. print(f"[{record[0]}] {Task.from_dict(json.loads(record[1]))}")
  290.  
  291. def mark_pending(self, oid: int) -> None:
  292. """
  293. Mark the operation with the _ROWID_ `oid` as "pending" (0)
  294.  
  295. :param oid: ID of the task to mark
  296. :type oid: int
  297. """
  298. cur = self.con.cursor()
  299. cur.execute("UPDATE tasks SET status = 0, owner = NULL where _ROWID_ = ?", (oid, ))
  300. self.con.commit()
  301.  
  302. def mark_running(self, oid: int, owner: int) -> None:
  303. """Mark the operation with the _ROWID_ `oid` as "running" (1). The "owner" Id is to ensure no two processes
  304. are trying to execute the same operation
  305.  
  306. :param oid: ID of the task to mark
  307. :type oid: int
  308. :param owner: Id of the process that is handling the operation
  309. :type owner: int
  310. """
  311. cur = self.con.cursor()
  312. cur.execute("UPDATE tasks SET status = 1, owner = ? where _ROWID_ = ?", (owner, oid))
  313. self.con.commit()
  314.  
  315. def mark_done(self, oid: int) -> None:
  316. """
  317. Mark the operation with the _ROWID_ `oid` as "done" (2)
  318. :param oid: ID of the task to mark
  319. :type oid: int
  320. """
  321. cur = self.con.cursor()
  322. cur.execute("UPDATE tasks SET status = 2, owner = NULL where _ROWID_ = ?", (oid, ))
  323. self.con.commit()
  324.  
  325. def mark_failed(self, oid: int) -> None:
  326. """
  327. Mark the operation with the _ROWID_ `oid` as "failed" (-1)
  328.  
  329. :param oid: ID of the task to mark
  330. :type oid: int
  331. """
  332. cur = self.con.cursor()
  333. cur.execute("UPDATE tasks SET status = -1, owner = NULL where _ROWID_ = ?", (oid, ))
  334. self.con.commit()
  335.  
  336. def run(self) -> None:
  337. """Execute all pending tasks"""
  338. if self.n_pending < 1:
  339. logging.getLogger().warn("Queue is empty")
  340.  
  341. while self.n_pending > 0:
  342. op = self.pop()
  343. op.run()
  344. self.mark_done(op.oid)
  345.  
  346.  
  347. class AlreadyUnderEvaluationError(Exception):
  348. """This Task is already being processed by a different worker"""
  349. pass
  350.  
  351. import tasks
  352. import pytest
  353.  
  354. def test_TaskQueue(tmp_path):
  355. """TaskQueue can queue and execute tasks"""
  356. src = tmp_path.joinpath("foo")
  357. src.touch()
  358.  
  359. q = tasks.TaskQueue(tmp_path.joinpath("qcp.db"))
  360. q.put(tasks.CopyTask(src, tmp_path.joinpath("copied_file")))
  361. q.run()
  362. assert tmp_path.joinpath("copied_file").is_file()
  363. q.put(tasks.MoveTask(tmp_path.joinpath("copied_file"), tmp_path.joinpath("moved_file")))
  364. q.run()
  365. assert not tmp_path.joinpath("copied_file").is_file()
  366. assert tmp_path.joinpath("moved_file").is_file()
  367. q.put(tasks.DeleteTask(tmp_path.joinpath("moved_file")))
  368. q.run()
  369. assert not tmp_path.joinpath("moved_file").is_file()
  370. assert src.is_file()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement