Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # Define your item pipelines here
- #
- # Don't forget to add your pipeline to the ITEM_PIPELINES setting
- # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
- import string
- import scrapy
- import urllib.parse
- import re
- from scrapy.pipelines.images import ImagesPipeline
- import ntpath
- import os
- from PIL import Image, ImageDraw, ImageFont
- from scrapy.utils.project import get_project_settings
- import sqlite3
- import json
- from pathlib import Path
- import unicodedata
- valid_filename_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
- char_limit = 255
- class ShopsCrawlPipeline(object):
- def process_item(self, item, spider):
- return item
- class ImgPipeline(ImagesPipeline):
- CONVERTED_ORIGINAL = re.compile('^full/[0-9,a-f]+.jpg$')
- counter = 0
- # name information coming from the spider, in each item
- # add this information to Requests() for individual images downloads
- # through "meta" dict
- def get_media_requests(self, item, info):
- # print("get_media_requests")
- if 'image_urls' in item:
- return [scrapy.Request(x, meta={'item': item, 'idx': idx}, dont_filter=True) for idx, x in
- enumerate(item.get('image_urls', []))]
- def file_path(self, request, response=None, info=None):
- filename = ntpath.basename(request.url)
- filename = urllib.parse.unquote(filename)
- _, file_extension = os.path.splitext(filename)
- sku = self.clean_filename(filename=request.meta['item']['sku'])
- filename = '{}_{}{}'.format(sku, request.meta['idx'], '.jpg')
- # filename = '{}_{}{}'.format(request.meta['item']['sku'], request.meta['idx'], file_extension)
- # if response:
- # if response.headers.get(b'Content-Type') == b'image/jpeg':
- # filename = '{}_{}{}'.format(request.meta['item']['sku'], request.meta['idx'], '.jpg')
- # elif response.headers.get(b'Content-Type') == b'image/png':
- # filename = '{}_{}{}'.format(request.meta['item']['sku'], request.meta['idx'], '.png')
- # else:
- # s = ''
- images_store = self.store.basedir + sku + '/' + filename
- isdir = os.path.isdir(self.store.basedir + '/' + sku + '/')
- if isdir:
- s = ''
- Path(self.store.basedir + '/' + sku + '/').mkdir(parents=True, exist_ok=True)
- if os.path.isfile(images_store):
- os.remove(images_store)
- return sku + '/' + filename
- def decorate_url(self, urla):
- return '["{}"]'.format(urla)
- def item_completed(self, results, item, info):
- image_paths = [x['path'] for ok, x in results if ok]
- item['images'] = image_paths
- return item
- @staticmethod
- def clean_filename(filename, whitelist=valid_filename_chars, replace=' '):
- # replace spaces
- for r in replace:
- filename = filename.replace(r, '_')
- # keep only valid ascii chars
- cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode()
- # keep only whitelisted chars
- cleaned_filename = ''.join(c for c in cleaned_filename if c in whitelist)
- if len(cleaned_filename) > char_limit:
- print("Warning, filename truncated because it was over {}. Filenames may no longer be unique".format(
- char_limit))
- return cleaned_filename[:char_limit]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement