Advertisement
Guest User

turbo_project_pm280220

a guest
Feb 28th, 2020
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.75 KB | None | 0 0
  1. 1 #!/usr/bin/env python3.5
  2. 2 # -*- coding: utf-8 -*-
  3. 3 #
  4. 4 # TURBO PROJECT: PRODUCER
  5. 5 #
  6. 6
  7. 7 import sys
  8. 8 import cv2 as cv
  9. 9 from kafka import KafkaProducer
  10. 10
  11. 11 URL = 'http://10.4.110.31/mjpg/video.mjpg'
  12. 12 KAFKA_PORT = 6667
  13. 13
  14. 14
  15. 15 def main(args):
  16. 16 return 0
  17. 17
  18. 18
  19. 19 if __name__ == '__main__':
  20. 20 cap = cv.VideoCapture(URL)
  21. 21 producer = KafkaProducer(bootstrap_servers='rt321vmw102.gtr.tp:%d' % KAFKA_PORT)
  22. 22
  23. 23 if not cap.isOpened():
  24. 24 print("Error opening video stream or file")
  25. 25
  26. 26 i = 0
  27. 27
  28. 28 while cap.isOpened():
  29. 29 try:
  30. 30 ret, frame = cap.read()
  31. 31
  32. 32 gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
  33. 33
  34. 34 i += 1
  35. 35 print('\r%d frame%s sent' % (i, 's' if i > 1 else ''), end='')
  36. 36
  37. 37 if ret:
  38. 38 producer.send('gdm410s03', gray.tobytes())
  39. 39 else:
  40. 40 break
  41. 41 except KeyboardInterrupt:
  42. 42 break
  43. 43
  44. 44 cap.release()
  45. 45 cv.destroyAllWindows()
  46. 46
  47. 47 sys.exit(main(sys.argv))
  48.  
  49. 1 #!/usr/bin/env python3
  50. 2 # -*- coding: utf-8 -*-
  51. 3 #
  52. 4 # TURBO PROJECT: CONSUME
  53. 5 #
  54. 6
  55. 7 import sys
  56. 8 import cv2 as cv
  57. 9 from cv2 import data
  58. 10 import numpy as np
  59. 11 from pyspark import SparkConf, SparkContext
  60. 12 from kafka import KafkaConsumer
  61. 13
  62. 14 KAFKA_PORT = 6667
  63. 15 MODEL_NAME = 'haarcascade_frontalface_default.xml'
  64. 16 FRAME_SHAPE = (480, 640)
  65. 17 BATCH_SIZE = 64
  66. 18
  67. 19 def face_recog(frame):
  68. 20 model = cv.CascadeClassifier(data.haarcascades + MODEL_NAME)
  69. 21 faces = model.detectMultiScale(frame, 1.1, 4)
  70. 22 cropped = [frame[y:y+h, x:x+w] for x, y, w, h in faces]
  71. 23 return cropped
  72. 24
  73. 25
  74. 26 def main(args):
  75. 27 return 0
  76. 28
  77. 29
  78. 30 if __name__ == '__main__':
  79. 31 confpm = SparkConf().setAppName("TURBO PROJECT").setMaster("yarn")
  80. 32 sc = SparkContext(conf=confpm)
  81. 33
  82. 34 consumer = KafkaConsumer(bootstrap_servers='rt321vmw102.gtr.tp:%d' % KAFKA_PORT,
  83. 35 value_deserializer=lambda msg: np.frombuffer(msg, dtype=np.uin
  84. 36
  85. 37 consumer.subscribe(['gdm410s03'])
  86. 38
  87. 39 frames = []
  88. 40
  89. 41 for msg in consumer:
  90. 42 frames.append(msg.value)
  91. 43 l = len(frames)
  92. 44 print('\r%d frame%s received' % (l, 's' if l > 1 else ''), end='', flush=True)
  93. 45 if l == BATCH_SIZE:
  94. 46 frames_rdd = sc.parallelize(frames)
  95. 47 faces = frames_rdd.flatMap(face_recog)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement