Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import datetime
- import psycopg2
- import psycopg2.extras
- import sys
- print datetime.datetime.now()
- moments01_master = psycopg2.connect(
- "host=localhost port=5432 dbname=youversion user=dev"
- )
- moments01_slave = psycopg2.connect(
- "host=localhost port=5432 dbname=youversion user=dev"
- )
- moments02_master = psycopg2.connect(
- "host=localhost port=5432 dbname=youversion user=dev"
- )
- moments02_slave = psycopg2.connect(
- "host=localhost port=5432 dbname=youversion user=dev"
- )
- moments01_master.autocommit = True
- moments01_slave.autocommit = True
- moments02_master.autocommit = True
- moments02_slave.autocommit = True
- psycopg2.extras.register_hstore(moments01_master)
- psycopg2.extras.register_hstore(moments01_slave)
- psycopg2.extras.register_hstore(moments02_master)
- psycopg2.extras.register_hstore(moments02_slave)
- cursor01_master = moments01_master.cursor()
- cursor01_slave = moments01_slave.cursor()
- cursor02_master = moments02_master.cursor()
- cursor02_slave = moments02_slave.cursor()
- parser = argparse.ArgumentParser(
- prog='add_hstore_data',
- description='Migrate the json data to hstore in moments',
- epilog=__doc__,
- formatter_class=argparse.RawTextHelpFormatter
- )
- parser.add_argument('--start',
- dest='start',
- help='Shard to start with',
- required=True,
- type=int)
- parser.add_argument('--end',
- dest='end',
- help='Shard to end with',
- required=True,
- type=int)
- parser.add_argument('--batch',
- dest='batch',
- help='How many records to pull at once',
- required=False,
- type=int)
- args = parser.parse_args()
- _batch_size = args.batch if args.batch else 1000
- if args.start > args.end:
- print "Start can't be greater than the end, duh"
- raise RuntimeError("Start can't be greater than the end, duh")
- read_sql = 'select id, extras from yv_{0}.moments where extra is null and extras is not null'
- for shard in range(args.start, args.end + 1):
- print "Migrating shard {0}...".format(shard)
- count = 0
- if shard < 512:
- slave = cursor01_slave
- master = cursor01_master
- else:
- slave = cursor02_slave
- master = cursor02_master
- slave.execute(read_sql.format(shard))
- rows = slave.fetchmany(_batch_size)
- while rows:
- for row in rows:
- id = row[0]
- old_extras = row[1]
- hstore_extras = {}
- for key, val in old_extras.iteritems():
- if val not in (None, '', []):
- if key == 'labels':
- val = ','.join(val)
- if not isinstance(val, basestring):
- val = str(val)
- hstore_extras[str(key)] = val
- # print 'Updating row with id: {0}'.format(id)
- update_sql = 'update yv_{0}.moments set extra = %s where id = %s'.format(shard)
- # print master.mogrify(update_sql, {shard=shard, extra=hstore_extras, id=id})
- master.execute(update_sql, (hstore_extras, id))
- count += 1
- rows = slave.fetchmany(_batch_size)
- print "{0} records processed for shard {1}".format(count, shard)
- print datetime.datetime.now()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement