Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import datetime
- import unicodedata
- from django.http import HttpResponse
- import dateparser
- class LinkedinScraping:
- def __init__(self, csv_file=None, accepted_profiles=None, required_skills=None,
- nice_to_haves=None, last_workplace=None, location=None):
- if required_skills is None:
- required_skills = []
- if nice_to_haves is None:
- nice_to_haves = []
- if last_workplace is None:
- last_workplace = []
- if accepted_profiles is None:
- self.accepted_profiles = []
- else:
- self.accepted_profiles = accepted_profiles
- if required_skills and not isinstance(required_skills, list):
- required_skills = [required_skills]
- if nice_to_haves and not isinstance(nice_to_haves, list):
- nice_to_haves = [nice_to_haves]
- if last_workplace and not isinstance(last_workplace, list):
- last_workplace = [last_workplace]
- self.csv_file = csv_file
- self.required_skills = required_skills
- self.nice_to_haves = nice_to_haves
- self.last_workplace = last_workplace
- self.location = location
- self.mapping = {
- 'cpp': 'c++'}
- self.keywords = [
- "programista", "inżynier oprogramowania", "projektant oprogramowania", "developer",
- "engineer", "programmer", "software developer", "software engineer"]
- self.profile_scores = {}
- def check_profiles_for_skills(self, operator=None):
- required_skills = self.required_skills
- mapping = self.mapping
- keywords = self.keywords
- csv_file = self.csv_file
- csv_reader = csv.DictReader(csv_file, delimiter=',')
- for row in csv_reader:
- # PARSES THE 'SKILLS' SECTION AND LISTS 5 MOST ENDORSED SKILLS TO CHECK AGAINST.
- most_endorsed_skills = []
- if "Skills" in row.keys() and row["Skills"]:
- comma_split_skills = row["Skills"].lower().strip().strip('""').strip().split(",")
- partially_parsed_skills = []
- incomplete_skill = ''
- for skill in comma_split_skills:
- skill = skill.strip().strip('""').strip()
- if skill:
- if skill[-1].isalpha():
- incomplete_skill += skill + ', '
- elif skill[-1].isdigit():
- if skill[-2].isalpha():
- incomplete_skill += skill + ', '
- else:
- full_skill_name = skill
- if incomplete_skill:
- full_skill_name = incomplete_skill + skill
- partially_parsed_skills.append(full_skill_name)
- incomplete_skill = ''
- skills_and_endorsements_parsed = []
- for string in partially_parsed_skills:
- string = string.strip('""').strip()
- if string:
- string_split = string.split(" : ")
- skill = string_split[0].strip('""').strip()
- endorsements = int(string_split[1].strip('""').strip())
- skills_and_endorsements_parsed.append(dict(skill=skill, endorsements=endorsements))
- most_endorsed_skills = sorted(
- skills_and_endorsements_parsed, key=lambda k: k['endorsements'], reverse=True)[:5]
- # CHECKS PROFILE SECTIONS FOR THE REQUIRED SKILLS
- incomplete_job_experience_info = False
- has_skills = []
- for required_skill in required_skills:
- # SEPARATES THE SKILL FROM EXPERIENCE (IF PROVIDED).
- skill_and_experience = required_skill.split('_')
- required_skill = skill_and_experience[0].strip().lower()
- required_experience = 0
- if len(skill_and_experience) == 2:
- required_experience = float(skill_and_experience[1])
- # CHECKS SKILL MAPPING
- if required_skill in mapping.keys():
- required_skill = mapping[required_skill]
- # CHECKS THE 5 MOST ENDORSED SKILLS FOR THE SKILL.
- skill_in_skills_section = False
- if most_endorsed_skills:
- for endorsed_skill in most_endorsed_skills:
- if required_skill in endorsed_skill["skill"]:
- skill_in_skills_section = True
- # CHECKS IF THE SKILL CAN BE FOUND IN A 'ORGANIZATION TITLE' OR 'ORGANIZATION DESCRIPTION',
- # AND CHECKS IF THE EXPERIENCE IS SUFFICIENT.
- numeration_of_jobs = []
- for key, value in row.items():
- if 'Organization Title' in key or 'Organization Description' in key:
- number = ''.join(filter(lambda x: x.isdigit(), key))
- if number not in numeration_of_jobs:
- numeration_of_jobs.append(number)
- numeration_of_jobs.sort()
- numeration_of_matching_jobs = []
- for number in numeration_of_jobs:
- job_title = row["Organization Title {}".format(number)].lower()
- job_description = row["Organization Description {}".format(number)].lower()
- if (required_skill in job_title) or (required_skill in job_description):
- if number not in numeration_of_matching_jobs:
- numeration_of_matching_jobs.append(number)
- elif ((any(kw in job_title for kw in keywords) or any(kw in job_description for kw in keywords))
- and skill_in_skills_section is True):
- if number not in numeration_of_matching_jobs:
- incomplete_job_experience_info = True
- numeration_of_matching_jobs.append(number)
- numeration_of_matching_jobs.sort()
- organization_start_end = []
- for number in numeration_of_matching_jobs:
- organization_start_end.append(
- dict(
- start=row['Organization Start {}'.format(number)].lower(),
- end=row['Organization End {}'.format(number)].lower()
- )
- )
- timedelta_objects = []
- for start_end_pair in organization_start_end:
- start_date = start_end_pair['start']
- end_date = start_end_pair['end']
- try:
- formatted_start = dateparser.parse(start_date, date_formats=["%b %Y"], languages=['en', 'pl'])
- except ValueError:
- formatted_start = dateparser.parse(start_date, date_formats=["%Y"], languages=['en', 'pl'])
- if end_date == 'present' or end_date == 'obecnie':
- present_day = datetime.datetime.today()
- formatted_end = present_day
- elif end_date == 'less than a year' or end_date == 'mniej niż rok':
- formatted_end = datetime.datetime(formatted_start.year, 12, formatted_start.day, 0, 0)
- else:
- try:
- formatted_end = dateparser.parse(end_date, date_formats=["%b %Y"], languages=['en', 'pl'])
- except ValueError:
- formatted_end = dateparser.parse(end_date, date_formats=["%Y"], languages=['en', 'pl'])
- timedelta_objects.append(formatted_end - formatted_start)
- total_skill_worktime = 0
- for timedelta_object in timedelta_objects:
- days = timedelta_object.days
- total_skill_worktime += days
- total_skill_worktime = total_skill_worktime / 365
- if total_skill_worktime >= required_experience:
- has_skills.append(True)
- else:
- has_skills.append(False)
- if incomplete_job_experience_info is False:
- row["Incomplete job experience information"] = "False"
- else:
- row["Incomplete job experience information"] = "True"
- if row not in self.accepted_profiles:
- if all(has_skills) and operator == 'AND':
- self.accepted_profiles.append(row)
- elif len(has_skills) == 1 and has_skills[0] is True:
- self.accepted_profiles.append(row)
- elif any(has_skills) and operator == 'OR':
- self.accepted_profiles.append(row)
- return self.accepted_profiles
- def check_profiles_for_nice_to_haves(self):
- nice_to_haves = self.nice_to_haves
- mapping = self.mapping
- keywords = self.keywords
- csv_file = self.csv_file
- csv_reader = csv.DictReader(csv_file, delimiter=',')
- for row in csv_reader:
- results = {"summary": [],
- "title/description": [],
- "skills": []}
- incomplete_title_or_description = False
- # PARSES THE 'SKILLS' SECTION AND LISTS 5 MOST ENDORSED SKILLS TO CHECK AGAINST
- most_endorsed_skills = []
- if row["Skills"]:
- comma_split_skills = row["Skills"].lower().strip().strip('""').strip().split(",")
- partially_parsed_skills = []
- incomplete_skill = ''
- for skill in comma_split_skills:
- skill = skill.strip().strip('""').strip()
- if skill:
- if skill[-1].isalpha():
- incomplete_skill += skill + ', '
- elif skill[-1].isdigit():
- if skill[-2].isalpha():
- incomplete_skill += skill + ', '
- else:
- full_skill_name = skill
- if incomplete_skill:
- full_skill_name = incomplete_skill + skill
- partially_parsed_skills.append(full_skill_name)
- incomplete_skill = ''
- skills_and_endorsements_parsed = []
- for string in partially_parsed_skills:
- string = string.strip('""').strip()
- if string:
- string_split = string.split(" : ")
- skill = string_split[0].strip('""').strip()
- endorsements = int(string_split[1].strip('""').strip())
- skills_and_endorsements_parsed.append(dict(skill=skill, endorsements=endorsements))
- most_endorsed_skills = sorted(
- skills_and_endorsements_parsed, key=lambda k: k['endorsements'], reverse=True)[:5]
- for nice_to_have in nice_to_haves:
- # SEPARATES THE SKILL FROM EXPERIENCE (IF PROVIDED).
- skill_and_experience = nice_to_have.split(':')
- nice_to_have = skill_and_experience[0].strip().lower()
- required_experience = 0
- if len(skill_and_experience) == 2:
- required_experience = float(skill_and_experience[1])
- # SKILL MAPPING
- if nice_to_have in mapping.keys():
- nice_to_have = mapping[nice_to_have]
- # CHECKS THE 5 MOST ENDORSED SKILLS FOR THE SKILL.
- if most_endorsed_skills:
- for endorsed_skill in most_endorsed_skills:
- if nice_to_have in endorsed_skill["skill"].lower():
- results["skills"].append(nice_to_have)
- break
- # CHECKS 'SUMMARY' FOR THE SKILL.
- for key, value in row.items():
- if 'Summary' in key:
- if nice_to_have in value.lower():
- results["summary"].append(nice_to_have)
- # CHECKS IF THE SKILL CAN BE FOUND IN A 'ORGANIZATION TITLE' OR 'ORGANIZATION DESCRIPTION',
- # AND CHECKS IF THE EXPERIENCE IS SUFFICIENT.
- numeration_of_jobs = []
- for key, value in row.items():
- if 'Organization Title' in key or 'Organization Description' in key:
- number = ''.join(filter(lambda x: x.isdigit(), key))
- if number not in numeration_of_jobs:
- numeration_of_jobs.append(number)
- numeration_of_jobs.sort()
- numeration_of_matching_jobs = []
- for number in numeration_of_jobs:
- title = row["Organization Title {}".format(number)]
- description = row["Organization Description {}".format(number)]
- if (nice_to_have in title or nice_to_have in description):
- if number not in numeration_of_matching_jobs:
- numeration_of_matching_jobs.append(number)
- elif ((any(kw in title for kw in keywords) or any(kw in description for kw in keywords))
- and nice_to_have in results["skills"]):
- if number not in numeration_of_matching_jobs:
- incomplete_title_or_description = True
- numeration_of_matching_jobs.append(number)
- numeration_of_matching_jobs.sort()
- # numeration_of_matching_jobs = []
- # for key, value in row.items():
- # if 'Organization Title' in key or 'Organization Description' in key:
- # number = ''.join(filter(lambda x: x.isdigit(), key))
- # if nice_to_have in value.lower() and number not in numeration_of_matching_jobs:
- # numeration_of_matching_jobs.append(number)
- # elif any(kw in value.lower() for kw in keywords) and nice_to_have in results["skills"]:
- # if ("Organization Title" in key
- # and nice_to_have in row["Organization Description {}".format(number)]):
- # pass
- # elif ("Organization Description" in key
- # and nice_to_have in row["Organization Title {}".format(number)]):
- # pass
- # else:
- # incomplete_title_or_description = True
- # numeration_of_matching_jobs.append(number)
- # numeration_of_matching_jobs.sort()
- organization_start_end = []
- for number in numeration_of_matching_jobs:
- organization_start_end.append(dict(
- start=row['Organization Start {}'.format(number)].lower(),
- end=row['Organization End {}'.format(number)].lower()))
- timedelta_objects = []
- for start_end_pair in organization_start_end:
- start_date = start_end_pair['start']
- end_date = start_end_pair['end']
- try:
- formatted_start = dateparser.parse(start_date, date_formats=["%b %Y"], languages=['en', 'pl'])
- except ValueError:
- formatted_start = dateparser.parse(start_date, date_formats=["%Y"], languages=['en', 'pl'])
- if end_date == 'present' or end_date == 'obecnie':
- formatted_end = datetime.datetime.today()
- elif end_date == 'less than a year' or end_date == 'mniej niż rok':
- formatted_end = datetime.datetime(formatted_start.year, 12, formatted_start.day, 0, 0)
- else:
- try:
- formatted_end = dateparser.parse(end_date, date_formats=["%b %Y"], languages=['en', 'pl'])
- except ValueError:
- formatted_end = dateparser.parse(end_date, date_formats=["%Y"], languages=['en', 'pl'])
- try:
- timedelta_objects.append(formatted_end - formatted_start)
- except TypeError:
- print("TypeError")
- total_skill_worktime = 0
- for timedelta_object in timedelta_objects:
- total_skill_worktime += timedelta_object.days
- total_skill_worktime = total_skill_worktime / 365
- if total_skill_worktime >= required_experience:
- results["title/description"].append(nice_to_have)
- # TAKES RESULTS AND CONVERTS THEM TO PERCENTAGE
- results_in_percents = {}
- for section_name, found_skills in results.items():
- if len(found_skills) == 0:
- percentage = 0
- results_in_percents[section_name] = percentage
- else:
- percentage = 100 * float(len(found_skills)) / float(len(nice_to_haves))
- results_in_percents[section_name] = percentage
- # CALCULATES PROFILE MATCH PERCENTAGE AND ADDS IT AS A FIELD TO ROW (PROFILE).
- skills = 3 * results_in_percents["skills"]
- summary = 1 * results_in_percents["summary"]
- title_description = 6 * results_in_percents["title/description"]
- weighted_average_divisor = 10
- if incomplete_title_or_description is True:
- title_description = 4 * results_in_percents["title/description"]
- weighted_average_divisor = 8
- profile_match_percentage = (summary + title_description + skills) / weighted_average_divisor
- row["Profile match percentage"] = profile_match_percentage
- self.accepted_profiles.append(row)
- def check_profiles_for_last_workplace(self):
- last_workplace = self.last_workplace
- csv_file = self.csv_file
- csv_reader = csv.DictReader(csv_file, delimiter=',')
- for row in csv_reader:
- matching_organization_numbers = []
- for workplace in last_workplace:
- workplace = workplace.strip()
- for key, value in row.items():
- correct_key = key.startswith('Organization') and len(key) <= 15 and key[-1].isdigit()
- correct_workplace = workplace.lower() in value.lower()
- if correct_key and correct_workplace:
- number = ''.join(filter(lambda x: x.isdigit(), key))
- if number not in matching_organization_numbers:
- matching_organization_numbers.append(number)
- matching_organization_numbers.sort()
- present_workplace_numbers = []
- for number in matching_organization_numbers:
- organization_end = row['Organization End {}'.format(number)].lower()
- if organization_end == 'present' or organization_end == 'obecnie':
- present_workplace_numbers.append(number)
- if not present_workplace_numbers:
- self.accepted_profiles.append(row)
- return self.accepted_profiles
- def check_profiles_for_location(self):
- csv_file = self.csv_file
- csv_reader = csv.DictReader(csv_file, delimiter=',')
- for row in csv_reader:
- location = unicodedata.normalize('NFKD', self.location).encode('ASCII', 'ignore').decode("utf-8")
- if location.lower() in row['Location'].lower():
- self.accepted_profiles.append(row)
- return self.accepted_profiles
- def generate_csv(self):
- response = HttpResponse(content_type='text/csv')
- response['Content-Disposition'] = 'attachment; filename="accepted_profiles.csv"'
- # fieldnames = [
- # "id", "Full name", "Email", "Profile url", "First name", "Last name", "Title", "Avatar",
- # "Location", "Address", "Birthday", "Summary", "Twitter", "Phone 1", "Phone 1 type", "Phone 2",
- # "Phone 2 type", "Phone 3", "Phone 3 type", "Messenger 1", "Messenger 1 type", "Messenger 2",
- # "Messenger 2 type", "Messenger 3", "Messenger 3 type", "Website 1", "Website 2", "Website 3",
- # "Organization 1", "Organization Title 1", "Organization Start 1", "Organization End 1",
- # "Organization Description 1", "Organization Location 1", "Organization LI URL 1", "Organization LI ID 1",
- # "Organization WWW 1", "Organization Domain 1", "Organization 2", "Organization Title 2",
- # "Organization Start 2", "Organization End 2", "Organization Description 2", "Organization Location 2",
- # "Organization LI URL 2", "Organization LI ID 2", "Organization WWW 2", "Organization Domain 2",
- # "Organization 3", "Organization Title 3", "Organization Start 3", "Organization End 3",
- # "Organization Description 3", "Organization Location 3", "Organization LI URL 3", "Organization LI ID 3",
- # "Organization WWW 3", "Organization Domain 3", "Organization 4", "Organization Title 4",
- # "Organization Start 4", "Organization End 4", "Organization Description 4", "Organization Location 4",
- # "Organization LI URL 4", "Organization LI ID 4", "Organization WWW 4", "Organization Domain 4",
- # "Organization 5", "Organization Title 5", "Organization Start 5", "Organization End 5",
- # "Organization Description 5", "Organization Location 5", "Organization LI URL 5", "Organization LI ID 5",
- # "Organization WWW 5", "Organization Domain 5", "Organization 6", "Organization Title 6",
- # "Organization Start 6", "Organization End 6", "Organization Description 6", "Organization Location 6",
- # "Organization LI URL 6", "Organization LI ID 6", "Organization WWW 6", "Organization Domain 6",
- # "Organization 7", "Organization Title 7", "Organization Start 7", "Organization End 7",
- # "Organization Description 7", "Organization Location 7", "Organization LI URL 7", "Organization LI ID 7",
- # "Organization WWW 7", "Organization Domain 7", "Education 1", "Education Degree 1",
- # "Education FOS 1", "Education Grade 1", "Education Start 1", "Education End 1", "Education Description 1",
- # "Education 2", "Education Degree 2", "Education FOS 2", "Education Grade 2", "Education Start 2",
- # "Education End 2", "Education Description 2", "Education 3", "Education Degree 3", "Education FOS 3",
- # "Education Grade 3", "Education Start 3", "Education End 3", "Education Description 3", "Skills",
- # "Followers", "Relationship", "Connected at", "Industry", "Mutual Count", "Mutual", "Mutual 1",
- # "Mutual 2", "Interests"]
- fieldnames = []
- for profile in self.accepted_profiles:
- for key in profile.keys():
- if key not in fieldnames:
- fieldnames.append(key)
- for profile in self.accepted_profiles:
- profile_keys = []
- for key in profile.keys():
- if key not in profile_keys:
- profile_keys.append(key)
- if sorted(profile_keys) != sorted(fieldnames):
- print("your code is shit")
- csv_writer = csv.DictWriter(response, fieldnames=fieldnames, delimiter=",")
- csv_writer.writeheader()
- for profile in self.accepted_profiles:
- csv_writer.writerow(profile)
- return response
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement