Guest User

Untitled

a guest
Dec 14th, 2017
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.95 KB | None | 0 0
  1. import threading
  2.  
  3. import mysql.connector
  4.  
  5. import apache_beam as beam
  6.  
  7. class CloudSQLSource(beam.io.iobase.BoundedSource):
  8.  
  9. def __init__(self, node):
  10. sql = mysql.connector.connect(user='', password='',
  11. host='127.0.0.1',
  12. database='', port=3306)
  13.  
  14. cursor = sql.cursor(dictionary=True)
  15. self._node = node
  16. cursor.execute(
  17. "SELECT COUNT(*) FROM `TABLE`")
  18. result = cursor.fetchone()
  19. self._count = result['COUNT(*)']
  20.  
  21. cursor.close()
  22. sql.close()
  23.  
  24. def estimate_size(self):
  25. return self._count
  26.  
  27. def get_range_tracker(self, start_position, stop_position):
  28. logging.warning('------ get range tracker ---------')
  29. import apache_beam as local_beam
  30. print start_position, stop_position
  31. if start_position is None:
  32. start_position = 0
  33. if stop_position is None:
  34. stop_position = self._count
  35.  
  36. return local_beam.io.range_trackers.OffsetRangeTracker(start_position, stop_position)
  37.  
  38. def read(self, range_tracker):
  39. logging.warning('------------ read _------------------')
  40. start = range_tracker.start_position()
  41. if range_tracker.start_position() is None:
  42. start = 0
  43.  
  44. count = range_tracker.stop_position()
  45. if count is None:
  46. count = self._count
  47. count = count - start
  48.  
  49. print start
  50. print count
  51.  
  52. if not range_tracker.try_claim(start):
  53. logging.warning('claim error')
  54. return
  55.  
  56. sql = mysql.connector.connect(user='', password='',
  57. host='127.0.0.1',
  58. database='', port=3306)
  59. cursor = sql.cursor(dictionary=True)
  60.  
  61. cursor.execute(
  62. 'SELECT * FROM `TABLE` LIMIT %s, %s ', (start, count))
  63.  
  64. results = cursor.fetchall()
  65. for row in results:
  66. yield row
  67.  
  68. cursor.close()
  69. sql.close()
  70.  
  71. def split(self, desired_bundle_size, start_position=None,
  72. stop_position=None):
  73.  
  74. logging.warning('----- split -------')
  75. logging.warning('%s %s %s', desired_bundle_size,
  76. start_position, stop_position)
  77.  
  78. import apache_beam as local_beam
  79.  
  80. if start_position is None:
  81. start_position = 0
  82. if stop_position is None:
  83. stop_position = self._count
  84.  
  85. bundle_start = start_position
  86. while bundle_start < self._count:
  87. bundle_stop = max(self._count, bundle_start + desired_bundle_size)
  88. yield local_beam.io.iobase.SourceBundle(weight=(bundle_stop - bundle_start),
  89. source=self,
  90. start_position=bundle_start,
  91. stop_position=bundle_stop)
  92. bundle_start = bundle_stop
Add Comment
Please, Sign In to add comment