Advertisement
Guest User

Untitled

a guest
Apr 9th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.42 KB | None | 0 0
  1. from __future__ import absolute_import, print_function, unicode_literals
  2.  
  3. from collections import Counter
  4. from streamparse.bolt import Bolt
  5.  
  6. import psycopg2
  7. from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
  8.  
  9. # Connect to the database
  10. #conn = psycopg2.connect(database="postgres", user="postgres", password="pass", host="localhost", port="1234")
  11.  
  12. #Create the Database
  13.  
  14. try:
  15. # CREATE DATABASE can't run inside a transaction
  16. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
  17. cur = conn.cursor()
  18. cur.execute("CREATE DATABASE tcount")
  19. cur.close()
  20. conn.close()
  21. except:
  22. print "Could not create tcount"
  23.  
  24. #Connecting to tcount
  25.  
  26. conn = psycopg2.connect(database="tcount", user="postgres", password="pass", host="localhost", port="5432")
  27.  
  28. #Create a Table
  29. #The first step is to create a cursor.
  30.  
  31. cur = conn.cursor()
  32. cur.execute('''CREATE TABLE tweetwordcount
  33. (word TEXT PRIMARY KEY NOT NULL,
  34. count INT NOT NULL);''')
  35. conn.commit()
  36.  
  37. class WordCounter(Bolt):
  38.  
  39. def initialize(self, conf, ctx):
  40. self.counts = Counter()
  41.  
  42. def process(self, tup):
  43. word = tup.values[0]
  44.  
  45. #More psycopg2 code goes here to complete a map-reduce operation and update the table. Last, we
  46.  
  47. # Increment the local count
  48. self.counts[word] += 1
  49. self.emit([word, self.counts[word]])
  50.  
  51. # Log the count - just to see the topology running
  52. self.log('%s: %d' % (word, self.counts[word]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement