Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import os
- import re
- import sys
- import logging
- import httplib
- import inspect
- import argparse
- import requests
- import json
- import pprint
- import time
- logger = None
- def configure_logger(name=None, debug=False):
- global logger
- name = name or __name__
- logger = logging.getLogger(name)
- level = logging.DEBUG if debug else logging.INFO
- logger.setLevel(level)
- ch = logging.StreamHandler()
- ch.setLevel(logging.DEBUG)
- datefmt='%c'
- formatter = logging.Formatter("[%(asctime)s %(name)s] %(message)s", datefmt=datefmt)
- ch.setFormatter(formatter)
- logger.addHandler(ch)
- if debug:
- try:
- import http.client as http_client
- except ImportError:
- # Python 2
- import httplib as http_client
- http_client.HTTPConnection.debuglevel = 1
- def which(program):
- def is_qualified_exe(fpath):
- return len(os.path.split(fpath)[0]) and os.path.isfile(fpath) and os.access(fpath, os.X_OK)
- if is_qualified_exe(program):
- return program
- if sys.platform == "darwin":
- bg1 = "/Applications/%s/%s.app/Contents/MacOS/%s" % (program, program, program)
- bg2 = "/Applications/%s.app/Contents/MacOS/%s" % (program, program)
- for best_guess in (bg1, bg2):
- if is_qualified_exe(best_guess):
- return best_guess
- for path in os.environ["PATH"].split(os.pathsep):
- path = path.strip('"')
- best_guess = os.path.join(path, program)
- if is_qualified_exe(best_guess):
- return best_guess
- return None
- def touch(fname, times=None):
- with open(fname, 'a'):
- os.utime(fname, times)
- class TransformPipeline(object):
- def __init__(self, processors=None, **kw):
- self.output_dir = kw.get("output_dir", '.')
- self.init_kw = kw
- self.processors = self._scan_for_processors(processors)
- self.extmap = {}
- for proc in self.processors:
- self.extmap.update(proc.extmap)
- def _scan_for_processors(self, processors):
- processors = processors or list()
- if not processors:
- for (name, cls) in globals().items():
- if cls in (PrintProcess, PrintCommand):
- continue
- if not (inspect.isclass(cls) and issubclass(cls, PrintProcess)):
- continue
- processors.append(cls)
- return [proc(**self.init_kw) for proc in processors]
- def get_pipeline(self, inputfn, outputfn, level=1):
- inputext = os.path.splitext(inputfn)[-1]
- outputext = os.path.splitext(outputfn)[-1]
- instem = os.path.splitext(os.path.split(inputfn)[-1])[0]
- if inputext not in self.extmap:
- return []
- if outputext in self.extmap[inputext]:
- # we're done!
- return [(self.extmap[inputext][outputext], inputfn, outputfn)]
- res = []
- for _inputext in self.extmap[inputext]:
- _inputfn = os.path.abspath(os.path.join(self.output_dir, instem + _inputext))
- res = self.get_pipeline(_inputfn, outputfn, level=level+1)
- if res:
- res = [(self.extmap[inputext][_inputext], inputfn, _inputfn)] + res
- return res
- def run_pipeline(self, pipeline, **kw):
- for (method, inputfn, outputfn) in pipeline:
- method(inputfn, outputfn, **kw)
- if method.im_self.stale(inputfn, outputfn):
- msg = "%s -> %s failed (stale)" % (inputfn, outputfn)
- raise RuntimeError, msg
- def transform(self, inputfn, outputfn, **kw):
- outputfn = outputfn
- pipeline = self.get_pipeline(inputfn, outputfn)
- if not pipeline:
- raise RuntimeError, "Could not transform %s -> %s" % (inputfn, outputfn)
- self.run_pipeline(pipeline, **kw)
- class PrintProcess(object):
- FunctionTemplate = re.compile("transform_(.+)_to_(.+)")
- def __init__(self, *args, **kw):
- self.phony = kw.get("phony", False)
- self._extmap = None
- @property
- def extmap(self):
- if self._extmap == None:
- self._extmap = {}
- methods = [(self.FunctionTemplate.match(info[0]), info[1]) for info in inspect.getmembers(self) if inspect.ismethod(info[1])]
- methods = [list(match.groups()) + [info] for (match, info) in methods if match]
- extmap = {}
- for (ext1, ext2, method) in methods:
- ext1 = '.' + ext1
- ext2 = '.' + ext2
- self._extmap[ext1] = self._extmap.get(ext1, {})
- self._extmap[ext1][ext2] = method
- return self._extmap
- def transform(self, inputfn, outputfn, **args):
- if self.phony or self.stale(inputfn, outputfn):
- self._transform(inputfn, outputfn, **args)
- return self.stale(inputfn, outputfn)
- def _transform(self, inputfn, outputfn, **args):
- touch(outputfn)
- class PrintCommand(PrintProcess):
- Command = "__command__"
- CommandPath = None
- @property
- def command(self, hint=None):
- if not self.CommandPath:
- hint = hint or self.Command
- self.CommandPath = which(hint)
- if not self.CommandPath:
- print "Could not find %s" % hint
- return self.CommandPath
- def stale(self, inputfn, outputfn):
- if os.path.exists(inputfn) and os.path.exists(outputfn):
- return os.path.getmtime(inputfn) > os.path.getmtime(outputfn)
- return True
- def run(self, args):
- cmd = str.join(' ', [self.command] + args)
- msg = "running: %s" % cmd
- logger.debug(msg)
- os.system(cmd)
- class Cura_API(PrintCommand):
- Command = "cura"
- def _transform(self, inputfn, outputfn, **kw):
- args = []
- if kw.get("profile", None):
- args.append("-i %s" % args["profile"])
- if kw.get("slice", True):
- args.append("-s")
- args.append("-o %s %s" % (outputfn, inputfn))
- msg = "generating '%s' with Cura" % outputfn
- logger.info(msg)
- self.run(args)
- def transform_stl_to_gcode(self, *args, **kw):
- self.transform(*args, **kw)
- class OpenSCAD_API(PrintCommand):
- Command = "openscad"
- def _transform(self, inputfn, outputfn, **kw):
- args = []
- if kw.get("make", True):
- args.append("-m make")
- args.append("-o %s %s" % (outputfn, inputfn))
- msg = "generating '%s' with OpenSCAD" % outputfn
- logger.info(msg)
- self.run(args)
- def transform_scad_to_dxf(self, *args, **kw):
- self.transform(*args, **kw)
- def transform_scad_to_png(self, *args, **kw):
- self.transform(*args, **kw)
- def transform_scad_to_stl(self, *args, **kw):
- self.transform(*args, **kw)
- class OctoPrint_API(PrintProcess):
- def __init__(self, *args, **kw):
- super(OctoPrint_API, self).__init__(*args, phony=True, **kw)
- self._config = kw["config"]
- self._session = None
- self.location = "sdcard" if kw.get("sdcard", False) else "local"
- @property
- def key(self):
- return self.config["OctoAPI_KEY"]
- @property
- def url(self):
- while self.config["OctoPrint_URL"].endswith('/'):
- self.config["OctoPrint_URL"] = self.config["OctoPrint_URL"][:-1]
- return (self.config["OctoPrint_URL"] + "/api/")
- @property
- def session(self):
- if not self._session:
- self._session = requests.Session()
- self._session.headers["X-Api-Key"] = self.key
- self._session.keep_alive = False
- return self._session
- @property
- def config(self):
- return self._config
- def get(self, url, **args):
- msg = "GET %s %s" % (url, args)
- logger.debug(msg)
- return self.session.get(self.url + url, params=args)
- def post(self, url, files=None, data=None):
- msg = "POST %s args=%s files=%s" % (url, args, 0 if not files else len(files))
- logger.debug(msg)
- kw = {}
- if files:
- kw["files"] = files
- if data != None:
- kw["data"] = json.dumps(data)
- kw["headers"] = {"content-type": "application/json"}
- return self.session.post(self.url + url, **kw)
- def delete(self, url, **args):
- msg = "DELETE %s %s" % (url, args)
- logger.debug(msg)
- return self.session.delete(self.url + url, params=args)
- ## utils
- def check_response(self, res, code, error=False, **kw):
- failure = (res.status_code != code)
- if failure:
- if error:
- raise RuntimeError, res.text
- msg = "response code=%s: %s" % (res.status_code, res.text)
- logger.debug(msg)
- return None
- try:
- res = res.json()
- except ValueError:
- pass
- return res
- def stale(self, inputfn, outputfn, filename=None, **kw):
- stale = True
- filename = filename or os.path.split(inputfn)[-1]
- file_info = self.get_file(filename, error=False)
- if file_info:
- stale = os.path.getmtime(inputfn) > file_info["date"]
- return stale
- ## high level
- def has_file(self, filename, **kw):
- info = self.get_file(filename, **kw)
- return bool(info)
- def get_file(self, filename, error=False, **kw):
- url = str.join('/', ("files", self.location, filename))
- res = self.get(url, **kw)
- return self.check_response(res, 200, error=error, **kw)
- def post_file(self, path, filename, upload=False, error=True, data=None, **kw):
- params = {}
- if upload:
- url = "files/%s" % self.location
- fh = open(path)
- filedata = fh.read()
- params["files"] = {"file": (filename, filedata)}
- else:
- url = "files/%s/%s" % (self.location, filename)
- if data != None:
- params["data"] = data
- res = self.post(url, **params)
- return self.check_response(res, 201, **kw)
- def delete_file(self, filename, **kw):
- url = "files/%s/%s" % (self.location, filename)
- res = self.delete(url)
- return self.check_response(res, 204, **kw)
- def get_state(self, **kw):
- res = self.get("printer")
- return self.check_response(res, 200, **kw)
- def pause_job(self):
- res = self.post("control/job", data={'command': 'pause'})
- return self.check_response(res, 204)
- ## transform API
- def _transform(self, inputfn, outputfn, upload=False, select=False, _print=False, **kw):
- filename = os.path.split(inputfn)[-1]
- actions = []
- params = {}
- if os.path.exists(inputfn):
- stale = self.stale(inputfn, outputfn, filename=filename, **kw)
- upload = upload or stale
- if upload:
- actions.append("uploading")
- if _print or select:
- select = True
- actions.append("selecting")
- if _print:
- actions.append("printing")
- msg = "%s %s on OctoPrint" % (str.join(', ', actions), filename)
- logger.info(msg)
- if upload:
- if self.has_file(filename):
- self.delete_file(filename)
- self.post_file(inputfn, filename=filename, upload=True)
- time.sleep(1)
- else:
- params = {}
- if select:
- params["command"] = "select"
- if _print:
- params["print"] = True
- self.post_file(inputfn, filename=filename, data=params)
- def transform_gcode_to_print(self, inputfn, outputfn, **kw):
- kw["_print"] = True
- kw["select"] = True
- self.transform(inputfn, outputfn, **kw)
- def transform_gcode_to_upload(self, inputfn, outputfn, **kw):
- self.transform(inputfn, outputfn, **kw)
- def transform_gcode_to_select(self, inputfn, outputfn, **kw):
- kw["select"] = True
- self.transform(inputfn, outputfn, **kw)
- def load_config(fn, error=True):
- try:
- fh = open(fn)
- config = json.loads(fh.read())
- return config
- except:
- if error:
- msg = "Missing config, please run '%s init' first" % sys.argv[0]
- raise RuntimeError, msg
- return {}
- def save_config(fn, config):
- msg = "Saving config file to '%s'" % fn
- print msg
- fh = open(fn, 'w')
- fh.write(json.dumps(config, sort_keys=True, indent=4))
- def get_cli():
- def standard_arg_set(sp):
- sp.add_argument('-S', '--openscad_exe', help='path to OpenSCAD')
- sp.add_argument('-C', '--cura_exe', help='path to Cura')
- sp.add_argument('-F', '--cura_profile', help='path to Cura profile, used for slicing')
- sp.add_argument('-o', '--output_dir', help='output directory')
- sp.add_argument('-P', '--export_png', default=False, action="store_true", help='export image of object with OpenSCAD')
- sp.add_argument('-D', '--export_dxf', default=False, action="store_true", help='export DXF of object with OpenSCAD')
- sp.add_argument('input_filename', nargs=1, help='The filename to process')
- # global
- parser = argparse.ArgumentParser(description='octoprint command')
- parser.add_argument('-d', '--debug', default=False, action="store_true", help='enable debugging')
- parser.add_argument('-c', '--config', help='path to octocmd configuration file')
- parser.add_argument('-s', '--sdcard', default=False, action="store_true", help='Use sdcard instead of local storage')
- subparsers = parser.add_subparsers(help='sub-command help', dest="mode")
- # create the parser for the "init" command
- sp = subparsers.add_parser('init', help='initialize the configuration')
- # create the parser for the "status" command
- sp = subparsers.add_parser('status', help='printer status')
- # create the parser for the "pause" command
- sp = subparsers.add_parser('pause', help='pause current job')
- # create the parser for the "print" command
- sp = subparsers.add_parser('print', help='print a file')
- standard_arg_set(sp)
- # create the parser for the "select" command
- sp = subparsers.add_parser('select', help='select a file')
- standard_arg_set(sp)
- # create the parser for the "upload" command
- sp = subparsers.add_parser('upload', help='upload a file')
- standard_arg_set(sp)
- args = parser.parse_args()
- # post
- if not args.config:
- homedir = os.path.expanduser("~")
- args.config = os.path.join(homedir, ".octocmd.conf")
- if hasattr(args, "output_dir"):
- if not args.output_dir:
- input_path = os.path.split(args.input_filename[0])
- args.output_dir = input_path[0] or '.'
- if not os.path.exists(args.output_dir):
- os.mkdir(args.output_dir)
- return args
- def process_status(args):
- op = OctoPrint_API(**args.__dict__)
- res = op.get_state(error=True)
- def format_status(obj, level=0):
- def header(level):
- return ' ' * level
- rpt = ''
- vals = []
- for key in obj:
- if key == "text":
- continue
- val = obj[key]
- if type(val) == dict:
- if vals:
- rpt += header(vals[0]) + str.join(', ', vals[1:]) + '\n'
- rpt += '%s%s: ' % (header(level), key)
- if "text" in obj[key]:
- rpt += '%s\n' % obj[key]["text"]
- elif any([type(i) == dict for i in val.values()]):
- rpt += '\n'
- rpt += format_status(val, level + 1)
- vals = []
- else:
- val = '%s=%s' % (key, val)
- if not vals:
- vals.append(level)
- vals.append(val)
- if vals:
- rpt += str.join(', ', vals[1:]) + '\n'
- return rpt
- print format_status(res)
- def process_init(args):
- def test_config(config):
- op = OctoPrint_API(config=config)
- res = op.get_state()
- return (res != None)
- cache = load_config(args.config, error=False)
- complete = False
- while not complete:
- msg = "OctoPrint URL%s: " % ("[%(OctoPrint_URL)s]" % cache if 'OctoPrint_URL' in cache else '')
- url = raw_input(msg)
- url = url or cache.get("OctoPrint_URL", None)
- msg = "OctoPrint API Key%s: " % ("[%(OctoAPI_KEY)s]" % cache if 'OctoAPI_KEY' in cache else '')
- apikey = raw_input(msg)
- apikey = apikey or cache.get("OctoAPI_KEY", None)
- cache = {"OctoAPI_KEY": apikey, "OctoPrint_URL": url}
- if test_config(cache):
- question = "This configuration works, keep it? [Y/n]: "
- default_answer = 'y'
- else:
- question = "This configuration does not work, keep it? [y/N]: "
- default_answer = 'n'
- answer = raw_input(question).strip() or default_answer
- complete = len(answer) and (answer[0].lower() == 'y')
- save_config(args.config, cache)
- def process_target(args):
- targets = []
- infn = args.input_filename[0]
- stem = os.path.splitext(os.path.split(infn)[-1])[0]
- outfn = stem + "." + args.mode
- targets.append(outfn)
- if args.export_png:
- outfn = stem + ".png"
- targets.append(outfn)
- pipe = TransformPipeline(**args.__dict__)
- for targetfn in targets:
- pipe.transform(infn, targetfn)
- def process(args):
- if args.mode == "status":
- args.config = load_config(args.config)
- process_status(args)
- elif args.mode == 'pause':
- op = OctoPrint_API(**args.__dict__)
- res = op.pause_job()
- elif args.mode in ("print", "upload", "select"):
- args.config = load_config(args.config)
- process_target(args)
- elif args.mode in ("init"):
- process_init(args)
- if __name__ == "__main__":
- args = get_cli()
- configure_logger("octocmd", debug=args.debug)
- process(args)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement