Source code for gssa.server

# This file is part of the Go-Smart Simulation Architecture (GSSA).
# Go-Smart is an EU-FP7 project, funded by the European Commission.
#
# Copyright (C) 2013-  NUMA Engineering Ltd. (see AUTHORS file)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# This is a workaround for syntastic lack of Py3 recognition

import asyncio

import os
import socket
import multiprocessing
import logging
import functools
import tempfile
import time
from . import family as families

logger = logging.getLogger(__name__)

# Try to hook into vigilant if present
try:
    import StatsCore
    from StatsCore.SimpleTransports import UDPStatsTransport, TCPStatsTransport
    from configparser import RawConfigParser as CParser
    use_observant = True
except:
    use_observant = False

import gssa.transferrer
import gssa.comparator
import gssa.definition
import gssa.translator
import gssa.error
import gssa.config
import gssa.utils


def _threadsafe_call(function, *args, **kwargs):
    loop = asyncio.get_event_loop()
    loop.call_soon_threadsafe(functools.partial(function, *args, **kwargs))

# FIXME: 18103 should be made configurable
_default_client_port = 18103


[docs]class GoSmartSimulationServerComponent(object): """This subclasses ApplicationSession, which runs inside an Autobahn WAMP session """ current = None client = None _db = None def _write_identity(self, identity): """Provide a directory-internal way to find out our ID (i.e. without looking at the name in the directory above) """ with open("identity", "w") as f: f.write(identity) def __init__(self, server_id, database, publish_cb, ignore_development=False, use_observant=use_observant, simdata_path='/tmp'): # This forwards exceptions to the client self.traceback_app = True self.server_id = server_id self.current = {} self.publish = publish_cb # Flag that tells the server to ignore anything with a parameter # `DEVELOPMENT` true self._ignore_development = ignore_development self._simdata_path = simdata_path # If we are using vigilant, do the relevant set-up if use_observant: config = CParser() config.read(os.path.join(gssa.config.etc_location, 'vigilant.cfg')) lock = str(config.get('daemon', 'lock')) sock = str(config.get('daemon', 'sock')) transport_type = str(config.get('transport', 'type')) host = str(config.get('transport', 'host')) port = int(config.get('transport', 'port')) transport_means = UDPStatsTransport if transport_type == 'udp' else TCPStatsTransport transport = transport_means(host=host, port=port) self.client = StatsCore.attachOrCreateStatsDaemon(transport, pid=lock, sock=sock) self.client.postWatchPid('go-smart-simulation-server', os.getpid()) # Create a directory to hold information specific to this server ID if not os.path.exists(server_id): logger.debug("Creating server ID directory") os.mkdir(server_id) logger.debug("Changing to server ID directory") # Use this as the working directory os.chdir(server_id) logger.debug("Storing identity (%s)" % server_id) self._write_identity(server_id) logger.debug("Requesting DB setup") # Flag this up to be done, but don't wait for it _threadsafe_call(self.setDatabase, database()) @asyncio.coroutine def _fetch_definition(self, guid, allow_many=False, resync=False): """Retrieve a definition, if not from the current set, from persistent storage. If resync is True then update the DB if it is inconsistent; set to False by default to avoid unnecessary DB hits. """ guid = guid.upper() # We only resync DB if there is a full-length GUID match live_current = None if guid in self.current: if resync: live_current = self.current[guid] else: return guid, self.current[guid] guids = [] if len(guid) < 32: guids = {k: v for k, v in self.current.items() if k.startswith(guid)} if len(guids) > 1: if not allow_many: raise RuntimeError("More than one matching GUID") fut = asyncio.Future() _threadsafe_call(lambda: fut.set_result(self._db.retrieve(guid))) definition = yield from fut if len(guid) < 32: definition.update(guids) if len(definition) > 1: if allow_many: return definition else: raise RuntimeError("More than one matching GUID") elif not definition: return guid, False short_guid = guid guid, current = definition.popitem() logger.info("Matched {short_guid} to {guid}".format(short_guid=short_guid, guid=guid)) if guid not in self.current: self.current[guid] = current elif definition: if live_current: self._resync(live_current, definition) else: self.current[guid] = definition else: return guid, False return guid, self.current[guid] @asyncio.coroutine def _resync(self, live_definition, db_definition): """Update the database based on the status of the live entry.""" live_summary = live_definition.summary() db_summary = db_definition.summary() if live_summary != db_summary: logger.warning("Definitions do not match!\n%s\n%s\n(updating)" % (live_summary, db_summary)) _threadsafe_call(self.addOrUpdate, live_definition) _threadsafe_call( self.setStatus, live_summary['guid'], live_summary['exit_status'], live_summary['status']['message'], live_summary['status']['percentage'], live_summary['status']['timestamp'] ) @asyncio.coroutine
[docs] def doSearch(self, guid, limit=None): """``com.gosmartsimulation.search`` Check for matching definitions """ definitions = yield from self._fetch_definition(guid, allow_many=True) logging.info('Searching for %s' % guid) # If one or zero results are available, they are returned as a GUID/def pair if isinstance(definitions, tuple): if definitions[1]: definitions = {definitions[0]: definitions[1]} else: logging.info('Found no matches') return {} definitions = {k: d.summary() for k, d in definitions.items()} # Reduce total number of definitions to a manageable level, if requested # Note this is an arbitrary selection if limit: key_subset = list(definitions.keys())[:limit] definitions = {k: definitions[k] for k in key_subset} logging.info('Found %d matches' % len(definitions)) return definitions
@asyncio.coroutine
[docs] def doApi(self): """``com.gosmartsimulation.api`` Find the current API version in use. The API version only needs to be bumped when backward-incompatible changes occur on either side """ return gssa.config.get_api_version()
[docs] def setDatabase(self, database): """Update database backend. Mainly for start-up. Mark everything in-progress in the DB as not-in-progress/unfinished. """ self._db = database self._db.markAllOld() logger.debug("DB set up")
@asyncio.coroutine
[docs] def doInit(self, guid): """``com.gosmartsimulation.init`` Dummy call for the moment. """ return True
@asyncio.coroutine
[docs] def doClean(self, guid): """``com.gosmartsimulation.clean`` Remove anything in simulation working directory, for instance """ guid, current = yield from self._fetch_definition(guid, resync=True) if not current: logger.warning("Definition %s not found" % guid) return False result = yield from current.clean() return result
@asyncio.coroutine
[docs] def doStart(self, guid): """``com.gosmartsimulation.start`` Execute the simulation in a coro. """ guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Definition %s not found" % guid) return False loop = asyncio.get_event_loop() coro = self.doSimulate(guid) task = loop.create_task(coro) # Once the simulation has completed, we must handle it task.add_done_callback(lambda f: asyncio.async(self._handle_simulation_done(f, guid=guid))) return True
# DEPRECATED
[docs] def doTmpValidation(self, guid, directory): # RMV: This is hacky loop = asyncio.get_event_loop() coro = families.register["elmer-libnuma"].validation(None, directory) try: task = loop.create_task(coro) except AttributeError: task = asyncio.async(coro, loop=loop) task.add_done_callback(lambda f: _threadsafe_call(self._db.updateValidation, guid, f.result())) return True
@asyncio.coroutine def _handle_simulation_done(self, fut, guid): # This should be the return value of the simulate call outcome = fut.result() logger.info("Simulation exited [%s]" % guid) guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Definition %s not found" % guid) return False if outcome is True: yield from self.eventComplete(guid) elif isinstance(outcome, gssa.error.ErrorMessage): logger.warning("Failed simulation in %s" % current.get_dir()) yield from self.eventFail(guid, outcome) elif outcome is None: # None indicates we've dealt with failure (errored) already pass else: # We know this did not succeed, but not why it failed code = gssa.error.Error.E_UNKNOWN error_message = "Unknown error occurred" logger.warning("Failed simulation in %s" % current.get_dir()) yield from self.eventFail(guid, gssa.error.makeError(code, error_message)) logger.info("Finished simulation") @asyncio.coroutine
[docs] def doUpdateFiles(self, guid, files): """``com.gosmartsimulation.update_files`` Add the passed files to the simulation's reference dictionary of required input files (available to be requested later) """ guid, current = yield from self._fetch_definition(guid) if not current or not isinstance(files, dict): return False logger.debug("Update Files") for local, remote in files.items(): logger.debug("remote" + remote) logger.debug("Local" + local) current.update_files(files) return True
@asyncio.coroutine
[docs] def doLogs(self, guid, only=None): """``com.gosmartsimulation.logs`` Retrieve the container logs for a simulation. """ guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Simulation [%s] not found" % guid) return False try: result = yield from current.logs(only) except Exception as e: logger.exception("Problem retrieving simulation container logs") raise e return result
@asyncio.coroutine
[docs] def doCancel(self, guid): """``com.gosmartsimulation.cancel`` Prematurely stop the running simulation. """ guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Simulation [%s] not found" % guid) return False try: result = yield from current.cancel() except Exception as e: logger.exception("Problem cancelling simulation") raise e return result
@asyncio.coroutine
[docs] def doRequestFiles(self, guid, files): """``com.gosmartsimulation.request_files`` Push the requested output files through the transferrer and return the list that was sent. """ logger.info("Files requested for [%s]" % guid) result = yield from self._request_files(guid, files) return result
@asyncio.coroutine
[docs] def doRequestResults(self, guid, target): """``com.gosmartsimulation.request_results`` Push a bundle of output files through the transferrer. If target is None, assume gateway is running a temporary HTTP on default client port FIXME: this should be made asynchronous! """ guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Simulation [%s] not found" % guid) return {} logger.info("Result bundle requested for [%s]" % guid) result_archive = current.gather_results() transferrer = None if target is None: gateway = gssa.utils.get_default_gateway() target = "http://%s:%d/receive" % ( gateway, _default_client_port ) transferrer = gssa.transferrer.transferrer_register['http']() files = {result_archive: target} result = yield from self._request_files(guid, files, transferrer=transferrer) return result
@asyncio.coroutine
[docs] def doRequestDiagnostic(self, guid, target): """``com.gosmartsimulation.request_diagnostic`` Push a bundle of diagnostic files through the transferrer. If target is None, assume gateway is running a temporary HTTP on default client port FIXME: this should be made asynchronous! """ guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Simulation [%s] not found" % guid) return {} logger.info("Diagnostic bundle requested for [%s]" % guid) diagnostic_archive = current.gather_diagnostic() transferrer = None if target is None: gateway = gssa.utils.get_default_gateway() target = "http://%s:%d/receive" % ( gateway, _default_client_port ) transferrer = gssa.transferrer.transferrer_register['http']() files = {diagnostic_archive: target} result = yield from self._request_files(guid, files, transferrer=transferrer) return result
@asyncio.coroutine def _request_files(self, guid, files, transferrer=None): """Helper routine as several endpoints involve returning file requests.""" guid, current = yield from self._fetch_definition(guid) if not current or not isinstance(files, dict): return {} try: uploaded_files = current.push_files(files, transferrer=transferrer) except Exception: logger.exception("Problem pushing files") return {} logger.info("Files sent") return uploaded_files @asyncio.coroutine
[docs] def doCompare(self, this_xml, that_xml): """``com.gosmartsimulation.compare`` Check whether two GSSA-XML files match and, if not, what their differences are. """ comparator = gssa.comparator.Comparator(this_xml, that_xml) return comparator.diff()
@asyncio.coroutine
[docs] def doUpdateSettingsXml(self, guid, xml): """``com.gosmartsimulation.update_settings_xml`` Set the GSSA-XML for a given simulation """ guid = guid.upper() try: # Create a working directory for the simulation (this is needed even # if the tool runs elsewhere, as in the Docker case) tmpdir = tempfile.mkdtemp(prefix='%s/' % self._simdata_path) os.chmod(tmpdir, 0o770) logger.debug("Changed permissions") # Set up the translator to parse the standard bits of GSSA-XML translator = gssa.translator.GoSmartSimulationTranslator() self.current[guid] = gssa.definition.GoSmartSimulationDefinition( guid, xml, tmpdir, translator, finalized=False, ignore_development=self._ignore_development, update_status_callback=lambda p, m: asyncio.async(self.updateStatus(guid, p, m)) ) # Announce that XML has been uploaded # TODO: why announce this? Surely the response is sufficient? self.publish(u'com.gosmartsimulation.announce', self.server_id, guid, [0, 'XML uploaded'], tmpdir, time.time()) except Exception as e: logger.exception("Problem updating settings XML") raise e logger.debug("XML set") return True
@asyncio.coroutine
[docs] def doSimulate(self, guid): """Start the simulation This occurs in a separately scheduled coro from the RPC call so it will almost certainly have returned by time we do. This does not have an API endpoint as :py:func:`~gssa.server.doStart` is responsible for launching it asynchronously. """ guid, current = yield from self._fetch_definition(guid) if not current: yield from self.eventFail(guid, gssa.error.makeError(gssa.error.Error.E_CLIENT, "Not fully prepared before launching - no current simulation set")) success = None logger.debug("Running simulation in %s" % current.get_dir()) # Inform the user that we got this far yield from self.updateStatus(guid, 0, "Starting simulation...") # Start the socket server before simulating yield from current.init_percentage_socket_server() # Start the simulation. If we get an error, then blame the server side - # it should have returned False # FIXME: so how exactly does any non E_SERVER message get sent back? try: success = yield from current.simulate() except Exception as e: logger.exception("Simulation failed! {exc}".format(exc=e)) yield from self.eventFail(guid, gssa.error.makeError(gssa.error.Error.E_SERVER, "[%s] %s" % (type(e), str(e)))) success = None return success
@asyncio.coroutine
[docs] def doFinalize(self, guid, client_directory_prefix): """``com.gosmartsimulation.finalize`` Do any remaining preparation before the simulation can start. """ logger.debug("Converting the Xml") guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Simulation [%s] not found" % guid) return False current.set_remote_dir(client_directory_prefix) # Make sure the simulation is in the DB self._db.addOrUpdate(current) # Execute the finalization result = current.finalize() return result
@asyncio.coroutine
[docs] def doProperties(self, guid): """``com.gosmartsimulation.properties`` Return important server-side simulation properties. """ result = yield from self.getProperties(guid) return result
@asyncio.coroutine
[docs] def getProperties(self, guid): """Server-specific properties for this simulation At present, just working directory location. """ guid, current = yield from self._fetch_definition(guid) if not current: raise RuntimeError("Simulation not found: %s" % guid) return {"location": current.get_dir()}
@asyncio.coroutine
[docs] def eventComplete(self, guid): """Called when simulation completes Publishes a completion event: ``com.gosmartsimulation.complete`` """ logger.debug("Completed [%s]" % guid) guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Tried to send simulation-specific completion event with no current simulation definition") current.set_exit_status(True) # Record the finishing time, as we see it timestamp = time.time() logger.debug(timestamp) try: # Tell the database we have finished _threadsafe_call(self.setStatus, guid, "SUCCESS", "Success", "100", timestamp) # Run validation (if req) validation = None # validation = yield from self.current[guid].validation() # if validation: # loop.call_soon_threadsafe(lambda: self._db.updateValidation(guid, validation)) except: validation = None logger.exception("Problem with completion/validation") logger.info('Success [%s]' % guid) # Notify any subscribers self.publish(u'com.gosmartsimulation.complete', guid, gssa.error.makeError('SUCCESS', 'Success'), current.get_dir(), timestamp, validation)
@asyncio.coroutine
[docs] def eventFail(self, guid, message): """Called when simulation fails Publishes a failure event: ``com.gosmartsimulation.failed`` """ guid, current = yield from self._fetch_definition(guid) if not current: logger.warning("Tried to send simulation-specific failure event with no current simulation definition") current.set_exit_status(False, message) # Record the failure time as we see it timestamp = time.time() try: # Update the database _threadsafe_call(self.setStatus, guid, message["code"], message["message"], None, timestamp) except: logger.exception("Problem saving failure status") logger.warning('Failure [%s]: %s' % (guid, repr(message))) # Notify any subscribers self.publish(u'com.gosmartsimulation.fail', guid, message, current.get_dir(), timestamp, None)
@asyncio.coroutine
[docs] def doRetrieveStatus(self, guid, allow_resync=True): """``com.gosmartsimulation.retrieve_status`` Get the latest status for a simulation. allow_resync permits the server to update the DB if it finds an inconsistency. """ # Get this from the DB, not current, as the DB should give a consistent # answer even after restart (unless marked unfinished) if allow_resync: guid, simulation = yield from self._fetch_definition(guid, resync=True) else: simulation = self._db.retrieve(guid) if not simulation: logger.error('Simulation not found') return None try: summary = simulation.summary() exit_code = summary['exit_status'] if exit_code is None: if summary['guid'] in self.current: exit_code = ('IN_PROGRESS', '...') else: exit_code = ('E_UNKNOWN', '...') # NB: makeError can return SUCCESS or IN_PROGRESS status = gssa.error.makeError(exit_code[0], summary['status']['message']) percentage = summary['status']['percentage'] timestamp = summary['status']['timestamp'] # This format matches the fail/status/complete events return { "server_id": self.server_id, "summary_id": summary['guid'], "exit_code": exit_code, "status": (percentage, status, timestamp), "directory": summary['directory'] } except Exception as e: logging.exception('Could not show status') raise e
[docs] def onRequestAnnounce(self): """``com.gosmartsimulation.request_announce`` Release a status report on each simulation in the database TODO: this gets unwieldy, perhaps it should have an earliest simulation timestamp argument? """ # Go through /every/ simulation simulations = self._db.all() for simulation in simulations: exit_code = simulation['exit_code'] # If it hasn't exited, it should be running... if exit_code is None: if simulation['guid'] in self.current: exit_code = 'IN_PROGRESS' else: exit_code = 'E_UNKNOWN' status = gssa.error.makeError(exit_code, simulation['status']) percentage = simulation['percentage'] # Tell the world self.publish(u'com.gosmartsimulation.announce', self.server_id, simulation['guid'], (percentage, status), simulation['directory'], simulation['timestamp'], simulation['validation']) logger.debug("Announced: %s %s %r" % (simulation['guid'], simulation['directory'], simulation['validation'] is not None)) # Follow up with an identify event self.onRequestIdentify()
[docs] def setStatus(self, id, key, message, percentage, timestamp): """Record a status change in the database and on the filesystem. Note that, for both those reasons, this could be slow and so should always be run with call_soon_threadsafe FIXME: we need some sort of rate limiting here, or producer-consumer pattern with ability to skip once getting behind """ # Write this message to the database self._db.setStatus(id, key, message, percentage, timestamp) try: # Write the last message in a format that the status can be easily # re-read with open(os.path.join(self.current[id].get_dir(), 'last_message'), 'w') as f: f.write("%s\n" % id) f.write("%s\n" % key.strip()) if percentage: f.write("%lf\n" % float(percentage)) else: f.write("\n") if message: f.write(message.strip()) except OSError: # This may because the simulation was from a previous server process logger.warning("Tried to update simulation status on filesystem but simulation gone.")
@asyncio.coroutine
[docs] def updateStatus(self, guid, percentage, message): """Update the status. Sets up a callback for asyncio to update database. """ progress = "%.2lf" % percentage if percentage else '##' guid, current = yield from self._fetch_definition(guid) if current: directory = current.get_dir() else: logger.warning("Simulation [%s] not found" % guid) if current.get_exit_status(): logger.warning("Got status message [%s%%:%s] for [%s], which has already exited." % (progress, message, guid)) return timestamp = time.time() # Write out to the command line for debug # TODO: switch to `logger` and `vigilant` logger.debug("%s [%r] ---- %s%%: %s" % (guid, timestamp, progress, message)) try: # Call the setStatus method asynchronously _threadsafe_call(self.setStatus, guid, 'IN_PROGRESS', message, percentage, timestamp) except: logger.exception("Problem saving status") directory = None # Publish a status update for the WAMP clients to see self.publish('com.gosmartsimulation.status', guid, (percentage, gssa.error.makeError('IN_PROGRESS', message)), directory, timestamp, None)
[docs] def onRequestIdentify(self): """``com.gosmartsimulation.request_identify`` Publish basic server information. $$ score = #cores - #active_simulations $$ This gives an availability estimate, the higher the better """ # FIXME: this seems to give a wrong number consistently, needs checked try: active_simulations = self._db.active_count() score = multiprocessing.cpu_count() - active_simulations server_name = socket.gethostname() self.publish( u'com.gosmartsimulation.identify', self.server_id, server_name, score ) logger.info("Announced score: %d [%s]" % (score, self.server_id)) except Exception as e: logger.error("Didn't send score!") raise e