Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import hmac
- import config
- import base64
- import urllib.parse
- import urllib
- import collections
- import requests
- import calendar
- import pprint
- import hashlib
- import time
- import string
- import random
- streamUrl = "https://stream.twitter.com/1.1/statuses/filter.json?"
- class GenertateAuthHeader():
- def generateOauthNonce(self):
- lst = [random.choice(string.ascii_letters + string.digits) for n in range(32)]
- str = "".join(lst)
- return str
- def createSignatureString(self, baseString, signingKey):
- outputStream = hmac.new(signingKey, baseString.encode(encoding='ascii'), hashlib.sha1)
- return str(base64.b64encode(outputStream.digest()), encoding='ascii').rstrip('\n')
- def createSignatureBaseString(self, urlEncodedUrl="", urlOauthEncodedHeader=""):
- return urlEncodedUrl + urlOauthEncodedHeader
- ##This creates a URL string that is encoded to I can attach
- ##The Authorization ehader and encode to make the Signature
- def createUrlString(self, httpMethod="POST", params=""):
- baseString = ""
- methodCapitalized = httpMethod.upper()
- baseString += methodCapitalized
- baseString += "&"
- baseString += urllib.parse.quote(streamUrl)
- baseString += "&"
- baseString += urllib.parse.quote(params)
- return urllib.parse.quote(baseString)
- ##This is becuase the keys need to be in order when creating the signature
- def createUnorderedDictToOrdered(self, inDict):
- orderedDict = collections.OrderedDict(sorted(inDict.items()))
- return orderedDict
- def createSigningKey(self):
- return str(urllib.parse.quote(config.twitterCSecret) + "&" + urllib.parse.quote(config.twitterAToken))
- ##agian this dictionary is for the signature
- def createAuthDict(self):
- oauthNonce = self.generateOauthNonce()
- timeStamp = str(calendar.timegm(time.gmtime()))
- authString = {
- 'oauth_consumer_key': urllib.parse.quote(config.twitterCKey),
- 'oauth_nonce': oauthNonce,
- 'oauth_signature_method': 'HMAC-SHA1',
- 'oauth_timestamp': timeStamp,
- 'oauth_token': config.twitterAToken,
- 'oauth_version': '1.0'
- }
- return self.createUnorderedDictToOrdered(authString)
- def generateSignatureString(self):
- aDict = self.createAuthDict()
- signingKey = self.createSigningKey()
- encodedUrl = self.createUrlString()
- orderedKeys = self.createUnorderedDictToOrdered(aDict)
- encodedSigningKey = ""
- for x in orderedKeys.items():
- encodedSigningKey += urllib.parse.quote_plus(x[0]) + "=" + urllib.parse.quote(x[1]) + ","
- baseString = encodedUrl + urllib.parse.quote(encodedSigningKey)
- return self.createSignatureString(baseString, bytes(signingKey, encoding='ascii'))
- ##This method only needs to be run to get the full authorization header WITH Signature
- def generateAuthorizationHeader(self):
- header = self.createAuthDict()
- headerString = "OAuth "
- count = 0
- for x, v in header.items():
- headerString += x + "=\"" + v + "\", "
- if count == 1:
- headerString += "oauth_signature=\"" + self.generateSignatureString() + "\", "
- count += 1
- return headerString[:-2]
- if __name__ == '__main__':
- authHeader = GenertateAuthHeader()
- authorizationHeader = authHeader.generateAuthorizationHeader()
- header = {
- 'Accept': '*/*',
- 'Connection': 'Keep-Alive',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'Authorization': authorizationHeader,
- 'Host': 'stream.twitter.com'
- }
- data = {
- 'language': config.twitterSearchLanguage,
- 'track': config.twitterSearchTerms
- }
- print(header)
- r = requests.post(streamUrl, headers=header, params=data)
- print(r.url)
- print(r)
- print(r.text)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement