Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- from datetime import datetime
- # create data folder to download stock data to
- cur_dir = os.getcwd()
- data_dir = os.path.join(cur_dir, 'data')
- archive_dir = os.path.join(cur_dir, 'archive')
- if not os.path.exists(data_dir):
- os.makedirs('data')
- if not os.path.exists(archive_dir):
- os.makedirs('archive')
- import pandas as pd
- pd.set_option('display.width', 600)
- pd.set_option('display.max_columns', 500)
- import glob
- import os
- import numpy as np
- from cassandra import ConsistencyLevel
- from cassandra.cluster import Cluster
- from cassandra.query import BatchStatement
- cluster = Cluster(port=9042)
- session = cluster.connect()
- key_space = 'ticker_plant'
- cql_path = r'examples/cassandra_schema.cql'
- def create_key_space_and_tables(key_space, cql_path):
- q = "CREATE KEYSPACE {key_space} WITH replication = ".format(key_space=key_space)
- q = q + " {'class':'SimpleStrategy', 'replication_factor':1};"
- cluster.connect().execute(q)
- with open(cql_path) as cql_file:
- for stmt in cql_file.read().split(";"):
- if len(stmt.strip()) > 0:
- print(stmt)
- create_schema(key_space, stmt)
- def create_schema(key_space, sql_statement):
- q = str(sql_statement)
- sess = cluster.connect()
- sess.set_keyspace(key_space)
- sess.execute(q)
- def drop_keyspace(key_space):
- session.execute("DROP KEYSPACE " + key_space)
- def insert_quote_data(df):
- try:
- tablename = 'datapoint'
- for dd in np.array_split(df, len(df) / 100):
- batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
- for index, row in dd.iterrows():
- query = 'INSERT INTO ' + tablename + '({0}) values ({1})'
- query_in = query.format(','.join(df.columns), ','.join('?' * len(df.columns)))
- insert_data = session.prepare(query_in)
- batch.add(insert_data, row.to_dict())
- session.execute(batch)
- return True
- except:
- return False
- def setup_cassandra():
- try:
- drop_keyspace(key_space)
- except:
- pass
- create_key_space_and_tables(key_space, cql_path)
- session.set_keyspace(key_space)
- files = glob.glob(os.path.join(data_dir, "*intra*"))
- for file in files:
- print(file)
- df = pd.read_csv(file).iloc[:, 1:]
- df = df.assign(date=pd.to_datetime(df['date']))
- df = df.rename(columns={'datetime_capture': 'event_time'})
- df = df.assign(event_time=df['date'])
- df = df.assign(date=df['date'].apply(lambda x: x.date()))
- df = df.assign(exchange='US')
- df.columns = map(str.lower, df.columns)
- result = insert_quote_data(df)
- if result:
- import shutil
- shutil.move(file, file.replace('data', 'archive'))
Add Comment
Please, Sign In to add comment