Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Whoever made these, THANKS
- # https://machinelearningmastery.com/use-word-embedding-layers-deep-learning-keras/
- # https://github.com/haccer/tweep
- from bs4 import BeautifulSoup # web scraping 101
- from time import gmtime, strftime # timestamps
- import datetime # another time tool
- import json # web data container
- import Queue # speed web scraping
- import requests # standard web faire
- q = Queue.Queue()
- class tweep:
- def __init__(self, u, s=None, year=None):
- self.min = -1
- self.author, self.search, self.year = u, s, year
- self.feed = [-1]
- self.tweets = 0
- self.tweet_urls = []
- self.result = []
- def get_url(self):
- url_1 = "https://twitter.com/search?f=tweets&vertical=default&lang=en&q="
- url_2 = "https://twitter.com/i/search/timeline?f=tweets&vertical=default" + \
- "&lang=en&include_available_features=1&include_entities=1&reset_error_state=false&src=typd"
- url = url_1 if self.min == -1 else "{0}&max_position={1.min}&q=".format(url_2, self)
- if self.author != None: url+= "from%3A{0.author}".format(self)
- if self.search != None:
- search = self.search.replace(' ','%20').replace('#','%23')
- url+= "%20{}".format(search)
- if self.year != None: url+= "%20until%3A{0.year}-1-1".format(self)
- return url
- def get_feed(self):
- r = requests.get(self.get_url(),headers=agent)
- self.feed = []
- try:
- if self.min == -1: html = r.text
- else: json_response = json.loads(r.text); html = json_response['items_html']
- soup = BeautifulSoup(html,"lxml")
- self.feed = soup.find_all('li','js-stream-item')
- lastid = self.feed[-1]['data-item-id']
- firstid = self.feed[0]['data-item-id']
- if self.min == -1: self.min = "TWEET-{}-{}".format(lastid,firstid)
- else:
- minsplit = json_response['min_position'].split('-')
- minsplit[1] = lastid
- self.min = "-".join(minsplit)
- except: pass
- return self.feed
- def get_tweets(self): # whoever wrote this is killing me
- for tweet in self.get_feed():
- self.tweets += 1
- tweetid = tweet['data-item-id']
- datestamp = tweet.find('a','tweet-timestamp')['title'].rpartition(' - ')[-1]
- date = datetime.datetime.strptime(datestamp, '%d %b %Y').strftime('%Y-%m-%d')
- timestamp = str(datetime.timedelta(seconds=int(tweet.find('span','_timestamp')['data-time']))).rpartition(', ')[-1]
- time = datetime.datetime.strptime(timestamp,'%H:%M:%S').strftime('%H:%M:%S')
- username = tweet.find('span','username').text.encode('utf8').replace('@','')
- timezone = strftime("%Z", gmtime())
- text = tweet.find('p','tweet-text').text.encode('utf8').replace('\n',' ')
- self.result += [tweetid, date, time, timezone, username, text]
- def main(self):
- while True if (self.tweets < float('inf')) and len(self.feed)>0 else False: self.get_tweets()
- return self.result
- def dig(username: str, tweet_count: int):
- return [i for i in tweep(username).main() if i[4] == username][:tweet_count]
- def dig_stream(username: str, tweet_count: int):
- return "".join([i[5] for i in dig(username, tweet_count)])
- from keras.preprocessing.text import one_hot # text to array
- from keras.preprocessing.sequence import pad_sequences # padding
- from keras.models import Sequential # Layered Neural Network
- from keras.layers import Dense, Flatten # Standard faire
- from keras.layers.embeddings import Embedding # Vectorizing
- from typing import List # beacuse safety first
- def ShitFilter(positive: List[str], negative: List[str], tweet_count: int):
- docs = [dig_stream(i, tweet_count) for i in (positive + negative)] # data
- labels = [0] * len(positive) + [1] * len(negative) # expected output
- vocab = 10000000 # a really bad estimate of unique words
- max_len = tweet_count * 60 # 280 characters = 50~60 words
- dimensions = 300 # common word embed dimension number
- iterations = 14 # it's a lucky number, what do you expect?
- post_docs = pad_sequences([one_hot(d, vocab) for d in docs], \
- maxlen=max_len, padding='post') # word array building
- model = Sequential().add(Embedding(vocab, dimensions, input_length=max_len))
- model.add(Flatten()).add(Dense(1, activation='sigmoid')) # output
- model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])
- print(model.summary()) # summarize the model
- model.fit(post_docs, labels, epochs=iterations, verbose=0) # fit the model
- loss, accuracy = model.evaluate(post_docs, labels, verbose=0) # evaluate the model
- return model # well that is how we are ending this
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement