Guest User

Untitled

a guest
Jan 30th, 2019
169
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.31 KB | None | 0 0
  1. ### MySQL DB info ###
  2. #import MySQLdb
  3. #conn = MySQLdb.connect(host="localhost", # your host, usually localhost
  4. # user="john", # your username
  5. # passwd="megajonhy", # your password
  6. # db="jonhydb") # name of the data base
  7.  
  8.  
  9. ### PostgreSQL DB info ###
  10. import psycopg2
  11. import psycopg2.extras
  12. postgresql_table_name = ""
  13. conn = psycopg2.connect("dbname=<DB_NAME> user=<USER_NAME>" +
  14. "password=<USER_PASSWORD> host=<POSTGRESQL_HOST>")
  15.  
  16. ### InfluxDB info ####
  17. from influxdb import InfluxDBClient
  18. influx_db_name = ""
  19. influxClient = InfluxDBClient("<INFLUX_HOST>", "<INFLUX_PORT>")
  20. influxClient.delete_database(influx_db_name)
  21. influxClient.create_database(influx_db_name)
  22.  
  23. # dictates how columns will be mapped to key/fields in InfluxDB
  24. schema = {
  25. "time_column": "", # the column that will be used as the time stamp in influx
  26. "columns_to_fields" : ["",...], # columns that will map to fields
  27. "columns_to_tags" : ["",...], # columns that will map to tags
  28. "table_name_to_measurement" : "", # table name that will be mapped to measurement
  29. }
  30.  
  31. '''
  32. Generates an collection of influxdb points from the given SQL records
  33. '''
  34. def generate_influx_points(records):
  35. influx_points = []
  36. for record in records:
  37. tags = {}, fields = {}
  38. for tag_label in schema['columns_to_tags']:
  39. tags[tag_label] = record[tag_label]
  40. for field_label in schema['columns_to_fields']:
  41. fields[field_label] = record[field_label]
  42. influx_points.append({
  43. "measurement": schema['table_name_to_measurement'],
  44. "tags": tags,
  45. "time": record[schema['time_column']],
  46. "fields": fields
  47. })
  48. return influx_points
  49.  
  50.  
  51.  
  52. # query relational DB for all records
  53. curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
  54. # curr = conn.cursor(dictionary=True)
  55. curr.execute("SELECT * FROM " + schema['table_name_to_measurement'] + "ORDER BY " + schema['column_to_time'] + " DESC;")
  56. row_count = 0
  57. # process 1000 records at a time
  58. while True:
  59. print("Processing row #" + (row_count + 1))
  60. selected_rows = curr.fetchmany(1000)
  61. influxClient.write_points(generate_influx_points(selected_rows))
  62. row_count += 1000
  63. if len(selected_rows) < 1000:
  64. break
  65. conn.close()
Add Comment
Please, Sign In to add comment