Advertisement
Guest User

Untitled

a guest
Aug 31st, 2015
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.28 KB | None | 0 0
  1. import argparse
  2. import datetime
  3. import psycopg2
  4. import psycopg2.extras
  5. import sys
  6.  
  7. print datetime.datetime.now()
  8.  
  9. moments01_master = psycopg2.connect(
  10. "host=localhost port=5432 dbname=youversion user=dev"
  11. )
  12. moments01_slave = psycopg2.connect(
  13. "host=localhost port=5432 dbname=youversion user=dev"
  14. )
  15. moments02_master = psycopg2.connect(
  16. "host=localhost port=5432 dbname=youversion user=dev"
  17. )
  18. moments02_slave = psycopg2.connect(
  19. "host=localhost port=5432 dbname=youversion user=dev"
  20. )
  21. moments01_master.autocommit = True
  22. moments01_slave.autocommit = True
  23. moments02_master.autocommit = True
  24. moments02_slave.autocommit = True
  25.  
  26. psycopg2.extras.register_hstore(moments01_master)
  27. psycopg2.extras.register_hstore(moments01_slave)
  28. psycopg2.extras.register_hstore(moments02_master)
  29. psycopg2.extras.register_hstore(moments02_slave)
  30.  
  31. cursor01_master = moments01_master.cursor()
  32. cursor01_slave = moments01_slave.cursor()
  33. cursor02_master = moments02_master.cursor()
  34. cursor02_slave = moments02_slave.cursor()
  35.  
  36. parser = argparse.ArgumentParser(
  37. prog='add_hstore_data',
  38. description='Migrate the json data to hstore in moments',
  39. epilog=__doc__,
  40. formatter_class=argparse.RawTextHelpFormatter
  41. )
  42.  
  43. parser.add_argument('--start',
  44. dest='start',
  45. help='Shard to start with',
  46. required=True,
  47. type=int)
  48.  
  49. parser.add_argument('--end',
  50. dest='end',
  51. help='Shard to end with',
  52. required=True,
  53. type=int)
  54.  
  55. parser.add_argument('--batch',
  56. dest='batch',
  57. help='How many records to pull at once',
  58. required=False,
  59. type=int)
  60.  
  61. args = parser.parse_args()
  62.  
  63. _batch_size = args.batch if args.batch else 1000
  64.  
  65. if args.start > args.end:
  66. print "Start can't be greater than the end, duh"
  67. raise RuntimeError("Start can't be greater than the end, duh")
  68.  
  69. read_sql = 'select id, extras from yv_{0}.moments where extra is null and extras is not null'
  70.  
  71. for shard in range(args.start, args.end + 1):
  72. print "Migrating shard {0}...".format(shard)
  73. count = 0
  74.  
  75. if shard < 512:
  76. slave = cursor01_slave
  77. master = cursor01_master
  78. else:
  79. slave = cursor02_slave
  80. master = cursor02_master
  81.  
  82. slave.execute(read_sql.format(shard))
  83. rows = slave.fetchmany(_batch_size)
  84.  
  85. while rows:
  86. for row in rows:
  87. id = row[0]
  88. old_extras = row[1]
  89. hstore_extras = {}
  90.  
  91. for key, val in old_extras.iteritems():
  92. if val not in (None, '', []):
  93. if key == 'labels':
  94. val = ','.join(val)
  95.  
  96. if not isinstance(val, basestring):
  97. val = str(val)
  98.  
  99. hstore_extras[str(key)] = val
  100.  
  101. # print 'Updating row with id: {0}'.format(id)
  102.  
  103. update_sql = 'update yv_{0}.moments set extra = %s where id = %s'.format(shard)
  104.  
  105. # print master.mogrify(update_sql, {shard=shard, extra=hstore_extras, id=id})
  106. master.execute(update_sql, (hstore_extras, id))
  107. count += 1
  108.  
  109. rows = slave.fetchmany(_batch_size)
  110.  
  111. print "{0} records processed for shard {1}".format(count, shard)
  112.  
  113. print datetime.datetime.now()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement