Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from contextlib import suppress
- import dateutil.parser as dateparser
- from pp_scraper import activity_logger
- from pp_scraper.spiders.BaseScraper import BaseScraperItemPipeline
- from scrapy.http import FormRequest
- from scrapy_splash import SplashRequest
- TFOFRE_BASE_URL = "https://www.tforcefreight.com/ltl/apps/Tracking"
- DEFAULT_SCRIPT = """
- function main(splash, args)
- assert(splash:go(args.url))
- assert(splash:wait(10))
- return {
- html = splash:html(),
- cookies = splash:get_cookies(),
- }
- end
- """
- class TFOFREScraper(BaseScraperItemPipeline):
- name = "TFOFREScraper"
- def __init__(self, name=None, **kwargs):
- super().__init__(name, **kwargs)
- self.start_urls = [TFOFRE_BASE_URL]
- self.ip_proxy = self.strategy.ip_proxy
- def start_requests(self):
- for url in self.start_urls:
- yield self.produce_request(
- SplashRequest,
- url,
- args={
- 'lua_source': DEFAULT_SCRIPT,
- },
- endpoint='execute',
- callback=self.parse,
- errback=self.error_call_back,
- cache_args=['lua_source'],
- )
- def parse(self, response):
- token = response.css(
- "input[name=__RequestVerificationToken]::attr(value)"
- ).get()
- if not token:
- return self.save_result([])
- print(token)
- try:
- return self.ip_proxy.randomize_IP(
- FormRequest.from_response(
- response,
- formid="trackingByProForm",
- method="POST",
- formdata={
- "__RequestVerificationToken": token,
- "ProNumbers": self.parcel_id,
- },
- callback=self.track_parcel,
- errback=self.error_call_back,
- )
- )
- # return self.strategy.produce_request(
- # FormRequest,
- # url=TFOFRE_BASE_URL,
- # formdata={
- # "__RequestVerificationToken": token,
- # "ProNumbers": self.parcel_id,
- # },
- # callback=self.parse,
- # errback=self.error_call_back,
- # )
- except Exception:
- activity_logger.info(f"{self.name} {self.parcel_id} no form found")
- return self.save_result([])
- def track_parcel(self, response):
- import pdb
- pdb.set_trace()
- event_list = response.css("#deliveredDetailsRow1 tbody > tr")
- if not event_list:
- return self.save_result([])
- events = []
- for event_item in event_list:
- event = {}
- try:
- datetime = event_item.css(
- "td:nth-child(2)::text, td:nth-child(3)::text"
- ).getall()
- event_time = " ".join(datetime)
- event["event_time"] = dateparser.parse(event_time).isoformat()
- except Exception:
- continue
- event_type = event_item.css("td:nth-child(4)::text").get("").strip()
- if not event_type:
- continue
- event["event_type"] = event_type
- event_location = event_item.css("td::text").get("").strip()
- if event_location:
- event["event_location"] = event_location
- if event.get("event_time") and event.get("event_type"):
- events.append(event)
- extra_info = {}
- status_selector = "#trackingAccordion1 > div {}::text"
- parcel_carrier_status = (
- (
- response.css(status_selector.format("label"))
- or response.css(status_selector.format("h3"))
- )
- .get("")
- .strip()
- )
- if parcel_carrier_status:
- extra_info["parcel_carrier_status"] = parcel_carrier_status
- delivered_datetime = response.css(
- "#deliveredOnDate::text, #deliveredAtTime::text"
- ).getall()
- delivered_date_carrier = " ".join(delivered_datetime)
- with suppress(Exception):
- extra_info["delivered_date_carrier"] = dateparser.parse(
- delivered_date_carrier
- ).isoformat()
- parcel_received_by = response.css("#deliveredSignedBy::text").get("").strip()
- if parcel_received_by:
- extra_info["parcel_received_by"] = parcel_received_by
- parcel_product = response.css("#deliveredService::text").get("").strip()
- if parcel_product:
- extra_info["parcel_product"] = parcel_product
- name_selector = '#trackingAccordion1 div:contains("{}") + div label::text'
- parcel_to_name_carrier = (
- response.css(name_selector.format("Ship To")).get("").strip()
- )
- if parcel_to_name_carrier:
- extra_info["parcel_to_name_carrier"] = parcel_to_name_carrier
- parcel_from_name_carrier = (
- response.css(name_selector.format("Ship From")).get("").strip()
- )
- if parcel_from_name_carrier:
- extra_info["parcel_from_name_carrier"] = parcel_from_name_carrier
- to_location = response.css("#lblOrigShipToCityState::text").get("").strip()
- with suppress(Exception):
- to_location_list = to_location.split()
- with suppress(Exception):
- parcel_to_locality_carrier = " ".join(to_location_list[:-2])
- extra_info["parcel_to_locality_carrier"] = parcel_to_locality_carrier
- with suppress(Exception):
- to_country_code = to_location_list[-1]
- extra_info[
- "parcel_to_country_carrier"
- ] = self.get_country_name_from_country_code(to_country_code)
- from_location = response.css("#lbldestShipToCityState::text").get("").strip()
- with suppress(Exception):
- from_location_list = from_location.split()
- with suppress(Exception):
- parcel_from_locality_carrier = " ".join(from_location_list[:-2])
- extra_info["parcel_from_locality_carrier"] = parcel_from_locality_carrier
- with suppress(Exception):
- from_country_code = from_location_list[-1]
- extra_info[
- "parcel_from_country_carrier"
- ] = self.get_country_name_from_country_code(from_country_code)
- info_selector = '#deliveredDetailsRow1 div:contains("{}") + div label::text'
- pieces = response.css(info_selector.format("Pieces")).get("").strip()
- if pieces and pieces.isdigit():
- extra_info["parcel_piece_count_carrier"] = int(pieces)
- weight = response.css(info_selector.format("Weight")).get("").strip()
- if weight:
- extra_info["parcel_weight_carrier"] = weight
- # print(events)
- # print(extra_info)
- return self.save_result(events, **extra_info)
Advertisement
RAW Paste Data
Copied
Advertisement