Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime, timedelta
- import praw
- from sqlalchemy import create_engine, Column, DateTime, ForeignKey, Integer, MetaData, Numeric, String, Table
- import threading, queue
- import time
- REDDIT = praw.Reddit(client_id='id',
- client_secret='secret',
- password='pass',
- user_agent='testscript by /u/fakebot3',
- username='username')
- METADATA = MetaData()
- SUBMISSIONS = Table('submissions', METADATA,
- Column('article_id', String, primary_key=True), #reddit_id
- Column('author', String),
- Column('title', String),
- Column('domain', String),
- Column('url', String),
- Column('created', DateTime),
- )
- UPVOTE_DATA = Table('upvote_data', METADATA,
- Column('id', Integer, primary_key=True),
- Column('article_id', None, ForeignKey('submissions.article_id')),
- Column('time', DateTime),
- Column('upvotes', Integer),
- Column('upvote_ratio', Numeric),
- )
- def data_gen(subreddit, q):
- for submission in subreddit.stream.submissions():
- # Filter out the older 'new' submissions so we don't create a ton of threads
- # for submissions we don't care about
- created = datetime.utcfromtimestamp(submission.created_utc)
- if timedelta.total_seconds(datetime.utcnow() - created) < 300:
- q.put((track_submission, time.time(), 120, submission, 0))
- print('Starting to track', submission.title)
- def track_submission(submission, write_q):
- submission = REDDIT.submission(id=submission.id)
- # Insert the initial submission
- data = dict(
- article_id=submission.id,
- time=datetime.now(),
- upvotes=submission.ups,
- upvote_ratio=submission.upvote_ratio
- )
- write_q.put(data)
- def scheduler(submission_q, write_q, conn):
- jobs = []
- while 1:
- time.sleep(1)
- current_time = time.time()
- completed_jobs = []
- terminated_jobs = []
- try:
- # Make sure we put the job from the queue into a list
- new_jobs = [submission_q.get(False)]
- for job in new_jobs:
- # Insert the initial submission
- ins = SUBMISSIONS.insert().values(
- article_id=job[3].id,
- author=str(job[3].author),
- title=job[3].title,
- domain=job[3].domain,
- url=job[3].url,
- created=datetime.utcfromtimestamp(job[3].created_utc)
- )
- conn.execute(ins)
- ins = UPVOTE_DATA.insert().values(
- article_id=job[3].id,
- time=datetime.now(),
- upvotes=job[3].ups,
- upvote_ratio=job[3].upvote_ratio
- )
- conn.execute(ins)
- jobs.extend(new_jobs)
- except queue.Empty:
- data = None
- print('No', data)
- for job in jobs:
- # We check to see if a jobs timer is up
- if job[1] + job[2] < current_time:
- func = job[0]
- # We pass the submission from the job into the function
- # along with the write queue
- func(job[3], write_q)
- # Add to the list of completed jobs
- if REDDIT.submission(id=job[3].id).ups < 100 & job[4] > 59:
- print('Job terminated')
- terminated_jobs.append(job)
- else:
- completed_jobs.append(job)
- # Build a new list of jobs minus the completed jobs
- jobs = [job for job in jobs if job not in completed_jobs and job not in terminated_jobs]
- # Rebuild the completed jobs with updated time and counter
- updated_jobs = [(job[0], current_time, job[2], job[3], job[4] + 1) for job in completed_jobs]
- # Combine the list of old jobs with the list of updated jobs
- jobs.extend(updated_jobs)
- def writer(write_q, conn):
- records = []
- while 1:
- time.sleep(1)
- try:
- record = write_q.get(False)
- records.append(record)
- except queue.Empty:
- data = None
- if len(records) > 19:
- conn.execute(UPVOTE_DATA.insert(), records)
- print(len(records), 'inserted')
- records.clear()
- if __name__ == '__main__':
- engine = create_engine('postgresql+psycopg2://postgres:password@localhost:5432/threadingtests', echo=True)
- METADATA.create_all(engine)
- conn = engine.connect()
- submission_q = queue.Queue()
- write_q = queue.Queue()
- subreddit = REDDIT.subreddit('politics')
- thread1 = threading.Thread(target=data_gen, args=(subreddit, submission_q))
- thread2 = threading.Thread(target=scheduler, args=(submission_q, write_q, conn))
- thread3 = threading.Thread(target=writer, args=(write_q, conn))
- #jobs = [(say_hello, time.time(), 1, 'yo'),(say_hello, time.time(), 2, 'hey'), (say_hello, time.time(), 10, 'bye')]
- thread1.start()
- thread2.start()
- thread3.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement