###############################################################################
##
## Copyright (C) 2011-2013, NYU-Poly.
## Copyright (C) 2006-2011, University of Utah.
## All rights reserved.
## Contact: contact@vistrails.org
##
## This file is part of VisTrails.
##
## "Redistribution and use in source and binary forms, with or without
## modification, are permitted provided that the following conditions are met:
##
## - Redistributions of source code must retain the above copyright notice,
## this list of conditions and the following disclaimer.
## - Redistributions in binary form must reproduce the above copyright
## notice, this list of conditions and the following disclaimer in the
## documentation and/or other materials provided with the distribution.
## - Neither the name of the University of Utah nor the names of its
## contributors may be used to endorse or promote products derived from
## this software without specific prior written permission.
##
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
## AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
## THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
## PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
## CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
## EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
## PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
## OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
## OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
## ADVISED OF THE POSSIBILITY OF SUCH DAMAGE."
##
###############################################################################
""" This is the application for vistrails when running as a server. """
import Queue
import base64
import hashlib
import inspect
import sys
import logging
import logging.handlers
import os
import re
import shutil
import subprocess
import tempfile
import time
import traceback
import urllib
import xmlrpclib
import ConfigParser
from PyQt4 import QtGui, QtCore
import SocketServer
from SimpleXMLRPCServer import SimpleXMLRPCServer
from datetime import date, datetime
from time import strptime
from vistrails.core.configuration import get_vistrails_configuration
from vistrails.core.application import VistrailsApplicationInterface
import vistrails.gui.theme
import vistrails.core.application
from vistrails.gui import qt
from vistrails.core.db.locator import DBLocator, ZIPFileLocator, FileLocator
from vistrails.core.db import io
import vistrails.core.db.action
from vistrails.core.utils import InstanceObject
from vistrails.core.vistrail.vistrail import Vistrail
from vistrails.core import command_line
from vistrails.core import system
from vistrails.core.modules.module_registry import get_module_registry as module_registry
from vistrails.core import interpreter
from vistrails.core.packagemanager import get_package_manager
import vistrails.core.system
import vistrails.db.services.io
import gc
import vistrails.core.requirements
import vistrails.core.console_mode
from vistrails.db.versions import currentVersion
ElementTree = vistrails.core.system.get_elementtree_library()
################################################################################
class StoppableXMLRPCServer(SimpleXMLRPCServer):
"""This class allows a server to be stopped by a external request"""
#accessList contains the list of ip addresses and hostnames that can send
#request to this server. Change this according to your server
global accessList
allow_reuse_address = True
def __init__(self, addr, logger):
self.logger = logger
SimpleXMLRPCServer.__init__(self, addr)
def serve_forever(self):
self.stop = False
while not self.stop:
self.handle_request()
def verify_request(self, request, client_address):
if client_address[0] in accessList:
self.logger.info("Request from %s allowed!"%str(client_address))
return 1
else:
self.logger.info("Request from %s denied!"%str(client_address))
return 0
################################################################################
class ThreadedXMLRPCServer(SocketServer.ThreadingMixIn,
StoppableXMLRPCServer): pass
"""This is a multithreaded version of the RPC Server. For each request, the
server will spawn a thread. Notice that these threads cannot use any Qt
related objects because they won't be in the main thread."""
################################################################################
class RequestHandler(object):
"""This class will handle all the requests sent to the server.
Add new methods here and they will be exposed through the XML-RPC interface
"""
def __init__(self, logger, instances):
self.server_logger = logger
self.instances = instances
self.proxies_queue = None
self.instantiate_proxies()
#proxies
def instantiate_proxies(self):
"""instantiate_proxies() -> None
If this server started other instances of VisTrails, this will create
the client proxies to connect to them.
"""
if len(self.instances) > 0:
self.proxies_queue = Queue.Queue()
for uri in self.instances:
try:
proxy = xmlrpclib.ServerProxy(uri)
self.proxies_queue.put(proxy)
self.server_logger.info("Instantiated client for %s" % uri)
except Exception, e:
self.server_logger.error("Error when instantiating proxy %s" % uri)
self.server_logger.error(str(e))
#utils
def memory_usage(self):
"""memory_usage() -> dict
Memory usage of the current process in kilobytes. We plan to
use this to clear cache on demand later.
I believe this works on Linux only.
"""
status = None
result = {'peak': 0, 'rss': 0}
try:
# This will only work on systems with a /proc file system
# (like Linux).
status = open('/proc/self/status')
for line in status:
parts = line.split()
key = parts[0][2:-1].lower()
if key in result:
result[key] = int(parts[1])
finally:
if status is not None:
status.close()
return result
def path_exists_and_not_empty(self, path):
"""path_exists_and_not_empty(path:str) -> boolean
Returns True if given path exists and it's not empty, otherwise returns
False.
"""
if os.path.exists(path):
n = 0
for root, dirs, file_names in os.walk(path):
n += len(file_names)
if n > 0:
return True
return False
def try_ping(self):
return 1
#crowdlabs
def get_wf_modules(self, host, port, db_name, vt_id, version):
"""get_wf_modules(host:str, port:int, db_name:str, vt_id:int,
version:int) -> (return_status, list of dict)
Returns a list of information about the modules used in a workflow
in a list of dictionaries. The dictionary has the following keys:
name, package, documentation.
"""
self.server_logger.info("Request: get_wf_modules(%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id, version))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
v = locator.load().vistrail
p = v.getPipeline(long(version))
if p:
result = []
for module in p.module_list:
descriptor = module_registry().get_descriptor_by_name(
module.package,
module.name,
module.namespace)
documentation = descriptor.module_documentation(module)
result.append({'name':module.name,
'package':module.package,
'documentation':documentation})
return (result, 1)
else:
result = "Pipeline was not materialized"
self.server_logger.error(str(result))
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
result = str(e)
self.server_logger.error(result)
self.server_logger.error(traceback.format_exc())
return (result, 0)
def get_packages(self):
"""get_packages()-> dict
This returns a dictionary with all the packages available in the
VisTrails registry.
The keys are the package identifier and for each identifier there's a
dictionary with modules and description.
"""
self.server_logger.info("Request: get_packages()")
try:
package_dic = {}
for package in module_registry().package_list:
package_dic[package.identifier] = {}
package_dic[package.identifier]['modules'] = []
for module in package._db_module_descriptors:
documentation = inspect.getdoc(module.module)
if documentation:
documentation = re.sub('^ *\n', '', documentation.rstrip())
else:
documentation = "(No documentation available)"
package_dic[package.identifier]['modules'].append({'name':module.name,
'package':module.package,
'documentation':documentation})
package_dic[package.identifier]['description'] = \
package.description if package.description else "(No description available)"
return (package_dic, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_server_packages(self, codepath=None, status=None):
"""get_server_packages()-> dict
This returns a dictionary with all the packages to vistrails with status indicating wether it is loaded.
It is also possible to enable/disable a package by passing a package codepath and the desired status on/off
The keys are the package identifier.
"""
self.server_logger.info("Request: get_server_packages()")
messages = []
if self.proxies_queue is not None:
# collect all proxies:
proxies = []
while len(proxies) < len(self.instances):
self.server_logger.info("Proxies: %s Instances: %s" % (len(proxies), len(self.instances)))
if self.proxies_queue.empty():
for p in proxies:
self.proxies_queue.put(p)
return [[[],
"Not all vistrail instances are free, please try again."], 1]
proxies.append(self.proxies_queue.get())
for proxy in proxies:
try:
if codepath and status is not None:
result, s = proxy.get_server_packages(codepath, status)
else:
result, s = proxy.get_server_packages()
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
if s == 0:
messages.append('An error occurred: %s' % result)
else:
messages.append(result[1])
self.proxies_queue.put(proxy)
try:
pkg_manager = get_package_manager()
message = ''
if codepath and status is not None:
if int(status):
# Try to enable package
try:
pkg_manager.late_enable_package(codepath)
message = "Successfully enabled package '%s'" % codepath
except Exception, e:
message = "Could not enable package '%s': %s %s" % \
(codepath, str(e), traceback.format_exc())
else:
# Try to disable package
if codepath in ["basic_modules", 'abstraction']:
message = "Package '%s' cannot be disabled" % codepath
elif not pkg_manager.can_be_disabled(
pkg_manager.get_package_by_codepath(codepath).identifier):
message = "Package '%s' cannot be disabled because other packages depends on it." % codepath
else:
try:
pkg_manager.remove_package(codepath)
message = "Successfully disabled package '%s'" % codepath
except Exception, e:
message = "Could not disable package '%s': %s %s" % \
(codepath, str(e), traceback.format_exc())
packages = []
enabled_pkgs = sorted(pkg_manager.enabled_package_list())
enabled_pkg_dict = dict([(pkg.codepath, pkg) for
pkg in enabled_pkgs])
for pkg in sorted([pkg.codepath for pkg in enabled_pkgs]):
packages.append([pkg, True])
available_pkg_names = [pkg for pkg in
sorted(pkg_manager.available_package_names_list())
if pkg not in enabled_pkg_dict]
for pkg in available_pkg_names:
packages.append([pkg, False])
if codepath and messages:
# we are the main instance so assemble messages
message = ''.join(["Main instance: %s" % message] + \
["
Instance %s: %s" % (i+1, m) for i, m in
zip(xrange(len(messages)), messages)])
return [[packages, message], 1]
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def add_vt_to_db(self, host, port, db_name, user, vt_filepath, filename,
repository_vt_id, repository_creator, is_local=True):
"""add_vt_to_db(host:str, port:int, db_name:str, user:str,
vt_filepath:str(or datastream), filename:str,
repository_vt_id:int, repository_creator:str) ->
(return_status, int)
This will add a vistrail in vt_filepath to the the database. If running
on a remote machine, vt_filepath will contain vt file data stream.
Before adding it it will annotate the vistrail with the
repository_vt_id and repository_creator.
"""
try:
if is_local:
locator = ZIPFileLocator(vt_filepath).load()
else:
# vt_filepath contains vt file datastream
# write to tmp file, read into FileLocator
# TODO: can we just read the file stream directly in?
(fd, fname) = tempfile.mkstemp(prefix='vt_tmp',
suffix='.vt')
os.close(fd)
try:
vt_file = open(fname, "wb")
vt_file.write(vt_filepath.data)
vt_file.close()
locator = ZIPFileLocator(fname).load()
finally:
os.unlink(fname)
# set some crowdlabs id info
if repository_vt_id != -1:
vistrail = locator.vistrail
vistrail.set_annotation('repository_vt_id', repository_vt_id)
vistrail.set_annotation('repository_creator', repository_creator)
db_locator = DBLocator(host=host, port=int(port), database=db_name,
name=filename, user=db_write_user, passwd=db_write_pass)
db_locator.save_as(locator)
return (db_locator.obj_id, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def merge_vt(self, host, port, db_name, user, new_vt_filepath,
old_db_vt_id, is_local=True):
"""
Merge new_vt (new_vt_filepath) with current vt (old_db_vt_id)
new_vt_filepath is either filepath to vt, or datastream of vt,
depending on if the server is on a remote machine
"""
self.server_logger.info("Request: merge_vt(%s,%s,%s,%s,%s,%s,%s)" % \
(host, port, db_name, user, new_vt_filepath,
old_db_vt_id, is_local))
try:
tmp_file = None
if is_local:
new_locator = ZIPFileLocator(new_vt_filepath)
else:
# vt_filepath contains vt file datastream
# write to tmp file, read into FileLocator
# TODO: can we just read the file stream directly in?
(fd, tmp_file) = tempfile.mkstemp(prefix='vt_tmp',
suffix='.vt')
os.close(fd)
vt_file = open(tmp_file, "wb")
vt_file.write(new_vt_filepath.data)
vt_file.close()
new_locator = ZIPFileLocator(tmp_file)
new_bundle = new_locator.load()
new_locator.save(new_bundle)
old_db_locator = DBLocator(host=host, port=int(port), database=db_name,
obj_id=int(old_db_vt_id), user=db_write_user, passwd=db_write_pass)
old_db_bundle = old_db_locator.load()
vistrails.db.services.vistrail.merge(old_db_bundle, new_bundle, 'vistrails')
old_db_locator.save(old_db_bundle)
new_locator.save(old_db_bundle)
if tmp_file is not None:
os.unlink(tmp_file)
return (1, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def remove_vt_from_db(self, host, port, db_name, user, vt_id):
"""remove_vt_from_db(host:str, port:int, db_name:str, user:str,
vt_id:int) -> (return_status, 0 or 1)
Remove a vistrail from the repository
"""
config = {}
config['host'] = host
config['port'] = int(port)
config['db'] = db_name
config['user'] = db_write_user
config['passwd'] = db_write_pass
try:
conn = vistrails.db.services.io.open_db_connection(config)
vistrails.db.services.io.delete_entity_from_db(conn,'vistrail', vt_id)
vistrails.db.services.io.close_db_connection(conn)
return (1, 1)
except Exception, e:
self.server_logger.error(str(e))
if conn:
vistrails.db.services.io.close_db_connection(conn)
return (str(e), 0)
def get_runnable_workflows(self, host, port, db_name, vt_id):
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(vistrail, _, _, _) = io.load_vistrail(locator)
# get server packages
local_packages = [x.identifier for x in \
module_registry().package_list]
runnable_workflows = []
py_source_workflows = []
local_data_modules = ['File', 'FileSink', 'Path']
# find runnable workflows
for version_id, version_tag in vistrail.get_tagMap().iteritems():
pipeline = vistrail.getPipeline(version_id)
workflow_packages = set()
on_repo = True
has_python_source = False
for module in pipeline.module_list:
# count modules that use data unavailable to web repo
if module.name[-6:] == 'Reader' or \
module.name in local_data_modules:
has_accessible_data = False
for edge in pipeline.graph.edges_to(module.id):
# TODO check for RepoSync checksum param
if pipeline.modules[edge[0]].name in \
['HTTPFile', 'RepoSync']:
has_accessible_data = True
if not has_accessible_data:
on_repo = False
elif module.name == "PythonSource":
has_python_source = True
# get packages used in tagged versions of this VisTrail
workflow_packages.add(module.package)
# ensure workflow doesn't use unsupported packages
if not filter(lambda p: p not in local_packages,
workflow_packages):
if has_python_source and on_repo and \
version_id not in py_source_workflows:
py_source_workflows.append(version_id)
elif not has_python_source and on_repo and \
version_id not in runnable_workflows:
runnable_workflows.append(version_id)
return ((runnable_workflows, py_source_workflows), 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
#medleys
def executeMedley(self, xml_medley, extra_info=None):
self.server_logger.info("executeMedley request received")
try:
self.server_logger.info(xml_medley)
xml_string = xml_medley.replace('\\"','"')
root = ElementTree.fromstring(xml_string)
try:
medley = MedleySimpleGUI.from_xml(root)
except:
#even if this error occurred there's still a chance of
# recovering from it... (the server can find cached images)
self.server_logger.error("couldn't instantiate medley")
self.server_logger.debug("%s medley: %s"%(medley._type, medley._name))
result = ""
subdir = hashlib.sha224(xml_string).hexdigest()
path_to_images = \
os.path.join(media_dir, 'medleys/images', subdir)
if (not self.path_exists_and_not_empty(path_to_images) and
self.proxies_queue is not None):
#this server can send requests to other instances
proxy = self.proxies_queue.get()
try:
self.server_logger.info("Sending request to %s" % proxy)
if extra_info is not None:
result = proxy.executeMedley(xml_medley, extra_info)
else:
result = proxy.executeMedley(xml_medley)
self.proxies_queue.put(proxy)
self.server_logger.info("returning %s"% result)
return result
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
if extra_info is None:
extra_info = {}
if extra_info.has_key('pathDumpCells'):
if extra_info['pathDumpCells']:
extra_path = extra_info['pathDumpCells']
else:
extra_info['pathDumpCells'] = path_to_images
if not self.path_exists_and_not_empty(extra_info['pathDumpCells']):
if not os.path.exists(extra_info['pathDumpCells']):
os.mkdir(extra_info['pathDumpCells'])
if medley._type == 'vistrail':
locator = DBLocator(host=db_host,
port=3306,
database='vistrails',
user=db_write_user,
passwd=db_write_pass,
obj_id=medley._vtid,
obj_type=None,
connection_id=None)
workflow = medley._version
sequence = False
for (k,v) in medley._alias_list.iteritems():
if v._component._seq == True:
sequence = True
val = XMLObject.convert_from_str(v._component._minVal,
v._component._spec)
maxval = XMLObject.convert_from_str(v._component._maxVal,
v._component._spec)
#making sure the filenames are generated in order
mask = '%s'
if isinstance(maxval, (int, long)):
mask = '%0' + str(len(v._component._maxVal)) + 'd'
while val <= maxval:
s_alias = "%s=%s$&$" % (k,val)
for (k2,v2) in medley._alias_list.iteritems():
if k2 != k and v2._component._val != '':
s_alias += "%s=%s$&$" % (k2,v2._component._val)
if s_alias != '':
s_alias = s_alias[:-3]
self.server_logger.info("Aliases: %s" % s_alias)
try:
gc.collect()
results = \
vistrails.core.console_mode.run_and_get_results( \
[(locator,int(workflow))],
s_alias,
extra_info=extra_info)
self.server_logger.info("Memory usage: %s"% self.memory_usage())
interpreter.cached.CachedInterpreter.flush()
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
ok = True
for r in results:
(objs, errors, _) = (r.objects, r.errors, r.executed)
for e in errors.itervalues():
self.server_logger.error("Module failed: %s"% str(e))
for i in objs.iterkeys():
if errors.has_key(long(i)):
ok = False
result += str(errors[i])
if ok:
self.server_logger.info("renaming files")
for root, dirs, file_names in os.walk(extra_info['pathDumpCells']):
break
s = []
for f in file_names:
if f.lower().endswith(".png"):
fmask = "%s_"+mask+"%s"
os.renames(os.path.join(root,f),
os.path.join(root,"%s" % f[:-4],
fmask% (f[:-4],val,f[-4:])))
if val < maxval:
val += XMLObject.convert_from_str(v._component._stepSize,
v._component._spec)
if val > maxval:
val = maxval
else:
break
if not sequence:
s_alias = ''
for (k,v) in medley._alias_list.iteritems():
if v._component._val != '':
s_alias += "%s=%s$&$" % (k,v._component._val)
if s_alias != '':
s_alias = s_alias[:-3]
self.server_logger.info("Not sequence aliases: %s"% s_alias)
try:
results = \
vistrails.core.console_mode.run_and_get_results( \
[(locator,int(workflow))],
s_alias,
extra_info=extra_info)
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
ok = True
for r in results:
(objs, errors, _) = (r.objects, r.errors, r.executed)
for e in errors.itervalues():
self.server_logger.error(str(e))
for i in objs.iterkeys():
if errors.has_key(long(i)):
ok = False
result += str(errors[i])
self.server_logger.info( "success? %s"% ok)
elif medley._type == 'visit':
cur_dir = os.getcwd()
os.chdir(self.temp_configuration.spreadsheetDumpCells)
if medley._id == 6:
session_file = 'crotamine.session'
elif medley._id == 7:
session_file = '1NTS.session'
else:
session_file = 'head.session'
session_file = '/server/code/visit/saved_sessions/' + session_file
self.server_logger.info("session_file: %s" % session_file)
ok = os.system('/server/code/visit/vistrails_plugin/visit/render-session.sh ' + \
session_file) == 0
self.server_logger.info( "success? %s" % ok)
os.chdir(cur_dir)
else:
self.server_logger.info("Found cached images.")
ok = True
if ok:
s = []
self.server_logger.info("images path: %s"%extra_info['pathDumpCells'])
for root, dirs, file_names in os.walk(extra_info['pathDumpCells']):
sub = []
#n = len(file_names)
#print "%s file(s) generated" % n
file_names.sort()
for f in file_names:
sub.append(os.path.join(root[root.find(subdir):],
f))
s.append(";".join(sub))
result = ":::".join(s)
# FIXME: copy images to extra_path
self.server_logger.info("returning %s" % result)
return (result, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
#vistrails
def run_from_db(self, host, port, db_name, vt_id, path_to_figures,
version=None, pdf=False, vt_tag='', build_always=False,
parameters='', is_local=True):
self.server_logger.info("Request: run_vistrail_from_db(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id,
path_to_figures, version, pdf,
vt_tag, build_always, parameters, is_local))
self.server_logger.info("path_exists_and_not_empty? %s" % self.path_exists_and_not_empty(path_to_figures))
self.server_logger.info("build_always? %s" % build_always)
self.server_logger.info(str(self.proxies_queue))
if not is_local:
# use same hashing as on crowdlabs webserver
dest_version = "%s_%s_%d_%d_%d" % (host, db_name, int(port), int(vt_id), int(version))
dest_version = hashlib.sha1(dest_version).hexdigest()
path_to_figures = os.path.join(media_dir, "wf_execution", dest_version)
if ((not self.path_exists_and_not_empty(path_to_figures) or
build_always) and self.proxies_queue is not None):
self.server_logger.info("will forward request")
#this server can send requests to other instances
proxy = self.proxies_queue.get()
try:
self.server_logger.info("Sending request to %s" % proxy)
result = proxy.run_from_db(host, port, db_name, vt_id,
path_to_figures, version, pdf, vt_tag,
build_always, parameters, is_local)
self.proxies_queue.put(proxy)
self.server_logger.info("returning %s" % result)
return result
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
extra_info = {}
extra_info['pathDumpCells'] = path_to_figures
self.server_logger.debug(path_to_figures)
extra_info['pdf'] = pdf
self.server_logger.debug("pdf: %s" % pdf)
# execute workflow
ok = True
if (not self.path_exists_and_not_empty(extra_info['pathDumpCells'])
or build_always):
if os.path.exists(extra_info['pathDumpCells']):
shutil.rmtree(extra_info['pathDumpCells'])
os.mkdir(extra_info['pathDumpCells'])
result = ''
if vt_tag !='':
version = vt_tag;
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_write_user,
passwd=db_write_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
results = []
try:
results = \
vistrails.core.console_mode.run_and_get_results([(locator,
int(version))],
parameters,
update_vistrail=True,
extra_info=extra_info,
reason="Server Pipeline Execution")
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
ok = True
for r in results:
(objs, errors, _) = (r.objects, r.errors, r.executed)
for i in objs.iterkeys():
if errors.has_key(i):
ok = False
result += str(errors[i])
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
if ok:
if is_local:
return (1, 1)
else:
# TODO pdf version
images = [im for im in os.listdir(path_to_figures) if im[-3:] == "png"]
results = {}
for image in images:
handler = open(os.path.join(path_to_figures, image), "rb")
image_data = handler.read()
handler.close()
results[image] = xmlrpclib.Binary(image_data)
return (results, 1)
else:
self.server_logger.error(result)
return (result, 0)
def get_package_list(self):
""" get_package_list() -> str
Returns a list of supported packages identifiers delimited by || """
self.server_logger.info("Request: get_package_list()")
try:
packages = [x.identifier for x in module_registry().package_list]
return (packages, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_wf_datasets(self, host, port, db_name, vt_id, version):
self.server_logger.info("Request: get_wf_datasets(%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id, version))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
v = locator.load().vistrail
p = v.getPipeline(long(version))
if p:
result = []
for module in p.module_list:
if module.name == "RepoSync":
for function in module.functions:
if function.name == 'checksum':
result.append(function.parameters[0].value())
return (result, 1)
else:
result = "Pipeline was not materialized"
self.server_logger.error(result)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
result = str(e)
self.server_logger.error(result)
self.server_logger.error(traceback.format_exc())
return (result, 0)
def get_tag_version(self, host, port, db_name, vt_id, vt_tag):
self.server_logger.info("Request: get_tag_version(%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id, vt_tag))
version = -1
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, _ , _, _) = io.load_vistrail(locator)
if v.has_tag_str(vt_tag):
version = v.get_tag_str(vt_tag).action_id
self.server_logger.info("Answer: %s" % version)
return (version, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_vt_xml(self, host, port, db_name, vt_id):
self.server_logger.info("Request: get_vt_xml(%s,%s,%s,%s)" % \
(host, port, db_name, vt_id))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, _ , _, _) = io.load_vistrail(locator)
result = io.serialize(v)
return (result, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_wf_xml(self, host, port, db_name, vt_id, version):
self.server_logger.info("Request: get_wf_xml(%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id, version))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, _ , _, _) = io.load_vistrail(locator)
p = v.getPipeline(long(version))
if p:
result = io.serialize(p)
self.server_logger.info("success")
return (result, 1)
else:
result = "Pipeline was not materialized"
self.server_logger.info(result)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
result = str(e)
self.server_logger.error(result)
self.server_logger.error(traceback.format_exc())
return (result, 0)
def get_wf_graph_pdf(self, host, port, db_name, vt_id, version, is_local=True):
"""get_wf_graph_pdf(host:str, port:int, db_name:str, vt_id:int,
version:int) -> str
Returns the relative url to the generated PDF
"""
self.server_logger.info("get_wf_graph_pdf(%s,%s,%s,%s,%s) request received" % \
(host, port, db_name, vt_id, version))
try:
vt_id = long(vt_id)
version = long(version)
subdir = 'workflows'
filepath = os.path.join(media_dir, 'graphs', subdir)
base_fname = "graph_%s_%s.pdf" % (vt_id, version)
filename = os.path.join(filepath,base_fname)
if ((not os.path.exists(filepath) or
os.path.exists(filepath) and not os.path.exists(filename))
and self.proxies_queue is not None):
#this server can send requests to other instances
proxy = self.proxies_queue.get()
try:
result = proxy.get_wf_graph_pdf(host,port,db_name, vt_id, version, is_local)
self.proxies_queue.put(proxy)
self.server_logger.info("get_wf_graph_pdf returning %s"% result)
return result
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
if not os.path.exists(filepath):
os.mkdir(filepath)
if not os.path.exists(filename):
from vistrails.gui.vistrail_controller import VistrailController
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, abstractions , thumbnails, mashups) = io.load_vistrail(locator)
controller = VistrailController(v, locator, abstractions,
thumbnails, mashups)
controller.change_selected_version(version)
controller.updatePipelineScene()
controller.current_pipeline_scene.saveToPDF(filename)
else:
self.server_logger.info("found cached pdf: %s" % filename)
if is_local:
return (os.path.join(subdir,base_fname), 1)
else:
f = open(filename, 'rb')
contents = f.read()
f.close()
return (xmlrpclib.Binary(contents), 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error("Error when saving pdf: %s" % str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_wf_graph_png(self, host, port, db_name, vt_id, version, is_local=True):
"""get_wf_graph_png(host:str, port:int, db_name:str, vt_id:int,
version:int) -> str
Returns the relative url to the generated image
"""
self.server_logger.info("get_wf_graph_png(%s,%s,%s,%s,%s) request received" % \
(host, port, db_name, vt_id, version))
try:
vt_id = long(vt_id)
version = long(version)
subdir = 'workflows'
filepath = os.path.join(media_dir, 'graphs', subdir)
base_fname = "graph_%s_%s.png" % (vt_id, version)
filename = os.path.join(filepath,base_fname)
if ((not os.path.exists(filepath) or
os.path.exists(filepath) and not os.path.exists(filename))
and self.proxies_queue is not None):
#this server can send requests to other instances
proxy = self.proxies_queue.get()
try:
self.server_logger.info("Sending request to %s" % proxy)
result = proxy.get_wf_graph_png(host, port, db_name, vt_id, version, is_local)
self.proxies_queue.put(proxy)
self.server_logger.info("returning %s" % result)
return result
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
#if it gets here, this means that we will execute on this instance
if not os.path.exists(filepath):
os.mkdir(filepath)
if not os.path.exists(filename):
from vistrails.gui.vistrail_controller import VistrailController
locator = DBLocator(host=host,
port=port,
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, abstractions , thumbnails, mashups) = io.load_vistrail(locator)
controller = VistrailController(v, locator, abstractions,
thumbnails, mashups)
controller.change_selected_version(version)
controller.updatePipelineScene()
controller.current_pipeline_scene.saveToPNG(filename)
else:
self.server_logger.info("found cached image: %s" % filename)
if is_local:
return (os.path.join(subdir,base_fname), 1)
else:
f = open(filename, 'rb')
contents = f.read()
f.close()
return (xmlrpclib.Binary(contents), 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error("Error when saving png %s" % str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def _is_image_stale(self, filename, host, port, db_name, vt_id):
statinfo = os.stat(filename)
image_time = datetime.fromtimestamp(statinfo.st_mtime)
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
vt_mod_time = locator.get_db_modification_time()
self.server_logger.info("image time: %s, vt time: %s"%(image_time,
vt_mod_time))
if image_time < vt_mod_time:
return True
else:
return False
def get_vt_graph_png(self, host, port, db_name, vt_id, is_local=True):
"""get_vt_graph_png(host:str, port: str, db_name: str, vt_id:str) -> str
Returns the relative url of the generated image
"""
self.server_logger.info("get_vt_graph_png(%s, %s, %s, %s)" % (host, port, db_name, vt_id))
try:
vt_id = long(vt_id)
subdir = 'vistrails'
filepath = os.path.join(media_dir, 'graphs', subdir)
base_fname = "graph_%s.png" % (vt_id)
filename = os.path.join(filepath,base_fname)
if ((not os.path.exists(filepath) or
(os.path.exists(filepath) and not os.path.exists(filename)) or
self._is_image_stale(filename, host, port, db_name, vt_id)) and
self.proxies_queue is not None):
#this server can send requests to other instances
proxy = self.proxies_queue.get()
try:
self.server_logger.info("Sending request to %s" % proxy)
result = proxy.get_vt_graph_png(host, port, db_name, vt_id, is_local)
self.proxies_queue.put(proxy)
self.server_logger.info("returning %s" % result)
return result
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
#if it gets here, this means that we will execute on this instance
if (not os.path.exists(filepath) or
(os.path.exists(filepath) and not os.path.exists(filename)) or
self._is_image_stale(filename, host, port, db_name, vt_id)):
from vistrails.gui.vistrail_controller import VistrailController
if os.path.exists(filepath):
shutil.rmtree(filepath)
os.mkdir(filepath)
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, abstractions , thumbnails, mashups) = io.load_vistrail(locator)
controller = VistrailController(v, locator, abstractions,
thumbnails, mashups)
from vistrails.gui.version_view import QVersionTreeView
version_view = QVersionTreeView()
version_view.scene().setupScene(controller)
version_view.scene().saveToPNG(filename)
del version_view
else:
self.server_logger.info("Found cached image: %s" % filename)
if is_local:
return (os.path.join(subdir,base_fname), 1)
else:
f = open(filename, 'rb')
contents = f.read()
f.close()
return (xmlrpclib.Binary(contents), 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error("Error when saving png: %s" % str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_vt_graph_pdf(self, host, port, db_name, vt_id, is_local=True):
"""get_vt_graph_pdf(host:str, port: str, db_name: str, vt_id:str) -> str
Returns the relative url of the generated image
"""
self.server_logger.info("get_vt_graph_pdf(%s, %s, %s, %s)" % (host, port, db_name, vt_id))
try:
vt_id = long(vt_id)
subdir = 'vistrails'
filepath = os.path.join(media_dir, 'graphs', subdir)
base_fname = "graph_%s.pdf" % (vt_id)
filename = os.path.join(filepath,base_fname)
if ((not os.path.exists(filepath) or
(os.path.exists(filepath) and not os.path.exists(filename)) or
self._is_image_stale(filename, host, port, db_name, vt_id)) and
self.proxies_queue is not None):
#this server can send requests to other instances
proxy = self.proxies_queue.get()
try:
self.server_logger.info("Sending request to %s" % proxy)
result = proxy.get_vt_graph_pdf(host, port, db_name, vt_id, is_local)
self.proxies_queue.put(proxy)
self.server_logger.info("returning %s" % result)
return result
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
#if it gets here, this means that we will execute on this instance
if (not os.path.exists(filepath) or
(os.path.exists(filepath) and not os.path.exists(filename)) or
self._is_image_stale(filename, host, port, db_name, vt_id)):
from vistrails.gui.vistrail_controller import VistrailController
if os.path.exists(filepath):
shutil.rmtree(filepath)
os.mkdir(filepath)
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, abstractions , thumbnails, mashups) = io.load_vistrail(locator)
controller = VistrailController(v, locator, abstractions,
thumbnails, mashups)
from vistrails.gui.version_view import QVersionTreeView
version_view = QVersionTreeView()
version_view.scene().setupScene(controller)
version_view.scene().saveToPDF(filename)
del version_view
else:
self.server_logger.info("Found cached pdf: %s" % filename)
if is_local:
return (os.path.join(subdir,base_fname), 1)
else:
f = open(filename, 'rb')
contents = f.read()
f.close()
return (xmlrpclib.Binary(contents), 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error("Error when saving pdf: %s" % str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_vt_zip(self, host, port, db_name, vt_id):
"""get_vt_zip(host:str, port: str, db_name: str, vt_id:str) -> str
Returns a .vt file encoded as base64 string
"""
self.server_logger.info("Request: get_vt_zip(%s,%s,%s,%s)" % \
(host, port, db_name, vt_id))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
save_bundle = locator.load()
#annotate the vistrail
save_bundle.vistrail.update_checkout_version('vistrails')
#create temporary file
(fd, name) = tempfile.mkstemp(prefix='vt_tmp',
suffix='.vt')
os.close(fd)
try:
fileLocator = FileLocator(name)
fileLocator.save(save_bundle)
contents = open(name).read()
result = base64.b64encode(contents)
finally:
os.unlink(name)
return (result, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_wf_vt_zip(self, host, port, db_name, vt_id, version):
"""get_wf_vt_zip(host:str, port:str, db_name:str, vt_id:str,
version:str) -> str
Returns a vt file containing the single workflow defined by version
encoded as base64 string
"""
self.server_logger.info("Request: get_wf_vt_zip(%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id, version))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
(v, _ , _, _) = io.load_vistrail(locator)
p = v.getPipeline(long(version))
if p:
vistrail = Vistrail()
action_list = []
for module in p.module_list:
action_list.append(('add', module))
for connection in p.connection_list:
action_list.append(('add', connection))
action = vistrails.core.db.action.create_action(action_list)
vistrail.add_action(action, 0L)
vistrail.addTag("Imported workflow", action.id)
if not vistrail.db_version:
vistrail.db_version = currentVersion
pipxmlstr = io.serialize(vistrail)
result = base64.b64encode(pipxmlstr)
return (result, 1)
else:
result = "Error: Pipeline was not materialized"
self.server_logger.info(result)
return (result, 0)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.info(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_db_vt_list(self, host, port, db_name):
self.server_logger.info("Request: get_db_vistrail_list(%s,%s,%s)" % \
(host, port, db_name))
config = {}
config['host'] = host
config['port'] = int(port)
config['db'] = db_name
config['user'] = db_read_user
config['passwd'] = db_read_pass
try:
rows = io.get_db_vistrail_list(config)
self.server_logger.info("returning %s" % str(rows))
return (rows, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
return (str(e), 0)
def get_db_vt_list_xml(self, host, port, db_name):
self.server_logger.info("Request: get_db_vistrail_list(%s,%s,%s)" % \
(host, port, db_name))
config = {}
config['host'] = host
config['port'] = int(port)
config['db'] = db_name
config['user'] = db_read_user
config['passwd'] = db_read_pass
try:
rows = io.get_db_vistrail_list(config)
result = ''
for (id, name, mod_time) in rows:
result += ''%(id,name,mod_time)
result += ''
return (result, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
def get_vt_tagged_versions(self, host, port, db_name, vt_id, is_local=True):
self.server_logger.info("Request: get_vt_tagged_versions(%s,%s,%s,%s,%s)" % \
(host, port, db_name, vt_id, is_local))
try:
locator = DBLocator(host=host,
port=int(port),
database=db_name,
user=db_read_user,
passwd=db_read_pass,
obj_id=int(vt_id),
obj_type=None,
connection_id=None)
result = []
v = locator.load().vistrail
for elem, tag in v.get_tagMap().iteritems():
action_map = v.actionMap[long(elem)]
if v.get_thumbnail(elem):
thumbnail_fname = os.path.join(
get_vistrails_configuration().thumbs.cacheDirectory,
v.get_thumbnail(elem))
else:
thumbnail_fname = ""
if not thumbnail_fname or is_local:
result.append({'id': elem, 'name': tag,
'notes': v.get_notes(elem) or '',
'user':action_map.user or '',
'date':action_map.date,
'thumbnail': thumbnail_fname})
else:
handler = open(thumbnail_fname, "rb")
thumbnail_data = handler.read()
handler.close()
result.append({'id': elem, 'name': tag,
'notes': v.get_notes(elem) or '',
'user':action_map.user or '',
'date':action_map.date,
'thumbnail': xmlrpclib.Binary(thumbnail_data)})
return (result, 1)
except xmlrpclib.ProtocolError, err:
err_msg = ("A protocol error occurred\n"
"URL: %s\n"
"HTTP/HTTPS headers: %s\n"
"Error code: %d\n"
"Error message: %s\n") % (err.url, err.headers,
err.errcode, err.errmsg)
self.server_logger.error(err_msg)
return (str(err), 0)
except Exception, e:
self.server_logger.error(str(e))
self.server_logger.error(traceback.format_exc())
return (str(e), 0)
################################################################################
# Some Medley code
class XMLObject(object):
@staticmethod
def convert_from_str(value, type):
def bool_conv(x):
s = str(x).upper()
if s == 'TRUE':
return True
if s == 'FALSE':
return False
if value is not None:
if type == 'str':
return str(value)
elif value.strip() != '':
if type == 'long':
return long(value)
elif type == 'float':
return float(value)
elif type == 'int':
return int(value)
elif type == 'bool':
return bool_conv(value)
elif type == 'date':
return date(*strptime(value, '%Y-%m-%d')[0:3])
elif type == 'datetime':
return datetime(*strptime(value, '%Y-%m-%d %H:%M:%S')[0:6])
return None
@staticmethod
def convert_to_str(value,type):
if value is not None:
if type == 'date':
return value.isoformat()
elif type == 'datetime':
return value.strftime('%Y-%m-%d %H:%M:%S')
else:
return str(value)
return ''
################################################################################
class MedleySimpleGUI(XMLObject):
def __init__(self, id, name, vtid=None, version=None, alias_list=None,
t='vistrail', has_seq=None):
self._id = id
self._name = name
self._version = version
self._alias_list = alias_list
self._vtid = vtid
self._type = t
if has_seq == None:
self._has_seq = False
if isinstance(self._alias_list, dict):
for v in self._alias_list.itervalues():
if v._component._seq == True:
self._has_seq = True
else:
self._has_seq = has_seq
def to_xml(self, node=None):
"""to_xml(node: ElementTree.Element) -> ElementTree.Element
writes itself to xml
"""
if node is None:
node = ElementTree.Element('medley_simple_gui')
#set attributes
node.set('id', self.convert_to_str(self._id,'long'))
node.set('version', self.convert_to_str(self._version,'long'))
node.set('vtid', self.convert_to_str(self._vtid,'long'))
node.set('name', self.convert_to_str(self._name,'str'))
node.set('type', self.convert_to_str(self._type,'str'))
node.set('has_seq', self.convert_to_str(self._has_seq,'bool'))
for (k,v) in self._alias_list.iteritems():
child_ = ElementTree.SubElement(node, 'alias')
v.to_xml(child_)
return node
@staticmethod
def from_xml(node):
if node.tag != 'medley_simple_gui':
print "node.tag != 'medley_simple_gui'"
return None
#read attributes
data = node.get('id', None)
id = MedleySimpleGUI.convert_from_str(data, 'long')
data = node.get('name', None)
name = MedleySimpleGUI.convert_from_str(data, 'str')
data = node.get('version', None)
version = MedleySimpleGUI.convert_from_str(data, 'long')
data = node.get('vtid', None)
vtid = MedleySimpleGUI.convert_from_str(data, 'long')
data = node.get('type', None)
type = MedleySimpleGUI.convert_from_str(data, 'str')
data = node.get('has_seq', None)
seq = ComponentSimpleGUI.convert_from_str(data, 'bool')
alias_list = {}
for child in node.getchildren():
if child.tag == "alias":
alias = AliasSimpleGUI.from_xml(child)
alias_list[alias._name] = alias
return MedleySimpleGUI(id=id, name=name, vtid=vtid, version=version,
alias_list=alias_list, t=type, has_seq=seq)
################################################################################
class AliasSimpleGUI(XMLObject):
def __init__(self, id, name, component=None):
self._id = id
self._name = name
self._component = component
def to_xml(self, node=None):
"""to_xml(node: ElementTree.Element) -> ElementTree.Element
writes itself to xml
"""
if node is None:
node = ElementTree.Element('alias')
#set attributes
node.set('id', self.convert_to_str(self._id,'long'))
node.set('name', self.convert_to_str(self._name,'str'))
child_ = ElementTree.SubElement(node, 'component')
self._component.to_xml(child_)
return node
@staticmethod
def from_xml(node):
if node.tag != 'alias':
return None
#read attributes
data = node.get('id', None)
id = AliasSimpleGUI.convert_from_str(data, 'long')
data = node.get('name', None)
name = AliasSimpleGUI.convert_from_str(data, 'str')
for child in node.getchildren():
if child.tag == "component":
component = ComponentSimpleGUI.from_xml(child)
alias = AliasSimpleGUI(id,name,component)
return alias
################################################################################
class ComponentSimpleGUI(XMLObject):
def __init__(self, id, pos, ctype, spec, val=None, minVal=None, maxVal=None,
stepSize=None, strvalueList="", parent=None, seq=False,
widget="text"):
"""ComponentSimpleGUI()
widget can be: text, slider, combobox, numericstepper, checkbox
"""
self._id = id
self._pos = pos
self._spec = spec
self._ctype = ctype
self._val = val
self._minVal = minVal
self._maxVal = maxVal
self._stepSize = stepSize
self._strvaluelist = strvalueList
self._parent = parent
self._seq = seq
self._widget = widget
def _get_valuelist(self):
data = self._strvaluelist.split(',')
result = []
for d in data:
result.append(urllib.unquote_plus(d))
return result
def _set_valuelist(self, valuelist):
q = []
for v in valuelist:
q.append(urllib.quote_plus(v))
self._strvaluelist = ",".join(q)
_valueList = property(_get_valuelist,_set_valuelist)
def to_xml(self, node=None):
"""to_xml(node: ElementTree.Element) -> ElementTree.Element
writes itself to xml
"""
if node is None:
node = ElementTree.Element('component')
#set attributes
node.set('id', self.convert_to_str(self._id,'long'))
node.set('pos', self.convert_to_str(self._pos,'long'))
node.set('spec', self.convert_to_str(self._spec,'str'))
node.set('ctype', self.convert_to_str(self._ctype,'str'))
node.set('val', self.convert_to_str(self._val, 'str'))
node.set('minVal', self.convert_to_str(self._minVal,'str'))
node.set('maxVal', self.convert_to_str(self._maxVal,'str'))
node.set('stepSize', self.convert_to_str(self._stepSize,'str'))
node.set('valueList',self.convert_to_str(self._strvaluelist,'str'))
node.set('parent', self.convert_to_str(self._parent,'str'))
node.set('seq', self.convert_to_str(self._seq,'bool'))
node.set('widget',self.convert_to_str(self._widget,'str'))
return node
@staticmethod
def from_xml(node):
if node.tag != 'component':
return None
#read attributes
data = node.get('id', None)
id = ComponentSimpleGUI.convert_from_str(data, 'long')
data = node.get('pos', None)
pos = ComponentSimpleGUI.convert_from_str(data, 'long')
data = node.get('ctype', None)
ctype = ComponentSimpleGUI.convert_from_str(data, 'str')
data = node.get('spec', None)
spec = ComponentSimpleGUI.convert_from_str(data, 'str')
data = node.get('val', None)
val = ComponentSimpleGUI.convert_from_str(data, 'str')
val = val.replace("<", "<")
val = val.replace(">", ">")
val = val.replace("&","&")
data = node.get('minVal', None)
minVal = ComponentSimpleGUI.convert_from_str(data, 'str')
data = node.get('maxVal', None)
maxVal = ComponentSimpleGUI.convert_from_str(data, 'str')
data = node.get('stepSize', None)
stepSize = ComponentSimpleGUI.convert_from_str(data, 'str')
data = node.get('valueList', None)
values = ComponentSimpleGUI.convert_from_str(data, 'str')
values = values.replace("<", "<")
values = values.replace(">", ">")
values = values.replace("&","&")
data = node.get('parent', None)
parent = ComponentSimpleGUI.convert_from_str(data, 'str')
data = node.get('seq', None)
seq = ComponentSimpleGUI.convert_from_str(data, 'bool')
data = node.get('widget', None)
widget = ComponentSimpleGUI.convert_from_str(data, 'str')
component = ComponentSimpleGUI(id=id,
pos=pos,
ctype=ctype,
spec=spec,
val=val,
minVal=minVal,
maxVal=maxVal,
stepSize=stepSize,
strvalueList=values,
parent=parent,
seq=seq,
widget=widget)
return component
################################################################################
################################################################################
class VistrailsServerSingleton(VistrailsApplicationInterface,
QtGui.QApplication):
"""
VistrailsServerSingleton is the singleton of the application,
there will be only one instance of the application during VisTrails
"""
def __call__(self):
""" __call__() -> VistrailsServerSingleton
Return self for calling method
"""
if not self._initialized:
self.init()
return self
def __init__(self):
QtGui.QApplication.__init__(self, sys.argv)
VistrailsApplicationInterface.__init__(self)
if QtCore.QT_VERSION < 0x40200: # 0x40200 = 4.2.0
raise vistrails.core.requirements.MissingRequirement("Qt version >= 4.2")
self.rpcserver = None
self.pingserver = None
self.images_url = "http://vistrails.sci.utah.edu/medleys/images/"
self.temp_xml_rpc_options = InstanceObject(server=None,
port=None,
log_file=None)
qt.allowQObjects()
def is_running_gui(self):
return True
def make_logger(self, filename, label):
"""self.make_logger(filename:str) -> logger. Creates a logging object to
be used for the server so we can log requests in file f."""
logger = logging.getLogger("VistrailsRPC[%s]"%label)
handler = logging.handlers.RotatingFileHandler(filename, maxBytes = 1024*1024,
backupCount=5)
handler.setFormatter(logging.Formatter('%(name)s - %(asctime)s %(levelname)-8s %(message)s'))
handler.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
return logger
def load_config(self, filename):
""" Load in server parameters from config file.
If parameters are missing, write them in and raise error.
If file doesn't exist, create one and raise error. """
global accessList, db_host, db_read_user, db_read_pass, db_write_user, db_write_pass, media_dir, script_file, virtual_display
accessList = []
db_host = ''
db_read_user = ''
db_read_pass = ''
db_write_user = ''
db_write_pass = ''
media_dir = ''
script_file = ''
virtual_display = ''
config = ConfigParser.ConfigParser()
file_opened = config.read(filename)
has_changed = False
if not file_opened:
# new file to store default (empt) config
new_filename = os.path.join(system.vistrails_root_directory(), 'server.cfg')
# load or create access fields
if config.has_option("access", "permitted_addresses"):
read_accessList = config.get("access", "permitted_addresses")
accessList = [elem.strip() for elem in read_accessList.split(',')]
else:
if not config.has_section("access"):
config.add_section("access")
config.set("access", "permitted_addresses", "localhost,")
accessList = []
has_changed = True
# load or create database fields
if not config.has_section("database"):
config.add_section("database")
has_changed = True
if config.has_option("database", "host"):
db_host = config.get("database", "host")
else:
config.set("database", "host", "")
has_changed = True
if config.has_option("database", "read_user"):
db_read_user = config.get("database", "read_user")
else:
config.set("database", "read_user", "")
has_changed = True
if config.has_option("database", "read_password"):
db_read_pass = config.get("database", "read_password")
else:
config.set("database", "read_password", "")
has_changed = True
if config.has_option("database", "write_user"):
db_write_user = config.get("database", "write_user")
else:
config.set("database", "write_user", "")
has_changed = True
if config.has_option("database", "write_password"):
db_write_pass = config.get("database", "write_password")
else:
config.set("database", "write_password", "")
has_changed = True
if not config.has_section("media"):
config.add_section("media")
has_changed = True
if config.has_option("media", "media_dir"):
media_dir = config.get("media", "media_dir")
if not os.path.exists(media_dir):
raise ValueError("media_dir %s doesn't exist." % media_dir)
if not config.has_section("script"):
config.add_section("script")
has_changed = True
if config.has_option("script", "script_file"):
script_file = config.get("script", "script_file")
if not os.path.exists(script_file):
raise ValueError("script_file %s doesn't exist." % script_file)
else:
config.set("script", "script_file", "")
has_changed = True
if config.has_option("script", "virtual_display"):
virtual_display = config.get("script", "virtual_display")
if virtual_display == "":
virtual_display = "0"
# check if all required parameters are present
missing_req_fields = [y for (x,y) in ((db_host,"host"),
(db_read_user,"read_user"),
(db_write_user,"write_user"),
(media_dir,"media_dir"),
(script_file,"script_file"),
(accessList,"permission_addresses")) if not x]
if missing_req_fields:
self.server_logger.error(("Following required parameters where missing "
"from %s config file: %s ") % \
(filename, ", ".join(missing_req_fields)))
if not has_changed:
raise ValueError("Following required parameters where missing from %s config file: %s " %
(filename, ", ".join(missing_req_fields)))
if has_changed:
# save changes to passed config file
if file_opened:
config.write(open(filename, "wb"))
self.server_logger.error("Invalid config file, the missing fields have been "
"added to your config, please populate them")
raise RuntimeError("Invalid config file, the missing fields have been "
"added to your config, please populate them")
else:
# save changes to default config file
config.write(open(new_filename, "wb"))
self.server_logger.error("Config file %s doesn't exist. Creating new file at %s. "
"Please populated it with the correct values and use it" %
(filename, new_filename))
raise RuntimeError("Config file %s doesn't exist. Creating new file at %s. "
"Please populate it with the correct values and use it" %
(filename, new_filename))
def init(self, optionsDict=None):
""" init(optionDict: dict) -> boolean
Create the application with a dict of settings
"""
VistrailsApplicationInterface.init(self,optionsDict)
self.vistrailsStartup.init()
self.server_logger = self.make_logger(self.temp_xml_rpc_options.log_file,
self.temp_xml_rpc_options.port)
self.load_config(self.temp_xml_rpc_options.config_file)
self.start_other_instances(self.temp_xml_rpc_options.instances)
self._python_environment = self.vistrailsStartup.get_python_environment()
self._initialized = True
return True
def start_other_instances(self, number):
global virtual_display, script_file
self.others = []
host = self.temp_xml_rpc_options.server
port = self.temp_xml_rpc_options.port
virt_disp = int(virtual_display)
for x in xrange(number):
port += 1 # each instance needs one port space for now
#later we might need 2 (normal requests and status requests)
virt_disp += 1
args = [script_file,":%s"%virt_disp,host,str(port),'0', '0']
try:
p = subprocess.Popen(args)
time.sleep(20)
self.others.append("http://%s:%s"%(host,port))
except Exception, e:
self.server_logger.error(("Couldn't start the instance on display:"
"%s port: %s") % (virtual_display, port))
self.server_logger.error(str(e))
def stop_other_instances(self):
script = os.path.join(system.vistrails_root_directory(), "stop_vistrails_server.py")
for o in self.others:
args = ['python', script, o]
try:
subprocess.Popen(args)
time.sleep(15)
except Exception, e:
self.server_logger.error("Couldn't stop instance: %s" % o)
self.server_logger.error(str(e))
def run_server(self):
"""run_server() -> None
This will run forever until the server receives a quit request, done
via xml-rpc.
"""
self.server_logger.info("Server is running on http://%s:%s"%(self.temp_xml_rpc_options.server,
self.temp_xml_rpc_options.port))
if self.temp_xml_rpc_options.multithread:
self.rpcserver = ThreadedXMLRPCServer((self.temp_xml_rpc_options.server,
self.temp_xml_rpc_options.port),
self.server_logger)
self.server_logger.info(" multithreaded instance")
else:
self.rpcserver = StoppableXMLRPCServer((self.temp_xml_rpc_options.server,
self.temp_xml_rpc_options.port),
self.server_logger)
"""
self.pingserver = StoppableXMLRPCServer((self.temp_xml_rpc_options.server,
self.temp_xml_rpc_options.port-1),
self.server_logger)
"""
self.server_logger.info(" singlethreaded instance")
#self.rpcserver.register_introspection_functions()
self.rpcserver.register_instance(RequestHandler(self.server_logger,
self.others))
if self.pingserver:
self.pingserver.register_instance(RequestHandler(self.server_logger, []))
self.server_logger.info("Status XML RPC Server is listening on http://%s:%s"% \
(self.temp_xml_rpc_options.server,
self.temp_xml_rpc_options.port-1))
self.pingserver.register_function(self.quit_server, "quit")
self.pingserver.serve_forever()
self.pingserver.serve_close()
self.rpcserver.register_function(self.quit_server, "quit")
self.server_logger.info("Vistrails XML RPC Server is listening on http://%s:%s"% \
(self.temp_xml_rpc_options.server,
self.temp_xml_rpc_options.port))
self.rpcserver.serve_forever()
self.rpcserver.server_close()
return 0
def quit_server(self):
result = "Vistrails XML RPC Server is quitting."
self.stop_other_instances()
self.server_logger.info(result)
self.rpcserver.stop = True
return result
def setupOptions(self, args=None):
""" setupOptions() -> None
Check and store all command-line arguments
"""
add = command_line.CommandLineParser.add_option
add("-T", "--xml_rpc_server", action="store", dest="rpcserver",
help="hostname or ip address where this xml rpc server will work")
add("-R", "--xml_rpc_port", action="store", type="int", default=8080,
dest="rpcport", help="database port")
add("-L", "--xml_rpc_log_file", action="store", dest="rpclogfile",
default=os.path.join(system.vistrails_root_directory(),
'rpcserver.log'),
help="log file for XML RPC server")
add("-O", "--xml_rpc_instances", action="store", type='int', default=0,
dest="rpcinstances",
help="number of other instances that vistrails should start")
add("-M", "--multithreaded", action="store_true",
default = None, dest='multithread',
help="server will start a thread for each request")
add("-C", "--config-file", action="store", dest = "rpcconfig",
default=os.path.join(system.vistrails_root_directory(),
'server.cfg'),
help="config file for server connection options")
VistrailsApplicationInterface.setupOptions(self, args)
def readOptions(self):
""" readOptions() -> None
Read arguments from the command line
"""
get = command_line.CommandLineParser().get_option
self.temp_xml_rpc_options = InstanceObject(server=get('rpcserver'),
port=get('rpcport'),
log_file=get('rpclogfile'),
instances=get('rpcinstances'),
multithread=get('multithread'),
config_file=get('rpcconfig'))
VistrailsApplicationInterface.readOptions(self)
# The initialization must be explicitly signalled. Otherwise, any
# modules importing vis_application will try to initialize the entire
# app.
def start_server(optionsDict=None):
"""Initializes the application singleton."""
global VistrailsServer
if VistrailsServer:
print "Server already started."
return
VistrailsServer = VistrailsServerSingleton()
vistrails.gui.theme.initializeCurrentTheme()
vistrails.core.application.set_vistrails_application(VistrailsServer)
x = VistrailsServer.init(optionsDict)
if x == True:
return 0
else:
return 1
VistrailsServer = None
def stop_server():
"""Stop and finalize the application singleton."""
global VistrailsServer
VistrailsServer.save_configuration()
VistrailsServer.destroy()
VistrailsServer.deleteLater()