Guest User

Untitled

a guest
May 26th, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.67 KB | None | 0 0
  1. import os
  2. from datetime import datetime
  3.  
  4. # create data folder to download stock data to
  5. cur_dir = os.getcwd()
  6.  
  7. data_dir = os.path.join(cur_dir, 'data')
  8. archive_dir = os.path.join(cur_dir, 'archive')
  9.  
  10. if not os.path.exists(data_dir):
  11. os.makedirs('data')
  12.  
  13. if not os.path.exists(archive_dir):
  14. os.makedirs('archive')
  15.  
  16. import pandas as pd
  17.  
  18. pd.set_option('display.width', 600)
  19. pd.set_option('display.max_columns', 500)
  20. import glob
  21. import os
  22. import numpy as np
  23.  
  24. from cassandra import ConsistencyLevel
  25. from cassandra.cluster import Cluster
  26. from cassandra.query import BatchStatement
  27.  
  28. cluster = Cluster(port=9042)
  29. session = cluster.connect()
  30. key_space = 'ticker_plant'
  31. cql_path = r'examples/cassandra_schema.cql'
  32.  
  33. def create_key_space_and_tables(key_space, cql_path):
  34. q = "CREATE KEYSPACE {key_space} WITH replication = ".format(key_space=key_space)
  35. q = q + " {'class':'SimpleStrategy', 'replication_factor':1};"
  36. cluster.connect().execute(q)
  37.  
  38. with open(cql_path) as cql_file:
  39. for stmt in cql_file.read().split(";"):
  40. if len(stmt.strip()) > 0:
  41. print(stmt)
  42. create_schema(key_space, stmt)
  43.  
  44. def create_schema(key_space, sql_statement):
  45. q = str(sql_statement)
  46. sess = cluster.connect()
  47. sess.set_keyspace(key_space)
  48. sess.execute(q)
  49.  
  50. def drop_keyspace(key_space):
  51. session.execute("DROP KEYSPACE " + key_space)
  52.  
  53. def insert_quote_data(df):
  54. try:
  55. tablename = 'datapoint'
  56. for dd in np.array_split(df, len(df) / 100):
  57. batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
  58. for index, row in dd.iterrows():
  59. query = 'INSERT INTO ' + tablename + '({0}) values ({1})'
  60. query_in = query.format(','.join(df.columns), ','.join('?' * len(df.columns)))
  61. insert_data = session.prepare(query_in)
  62. batch.add(insert_data, row.to_dict())
  63. session.execute(batch)
  64. return True
  65. except:
  66. return False
  67.  
  68. def setup_cassandra():
  69. try:
  70. drop_keyspace(key_space)
  71. except:
  72. pass
  73.  
  74. create_key_space_and_tables(key_space, cql_path)
  75.  
  76. session.set_keyspace(key_space)
  77.  
  78. files = glob.glob(os.path.join(data_dir, "*intra*"))
  79.  
  80. for file in files:
  81. print(file)
  82. df = pd.read_csv(file).iloc[:, 1:]
  83. df = df.assign(date=pd.to_datetime(df['date']))
  84. df = df.rename(columns={'datetime_capture': 'event_time'})
  85. df = df.assign(event_time=df['date'])
  86. df = df.assign(date=df['date'].apply(lambda x: x.date()))
  87. df = df.assign(exchange='US')
  88. df.columns = map(str.lower, df.columns)
  89. result = insert_quote_data(df)
  90. if result:
  91. import shutil
  92. shutil.move(file, file.replace('data', 'archive'))
Add Comment
Please, Sign In to add comment