Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- import sys
- import time
- import collections
- sys.path.append('/usr/lib/python3.6/site-packages/')
- sys.path.append('/usr/lib64/python3.6/site-packages')
- import psycopg2
- import pygrametl
- from pygrametl.datasources import CSVSource, HashJoiningSource
- from pygrametl.tables import Dimension, FactTable, CachedDimension
- dw_string = "host='127.0.0.1' dbname='fklubdw' user='dwuser' password='12345'"
- dw_pgconn = psycopg2.connect(dw_string)
- dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)
- membersfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/member.csv", 'r', 16384)
- members = CSVSource(membersfile, delimiter=';')
- productsfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/product.csv", 'r', 16384)
- products = CSVSource(productsfile, delimiter=';')
- roomsfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/room.csv", 'r', 16384)
- rooms = CSVSource(roomsfile, delimiter=';')
- categoriesfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/category.csv", 'r', 16384)
- categories = CSVSource(categoriesfile, delimiter=';')
- #paymentsfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/payment.csv", 'r', 16384)
- #payments = CSVSource(paymentsfile, delimiter=';')
- #oldpricefile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/oldprice.csv", 'r', 16384)
- #oldprices = CSVSource(oldpricefile, delimiter=';')
- #productroomfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/product_room.csv", 'r', 16384)
- #productroom = CSVSource(productroomfile, delimiter=';')
- salefile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/sale.csv", 'r', 16384)
- sales = CSVSource(salefile, delimiter=';')
- productcategoriesfile = csvfile= open("/home/dwuser/fklubdw/FKlubSourceData/product_categories.csv", 'r', 16384)
- productcategories = CSVSource(productcategoriesfile, delimiter=';')
- fact_table = FactTable(
- name='facttable',
- keyrefs=['product_id', 'category_id', 'room_id', 'member_id', 'timestamp'],
- measures=['price'])
- member_dimension = Dimension(
- name='member',
- key='member_id',
- attributes=['active', 'year', 'gender', 'want_spam', 'balance', 'undo_count'],
- lookupatts=['member_id'])
- product_dimension = CachedDimension(
- name='product',
- key='product_id',
- attributes=['name', 'price', 'active', 'deactivate_date', 'quantity', 'alcohol_content_ml', 'start_date'],
- lookupatts=['product_id'])
- room_dimension = CachedDimension(
- name='room',
- key='room_id',
- attributes=['name', 'description'],
- lookupatts=['room_id'])
- category_dimension = CachedDimension(
- name='category',
- key='category_id',
- attributes=['name'],
- lookupatts=['category_id'])
- time_dimension = Dimension(
- name='time',
- key='time_id',
- attributes=['day', 'month', 'year', 'quarter'])
- #datastatus_dimension = Dimension(
- # name='datastatus',
- # key='id',
- # attributes=['status'])
- #audit_dimension = Dimension(
- # name='audit',
- # key='id',
- # attributes=['start_time', 'end_time', 'processed_number', 'rejected_number', 'ETL_version'])
- def member_transformer(row):
- if row['active']=='f':
- row['active'] = False
- else:
- row['active'] = True
- if row['want_spam']=='f':
- row['want_spam'] = False
- else:
- row['want_spam'] = True
- def product_transform(row):
- if row['alcohol_content_ml'] == '':
- row['alcohol_content_ml'] = 0
- if row['deactivate_date'] == '':
- row['deactivate_date'] = 0
- else:
- row['deactivate_date'] = 1
- if row['start_date'] == '':
- row['start_date'] = 0
- else:
- row['start_date'] = 1
- def split_timestamp(row):
- #"""Splits a timestamp containing a date into its three parts"""
- # First the timestamp is extracted from the row dictionary
- timestamp = row['timestamp']
- # Then the string is split on the / in the time stamp
- timestamp_split = timestamp.split('-')
- # Finally each part is reassigned to the row dictionary. It can then be
- # accessed by the caller as the row is a reference to the dict object
- row['year'] = timestamp_split[0]
- row['month'] = timestamp_split[1]
- daySplit = timestamp_split[2].split(' ')
- row['day'] = daySplit[0]
- row['quarter'] = (int(timestamp_split[1]) % 3) + 1
- for row in members:
- row['member_id'] = row['id']
- member_transformer(row)
- member_dimension.insert(row)
- for row in categories:
- row['category_id'] = row['id']
- category_dimension.insert(row)
- for row in products:
- row['product_id'] = row['id']
- product_transform(row)
- product_dimension.insert(row)
- for row in rooms:
- row['room_id'] = row['id']
- room_dimension.insert(row)
- salesCategory = HashJoiningSource(sales, 'product_id',
- productcategories, 'product_id')
- for row in salesCategory:
- split_timestamp(row)
- if row['member_id'] == '0':
- continue
- row['member_id'] = member_dimension.lookup(row)
- row['product_id'] = product_dimension.lookup(row)
- row['room_id'] = room_dimension.lookup(row)
- row['category_id'] = category_dimension.lookup(row)
- row['price'] = row['price']
- row['timestamp'] = time_dimension.ensure(row)
- fact_table.insert(row)
- dw_conn_wrapper.commit()
- dw_conn_wrapper.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement