Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class FacebookRequest:
- """
- Represents an API request
- """
- def __init__(
- self,
- node_id,
- method,
- endpoint,
- api=None,
- param_checker=TypeChecker({}, {}),
- target_class=None,
- api_type=None,
- allow_file_upload=False,
- response_parser=None,
- include_summary=True,
- api_version=None,
- ):
- """
- Args:
- node_id: The node id to perform the api call.
- method: The HTTP method of the call.
- endpoint: The edge of the api call.
- api (optional): The FacebookAdsApi object.
- param_checker (optional): Parameter checker.
- target_class (optional): The return class of the api call.
- api_type (optional): NODE or EDGE type of the call.
- allow_file_upload (optional): Whether the call allows upload.
- response_parser (optional): An ObjectParser to parse response.
- include_summary (optional): Include "summary".
- api_version (optional): API version.
- """
- self._api = api or FacebookAdsApi.get_default_api()
- self._node_id = node_id
- self._method = method
- self._endpoint = endpoint.replace('/', '')
- self._path = (node_id, endpoint.replace('/', ''))
- self._param_checker = param_checker
- self._target_class = target_class
- self._api_type = api_type
- self._allow_file_upload = allow_file_upload
- self._response_parser = response_parser
- self._include_summary = include_summary
- self._api_version = api_version
- self._params = {}
- self._fields = []
- self._file_params = {}
- self._file_counter = 0
- self._accepted_fields = []
- if target_class is not None:
- self._accepted_fields = target_class.Field.__dict__.values()
- def add_file(self, file_path):
- if not self._allow_file_upload:
- api_utils.warning('Endpoint ' + self._endpoint + ' cannot upload files')
- file_key = 'source' + str(self._file_counter)
- if os.path.isfile(file_path):
- self._file_params[file_key] = file_path
- self._file_counter += 1
- else:
- raise FacebookBadParameterError(
- 'Cannot find file ' + file_path + '!',
- )
- return self
- def add_files(self, files):
- if files is None:
- return self
- for file_path in files:
- self.add_file(file_path)
- return self
- def add_field(self, field):
- if field not in self._fields:
- self._fields.append(field)
- if field not in self._accepted_fields:
- api_utils.warning(self._endpoint + ' does not allow field ' + field)
- return self
- def add_fields(self, fields):
- if fields is None:
- return self
- for field in fields:
- self.add_field(field)
- return self
- def add_param(self, key, value):
- if not self._param_checker.is_valid_pair(key, value):
- api_utils.warning('value of ' + key + ' might not be compatible. ' +
- ' Expect ' + self._param_checker.get_type(key) + '; ' +
- ' got ' + str(type(value)))
- if self._param_checker.is_file_param(key):
- self._file_params[key] = value
- else:
- self._params[key] = self._extract_value(value)
- return self
- def add_params(self, params):
- if params is None:
- return self
- for key in params.keys():
- self.add_param(key, params[key])
- return self
- def get_fields(self):
- return list(self._fields)
- def get_params(self):
- return copy.deepcopy(self._params)
- def execute(self):
- params = copy.deepcopy(self._params)
- if self._api_type == "EDGE" and self._method == "GET":
- cursor = Cursor(
- target_objects_class=self._target_class,
- params=params,
- fields=self._fields,
- include_summary=self._include_summary,
- api=self._api,
- node_id=self._node_id,
- endpoint=self._endpoint,
- )
- cursor.load_next_page()
- return cursor
- if self._fields:
- params['fields'] = ','.join(self._fields)
- with open_files(self._file_params) as files:
- response = self._api.call(
- method=self._method,
- path=(self._path),
- params=params,
- files=files,
- api_version=self._api_version,
- )
- if response.error():
- raise response.error()
- if self._response_parser:
- return self._response_parser.parse_single(response.json())
- else:
- return response
- def add_to_batch(self, batch, success=None, failure=None):
- batch.add_request(self, success, failure)
- def _extract_value(self, value):
- if hasattr(value, 'export_all_data'):
- return value.export_all_data()
- elif isinstance(value, list):
- return [self._extract_value(item) for item in value]
- elif isinstance(value, dict):
- return dict((self._extract_value(k), self._extract_value(v))
- for (k, v) in value.items())
- else:
- return value
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement