Advertisement
Guest User

Untitled

a guest
Feb 6th, 2017
186
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.28 KB | None | 0 0
  1. class HiveConnection(object):
  2.  
  3. def __init__(self, host, user, password):
  4. """Instantiate a HiveConnector object."""
  5. self.host = host
  6. self.user = user
  7. self.password = password
  8. self.port = 10000
  9. self.auth_mechanism = 'PLAIN'
  10. self._connection = self._connect()
  11.  
  12. def _connect(self):
  13. """Start the connection to database."""
  14. try:
  15. return connect(host=self.host, port=self.port,
  16. user=self.user, password=self.password,
  17. auth_mechanism=self.auth_mechanism)
  18. except TTransportException as error:
  19. print('Failed attempt to connect')
  20. self._connect()
  21.  
  22. def _disconnect(self):
  23. """Close connection to database."""
  24. self._connection.close()
  25.  
  26. def hadoop_connection_handler(function):
  27. """Start a database connection if not already open."""
  28. @wraps(function)
  29. def wrapper(inst, *args, **kwargs):
  30. if not inst._connection:
  31. inst._connect()
  32. return function(inst, *args, **kwargs)
  33. return wrapper
  34.  
  35. @hadoop_connection_handler
  36. def read(self, query):
  37. """Execute a query to pull the data.
  38.  
  39. Args:
  40. query: [str] Query to pull the data.
  41.  
  42. Returns:
  43. A list of namedtuple (`Row`).
  44. """
  45. columns = self._columns(query)
  46. cursor = self._connection.cursor()
  47. cursor.execute(query)
  48. Record = namedtuple("Record", columns)
  49. data = map(Record._make, cursor.fetchall())
  50.  
  51. cursor.close()
  52. return data
  53.  
  54. from unittest.mock import patch, MagicMock
  55. from nose.tools import assert_equal, raises
  56.  
  57. from services.db_connections import HiveConnection
  58.  
  59.  
  60. class TestHiveConnection:
  61. """Integration test suite for HiveConnection class."""
  62.  
  63. def setUp(self):
  64. self.hive = HiveConnection(user='username', password='password', host='myhost.net')
  65.  
  66. def test_reconnect(self):
  67. """If the connection drops, the object should be able to establish a
  68. new connection.
  69. """
  70. query = 'SELECT * FROM database.table 1'
  71. self.hive._connect = MagicMock()
  72. self.hive._disconnect()
  73. self.hive.read(query)
  74. assert_equal(self.hive._connect.called, True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement