Advertisement
Guest User

Untitled

a guest
Dec 6th, 2019
164
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.31 KB | None | 0 0
  1. import datetime
  2. import sys
  3. import time
  4. import collections
  5.  
  6. sys.path.append('/usr/lib/python3.6/site-packages/')
  7. sys.path.append('/usr/lib64/python3.6/site-packages')
  8.  
  9. import psycopg2
  10. import pygrametl
  11.  
  12. from pygrametl.datasources import CSVSource, HashJoiningSource
  13. from pygrametl.tables import Dimension, FactTable, CachedDimension
  14.  
  15. dw_string = "host='127.0.0.1' dbname='fklubdw' user='dwuser' password='12345'"
  16. dw_pgconn = psycopg2.connect(dw_string)
  17.  
  18.  
  19. dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)
  20.  
  21. membersfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/member.csv", 'r', 16384)
  22. members = CSVSource(membersfile, delimiter=';')
  23.  
  24. productsfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/product.csv", 'r', 16384)
  25. products = CSVSource(productsfile, delimiter=';')
  26.  
  27. roomsfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/room.csv", 'r', 16384)
  28. rooms = CSVSource(roomsfile, delimiter=';')
  29.  
  30. categoriesfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/category.csv", 'r', 16384)
  31. categories = CSVSource(categoriesfile, delimiter=';')
  32.  
  33. #paymentsfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/payment.csv", 'r', 16384)
  34. #payments = CSVSource(paymentsfile, delimiter=';')
  35.  
  36. #oldpricefile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/oldprice.csv", 'r', 16384)
  37. #oldprices = CSVSource(oldpricefile, delimiter=';')
  38.  
  39. #productroomfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/product_room.csv", 'r', 16384)
  40. #productroom = CSVSource(productroomfile, delimiter=';')
  41.  
  42. salefile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/sale.csv", 'r', 16384)
  43. sales = CSVSource(salefile, delimiter=';')
  44.  
  45. productcategoriesfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/product_categories.csv", 'r', 16384)
  46. productcategories = CSVSource(productcategoriesfile, delimiter=';')
  47.  
  48. fact_table = FactTable(
  49. name='facttable',
  50. keyrefs=['product_id', 'category_id', 'room_id', 'member_id', 'timestamp'],
  51. measures=['price'])
  52.  
  53. member_dimension = Dimension(
  54. name='member',
  55. key='member_id',
  56. attributes=['active', 'year', 'gender', 'want_spam', 'balance', 'undo_count'],
  57. lookupatts=['member_id'])
  58.  
  59. product_dimension = CachedDimension(
  60. name='product',
  61. key='product_id',
  62. attributes=['name', 'price', 'active', 'deactivate_date', 'quantity', 'alcohol_content_ml', 'start_date'],
  63. lookupatts=['product_id'])
  64.  
  65. room_dimension = CachedDimension(
  66. name='room',
  67. key='room_id',
  68. attributes=['name', 'description'],
  69. lookupatts=['room_id'])
  70.  
  71. category_dimension = CachedDimension(
  72. name='category',
  73. key='category_id',
  74. attributes=['name'],
  75. lookupatts=['category_id'])
  76.  
  77. time_dimension = Dimension(
  78. name='time',
  79. key='time_id',
  80. attributes=['day', 'month', 'year', 'quarter'])
  81.  
  82. #datastatus_dimension = Dimension(
  83. # name='datastatus',
  84. # key='id',
  85. # attributes=['status'])
  86.  
  87. #audit_dimension = Dimension(
  88. # name='audit',
  89. # key='id',
  90. # attributes=['start_time', 'end_time', 'processed_number', 'rejected_number', 'ETL_version'])
  91.  
  92. def member_transformer(row):
  93. if row['active']=='f':
  94. row['active'] = False
  95. else:
  96. row['active'] = True
  97.  
  98. if row['want_spam']=='f':
  99. row['want_spam'] = False
  100. else:
  101. row['want_spam'] = True
  102.  
  103. def product_transform(row):
  104. if row['alcohol_content_ml'] == '':
  105. row['alcohol_content_ml'] = 0
  106. if row['deactivate_date'] == '':
  107. row['deactivate_date'] = 0
  108. else:
  109. row['deactivate_date'] = 1
  110. if row['start_date'] == '':
  111. row['start_date'] = 0
  112. else:
  113. row['start_date'] = 1
  114.  
  115. def split_timestamp(row):
  116. #"""Splits a timestamp containing a date into its three parts"""
  117.  
  118. # First the timestamp is extracted from the row dictionary
  119. timestamp = row['timestamp']
  120.  
  121. # Then the string is split on the / in the time stamp
  122. timestamp_split = timestamp.split('-')
  123.  
  124. # Finally each part is reassigned to the row dictionary. It can then be
  125. # accessed by the caller as the row is a reference to the dict object
  126. row['year'] = timestamp_split[0]
  127. row['month'] = timestamp_split[1]
  128.  
  129. daySplit = timestamp_split[2].split(' ')
  130. row['day'] = daySplit[0]
  131.  
  132. row['quarter'] = (int(timestamp_split[1]) % 3) + 1
  133.  
  134. for row in members:
  135. row['member_id'] = row['id']
  136. member_transformer(row)
  137. member_dimension.insert(row)
  138.  
  139. for row in categories:
  140. row['category_id'] = row['id']
  141. category_dimension.insert(row)
  142.  
  143. for row in products:
  144. row['product_id'] = row['id']
  145. product_transform(row)
  146. product_dimension.insert(row)
  147.  
  148. for row in rooms:
  149. row['room_id'] = row['id']
  150. room_dimension.insert(row)
  151.  
  152. salesCategory = HashJoiningSource(sales, 'product_id',
  153. productcategories, 'product_id')
  154.  
  155. for row in salesCategory:
  156. split_timestamp(row)
  157. if row['member_id'] == '0':
  158. continue
  159.  
  160. row['member_id'] = member_dimension.lookup(row)
  161. row['product_id'] = product_dimension.lookup(row)
  162. row['room_id'] = room_dimension.lookup(row)
  163. row['category_id'] = category_dimension.lookup(row)
  164. row['price'] = row['price']
  165. row['timestamp'] = time_dimension.ensure(row)
  166. fact_table.insert(row)
  167.  
  168. dw_conn_wrapper.commit()
  169. dw_conn_wrapper.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement