Guest User

Untitled

a guest
Jul 21st, 2018
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.27 KB | None | 0 0
  1. import os
  2. import sqlite3
  3. import json
  4.  
  5. from biothings import config
  6. from biothings.utils.hub_db import IDatabase
  7. from biothings.utils.dotfield import parse_dot_fields
  8. from biothings.utils.dataload import update_dict_recur
  9. from biothings.utils.common import json_serial
  10.  
  11. def get_hub_db_conn():
  12. return Database()
  13.  
  14. def get_src_dump():
  15. db = Database()
  16. return db[db.CONFIG.DATA_SRC_DUMP_COLLECTION]
  17.  
  18. def get_src_master():
  19. db = Database()
  20. return db[db.CONFIG.DATA_SRC_MASTER_COLLECTION]
  21.  
  22. def get_src_build():
  23. db = Database()
  24. return db[db.CONFIG.DATA_SRC_BUILD_COLLECTION]
  25.  
  26. def get_src_build_config():
  27. db = Database()
  28. return db[db.CONFIG.DATA_SRC_BUILD_CONFIG_COLLECTION]
  29.  
  30. def get_source_fullname(col_name):
  31. """
  32. Assuming col_name is a collection created from an upload process,
  33. find the main source & sub_source associated.
  34. """
  35. src_dump = get_src_dump()
  36. info = None
  37. for doc in src_dump.find():
  38. if col_name in doc.get("upload",{}).get("jobs",{}).keys():
  39. info = doc
  40. if info:
  41. name = info["_id"]
  42. if name != col_name:
  43. # col_name was a sub-source name
  44. return "%s.%s" % (name,col_name)
  45. else:
  46. return name
  47.  
  48. class Database(IDatabase):
  49.  
  50. def __init__(self):
  51. super(Database,self).__init__()
  52. self.name = self.CONFIG.DATA_HUB_DB_DATABASE
  53. if not os.path.exists(self.CONFIG.HUB_DB_BACKEND["sqlite_db_folder"]):
  54. os.makedirs(self.CONFIG.HUB_DB_BACKEND["sqlite_db_folder"])
  55. self.dbfile = os.path.join(self.CONFIG.HUB_DB_BACKEND["sqlite_db_folder"],self.name)
  56. self.cols = {}
  57.  
  58. @property
  59. def address(self):
  60. return self.dbfile
  61.  
  62. def get_conn(self):
  63. return sqlite3.connect(self.dbfile)
  64.  
  65. def collection_names(self):
  66. tables = self.get_conn().execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
  67. return [name[0] for name in tables]
  68.  
  69. def create_collection(self,colname):
  70. return self[colname]
  71.  
  72. def create_if_needed(self,table):
  73. existings = [tname[0] for tname in self.get_conn().execute("SELECT name FROM sqlite_master WHERE type='table' and " + \
  74. "name = ?",(table,)).fetchall()]
  75. if not table in existings:
  76. # TODO: injection...
  77. self.get_conn().execute("CREATE TABLE %s (_id TEXT PRIMARY KEY, document TEXT)" % table).fetchone()
  78.  
  79. def __getitem__(self, colname):
  80. if not colname in self.cols:
  81. self.create_if_needed(colname)
  82. self.cols[colname] = Collection(colname,self)
  83. return self.cols[colname]
  84.  
  85.  
  86. class Collection(object):
  87.  
  88. def __init__(self, colname, db):
  89. self.colname = colname
  90. self.db = db
  91.  
  92. def get_conn(self):
  93. return sqlite3.connect(self.db.dbfile)
  94.  
  95. @property
  96. def name(self):
  97. return self.colname
  98.  
  99. @property
  100. def database(self):
  101. return self.db
  102.  
  103. def find_one(self,*args,**kwargs):
  104. if args and len(args) == 1 and type(args[0]) == dict:
  105. if len(args[0]) == 1 and "_id" in args[0]:
  106. strdoc = self.get_conn().execute("SELECT document FROM %s WHERE _id = ?" % self.colname,(args[0]["_id"],)).fetchone()
  107. if strdoc:
  108. return json.loads(strdoc[0])
  109. else:
  110. return None
  111. else:
  112. return self.find(*args,find_one=True)
  113. elif args or kwargs:
  114. raise NotImplementedError("find(): %s %s" % (repr(args),repr(kwargs)))
  115. else:
  116. return self.find(find_one=True)
  117.  
  118. def find(self,*args,**kwargs):
  119. results = []
  120. if args and len(args) == 1 and type(args[0]) == dict and len(args[0]) > 0:
  121. # it's key/value search, let's iterate
  122. for doc in self.get_conn().execute("SELECT document FROM %s" % self.colname).fetchall():
  123. found = False
  124. doc = json.loads(doc[0])
  125. for k,v in args[0].items():
  126. if k in doc:
  127. if doc[k] == v:
  128. found = True
  129. else:
  130. found = False
  131. break
  132. if found:
  133. if "find_one" in kwargs:
  134. return doc
  135. else:
  136. results.append(doc)
  137. return results
  138. elif not args or len(args) == 1 and len(args[0]) == 0:
  139. # nothing or empty dict
  140. return [json.loads(doc[0]) for doc in \
  141. self.get_conn().execute("SELECT document FROM %s" % self.colname).fetchall()]
  142. else:
  143. raise NotImplementedError("find: args=%s kwargs=%s" % (repr(args),repr(kwargs)))
  144.  
  145. def insert_one(self,doc):
  146. assert "_id" in doc
  147. with self.get_conn() as conn:
  148. conn.execute("INSERT INTO %s (_id,document) VALUES (?,?)" % self.colname, \
  149. (doc["_id"],json.dumps(doc,default=json_serial))).fetchone()
  150. conn.commit()
  151.  
  152. def update_one(self,query,what):
  153. assert len(what) == 1 and ("$set" in what or \
  154. "$unset" in what or "$push" in what), "$set/$unset/$push operators not found"
  155. doc = self.find_one(query)
  156. if doc:
  157. if "$set" in what:
  158. # parse_dot_fields uses json.dumps internally, we can to make
  159. # sure everything is serializable first
  160. what = json.loads(json.dumps(what,default=json_serial))
  161. what = parse_dot_fields(what["$set"])
  162. doc = update_dict_recur(doc,what)
  163. elif "$unset" in what:
  164. for keytounset in what["$unset"].keys():
  165. doc.pop(keytounset,None)
  166. elif "$push" in what:
  167. for listkey,elem in what["$push"].items():
  168. assert not "." in listkey, "$push not supported for nested keys: %s" % listkey
  169. doc.setdefault(listkey,[]).append(elem)
  170.  
  171. self.save(doc)
  172.  
  173. def update(self,query,what):
  174. docs = self.find(query)
  175. for doc in docs:
  176. self.update_one({"_id":doc["_id"]},what)
  177.  
  178. def save(self,doc):
  179. if self.find_one({"_id":doc["_id"]}):
  180. with self.get_conn() as conn:
  181. conn.execute("UPDATE %s SET document = ? WHERE _id = ?" % self.colname,
  182. (json.dumps(doc,default=json_serial),doc["_id"]))
  183. conn.commit()
  184. else:
  185. self.insert_one(doc)
  186.  
  187. def replace_one(self,query,doc):
  188. orig = self.find_one(query)
  189. if orig:
  190. with self.get_conn() as conn:
  191. conn.execute("UPDATE %s SET document = ? WHERE _id = ?" % self.colname,
  192. (json.dumps(doc,default=json_serial),orig["_id"]))
  193. conn.commit()
  194.  
  195. def remove(self,query):
  196. docs = self.find(query)
  197. with self.get_conn() as conn:
  198. for doc in docs:
  199. conn.execute("DELETE FROM %s WHERE _id = ?" % self.colname,(doc["_id"],)).fetchone()
  200. conn.commit()
  201.  
  202. def count(self):
  203. return self.get_conn().execute("SELECT count(_id) FROM %s" % self.colname).fetchone()[0]
  204.  
  205. def __getitem__(self, _id):
  206. return self.find_one({"_id":_id})
  207.  
  208. def __getstate__(self):
  209. self.__dict__.pop("db",None)
  210. return self.__dict__
Add Comment
Please, Sign In to add comment