Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Created on Thu Jun 23 22:14:36 2016
- @author: Michael
- """
- #QUIZ: ITERATIVE PARSING
- import xml.etree.cElementTree as ET
- from pprint import pprint
- tag_names = {}
- def count_tags(filename):
- for event, element in ET.iterparse(filename):
- if element.tag not in tag_names:
- tag_names[element.tag] = 0
- if element.tag in tag_names:
- tag_names[element.tag] = (tag_names[element.tag]+1)
- print tag_names
- def test():
- tags = count_tags('example.osm')
- pprint(tags)
- assert tags == {'bounds': 1,
- 'member': 3,
- 'nd': 4,
- 'node': 20,
- 'osm': 1,
- 'relation': 1,
- 'tag': 7,
- 'way': 1}
- if __name__ == "__main__":
- test()
- #QUIZ: EPLORING USERS
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import xml.etree.cElementTree as ET
- from pprint import pprint
- import re
- def process_map(filename):
- users = []
- for event, element in ET.iterparse(filename):
- for tag in element.iter("node"):
- if tag.attrib['uid'] not in users:
- users.append(element.attrib['uid'])
- return users
- def test():
- users = process_map('example.osm')
- pprint(users)
- print len(users)
- assert len(users) == 6
- if __name__ == "__main__":
- test()
- #QUIZ: IMPROVING STREET NAMES
- import xml.etree.cElementTree as ET
- from collections import defaultdict
- import re
- from pprint import pprint
- OSMFILE = "example.osm"
- #regular expression for
- street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
- #All the expected street types (the correct ones)
- expected = ["Alley", "Crescent", "Loop", "Street", "Avenue", "Plaza", "Boulevard", "Drive",
- "Court", "Terrace", "Walk", "Place", "Square", "Lane", "Road", "Parkway", "Highway",
- "Trail", "Commons", "Americas", "Center", "Circle", "Close", "East", "Expressway",
- "Extension", "Gardens", "Heights", "Island", "North", "West", "South", "Park", "Path", "Promenade",
- "Slip", "Row", "Rockaways", "Southwest", "Turnpike", "X", "Y", "Z"]
- # All the errors, or non-uniform street types I caught
- mapping = { "St": "Street",
- "St.": "Street",
- "Ave": "Avenue",
- "Ave.": "Avenue",
- "ave": "Avenue",
- "Pkwy": "Parkway",
- "Blvd": "Boulevard",
- "Pl": "Place",
- "Hwy": "Highway",
- "Dr": "Drive",
- "Rd": "Road",
- "Avenue,": "Avenue",
- "Plz": "Place",
- "ST": "Street",
- "Streeet":"Street",
- "avenue":"Avenue",
- "Steet": "Street"
- }
- #checks if a street_type is in the expected list, if not add it to its own group
- def audit_street_type(street_types, street_name):
- m = street_type_re.search(street_name)
- if m:
- street_type = m.group()
- if street_type not in expected:
- street_types[street_type].add(street_name)
- #checks if the element is a street name
- def is_street_name(elem):
- return (elem.attrib['k'] == "addr:street")
- #makes street_types a default dictionary of sets of elements
- #parses through the osm file and if a tag in a node or way is found, it checks if that tag is a street, then checks if its in the expected list
- def audit(osmfile):
- osm_file = open(osmfile, "r")
- street_types = defaultdict(set)
- for event, elem in ET.iterparse(osm_file, events=("start",)):
- if elem.tag == "node" or elem.tag == "way":
- for tag in elem.iter("tag"):
- if is_street_name(tag):
- audit_street_type(street_types, tag.attrib['v'])
- osm_file.close()
- return street_types
- #takes in each name, and knows the appropriate mappings if it is not a correct name
- #if the name is a regular street type, put it in a group, check if it needs to be updated, change its name if necessary
- #if its a street type but its not one that needs changing, put in a list of other street types
- def update_name(name, mapping):
- m = street_type_re.search(name)
- other_street_types = []
- if m:
- street_type = m.group()
- if street_type in mapping.keys():
- name = re.sub(street_type, mapping[street_type], name)
- else:
- other_street_types.append(street_type)
- return name
- #first, print the dictionary of street types, in expected and now
- #then, check all the street names to check which need updating. For those, print the old name, then the new name
- def test():
- st_types = audit(OSMFILE)
- # assert len(st_types) == 3
- # pprint(dict(st_types))
- for st_type, ways in st_types.iteritems():
- for name in ways:
- better_name = update_name(name, mapping)
- print name, "=>", better_name
- # if name == "West Lexington St.":
- # assert better_name == "West Lexington Street"
- # if name == "Baldwin Rd.":
- # assert better_name == "Baldwin Road"
- if __name__ == '__main__':
- test()
- #QUIZ: PREPARING FOR DATABASE
- import csv
- import codecs
- import re
- import xml.etree.cElementTree as ET
- import cerberus
- import schema
- #OSM file of booklyn
- OSM_PATH = "brooklyn_new-york.osm"
- #all the csv files that will be created
- NODES_PATH = "nodes.csv"
- NODE_TAGS_PATH = "nodes_tags.csv"
- WAYS_PATH = "ways.csv"
- WAY_NODES_PATH = "ways_nodes.csv"
- WAY_TAGS_PATH = "ways_tags.csv"
- #regular expressions, lower colon accounts for a :, problemchars accounts for all problematic characters #$%^
- LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
- PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
- #this is the defined schema from Udacity quiz:preparing for database (saved locally)
- SCHEMA = schema.schema
- #all the expected street types
- expected = ["Alley", "Crescent", "Loop", "Street", "Avenue", "Plaza", "Boulevard", "Drive",
- "Court", "Terrace", "Walk", "Place", "Square", "Lane", "Road", "Parkway", "Highway",
- "Trail", "Commons", "Americas", "Center", "Circle", "Close", "East", "Expressway",
- "Extension", "Gardens", "Heights", "Island", "North", "West", "South", "Park", "Path", "Promenade",
- "Slip", "Row", "Rockaways", "Southwest", "Turnpike", "X", "Y", "Z"]
- #all the problematic or non-uniform street types I found
- mapping = { "St": "Street",
- "St.": "Street",
- "Ave": "Avenue",
- "Ave.": "Avenue",
- "ave": "Avenue",
- "Pkwy": "Parkway",
- "Blvd": "Boulevard",
- "Pl": "Place",
- "Hwy": "Highway",
- "Dr": "Drive",
- "Rd": "Road",
- "Avenue,": "Avenue",
- "Plz": "Place",
- "ST": "Street",
- "Streeet":"Street",
- "avenue":"Avenue",
- "Steet": "Street"
- }
- #The column headers of each csv file
- # Make sure the fields order in the csv's matches the column order in the sql table schema
- NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
- NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
- WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
- WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
- WAY_NODES_FIELDS = ['id', 'node_id', 'position']
- #check if k is in the right place using the colon as a reference
- def is_k_in_right_place(k):
- place=k.find(':')
- correct=k[:place]
- k=k[place+1:]
- return k,correct
- def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
- problem_chars=PROBLEMCHARS, default_tag_type='regular'):
- """Clean and shape node or way XML element to Python dict"""
- node_attribs = {}
- way_attribs = {}
- way_nodes = []
- tags = [] # Handle secondary tags the same way for both node and way elements
- #First, when top level tag is a node...
- if element.tag == 'node':
- #then for each attribute in that node that we will put into the csv as a column header..
- for i in node_attr_fields:
- #set dictionary i to that attribute
- node_attribs[i]=element.attrib[i]
- #Now, when top level tag is a way...
- if element.tag=='way':
- #for each attribute in that way that will be a column header..
- for i in way_attr_fields:
- way_attribs[i]=element.attrib[i]
- #for each node or way tag..
- for tag in element.iter("tag"):
- # use 'if is_street_name()' function to determine if the attribute matches
- #make a blank dictionary
- dic = {}
- #set attributes = all the attributes of that tag
- attributes = tag.attrib
- if is_street_name(tag):
- print 'BEFORE'
- print tag.attrib['v']
- # update tag.attrib['v'] with the return from update_name()
- tag.attrib['v']=update_name(tag.attrib['v'],mapping)
- print 'AFTER'
- print tag.attrib['v']
- #if the k attribute has some problem characters, continue
- if problem_chars.search(tag.attrib['k']):
- continue
- #if the tag is a node
- if element.tag=='node':
- #set id in the blank dictionary to the id from the node attributes
- dic['id']=node_attribs['id']
- #but if the tag doesn't = node, set the dictionary id to the id from the way attributes
- else:
- dic['id']=way_attribs['id']
- #make the value always equal to the v attribute from the tag element
- dic['value'] = attributes['v']
- #if the tag attribute k has a colon
- colon_k=LOWER_COLON.search(tag.attrib['k'])
- if colon_k:
- #then print the attributes k's group, print that attribute, and make the key and type = k, correct
- print colon_k.group(0)
- print tag.attrib['k']
- dic['key'],dic['type']=is_k_in_right_place(tag.attrib['k'])
- #if attribute k does not have a colon, make the key equal to k and the type equal to regular
- else:
- dic['key']=attributes['k']
- dic['type']='regular'
- #append the dic to the empty tags list
- tags.append(dic)
- #if the tag = way
- if element.tag=='way':
- position=0
- for nd in element.iter("nd"):
- way_node_dic={}
- way_node_dic['id']=way_attribs['id']
- way_node_dic['node_id']=nd.attrib['ref']
- way_node_dic['position']=position
- position = position + 1
- way_nodes.append(way_node_dic)
- if element.tag == 'node':
- return {'node': node_attribs, 'node_tags': tags}
- elif element.tag == 'way':
- return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
- # ================================================== #
- # Helper Functions #
- # ================================================== #
- def get_element(osm_file, tags=('node', 'way', 'relation')):
- """Yield element if it is the right type of tag"""
- context = ET.iterparse(osm_file, events=('start', 'end'))
- _, root = next(context)
- for event, elem in context:
- if event == 'end' and elem.tag in tags:
- yield elem
- root.clear()
- def validate_element(element, validator, schema=SCHEMA):
- """Raise ValidationError if element does not match schema"""
- if validator.validate(element, schema) is not True:
- field, errors = next(validator.errors.iteritems())
- message_string = "\nElement of type '{0}' has the following errors:\n{1}"
- error_strings = (
- "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
- for k, v in errors.iteritems()
- )
- raise cerberus.ValidationError(
- message_string.format(field, "\n".join(error_strings))
- )
- class UnicodeDictWriter(csv.DictWriter, object):
- """Extend csv.DictWriter to handle Unicode input"""
- def writerow(self, row):
- super(UnicodeDictWriter, self).writerow({
- k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
- })
- def writerows(self, rows):
- for row in rows:
- self.writerow(row)
- street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
- def audit_street_type(street_types, street_name):
- m = street_type_re.search(street_name)
- if m:
- street_type = m.group()
- if street_type not in expected:
- street_types[street_type].add(street_name)
- def is_street_name(elem):
- return (elem.attrib['k'] == "addr:street")
- def audit(osmfile):
- osm_file = open(osmfile, "r")
- street_types = defaultdict(set)
- for event, elem in ET.iterparse(osm_file, events=("start",)):
- if elem.tag == "node" or elem.tag == "way":
- for tag in elem.iter("tag"):
- if is_street_name(tag):
- audit_street_type(street_types, tag.attrib['v'])
- osm_file.close()
- return street_types
- def update_name(name, mapping):
- m = street_type_re.search(name)
- other_street_types = []
- if m:
- street_type = m.group()
- if street_type in mapping.keys():
- name = re.sub(street_type, mapping[street_type], name)
- else:
- other_street_types.append(street_type)
- return name
- # ================================================== #
- # Main Function #
- # ================================================== #
- def process_map(file_in, validate):
- """Iteratively process each XML element and write to csv(s)"""
- with codecs.open(NODES_PATH, 'w') as nodes_file, \
- codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
- codecs.open(WAYS_PATH, 'w') as ways_file, \
- codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
- codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:
- nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
- node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
- ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
- way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
- way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)
- nodes_writer.writeheader()
- node_tags_writer.writeheader()
- ways_writer.writeheader()
- way_nodes_writer.writeheader()
- way_tags_writer.writeheader()
- validator = cerberus.Validator()
- for element in get_element(file_in, tags=('node', 'way')):
- el = shape_element(element)
- if el:
- if validate is True:
- validate_element(el, validator)
- if element.tag == 'node':
- nodes_writer.writerow(el['node'])
- node_tags_writer.writerows(el['node_tags'])
- elif element.tag == 'way':
- ways_writer.writerow(el['way'])
- way_nodes_writer.writerows(el['way_nodes'])
- way_tags_writer.writerows(el['way_tags'])
- if __name__ == '__main__':
- # Note: Validation is ~ 10X slower. For the project consider using a small
- # sample of the map when validating.
- process_map(OSM_PATH, validate=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement