Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import psycopg2
- import configparser
- from urllib.parse import urlparse, uses_netloc
- config = configparser.ConfigParser()
- config.read('config.ini')
- connection_string = config['database']['postgres_connection']
- def connect_to_db(conn_str):
- uses_netloc.append("postgres")
- url = urlparse(conn_str)
- conn = psycopg2.connect(database=url.path[1:],
- user=url.username,
- password=url.password,
- host=url.hostname,
- port=url.port)
- return conn
- def initialize():
- with conn.cursor() as cursor:
- # foreign key for the productID, how?
- cursor.execute('CREATE TABLE IF NOT EXISTS Customers (id SERIAL UNIQUE, firstName text, lastName text, street text, city text, state text, zip int, CONSTRAINT nameConstraint UNIQUE (firstName, lastName));')
- cursor.execute('CREATE TABLE IF NOT EXISTS Products (id SERIAL UNIQUE, name text, price real, CONSTRAINT pNameConstraint UNIQUE (name));')
- cursor.execute('CREATE TABLE IF NOT EXISTS Orders (id SERIAL UNIQUE, CustomerId int, ProductID int, date text);')
- conn.commit()
- # displays customers on project1/templates/customers/index
- def get_customers():
- with conn.cursor() as cursor:
- cursor.execute('SELECT * FROM Customers;')
- customers = list()
- for result in cursor:
- customers.append({'id':result[0],'firstName':result[1],'lastName':result[2],'street':result[3],'city':result[4],'state':result[5],'zip':result[6]})
- return customers
- def get_customer(id):
- with conn.cursor() as cursor:
- cursor.execute('SELECT * FROM Customers WHERE id = %s',(id,))
- result = cursor.fetchone()
- return {'id':result[0],'firstName':result[1],'lastName':result[2],'street':result[3],'city':result[4],'state':result[5],'zip':result[6]}
- def upsert_customer(customer):
- with conn.cursor() as cursor:
- cursor.execute('INSERT INTO Customers(firstName, lastName, street, city, state, zip) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (firstName, lastName) DO UPDATE SET street = excluded.street, city = excluded.city, state = excluded.state, zip = excluded.zip',
- (customer['firstName'],customer['lastName'],customer['street'],customer['city'],customer['state'],customer['zip']))
- conn.commit()
- def delete_customer(id):
- with conn.cursor() as cursor:
- cursor.execute('DELETE FROM Customers WHERE id = %s', (id,))
- conn.commit()
- # displays products on project1/templates/products/index
- def get_products():
- with conn.cursor() as cursor:
- cursor.execute('SELECT * FROM Products;')
- products = list()
- for result in cursor:
- products.append({'id':result[0],'name':result[1],'price':result[2]})
- return products
- # this works correctly now
- def get_product(id):
- with conn.cursor() as cursor:
- cursor.execute('SELECT * FROM Products WHERE id = %s',(id,))
- result = cursor.fetchone()
- return {'id':result[0],'name':result[1],'price':result[2]}
- # allows the price for a product to be changed. If the name is changed, the product is new (inserted).
- # the conflict could be changed to allow for a product to be renamed, but name seems pretty important for identifying a product
- # and could be part of the PK.
- def upsert_product(product):
- with conn.cursor() as cursor:
- cursor.execute('INSERT INTO Products(name, price) VALUES(%s, %s) ON CONFLICT (name) DO UPDATE SET price = excluded.price;', (product['name'], product['price']) )
- conn.commit()
- def delete_product(id):
- with conn.cursor() as cursor:
- cursor.execute('DELETE FROM Products WHERE id = %s',(id,))
- conn.commit()
- # needs to be joined with other tables, see index.html for orders
- def get_orders():
- with conn.cursor() as cursor:
- cursor.execute('SELECT * FROM Orders join customers on Orders.customerid = customers.id join products on orders.productid = products.id;')
- orders = list()
- for result in cursor:
- orders.append({'product': {'id': result[11],'name': result[12] , 'price': result[13]}, 'customer': {'id':result[4], 'firstName':result[5],
- 'lastName':result[6],
- 'street':result[7], 'city':result[8], 'state': result[9], 'zip': result[10]},
- 'id': result[0], 'customerid': result[1], 'productid':result[2], 'date': result[3]})
- return orders
- def get_order(id):
- with conn.cursor() as cursor:
- cursor.execute('SELECT * FROM Orders WHERE id = %s;',(id,))
- result = cursor.fetchone()
- return {'id':result[0], 'customerid':result[1], 'productId':result[2], 'date':result[3]}
- # should order be upsert, or just insert? There does not exist an edit feature on the page.
- def upsert_order(order):
- with conn.cursor() as cursor:
- cursor.execute('INSERT INTO Orders(customerid, productId, date) VALUES(%s, %s, %s);', (order['customerId'], order['productId'], order['date']) )
- conn.commit()
- def delete_order(id):
- with conn.cursor() as cursor:
- cursor.execute('DELETE FROM Orders WHERE id = %s;',(id,))
- conn.commit()
- # Return the customer, with a list of orders. Each order should have a product
- # property as well.
- # renders the added report page
- def customer_report(id):
- customer = get_customer(id)
- orders = get_orders()
- customer['orders'] = [o for o in orders if o['customerid'] == id]
- return customer
- # Return a list of products. For each product, build
- # create and populate a last_order_date, total_sales, and
- # gross_revenue property. Use JOIN and aggregation to avoid
- # accessing the database more than once, and retrieving unnecessary
- # information
- # appears to be working
- def sales_report():
- products = get_products()
- for product in products:
- orders = [o for o in get_orders() if o['productid'] == product['id']]
- orders = sorted(orders, key=lambda k: k['date'])
- product['last_order_date'] = orders[-1]['date']
- product['total_sales'] = len(orders)
- product['gross_revenue'] = product['price'] * product['total_sales']
- return products
- conn = connect_to_db(connection_string)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement