Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import subprocess
- import logging
- import dateutil.parser
- from datetime import datetime, timedelta
- logging.basicConfig(filename="intake.log",filemode='a+',level=logging.DEBUG)
- logger = logging.getLogger(__name__)
- ROOT_CMD = "npm run intake -- "
- COMMANDS = ['stream', 'download', 'createIndex']
- HISTORY = None
- NODES = {
- "Order": {
- "tags": ["transactional"],
- },
- "OrderItem": {
- "tags": ["transactional"],
- "relationships": [
- "HAS_ORDER_ITEM",
- "CONTAINS_VARIANT"
- ]
- },
- "Review": {
- "tags": ["reviews"]
- },
- "Question": {
- "tags": ["reviews"]
- },
- "ClientResponse": {
- "tags": ["reviews"],
- "relationships":[
- "HAS_CLIENT_RESPONSE"
- ]
- },
- "Answer": {
- "tags": ["reviews"],
- "relationships": [
- "HAS_ANSWER"
- ]
- },
- "ABOM": {
- "tags": ["product", "material"],
- "relationships": [
- "HAS_PRIMARY_COLOR",
- "HAS_SECONDARY_COLOR",
- "HAS_LOGO_COLOR",
- "HAS_ARTICLE"
- ]
- },
- "Article": {
- "tags": ["product"],
- "relationships": [
- "INCLUDES_BOM_SECTION",
- "HAS_PRIMARY_COLOR",
- "HAS_SECONDARY_COLOR",
- "HAS_LOGO_COLOR",
- "HAS_ARTICLE"
- ]
- },
- "ArticleImageAsset": {
- "tags": ["product"],
- "relationships": [
- "HAS_ARTICLE_IMAGE"
- ]
- },
- "BOMSection": {
- "tags": ["product", "material"]
- },
- "CBOM": {
- "tags": ["product", "material"],
- "relationships":[
- "INCLUDES_ABOM",
- "INCLUDES_COLOR",
- "INCLUDES_ARTICLE",
- ]
- },
- "Color": {
- "tags": ["product", "material"]
- },
- "Fabric": {
- "tags": ["product", "material"]
- },
- "FeaturesAndBenefits": {
- "tags": ["product","test"]
- },
- "PlannedDevelopment": {
- "tags": ["product"]
- },
- "MiscPart": {
- "tags": ["product", "material"]
- },
- "ProductConfiguration": {
- "tags": ["product", "material"]
- },
- "ProductConfigurationChoice": {
- "tags": ["product", "material"],
- "relationships":[
- "HAS_PRODUCT_CONFIGURATION_CHOICE",
- "HAS_PLANNED_DEVELOPMENT"
- ]
- },
- "ProductPlan": {
- "tags": ["product", "material"]
- },
- "SeasonalPlan": {
- "tags": ["product"]
- },
- "SizeChart": {
- "tags": ["product"]
- },
- "SizeVariant": {
- "tags": ["product"]
- },
- "Style": {
- "tags": ["product"],
- "relationships":[
- "HAS_DIVISION",
- "HAS_FITTYPE",
- "HAS_GENDER",
- "HAS_SUBCATEGORY",
- ]
- },
- "RegionalPrice": {
- "tags": ["product","pricing"],
- "relationships": [
- "HAS_REGIONAL_PRICE"
- ]
- },
- "SubCategory": {
- "tags": ["product"]
- },
- "Royalty": {
- "tags": ["product"]
- },
- "Trim": {
- "tags": ["product", "material"]
- },
- "FitType": {
- "tags": ["product"],
- },
- "Gender": {
- "tags": ["product"]
- },
- "Division": {
- "tags": ["product"]
- },
- }
- RELATIONSHIPS = {
- "HAS_FEATURES_AND_BENEFITS": {
- "tags": ["product","test"]
- },
- "HAS_REVIEW": {
- "tags": ["reviews"]
- },
- "HAS_SIZE_CHART": {
- "tags": ["product"]
- },
- "INCLUDES_MISC_PART": {
- "tags": ["product"]
- },
- "HAS_QUESTION": {
- "tags": ["reviews"]
- },
- "HAS_ROYALTY": {
- "tags": ["product"]
- },
- "INCLUDES_FABRIC": {
- "tags": ["product"]
- },
- "INCLUDES_TRIM": {
- "tags": ["product"]
- },
- "PLANNED_FOR_SEASON": {
- "tags": ["product"]
- },
- }
- FAKE_RUN = False
- def get_command(command=None, nodes=None, relationships=None):
- if command not in COMMANDS and command is not None:
- raise ("Command must be one of the following commands: " + ','.join(COMMANDS))
- cmd = ROOT_CMD
- if nodes:
- cmd += ' --nodes=' + ','.join(nodes.keys())
- if relationships:
- cmd += ' --relationships=' + ','.join(
- relationships.keys() if isinstance(relationships,dict) else relationships)
- if command:
- return ''.join([cmd, ' --', command])
- else:
- return cmd
- def run_command(cmd):
- if FAKE_RUN:
- print(cmd)
- return True
- else:
- try:
- logger.info("#####################################")
- logger.info(f"Running '{cmd}'")
- logger.info("#####################################")
- output = subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True)
- except subprocess.CalledProcessError as exc:
- logger.error(f"Failed with return code {exc.returncode}:\n{exc.output}\n\n")
- return False
- else:
- logger.info(f"Succeeded: \n{output}")
- return True
- def create_indices(nodes):
- return run_command(get_command("createIndex", nodes))
- def download_nodes(nodes, stream_to_db=False):
- return run_command(get_command("stream" if stream_to_db else "download", nodes=nodes))
- def download_relationships(relationships, nodes=None, stream_to_db=False):
- return run_command(get_command("stream" if stream_to_db else "download", nodes=nodes, relationships=relationships))
- def upload_nodes(nodes):
- return run_command(get_command(nodes=nodes))
- def upload_relationships(relationships, nodes=None):
- return run_command(get_command(nodes=nodes, relationships=relationships))
- def download_by_tags(tags, stream_to_db=False):
- if not isinstance(tags, list):
- tags = [tags]
- # find the nodes to import (based on the tags given)
- nodes_to_import = {k: v for k, v in NODES.items() if list(set(tags) & set(v['tags']))}
- relationships_to_import = {k: v for k, v in RELATIONSHIPS.items() if list(set(tags) & set(v['tags']))}
- # Run the command for one node at a time.
- action = "Streaming" if stream_to_db else "Downloading"
- action_past = "streamed" if stream_to_db else "downloaded"
- failures = []
- for name, info in nodes_to_import.items():
- print("###################################################")
- print("{0} {1}:".format(action,name))
- if completed_recently(name):
- print("Skipping {0} because it was uploaded recently".format(name))
- continue
- if stream_to_db:
- print(" > Creating index")
- if not create_indices(nodes={name: info}):
- print(" >> Create index on {0} failed. See logs for more info".format(name))
- failures.append("Create Index: {0}".format(name))
- else:
- print(" >> Successfully created")
- print(" > {0} node only".format(action))
- if not download_nodes(nodes={name: info}, stream_to_db=stream_to_db):
- print(" >> {0} {1} failed. See logs for more info".format(action,name))
- failures.append("{1}: {0}".format(action,name))
- else:
- completed_now(name)
- print(" >> Successfully {0} {1}".format(action_past,name))
- # We have to do the nested relationships for the nodes only after all the connected nodes have been
- # created so we have to do this in two separate loops.
- for name, info in nodes_to_import.items():
- if 'relationships' in info and isinstance(info['relationships'], list) and len(info['relationships']) > 0:
- if completed_recently(name+"_relationships"):
- print("Skipping {0} relationships because they were uploaded recently".format(name))
- continue
- print(" > {0} relationships for {1} only".format(action,name))
- if not download_relationships(nodes={name: info}, relationships=info['relationships'], stream_to_db=stream_to_db):
- print(" >> {0} relationships failed for {1}. See logs for more info".format(action, name))
- failures.append("{0} relationships for {1}: {2}".format(action, name, ','.join(info['relationships'])))
- else:
- completed_now(name+"_relationships")
- print(" >> Successfully {0} relationships for {1}".format(action_past, name))
- # Now handle independent relationships
- if len(relationships_to_import) > 0:
- print(" > {0} independent relationships".format(action))
- for name,info in relationships_to_import.items():
- if completed_recently(name):
- print("Skipping {0} because it was uploaded recently".format(name))
- continue
- print(" >> {0} {1} relationship".format(action, name))
- if not download_relationships(relationships={name: info}, stream_to_db=stream_to_db):
- print(" >> {0} independent relationship {1} failed . See logs for more info".format(action, name))
- failures.append("{0} relationship: {1}".format(action, name))
- else:
- completed_now(name)
- print(" >> Successfully {0} relationship {1}".format(action_past, name))
- print("-----------------")
- if len(failures) > 0:
- print("The following actions failed during execution. See logs for more information")
- for f in failures:
- print("* " + f)
- print("###############")
- def load_history():
- global HISTORY
- if not HISTORY:
- with open('./intake_history.json', 'w+') as fp:
- try:
- HISTORY = json.load(fp)
- except json.JSONDecodeError:
- HISTORY = {}
- return HISTORY
- def save_history():
- h = load_history()
- if h:
- with open('./intake_history.json', 'w+') as fp:
- json.dump(HISTORY, fp)
- def completed_recently(node_or_relationship):
- h = load_history()
- if node_or_relationship in h:
- date_ob = dateutil.parser.parse(h['date'])
- if datetime.now() - date_ob < timedelta(days=7):
- return True
- return False
- def completed_now(node_or_relationship):
- h = load_history()
- h[node_or_relationship] = {"date": datetime.now().isoformat()}
- save_history()
- download_by_tags(["product"], True)
- exit(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement