Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ### MySQL DB info ###
- #import MySQLdb
- #conn = MySQLdb.connect(host="localhost", # your host, usually localhost
- # user="john", # your username
- # passwd="megajonhy", # your password
- # db="jonhydb") # name of the data base
- ### PostgreSQL DB info ###
- import psycopg2
- import psycopg2.extras
- postgresql_table_name = ""
- conn = psycopg2.connect("dbname=<DB_NAME> user=<USER_NAME>" +
- "password=<USER_PASSWORD> host=<POSTGRESQL_HOST>")
- ### InfluxDB info ####
- from influxdb import InfluxDBClient
- influx_db_name = ""
- influxClient = InfluxDBClient("<INFLUX_HOST>", "<INFLUX_PORT>")
- influxClient.delete_database(influx_db_name)
- influxClient.create_database(influx_db_name)
- # dictates how columns will be mapped to key/fields in InfluxDB
- schema = {
- "time_column": "", # the column that will be used as the time stamp in influx
- "columns_to_fields" : ["",...], # columns that will map to fields
- "columns_to_tags" : ["",...], # columns that will map to tags
- "table_name_to_measurement" : "", # table name that will be mapped to measurement
- }
- '''
- Generates an collection of influxdb points from the given SQL records
- '''
- def generate_influx_points(records):
- influx_points = []
- for record in records:
- tags = {}, fields = {}
- for tag_label in schema['columns_to_tags']:
- tags[tag_label] = record[tag_label]
- for field_label in schema['columns_to_fields']:
- fields[field_label] = record[field_label]
- influx_points.append({
- "measurement": schema['table_name_to_measurement'],
- "tags": tags,
- "time": record[schema['time_column']],
- "fields": fields
- })
- return influx_points
- # query relational DB for all records
- curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
- # curr = conn.cursor(dictionary=True)
- curr.execute("SELECT * FROM " + schema['table_name_to_measurement'] + "ORDER BY " + schema['column_to_time'] + " DESC;")
- row_count = 0
- # process 1000 records at a time
- while True:
- print("Processing row #" + (row_count + 1))
- selected_rows = curr.fetchmany(1000)
- influxClient.write_points(generate_influx_points(selected_rows))
- row_count += 1000
- if len(selected_rows) < 1000:
- break
- conn.close()
Add Comment
Please, Sign In to add comment