Advertisement
Guest User

Untitled

a guest
Jun 16th, 2018
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.10 KB | None | 0 0
  1. from datetime import datetime, timedelta
  2. import praw
  3. from sqlalchemy import create_engine, Column, DateTime, ForeignKey, Integer, MetaData, Numeric, String, Table
  4. import threading, queue
  5. import time
  6.  
  7. REDDIT = praw.Reddit(client_id='id',
  8.                      client_secret='secret',
  9.                      password='pass',
  10.                      user_agent='testscript by /u/fakebot3',
  11.                      username='username')
  12. METADATA = MetaData()
  13. SUBMISSIONS = Table('submissions', METADATA,
  14.         Column('article_id', String, primary_key=True), #reddit_id
  15.         Column('author', String),
  16.         Column('title', String),
  17.         Column('domain', String),
  18.         Column('url', String),
  19.         Column('created', DateTime),
  20. )
  21. UPVOTE_DATA = Table('upvote_data', METADATA,
  22.     Column('id', Integer, primary_key=True),
  23.     Column('article_id', None, ForeignKey('submissions.article_id')),
  24.     Column('time', DateTime),
  25.     Column('upvotes', Integer),
  26.     Column('upvote_ratio', Numeric),
  27. )
  28.  
  29. def data_gen(subreddit, q):
  30.     for submission in subreddit.stream.submissions():
  31.         # Filter out the older 'new' submissions so we don't create a ton of threads
  32.         # for submissions we don't care about
  33.         created = datetime.utcfromtimestamp(submission.created_utc)
  34.         if timedelta.total_seconds(datetime.utcnow() - created) < 300:
  35.             q.put((track_submission, time.time(), 120, submission, 0))
  36.             print('Starting to track', submission.title)
  37.  
  38. def track_submission(submission, write_q):
  39.     submission = REDDIT.submission(id=submission.id)
  40.     # Insert the initial submission
  41.     data = dict(
  42.         article_id=submission.id,
  43.         time=datetime.now(),
  44.         upvotes=submission.ups,
  45.         upvote_ratio=submission.upvote_ratio
  46.     )
  47.     write_q.put(data)
  48.  
  49. def scheduler(submission_q, write_q, conn):
  50.     jobs = []
  51.     while 1:
  52.         time.sleep(1)
  53.         current_time = time.time()
  54.         completed_jobs = []
  55.         terminated_jobs = []
  56.         try:
  57.             # Make sure we put the job from the queue into a list
  58.             new_jobs = [submission_q.get(False)]
  59.             for job in new_jobs:
  60.                 # Insert the initial submission
  61.                 ins = SUBMISSIONS.insert().values(
  62.                     article_id=job[3].id,
  63.                     author=str(job[3].author),
  64.                     title=job[3].title,
  65.                     domain=job[3].domain,
  66.                     url=job[3].url,
  67.                     created=datetime.utcfromtimestamp(job[3].created_utc)
  68.                 )
  69.                 conn.execute(ins)
  70.                 ins = UPVOTE_DATA.insert().values(
  71.                     article_id=job[3].id,
  72.                     time=datetime.now(),
  73.                     upvotes=job[3].ups,
  74.                     upvote_ratio=job[3].upvote_ratio
  75.                 )
  76.                 conn.execute(ins)
  77.             jobs.extend(new_jobs)
  78.         except queue.Empty:
  79.             data = None
  80.             print('No', data)
  81.  
  82.         for job in jobs:
  83.             # We check to see if a jobs timer is up
  84.             if job[1] + job[2] < current_time:
  85.                 func = job[0]
  86.                 # We pass the submission from the job into the function
  87.                 # along with the write queue
  88.                 func(job[3], write_q)
  89.                 # Add to the list of completed jobs
  90.                 if REDDIT.submission(id=job[3].id).ups < 100 & job[4] > 59:
  91.                     print('Job terminated')
  92.                     terminated_jobs.append(job)
  93.                 else:
  94.                     completed_jobs.append(job)
  95.         # Build a new list of jobs minus the completed jobs
  96.         jobs = [job for job in jobs if job not in completed_jobs and job not in terminated_jobs]
  97.         # Rebuild the completed jobs with updated time and counter
  98.         updated_jobs = [(job[0], current_time, job[2], job[3], job[4] + 1) for job in completed_jobs]
  99.         # Combine the list of old jobs with the list of updated jobs
  100.         jobs.extend(updated_jobs)
  101.  
  102. def writer(write_q, conn):
  103.     records = []
  104.     while 1:
  105.         time.sleep(1)
  106.         try:
  107.             record = write_q.get(False)
  108.             records.append(record)
  109.         except queue.Empty:
  110.             data = None
  111.         if len(records) > 19:
  112.             conn.execute(UPVOTE_DATA.insert(), records)
  113.             print(len(records), 'inserted')
  114.             records.clear()
  115.  
  116. if __name__ == '__main__':
  117.     engine = create_engine('postgresql+psycopg2://postgres:password@localhost:5432/threadingtests', echo=True)
  118.     METADATA.create_all(engine)    
  119.     conn = engine.connect()
  120.  
  121.     submission_q = queue.Queue()
  122.     write_q = queue.Queue()
  123.     subreddit = REDDIT.subreddit('politics')
  124.     thread1 = threading.Thread(target=data_gen, args=(subreddit, submission_q))
  125.     thread2 = threading.Thread(target=scheduler, args=(submission_q, write_q, conn))
  126.     thread3 = threading.Thread(target=writer, args=(write_q, conn))
  127.  
  128.     #jobs = [(say_hello, time.time(), 1, 'yo'),(say_hello, time.time(), 2, 'hey'), (say_hello, time.time(), 10, 'bye')]
  129.     thread1.start()
  130.     thread2.start()
  131.     thread3.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement