Advertisement
Guest User

Untitled

a guest
Feb 23rd, 2016
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.56 KB | None | 0 0
  1. #!/usr/bin/python
  2.  
  3. import json
  4. import psycopg2
  5. import sys
  6. import os
  7.  
  8. glob_single_id_found_counter = 0
  9.  
  10.  
  11. def write_to_database(order_id, product_id):
  12.     assert type(order_id) is str, "order_id var in sql function must be a string."
  13.     assert type(product_id) is str, "product_id var in sql function must be a string."
  14.     # Ask if ids are all same length ?
  15.     assert len(order_id) == 12, "order_id var in sql function must be 12 chars."
  16.     assert len(product_id) == 12, "order_id var in sql function must be 12 chars."
  17.  
  18.     try:
  19.         conn_data = "host='localhost'" \
  20.                     "dbname='postgres'" \
  21.                     "user='postgres'" \
  22.                     "password='123456'"
  23.  
  24.         conn = psycopg2.connect(conn_data)
  25.         cursor = conn.cursor()
  26.         cursor.execute("select orderId from my_table where orderId=%s;", (order_id,))
  27.         row_exist = cursor.fetchall()
  28.  
  29.         if not row_exist:
  30.             cursor.execute("INSERT INTO my_table VALUES (%s, %s)", (order_id, product_id,))
  31.             #print("\nTEST: orderId:", order_id,)
  32.             #print("TEST: productId:", product_id,)
  33.             print("was added to the database.")
  34.             conn.commit()
  35.  
  36.         cursor.close()
  37.         conn.close()
  38.  
  39.     except Exception as Error:
  40.         print(Error)
  41.         print("Unable to connect to the database.")
  42.         cursor.close()
  43.         conn.close()
  44.  
  45.  
  46. def watch_for_single_product_id(line):
  47.     if type(line) is int or len(line) == 0:
  48.         return False
  49.  
  50.     global glob_single_id_found_counter
  51.     # decode string-line to json
  52.     try:
  53.         line = (line.split("{"))[1].split("}")[0]
  54.         line = line.replace("u'", '"')
  55.         line = line.replace("'", '"')
  56.         line = "{" + line + "}"
  57.         line = json.loads(line)
  58.     except IndexError:
  59.         # line does not fit to our decoder
  60.         return False
  61.     # check for single productId
  62.     if (len(line["productId"])) == 1:
  63.         print("\norderId:\t", line["orderId"])
  64.         print("productId:\t", line["productId"][0])
  65.         glob_single_id_found_counter += 1
  66.         # pass values to sql function
  67.         write_to_database(line["orderId"], line["productId"][0])
  68.         return True
  69.  
  70.  
  71. def open_log(files):
  72.     global glob_single_id_found_counter
  73.     """
  74.    if os.path.exists(files):  # if no list then check for relative path
  75.        return True
  76.    else:
  77.        return False
  78.    """
  79.     if len(files) > 1:
  80.         for file in files:
  81.             print("\ninput:", file)
  82.             for line in open(files[0]):
  83.                 watch_for_single_product_id(line)
  84.     else:
  85.         print("\ninput:", files[0])
  86.         for line in open(files[0]):
  87.             watch_for_single_product_id(line)
  88.  
  89.     print("\nsingle productIds found:", glob_single_id_found_counter)
  90.     return True
  91.  
  92.  
  93. def test_watch_for_single_product_id():
  94.     good_test_line = """"2015-09-17 11:00:09,621 - adnymics.BrochureGeneration.JobGenerator - DEBUG: Checking args for (re-)generating brochure: {u'orderId': u'123456789000', u'productId': [u'123456789003']}"""""
  95.     bad_test_lines = (
  96.         "something something",
  97.         123456789,
  98.         "",
  99.     )
  100.  
  101.     assert watch_for_single_product_id(good_test_line) == True
  102.  
  103.     for line in bad_test_lines:
  104.         assert watch_for_single_product_id(line) == False
  105.  
  106. def test_open_log(tmpdir):
  107.     p = tmpdir.join("data.log")
  108.     p.write("""
  109.    "2015-09-17 11:00:09,621 - adnymics.BrochureGeneration.JobGenerator - DEBUG: Checking args for (re-)generating brochure: {u'orderId': u'123456789000', u'productId': [u'123456789001', u'123456789002', u'123456789003']}"
  110.    "2015-09-17 11:00:09,621 - adnymics.BrochureGeneration.JobGenerator - DEBUG: Checking args for (re-)generating brochure: {u'orderId': u'123456789000', u'productId': [u'123456789003']}"
  111.    """)
  112.     assert type(p.read()) is str, "log is not a string: "
  113.     #test = str(tmpdir)
  114.     #file = str(os.path.join(test, 'data.log')) # relative path
  115.     ##assert open_log(file) == True, "test"
  116.  
  117.  
  118. # ---------
  119. # SQL Debug
  120. #
  121.  
  122. def show_sql():
  123.     try:
  124.         conn_data = "host='localhost'" \
  125.                     "dbname='postgres'" \
  126.                     "user='postgres'" \
  127.                     "password='123456'"
  128.  
  129.         conn = psycopg2.connect(conn_data)
  130.         cursor = conn.cursor()
  131.         cursor.execute("""SELECT orderId, productId FROM my_table;""")
  132.         for row in cursor.fetchall():
  133.             print(row)
  134.  
  135.     except Exception as Error:
  136.         print(Error)
  137.  
  138. #
  139. # SQL Debug
  140. # ---------
  141.  
  142.  
  143. if __name__ == "__main__":
  144.     open_log(sys.argv[1:])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement