Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ERROR:
- res = executor.map(run_single(), sorted(states))
- TypeError: run_single() missing 1 required positional argument: 'state'
- import datetime
- import os
- import time
- import xml.etree.ElementTree as ET
- import psycopg2
- import psycopg2.extras
- import requests
- from concurrent import futures
- class Processor(object):
- database = 'db'
- user = 'user'
- password = 'password'
- def __init__(self, state):
- slef.base_url = 'api.com'
- self.state = state
- self.status = None
- self.fetch_size = 1000000
- self.job_id = ''
- self.send_requests(self.state)
- def send_requests(self, state):
- payload = dict(params)
- # connection to postgres db table , fetch data.
- conn = psycopg2.connect(
- "dbname='%s' user='%s' host='pamsede06' password='%s'" % (database, user, password))
- cursor = conn.cursor('%s' % state, cursor_factory=psycopg2.extras.DictCursor)
- sql = ("select * from table where state='%s' limit 1" % state)
- cursor.execute(sql)
- try:
- # function to build/send requests fetching data by chunks of fetch_size limited.
- while True:
- fetchs = cursor.fetchmany(self.fetch_size)
- if len(fetchs) != 0:
- chunk = ''
- for fetch in fetchs:
- try:
- row = fetch[0] + '|' + fetch[1] + '|' + fetch[2] + 'n'
- chunk += row
- except:
- print('>ERROR ->', fetch[0])
- pass
- header = 'recId|searchtext|countryn'
- row = requests.post(self.base_url, params=payload, data=header + chunk)
- response = row.text
- print('-> %s: response job_xml: %s' % (state, response))
- root = ET.fromstring(response)
- self.job_id = root.find('Response/MetaInfo/RequestId').text
- print('-> %s: response job_id: %s' % (state, self.job_id))
- self.check_jobs(state)
- else:
- break
- except Exception as e:
- print(e)
- pass
- # Function checking the status of the job_id if completed download() the results if not wait and retry.
- def check_jobs(self, state):
- print('->>> %s: Checking job %s <<<-' % (state, self.job_id))
- status = self.get_status(self.job_id)
- if status == 'completed':
- print('-> %s: status: %s, job_id: %s ' % (state, status, self.job_id))
- self.download_results(self.job_id)
- else:
- time.sleep(4) # 480 large million requests
- self.check_jobs(state)
- # Function to retur status of job_id
- def get_status(self, job_id):
- url_status = 'url that get status of job_id'
- req_status = requests.get(url_status)
- root = ET.fromstring(req_status.text)
- status = root.find('Response/Status').text
- return status
- # Function download the results
- def download_results(self, job_id):
- url_download = 'url to download job_id'
- print('-> %s: downloading jod_id: %s @ URL [%s]' % (self.state, job_id, url_download))
- r = requests.get(url_download, stream=True)
- # create folder for state if not exists
- download = os.path.join(self.responses_folder, self.state)
- if not os.path.exists(download):
- os.makedirs(download)
- # Save result to folder
- save_as = os.path.join(download, str(job_id + '.zip'))
- with open(save_as, 'wb') as f:
- for chunk in r.iter_content(chunk_size=1024):
- if chunk:
- f.write(chunk)
- print('-> %s: downloaded job_id: %s @ folder [ %s ] ' % (self.state, job_id, save_as))
- self.delete_results(job_id)
- if __name__ == "__main__":
- states = ['AK', 'AL', 'AR']
- workers = 20
- def run_single(state):
- Geocoder(state)
- for state in states:
- with futures.ThreadPoolExecutor(workers) as executor:
- res = executor.map(run_single(), sorted(states))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement