Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 1 #!/usr/bin/env python3.5
- 2 # -*- coding: utf-8 -*-
- 3 #
- 4 # TURBO PROJECT: PRODUCER
- 5 #
- 6
- 7 import sys
- 8 import cv2 as cv
- 9 from kafka import KafkaProducer
- 10
- 11 URL = 'http://10.4.110.31/mjpg/video.mjpg'
- 12 KAFKA_PORT = 6667
- 13
- 14
- 15 def main(args):
- 16 return 0
- 17
- 18
- 19 if __name__ == '__main__':
- 20 cap = cv.VideoCapture(URL)
- 21 producer = KafkaProducer(bootstrap_servers='rt321vmw102.gtr.tp:%d' % KAFKA_PORT)
- 22
- 23 if not cap.isOpened():
- 24 print("Error opening video stream or file")
- 25
- 26 i = 0
- 27
- 28 while cap.isOpened():
- 29 try:
- 30 ret, frame = cap.read()
- 31
- 32 gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
- 33
- 34 i += 1
- 35 print('\r%d frame%s sent' % (i, 's' if i > 1 else ''), end='')
- 36
- 37 if ret:
- 38 producer.send('gdm410s03', gray.tobytes())
- 39 else:
- 40 break
- 41 except KeyboardInterrupt:
- 42 break
- 43
- 44 cap.release()
- 45 cv.destroyAllWindows()
- 46
- 47 sys.exit(main(sys.argv))
- 1 #!/usr/bin/env python3
- 2 # -*- coding: utf-8 -*-
- 3 #
- 4 # TURBO PROJECT: CONSUME
- 5 #
- 6
- 7 import sys
- 8 import cv2 as cv
- 9 from cv2 import data
- 10 import numpy as np
- 11 from pyspark import SparkConf, SparkContext
- 12 from kafka import KafkaConsumer
- 13
- 14 KAFKA_PORT = 6667
- 15 MODEL_NAME = 'haarcascade_frontalface_default.xml'
- 16 FRAME_SHAPE = (480, 640)
- 17 BATCH_SIZE = 64
- 18
- 19 def face_recog(frame):
- 20 model = cv.CascadeClassifier(data.haarcascades + MODEL_NAME)
- 21 faces = model.detectMultiScale(frame, 1.1, 4)
- 22 cropped = [frame[y:y+h, x:x+w] for x, y, w, h in faces]
- 23 return cropped
- 24
- 25
- 26 def main(args):
- 27 return 0
- 28
- 29
- 30 if __name__ == '__main__':
- 31 confpm = SparkConf().setAppName("TURBO PROJECT").setMaster("yarn")
- 32 sc = SparkContext(conf=confpm)
- 33
- 34 consumer = KafkaConsumer(bootstrap_servers='rt321vmw102.gtr.tp:%d' % KAFKA_PORT,
- 35 value_deserializer=lambda msg: np.frombuffer(msg, dtype=np.uin
- 36
- 37 consumer.subscribe(['gdm410s03'])
- 38
- 39 frames = []
- 40
- 41 for msg in consumer:
- 42 frames.append(msg.value)
- 43 l = len(frames)
- 44 print('\r%d frame%s received' % (l, 's' if l > 1 else ''), end='', flush=True)
- 45 if l == BATCH_SIZE:
- 46 frames_rdd = sc.parallelize(frames)
- 47 faces = frames_rdd.flatMap(face_recog)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement