Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import absolute_import, print_function, unicode_literals
- from collections import Counter
- from streamparse.bolt import Bolt
- import psycopg2
- from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
- # Connect to the database
- #conn = psycopg2.connect(database="postgres", user="postgres", password="pass", host="localhost", port="1234")
- #Create the Database
- try:
- # CREATE DATABASE can't run inside a transaction
- conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- cur = conn.cursor()
- cur.execute("CREATE DATABASE tcount")
- cur.close()
- conn.close()
- except:
- print "Could not create tcount"
- #Connecting to tcount
- conn = psycopg2.connect(database="tcount", user="postgres", password="pass", host="localhost", port="5432")
- #Create a Table
- #The first step is to create a cursor.
- cur = conn.cursor()
- cur.execute('''CREATE TABLE tweetwordcount
- (word TEXT PRIMARY KEY NOT NULL,
- count INT NOT NULL);''')
- conn.commit()
- class WordCounter(Bolt):
- def initialize(self, conf, ctx):
- self.counts = Counter()
- def process(self, tup):
- word = tup.values[0]
- #More psycopg2 code goes here to complete a map-reduce operation and update the table. Last, we
- # Increment the local count
- self.counts[word] += 1
- self.emit([word, self.counts[word]])
- # Log the count - just to see the topology running
- self.log('%s: %d' % (word, self.counts[word]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement