Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- """:mod:`magic_image_crawler` --- Magic Image Crawler!
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- This small program crawls the URL and download only most important images of
- the page. Trivial images like buttons, banners are ignored.
- """
- import re
- import itertools
- import urlparse
- import urllib2
- import tempfile
- import os.path
- import lxml.etree
- import lxml.html
- import Image
- import ImageChops
- #: The :class:`~lxml.etree.XPath` that finds image URLs.
- IMG_SRC_XPATH = lxml.etree.XPath("//img[@src]/@src")
- #: The default minimum width/height.
- DEFAULT_MINAREA = 200 * 250
- #: The default tolerence for treating background pixels.
- DEFAULT_BGTOLERANCE = 10
- def image_urls(url, root_url=None):
- """Finds only important image URLs from the given ``url``.
- :param url: the URL of the page to find
- :type url: :class:`basestring`
- :param root_url: an optional URL of the home
- :type root_url: :class:`basestring`
- :returns: a list of found image URLs
- :rtype: :class:`list`
- """
- doc = lxml.html.parse(url)
- charset = doc.xpath("//meta[@charset]/@charset")
- if charset:
- charset = charset[0]
- else:
- ns = lxml.etree.FunctionNamespace(None)
- ns["lower-case"] = lambda dummy, seq: seq[0].lower() if seq else None
- charset = doc.xpath("//meta[lower-case(@http-equiv)='content-type']"
- "/@content")
- if charset:
- m = re.search(r";\s*charset\s*=\s*(\S+)\s*", charset[0], re.I)
- charset = m.group(1) if m else "utf-8"
- else:
- charset = "utf-8"
- images = (urlparse.urljoin(url, src) for src in IMG_SRC_XPATH(doc))
- # duplicated images are probably layout elements like buttons.
- images = (k for k, v in itertools.groupby(images) if len(tuple(v)) < 2)
- root_url = root_url or urlparse.urljoin(url, "/")
- root_doc = lxml.html.parse(root_url)
- root_images = [urlparse.urljoin(root_url, src)
- for src in IMG_SRC_XPATH(root_doc)]
- return [img.encode(charset) if isinstance(img, unicode) else img
- for img in images if img not in root_images]
- def download_urls(urls):
- """Downloads files of URLs. This is a generator function that yields
- file objects.
- :param urls: a list of URLs to download
- :type urls: iterable object
- :returns: a list of downloaded files
- :rtype: iterable object
- """
- for url in urls:
- u = urllib2.urlopen(url)
- f = tempfile.SpooledTemporaryFile()
- while True:
- buffer = u.read(0x1000)
- if buffer:
- f.write(buffer)
- else:
- break
- f.seek(0)
- u.close()
- yield f
- def split_image(image, minarea=DEFAULT_MINAREA,
- bgtolerance=DEFAULT_BGTOLERANCE, gap=1, recurse=3):
- """Does zealous crop and splits the given ``image`` into several small
- images.
- :param image: a PIL image to split
- :param minarea: a minimum width * height size for filtering.
- default is :const:`DEFAULT_MINAREA`
- :type minarea: :class:`int`, :class:`long`
- :param bgtolerance: a tolerance for treating background pixels.
- default is :const:`DEFAULT_BGTOLERANCE`
- :type bgtolerence: :class:`int`
- :param gap: a minmum pixel of gap. default is 1
- :type gap: :class:`int`
- :param recurse: a maximum recursion depth. default is 3
- :type recurse: :class:`int`
- :returns: a list of split images
- :rtype: iterable object
- """
- if not Image.isImageType(image):
- raise TypeError("image must be a PIL image object, not " + repr(image))
- if gap < 1:
- raise TypeError("gap must be greater than 0")
- if image.mode != "RGB":
- image = image.convert("RGB")
- bgcolor = 255, 255, 255
- def _isbg(color):
- return sum(abs(a - b) for a, b in zip(color, bgcolor)) <= bgtolerance
- if bgtolerance < 1:
- _isbg = lambda color: color == bgcolor
- bg = Image.new("RGB", image.size, bgcolor)
- diff = ImageChops.difference(image, bg)
- image = image.crop(diff.getbbox())
- pixels = image.load()
- width, height = image.size
- empty = 0
- top = 0
- for y in xrange(height):
- if all(_isbg(pixels[x, y]) for x in xrange(width)):
- empty += 1
- elif empty >= gap:
- if width * (y - empty - top) >= minarea:
- inner_image = image.crop((0, top, width, y - empty))
- if recurse > 0:
- inner_image = inner_image.rotate(90)
- _imgs = split_image(inner_image,
- minarea=minarea,
- bgtolerance=bgtolerance,
- recurse=recurse - 1)
- for _i in _imgs:
- yield _i.rotate(270)
- else:
- yield inner_image
- empty = 0
- top = y
- if empty < gap and width * (height - top) >= minarea:
- inner_image = image.crop((0, top, width, height))
- if recurse:
- inner_image = inner_image.rotate(90)
- _imgs = split_image(inner_image,
- minarea=minarea,
- bgtolerance=bgtolerance,
- recurse=recurse - 1)
- for _i in _imgs:
- yield _i.rotate(270)
- else:
- yield inner_image
- def magic(url, root_url=None, save_path=".",
- minarea=DEFAULT_MINAREA, bgtolerance=DEFAULT_BGTOLERANCE):
- """Spells and does magic!
- :param url: the URL to crawl
- :type url: :class:`basestring`
- :param root_url: an optional URL of the related page or home page
- :type root_url: :class:`basestring`
- :param save_path: a path to save images
- :type save_path: :class:`basestring`
- :param minarea: a minimum image area for filtering.
- default is :const:`DEFAULT_MINAREA`
- :type minarea: :class:`int`, :class:`long`
- :param bgtolerance: a tolerance for treating background pixels.
- default is :const:`DEFAULT_BGTOLERANCE`
- :type bgtolerence: :class:`int`
- :returns: a generator that saves images and yields paths
- :rtype: iterable object
- """
- parsed_url = urlparse.urlparse(url)
- files = download_urls(image_urls(url, root_url))
- def _images():
- for file in files:
- try:
- im = Image.open(file)
- except IOError:
- continue
- if im.size[0] * im.size[1] >= minarea:
- yield split_image(im, minarea=minarea, bgtolerance=bgtolerance)
- m = re.match(r"^https?://[^/]+/(.*)$", url, re.IGNORECASE)
- if m:
- key = "-".join(d.group(0) for d in re.finditer(r"\d+", m.group(1))) \
- + "-"
- else:
- key = ""
- fmt = ("{0}-" + key + "{1:03d}.png").format
- n = 1
- for image_set in _images():
- for im in image_set:
- while True:
- path = os.path.join(save_path, fmt(parsed_url.hostname, n))
- if os.path.isfile(path):
- n += 1
- else:
- break
- im.save(path)
- n += 1
- yield path
- def _download(kwargs):
- result = magic(**kwargs)
- for path in result:
- print path
- return kwargs["url"]
- def unique_everseen(iterable, key=None):
- """List unique elements, preserving order. Remember all elements ever
- seen.
- .. sourcecode:: pycon
- >>> list(unique_everseen('AAAABBBCCDAABBB'))
- ['A', 'B', 'C', 'D']
- >>> unique_everseen('ABBCcAD', str.lower)
- ['A', 'B', 'C', 'D']
- .. note:: Copied from :mod:`itertools` recipes.
- """
- seen = set()
- seen_add = seen.add
- if key is None:
- for element in itertools.ifilterfalse(seen.__contains__, iterable):
- seen_add(element)
- yield element
- else:
- for element in iterable:
- k = key(element)
- if k not in seen:
- seen_add(k)
- yield element
- def main():
- import optparse
- import os
- import multiprocessing
- multiprocessing.freeze_support()
- parser = optparse.OptionParser(usage="%prog [options] URL...")
- parser.add_option("-r", "--root-url", metavar="URL", default=None,
- help="an optional URL of the related page or home page.")
- parser.add_option("-d", "--save-path", metavar="DIR", default=".",
- help="a DIR path to save images. [%default]")
- parser.add_option("-a", "--min-area",
- type="int", metavar="AREA", default=DEFAULT_MINAREA,
- help="a minimum image area for filtering. [%default]")
- parser.add_option("-t", "--bgtolerance", "--background-tolerance",
- type="int", metavar="TOLERANCE",
- default=DEFAULT_BGTOLERANCE,
- help="a TOLERANCE for treating background pixels. "
- "[%default]")
- parser.add_option("-w", "--workers", type="int", metavar="NUM", default=3,
- help="the number of workers [%default]")
- options, urls = parser.parse_args()
- if not urls:
- parser.error("required one or more URLs to crawl.")
- urls = itertools.chain.from_iterable(url.split() for url in urls)
- urls = list(unique_everseen(urls)) # remove duplicates
- if not os.path.isdir(options.save_path):
- os.makedirs(options.save_path)
- args = [("root_url", options.root_url),
- ("save_path", options.save_path),
- ("minarea", options.min_area),
- ("bgtolerance", options.bgtolerance)]
- pool_size = min(len(urls), options.workers)
- if pool_size < 2:
- args = dict(args)
- for i, url in enumerate(urls):
- if "root_url" not in args or not args["root_url"]:
- args["root_url"] = urls[0 if i > 0 else 1]
- _args = dict(args)
- _args["url"] = url
- _download(_args)
- print "[complete]", url
- return
- pool = multiprocessing.Pool(pool_size)
- argslist = []
- for i, url in enumerate(urls):
- _args = dict(args)
- _args["url"] = url
- if "root_url" not in _args or not _args["root_url"]:
- _args["root_url"] = urls[0 if i > 0 else 1]
- argslist.append(_args)
- result = pool.imap_unordered(_download, argslist)
- for url in result:
- print "[complete]", url
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment