Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Compare sitemaps by turning <url>s flat.
- # Dumb text manipulation.
- #
- # Since this is a throwaway script, I indulge in some crude FP stuff here.
- # and_then = fmap; boolean short-circuiting `and` is the sequence operation.
- import collections
- import os
- import re
- from lxml import etree
- class Right(object):
- __slots__ = ['value']
- def __init__(self, value):
- self.value = value
- def __len__(self): return 1 # bool(self) -> True
- def bind(self, func): return func(self.value)
- def and_then(self, func): return Right(func(self.value))
- def and_return(self, value): return Right(value)
- def __repr__(self): return 'Right(%s)' % repr(self.value)
- class Left(object):
- __slots__ = ['value']
- def __init__(self, value):
- self.value = value
- def __len__(self): return 0 # bool(self) -> False
- def and_then(self, *args): return self
- and_return = bind = and_then = lambda self, *ignored_args: self
- def __repr__(self): return 'Left(%s)' % repr(self.value)
- def from_equality(value, expected):
- if value == expected:
- return Right(value)
- return Left((value, expected))
- def from_value(value):
- return (Left if value is None else Right)(value)
- def read_expecting(input_stream, prefix):
- line = input_stream.readline().strip()
- return (Right if line.startswith(prefix) else Left)(line)
- def copy_expecting(input_stream, output_stream, prefix):
- return read_expecting(input_stream, prefix).and_then(output_stream.write)
- def flatten_url(input_stream, output_stream):
- read = lambda prefix: read_expecting(input_stream, prefix)
- copy = lambda prefix: copy_expecting(input_stream, output_stream, prefix)
- return (read('<url>') and
- copy('<loc>') and
- copy('<lastmod>') and
- copy('<changefreq>') and
- copy('<priority>') and
- read('</url>')).and_return('\n').and_then(output_stream.write)
- def flatten_url_without_lastmod(input_stream, output_stream):
- read = lambda prefix: read_expecting(input_stream, prefix)
- copy = lambda prefix: copy_expecting(input_stream, output_stream, prefix)
- return (read('<url>') and
- copy('<loc>') and
- read('<lastmod>') and # Skip lastmod.
- copy('<changefreq>') and
- copy('<priority>') and
- read('</url>').and_return('\n').and_then(output_stream.write))
- def read_eof(input_stream):
- return from_equality(input_stream.readline(), '') # No \n -> EOF.
- def repeat_while_right(func, *args):
- while True: # A tailrec version would be too hard in Python.
- result = func(*args)
- if not result:
- return Right(result.value) # result is Left here, but we want success.
- def flatten_file(input_stream, output_stream, url_flattener):
- read = lambda prefix: read_expecting(input_stream, prefix)
- got_urlset_end = lambda line: from_equality(line, '</urlset>')
- return (
- read("<?xml") and
- read('<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">') and
- repeat_while_right(url_flattener, input_stream, output_stream).bind(
- got_urlset_end
- ) and
- read_eof(input_stream).and_return('Completed successfully')
- )
- def run_file_lastmod(in_name, out_name):
- with open(in_name) as input_stream, open(out_name, 'w') as output_stream:
- return flatten_file(input_stream, output_stream, flatten_url)
- def run_file_no_lastmod(in_name, out_name):
- with open(in_name) as input_stream, open(out_name, 'w') as output_stream:
- return flatten_file(input_stream, output_stream, flatten_url_without_lastmod)
- XMLNS = {'NS': 'http://www.sitemaps.org/schemas/sitemap/0.9'} # Per Google spec.
- def run_sitemap(sitemap_path, out_name, use_lastmod=True):
- root = etree.parse(os.path.join(sitemap_path, 'sitemap.xml')).getroot()
- result = Right('Just started')
- flattener = flatten_url if use_lastmod else flatten_url_without_lastmod
- with open(out_name, 'w') as output_stream:
- for loc in root.findall('NS:sitemap/NS:loc', XMLNS):
- part_name = os.path.join(sitemap_path, os.path.basename(loc.text))
- print ('%20s\r' % part_name), # We're not pure anyway.
- with open(part_name) as input_stream:
- result = result and flatten_file(input_stream, output_stream, flattener)
- print
- return result.and_then(lambda _: 'Wrote %s successfully' % out_name)
- def extract_diff(diff_fname):
- RX_TARGET = re.compile(r'^([<>]).*/(\d+-[^/]+)/menu/')
- RX_OTHER = re.compile(r'^([<>]) <loc>(.*)(?!/menu/)</loc>')
- result = collections.defaultdict(collections.Counter)
- with open(diff_fname) as input_stream:
- for line in input_stream:
- (from_value(RX_TARGET.search(line)).and_then(
- lambda match: ('restaurant',) + match.groups()) or
- from_value(RX_OTHER.search(line)).and_then(
- lambda match: ('other',) + match.groups())).bind(
- lambda (kind, direction, url): result[direction + kind].update((url,))
- )
- return result
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement