Advertisement
Guest User

Untitled

a guest
Oct 2nd, 2016
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.80 KB | None | 0 0
  1. ERROR:
  2. res = executor.map(run_single(), sorted(states))
  3. TypeError: run_single() missing 1 required positional argument: 'state'
  4.  
  5. import datetime
  6. import os
  7. import time
  8. import xml.etree.ElementTree as ET
  9. import psycopg2
  10. import psycopg2.extras
  11. import requests
  12. from concurrent import futures
  13.  
  14.  
  15. class Processor(object):
  16. database = 'db'
  17. user = 'user'
  18. password = 'password'
  19.  
  20. def __init__(self, state):
  21. slef.base_url = 'api.com'
  22. self.state = state
  23. self.status = None
  24. self.fetch_size = 1000000
  25. self.job_id = ''
  26.  
  27. self.send_requests(self.state)
  28.  
  29. def send_requests(self, state):
  30. payload = dict(params)
  31.  
  32. # connection to postgres db table , fetch data.
  33. conn = psycopg2.connect(
  34. "dbname='%s' user='%s' host='pamsede06' password='%s'" % (database, user, password))
  35. cursor = conn.cursor('%s' % state, cursor_factory=psycopg2.extras.DictCursor)
  36. sql = ("select * from table where state='%s' limit 1" % state)
  37. cursor.execute(sql)
  38.  
  39. try:
  40. # function to build/send requests fetching data by chunks of fetch_size limited.
  41. while True:
  42. fetchs = cursor.fetchmany(self.fetch_size)
  43. if len(fetchs) != 0:
  44. chunk = ''
  45. for fetch in fetchs:
  46. try:
  47. row = fetch[0] + '|' + fetch[1] + '|' + fetch[2] + 'n'
  48. chunk += row
  49. except:
  50. print('>ERROR ->', fetch[0])
  51. pass
  52. header = 'recId|searchtext|countryn'
  53. row = requests.post(self.base_url, params=payload, data=header + chunk)
  54. response = row.text
  55. print('-> %s: response job_xml: %s' % (state, response))
  56. root = ET.fromstring(response)
  57. self.job_id = root.find('Response/MetaInfo/RequestId').text
  58. print('-> %s: response job_id: %s' % (state, self.job_id))
  59. self.check_jobs(state)
  60. else:
  61. break
  62. except Exception as e:
  63. print(e)
  64. pass
  65.  
  66. # Function checking the status of the job_id if completed download() the results if not wait and retry.
  67. def check_jobs(self, state):
  68. print('->>> %s: Checking job %s <<<-' % (state, self.job_id))
  69. status = self.get_status(self.job_id)
  70. if status == 'completed':
  71. print('-> %s: status: %s, job_id: %s ' % (state, status, self.job_id))
  72. self.download_results(self.job_id)
  73. else:
  74. time.sleep(4) # 480 large million requests
  75. self.check_jobs(state)
  76.  
  77. # Function to retur status of job_id
  78. def get_status(self, job_id):
  79. url_status = 'url that get status of job_id'
  80. req_status = requests.get(url_status)
  81. root = ET.fromstring(req_status.text)
  82. status = root.find('Response/Status').text
  83. return status
  84.  
  85. # Function download the results
  86. def download_results(self, job_id):
  87. url_download = 'url to download job_id'
  88. print('-> %s: downloading jod_id: %s @ URL [%s]' % (self.state, job_id, url_download))
  89. r = requests.get(url_download, stream=True)
  90.  
  91. # create folder for state if not exists
  92. download = os.path.join(self.responses_folder, self.state)
  93. if not os.path.exists(download):
  94. os.makedirs(download)
  95.  
  96. # Save result to folder
  97. save_as = os.path.join(download, str(job_id + '.zip'))
  98. with open(save_as, 'wb') as f:
  99. for chunk in r.iter_content(chunk_size=1024):
  100. if chunk:
  101. f.write(chunk)
  102.  
  103. print('-> %s: downloaded job_id: %s @ folder [ %s ] ' % (self.state, job_id, save_as))
  104. self.delete_results(job_id)
  105.  
  106.  
  107. if __name__ == "__main__":
  108. states = ['AK', 'AL', 'AR']
  109. workers = 20
  110.  
  111. def run_single(state):
  112. Geocoder(state)
  113.  
  114. for state in states:
  115. with futures.ThreadPoolExecutor(workers) as executor:
  116. res = executor.map(run_single(), sorted(states))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement