Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # pipedream add-package google-cloud-storage
- from google.cloud import storage
- import json
- from google.oauth2 import service_account
- import os
- import time
- def get_poll_blob(blob_name):
- json_acct_info = json.loads(os.environ['creds'])
- credentials = service_account.Credentials.from_service_account_info(
- json_acct_info)
- storage_client = storage.Client(credentials=credentials)
- bucket_name = 'poll_updates'
- bucket = storage_client.bucket(bucket_name)
- blob = bucket.blob(blob_name)
- return blob
- def get_name(user):
- if 'username' in user:
- return user['username']
- return user.get('first_name', '') + ' ' + user.get('last_name', '')
- def get_poll_answer(event):
- print(event)
- pa = event['poll_answer']
- print(pa['option_ids'])
- vote = {
- 'id': pa['user']['id'],
- 'user': get_name(pa['user']),
- 'vote': '-1' if not pa['option_ids'] else ",".join(map(lambda v: str(v), pa['option_ids'])),
- 'update_id': event['update_id'],
- }
- print('vote=', vote)
- return pa['poll_id'], vote
- def append_new_vote(blob, vote):
- contents = blob.download_as_string().decode('utf-8') if blob.exists() else '{}'
- cd = json.loads(contents)
- print(cd)
- old_vote = cd.get(str(vote['id']))
- print('old_vote=', vote['id'], old_vote)
- if not old_vote or old_vote['update_id'] <= vote['update_id']:
- cd[vote['id']] = vote
- blob.upload_from_string(json.dumps(cd))
- return cd
- def handlerExcept(pd: "pipedream"):
- poll, user_vote = get_poll_answer(pd.steps['trigger']['event'])
- blob = get_poll_blob(poll)
- contents = append_new_vote(blob, user_vote)
- return {"contents": contents}
- def handler(pd: "pipedream"):
- for i in range(10):
- try:
- handlerExcept(pd)
- return
- except Exception as e:
- print(e)
- time.sleep(15)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement