Advertisement
Guest User

Untitled

a guest
Jun 15th, 2018
109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.84 KB | None | 0 0
  1. from datetime import datetime, timedelta
  2. from sqlalchemy import create_engine, Column, DateTime, ForeignKey, Integer, MetaData, Numeric, String, Table
  3. import praw
  4. import time
  5. import threading, queue
  6.  
  7. REDDIT = praw.Reddit(client_id='id',
  8.                      client_secret='secret',
  9.                      password='nope',
  10.                      user_agent='testscript by /u/fakebot3',
  11.                      username='user')
  12. METADATA = MetaData()
  13. SUBMISSIONS = Table('submissions', METADATA,
  14.         Column('article_id', String, primary_key=True), #reddit_id
  15.         Column('author', String),
  16.         Column('title', String),
  17.         Column('domain', String),
  18.         Column('url', String),
  19.         Column('created', DateTime),
  20. )
  21. UPVOTE_DATA = Table('upvote_data', METADATA,
  22.     Column('id', Integer, primary_key=True),
  23.     Column('article_id', None, ForeignKey('submissions.article_id')),
  24.     Column('time', DateTime),
  25.     Column('upvotes', Integer),
  26.     Column('upvote_ratio', Numeric),
  27. )
  28.  
  29.  
  30. def data_gen(subreddit, q):
  31.     for submission in subreddit.stream.submissions():
  32.         # Filter out the older 'new' submissions so we don't create a ton of threads
  33.         # for submissions we don't care about
  34.         created = datetime.utcfromtimestamp(submission.created_utc)
  35.         if timedelta.total_seconds(datetime.utcnow() - created) < 300:
  36.             q.put(submission)
  37.             print('Starting to track ', submission.title)
  38.  
  39. def scheduler(q, conn):
  40.     # This function takes submission queue and db connection
  41.     # We create a tracker queue to share between submission trackers
  42.     tracker_q = queue.Queue()
  43.     while True:
  44.         try:
  45.             submission = q.get(False)
  46.             # Insert the initial submission
  47.             ins = SUBMISSIONS.insert().values(
  48.                 article_id=submission.id,
  49.                 author=str(submission.author),
  50.                 title=submission.title,
  51.                 domain=submission.domain,
  52.                 url=submission.url,
  53.                 created=datetime.utcfromtimestamp(submission.created_utc)
  54.             )
  55.             result = conn.execute(ins)
  56.             # Spin off the submission/tracker thread
  57.             submission_thread = threading.Thread(target=tracker, args=(submission, conn, tracker_q))
  58.             submission_thread.start()
  59.         except queue.Empty:
  60.             data = None
  61.         # Ricky bobby doesn't like to go fast
  62.         time.sleep(1)
  63.  
  64. def tracker(submission, conn, tracker_q):
  65.     # Empty records we will append to with each iteration
  66.     # Once our records are full, we instert them into the DB
  67.     records = []
  68.     counter = 0
  69.     while True:
  70.         submission = REDDIT.submission(id=submission.id)
  71.         data = dict(
  72.             article_id=submission.id,
  73.             time=datetime.now(),
  74.             upvotes=submission.ups,
  75.             upvote_ratio=submission.upvote_ratio
  76.         )
  77.         records.append(data)
  78.         # Once we reach 15 records, insert them (every half hour)
  79.         if len(records) > 14:
  80.             print('Record insterted!')
  81.             conn.execute(UPVOTE_DATA.insert(), records)
  82.             records.clear()
  83.             counter += 1
  84.             # After we insert 32 times (16 hours) break the loop
  85.             if counter == 32:
  86.                 break
  87.         time.sleep(120)
  88.  
  89. if __name__ == '__main__':
  90.     engine = create_engine('postgresql+psycopg2://postgres:password@localhost:5432/threadingtests', echo=True)
  91.     METADATA.create_all(engine)    
  92.     conn = engine.connect()
  93.  
  94.     # The submission queue
  95.     sub_q = queue.Queue()
  96.     subreddit = REDDIT.subreddit('politics')
  97.  
  98.     # Our two main threads
  99.     thread1 = threading.Thread(target=data_gen, args=(subreddit, sub_q))
  100.     thread2 = threading.Thread(target=scheduler, args=(sub_q, conn))
  101.  
  102.     print('starting')
  103.     thread1.start()
  104.     thread2.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement