Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # coding: utf-8
- import sendgrid
- import json
- import os
- # Class for processing list for campaign in SendGrid service.
- class SendGridCampaignListProcessor:
- def __init__(self, list_id, api_key=os.environ.get('SENDGRID_API_KEY')):
- self.apikey = api_key
- self.list_id = list_id
- self.sg = None
- def init_sendgrid(self):
- """
- Init SendGrid API object.
- :return: True if API key exists.
- """
- if not self.apikey:
- return False
- self.sg = sendgrid.SendGridAPIClient(apikey=self.apikey)
- return True
- def add_recipient(self, *args, **kwargs):
- """
- Add new recipient for campaign.
- At the moment this API method limit is 3 requests in 2 seconds.
- :param args:
- :param kwargs: recipient info fields. Filed 'email' is required.
- :return: recipient id.
- """
- if 'email' not in kwargs:
- print('Error. Email field is missing.')
- return None
- if not self.sg:
- print('Error. Looks like API client not initialized.')
- return None
- data = [kwargs]
- response = self.sg.client.contactdb.recipients.post(request_body=data)
- if response.status_code != 201:
- print('Error. Request failed.')
- return None
- if response.body:
- response_result = json.loads(response.body.decode('utf-8'))
- if 'error_count' in response_result and response_result['error_count'] > 0:
- for error in response_result['errors']:
- print(error['message'])
- return None
- if 'persisted_recipients' not in response_result or not response_result['persisted_recipients']:
- print('Something wrong. Looks like recipient was not added.')
- return None
- return response_result['persisted_recipients'][0]
- return None
- def add_recipient_to_list(self, recipient_id):
- """
- Add exist recipient to list.
- At the moment this API method limit is 1000 requests per second.
- :param recipient_id: recipient's id, string.
- :return: True is requests status is success.
- """
- if not recipient_id:
- print('Error. Recipient id is missed.')
- return False
- if not self.list_id:
- print('Error. Wrong list id.')
- return False
- if not self.sg:
- print('Error. Looks like API client not initialized.')
- return False
- response = self.sg.client.contactdb.lists._(self.list_id).recipients._(recipient_id).post()
- if response.status_code != 201:
- print('Error. Request failed.')
- return False
- if response.body:
- response_result = json.loads(response.body.decode('utf-8'))
- if response_result and 'error_count' in response_result and response_result['error_count'] > 0:
- for error in response_result['errors']:
- print(error['message'])
- return False
- return True
- def delete_recipients_from_list(self, recipient_ids):
- """
- Delete recepients from list.
- :param recipient_ids: recipient's id, string or list of recipient ids.
- :return: True if request status is success.
- """
- if not recipient_ids:
- print('Error. Recipient ids are missed.')
- return False
- if not self.list_id:
- print('Error. Wrong list id.')
- return False
- if not self.sg:
- print('Error. Looks like API client not initialized.')
- return False
- data = recipient_ids if isinstance(recipient_ids, list) else [recipient_ids]
- response = self.sg.client.contactdb.recipients.delete(request_body=data)
- if response.status_code != 204:
- print('Error. Request failed.')
- return False
- if response.body:
- response_result = json.loads(response.body.decode('utf-8'))
- if response_result and 'error_count' in response_result and response_result['error_count'] > 0:
- for error in response_result['errors']:
- print(error['message'])
- return False
- return True
- if __name__ == '__main__':
- sgcl_proc = SendGridCampaignListProcessor(
- list_id=11,
- api_key=''
- )
- sgcl_proc.init_sendgrid()
- recipient = sgcl_proc.add_recipient(first_name='Vasily', last_name='Pupkin', email='lol@test.com')
- if not recipient:
- print('Error. Recipient was not added to SendGrid service.')
- exit(0)
- print('Recipient is added to SendGrid service.')
- if sgcl_proc.add_recipient_to_list(recipient):
- print('Recipient is added to list.')
- if sgcl_proc.delete_recipients_from_list(recipient):
- print('Recipient is deleted from list.')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement