Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import cv2
- REMAINING = 'remaining'
- PROCESSED = 'processed'
- def get_chars(stream, needle, buffer=None, chunksize=1024, mode=REMAINING):
- """
- Read chunks from stream and stop when needle was found.
- Return needle and all characters after it if mode is 'remaining'.
- Return all characters before needle and needle if mode is 'processed'.
- Note that no more chunks are read when needle was found. Therefore,
- characters might remain untouched at the stream. These characters are
- not included in the result.
- """
- if not needle:
- raise ValueError('needle must not be empty')
- if mode not in (REMAINING, PROCESSED):
- raise ValueError('unknown mode')
- empty = type(needle)()
- if not buffer:
- buffer = empty
- processed = []
- bufsize = len(needle) - 1
- chunks = iter(lambda: stream.read(chunksize), empty)
- for chunk in chunks:
- haystack = buffer + chunk
- index = haystack.find(needle)
- if index >= 0:
- if mode == PROCESSED:
- tail = haystack[:index + len(needle)]
- return empty.join(processed) + tail
- return haystack[index:]
- buffer = haystack[-bufsize:]
- if mode == PROCESSED:
- chars = haystack[:-bufsize]
- processed.append(chars)
- return empty
- def get_image_data(stream, start_mark=b'\xff\xd8', stop_mark=b'\xff\xd9'):
- data = get_chars(stream, start_mark, mode=REMAINING)
- if not data:
- raise IOError('missing start mark')
- buf = data[len(start_mark):]
- data = get_chars(stream, stop_mark, buffer=buf, mode=PROCESSED)
- if not data:
- raise IOError('missing stop mark')
- return data
- def get_image(stream):
- data = get_image_data(stream)
- buf = np.fromstring(data, dtype=np.uint8)
- return cv2.imdecode(buf, 1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement