Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine, Column, DateTime, ForeignKey, Integer, MetaData, Numeric, String, Table
- import praw
- import time
- import threading, queue
- REDDIT = praw.Reddit(client_id='id',
- client_secret='secret',
- password='nope',
- user_agent='testscript by /u/fakebot3',
- username='user')
- METADATA = MetaData()
- SUBMISSIONS = Table('submissions', METADATA,
- Column('article_id', String, primary_key=True), #reddit_id
- Column('author', String),
- Column('title', String),
- Column('domain', String),
- Column('url', String),
- Column('created', DateTime),
- )
- UPVOTE_DATA = Table('upvote_data', METADATA,
- Column('id', Integer, primary_key=True),
- Column('article_id', None, ForeignKey('submissions.article_id')),
- Column('time', DateTime),
- Column('upvotes', Integer),
- Column('upvote_ratio', Numeric),
- )
- def data_gen(subreddit, q):
- for submission in subreddit.stream.submissions():
- # Filter out the older 'new' submissions so we don't create a ton of threads
- # for submissions we don't care about
- created = datetime.utcfromtimestamp(submission.created_utc)
- if timedelta.total_seconds(datetime.utcnow() - created) < 300:
- q.put(submission)
- print('Starting to track ', submission.title)
- def scheduler(q, conn):
- # This function takes submission queue and db connection
- # We create a tracker queue to share between submission trackers
- tracker_q = queue.Queue()
- while True:
- try:
- submission = q.get(False)
- # Insert the initial submission
- ins = SUBMISSIONS.insert().values(
- article_id=submission.id,
- author=str(submission.author),
- title=submission.title,
- domain=submission.domain,
- url=submission.url,
- created=datetime.utcfromtimestamp(submission.created_utc)
- )
- result = conn.execute(ins)
- # Spin off the submission/tracker thread
- submission_thread = threading.Thread(target=tracker, args=(submission, conn, tracker_q))
- submission_thread.start()
- except queue.Empty:
- data = None
- # Ricky bobby doesn't like to go fast
- time.sleep(1)
- def tracker(submission, conn, tracker_q):
- # Empty records we will append to with each iteration
- # Once our records are full, we instert them into the DB
- records = []
- counter = 0
- while True:
- submission = REDDIT.submission(id=submission.id)
- data = dict(
- article_id=submission.id,
- time=datetime.now(),
- upvotes=submission.ups,
- upvote_ratio=submission.upvote_ratio
- )
- records.append(data)
- # Once we reach 15 records, insert them (every half hour)
- if len(records) > 14:
- print('Record insterted!')
- conn.execute(UPVOTE_DATA.insert(), records)
- records.clear()
- counter += 1
- # After we insert 32 times (16 hours) break the loop
- if counter == 32:
- break
- time.sleep(120)
- if __name__ == '__main__':
- engine = create_engine('postgresql+psycopg2://postgres:password@localhost:5432/threadingtests', echo=True)
- METADATA.create_all(engine)
- conn = engine.connect()
- # The submission queue
- sub_q = queue.Queue()
- subreddit = REDDIT.subreddit('politics')
- # Our two main threads
- thread1 = threading.Thread(target=data_gen, args=(subreddit, sub_q))
- thread2 = threading.Thread(target=scheduler, args=(sub_q, conn))
- print('starting')
- thread1.start()
- thread2.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement