Source code for gssa.definition

# This file is part of the Go-Smart Simulation Architecture (GSSA).
# Go-Smart is an EU-FP7 project, funded by the European Commission.
#
# Copyright (C) 2013-  NUMA Engineering Ltd. (see AUTHORS file)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
import os
import shutil
import tarfile
import time
import logging

logger = logging.getLogger(__name__)

# Replace with better integrated approach!
import asyncio
from .transferrer import transferrer_register
from zope.interface.verify import verifyObject
from .transferrer import ITransferrer
from . import family as families

import lxml.etree


[docs]class GoSmartSimulationDefinition: """Routines for working with a single specific simulation.""" _guid = None _dir = None _remote_dir = '' _finalized = False _files = None _exit_status = None _model_builder = None _shadowing = False _status = None
[docs] def set_exit_status(self, success, message=None): """Set the status to be recorded in the DB.""" self._exit_status = (success, message)
[docs] def get_exit_status(self): return self._exit_status
@asyncio.coroutine def _handle_percentage_connection(self, stream_reader, stream_writer): """Set up percentage relay. Once we have a status connection from the simulation feed messages back to the server """ logger.debug('Got percentage connection') while True: line = yield from stream_reader.readline() # Once we are out of data and the stream has closed, our job is done if not line: break # This is a very simplistic parsing approach, separating the string # based on the first pipe character. The percentage is first, the # status message the remainder. line = line.decode('utf-8').strip().split('|', maxsplit=1) percentage, message = (None, line[0]) if len(line) == 1 else line try: percentage = float(percentage) except ValueError: percentage = None # Call the server's callback self._status = {'percentage': percentage, 'message': message, 'timestamp': time.time()} self._update_status_callback(percentage, message) @asyncio.coroutine
[docs] def init_percentage_socket_server(self): """Start up the status server.""" if self._shadowing: logger.debug('No percentages: shadowing') self._percentage_socket_server = None return # Create the socket for the simulation to reach working_directory = self.get_dir() self._percentage_socket_location = self._model_builder.get_percentage_socket_location(working_directory) logger.debug('Status socket for %s : %s' % (self._guid, self._percentage_socket_location)) try: # Start the socket server self._percentage_socket_server = yield from asyncio.start_unix_server( self._handle_percentage_connection, self._percentage_socket_location ) except Exception as e: logger.debug('Could not connect to socket: %s' % str(e)) self._percentage_socket_server = None
def __init__(self, guid, xml_string, tmpdir, translator, finalized=False, ignore_development=False, update_status_callback=None): self._guid = guid self._dir = tmpdir self._finalized = finalized self._files = {} self._translator = translator self._update_status_callback = update_status_callback self._ignore_development = ignore_development if not finalized: # Do first parse of the GSSA-XML try: self.create_xml_from_string(xml_string) except Exception as e: logger.error(e) # Create the input directory, ready for the STL surfaces input_dir = os.path.join(tmpdir, 'input') if not os.path.exists(input_dir): try: os.mkdir(input_dir) except Exception: logger.exception('Could not create input directory') # Write the GSSA-XML there for safekeeping with open(os.path.join(tmpdir, "original.xml"), "w") as f: f.write(xml_string) # Make a note of the client GUID, in case we need to track backwards with open(os.path.join(tmpdir, "guid"), "w") as f: f.write(guid)
[docs] def summary(self): """Provide a handy synopsis of this definition.""" return { 'guid': self._guid, 'directory': self._dir, 'finalized': self._finalized, 'exit_status': self._exit_status, 'status': self._status }
[docs] def get_remote_dir(self): """Get remote directory location. This directory indicates where on the client's system we should be pulling/pushing from/to. """ return self._remote_dir
[docs] def set_remote_dir(self, remote_dir): self._remote_dir = remote_dir
[docs] def get_guid(self): """Return the simulation's GUID.""" return self._guid
[docs] def create_xml_from_string(self, xml): """Turn the string XML into an ElementTree object. Args: xml (str): string-version of GSSA-XML. """ self._finalized = False try: self._xml = lxml.etree.fromstring(bytes(xml, 'utf-8')) except Exception as e: logger.exception('Could not create XML from input') raise e return True
[docs] def update_files(self, files): """Wraps the file transferrer.""" self._files.update(files)
[docs] def get_files(self): return self._files
[docs] def finalize(self): """Trigger the heavy lifting of interpreting the GSSA-XML.""" logger.debug("Finalize - Translating Called") if self._xml is None: return False try: logger.debug("Instantiating transferrer") # Discover what kind of transferrer (e.g. via /tmp, via SFTP) we # have been asked to use and create it transferrer_node = self._xml.find('transferrer') cls = transferrer_node.get('class') self._transferrer = transferrer_register[cls]() verifyObject(ITransferrer, self._transferrer) # Configure the transferrer from this node self._transferrer.configure_from_xml(transferrer_node) logger.debug("Starting to Translate") # Run the translator, which understands the higher-level, generic # concepts of the GSSA-XML family, numerical_model_node, parameters, algorithms = \ self._translator.translate(self._xml) if family is None or family not in families.register: raise RuntimeError("Unknown family of models : %s" % family) # If we must ignore DEVELOPMENT='true' runs, and if this is one, then do so if self._ignore_development and 'DEVELOPMENT' in parameters and parameters['DEVELOPMENT']: self._shadowing = True logger.warning("Shadowing mode ON for this definition") else: files_required = self._translator.get_files_required() # Set up the model, most of the rest of the work is done here self._model_builder = families.register[family](files_required) self._model_builder.load_definition(numerical_model_node, parameters=parameters, algorithms=algorithms) self._files.update(files_required) self._transferrer.connect() # Pull down the input/definition files self._transferrer.pull_files(self._files, self.get_dir(), self.get_remote_dir()) self._transferrer.disconnect() except Exception: logger.exception('Could not finalize set-up') return False self._finalized = True return True
[docs] def finalized(self): return self._finalized
[docs] def get_dir(self): """Return working directory.""" return self._dir
@asyncio.coroutine
[docs] def clean(self): """Clean out the working directory.""" yield from self._model_builder.clean() shutil.rmtree(self._dir) return True
[docs] def gather_results(self): """Create a results archive.""" output_directory = os.path.join(self.get_dir(), 'output') output_final_directory = os.path.join(self.get_dir(), 'output.final') result_files = { 'output': output_directory, 'output.final': output_final_directory, 'original.xml': os.path.join(self.get_dir(), 'original.xml'), 'guid': os.path.join(self.get_dir(), 'guid'), } return self._gather_files('results_archive.tgz', result_files)
[docs] def gather_diagnostic(self): """Create a diagnostic archive.""" input_directory = os.path.join(self.get_dir(), 'input') input_final_directory = os.path.join(self.get_dir(), 'input.final') output_directory = os.path.join(self.get_dir(), 'output') log_directory = os.path.join(output_directory, 'logs') diagnostic_files = { 'input': input_directory, 'input.final': input_final_directory, 'logs': log_directory, 'original.xml': os.path.join(self.get_dir(), 'original.xml'), 'guid': os.path.join(self.get_dir(), 'guid'), } return self._gather_files('diagnostic_archive.tgz', diagnostic_files)
def _gather_files(self, archive_name, files): """Turn a list of files into an archive.""" missing_file = os.path.join(self.get_dir(), 'missing.txt') logger.debug("Creating tarfile") archive = os.path.join(self.get_dir(), archive_name) with tarfile.open(archive, mode='w:gz') as definition_tar: with open(missing_file, 'w') as missing: for f, loc in files.items(): try: definition_tar.add(loc, arcname='%s/%s' % (self._guid, f)) except Exception as e: missing.write("Missing %s : %s\n" % (f, str(e))) definition_tar.add(missing_file, arcname='%s/diagnostic_missing.txt' % self._guid) logger.debug("Created tarfile") return archive
[docs] def push_files(self, files, transferrer=None): """Send back the results.""" if self._shadowing: logger.warning("Not simulating: shadowing mode ON for this definition") return {} if transferrer is None: transferrer = self._transferrer uploaded_files = {} for local, remote in files.items(): path = os.path.join(self.get_dir(), local) if os.path.exists(path): uploaded_files[local] = remote else: logger.warning("Could not find %s for pushing" % path) transferrer.connect() transferrer.push_files(uploaded_files, self.get_dir(), self.get_remote_dir()) transferrer.disconnect() return uploaded_files
@asyncio.coroutine
[docs] def logs(self, only=None): """Send the cancel request to the model builder (family).""" if not self._model_builder: return False logs = yield from self._model_builder.logs() return logs
@asyncio.coroutine
[docs] def cancel(self): """Send the cancel request to the model builder (family).""" if not self._model_builder: return False success = yield from self._model_builder.cancel() return success
@asyncio.coroutine
[docs] def simulate(self): """Tell the family to start simulating.""" if self._shadowing: logger.warning("Not simulating: shadowing mode ON for this definition") raise RuntimeError("Failing here to leave simulation for external server control") # Get our asyncio task from the model builder task = yield from self._model_builder.simulate(self.get_dir()) output_directory = os.path.join(self.get_dir(), 'output') if not os.path.exists(output_directory): os.mkdir(output_directory) # Get files output by the model into the output directory (I think this # is primarily useful for the Docker modules, say, where they are not # already there) self._model_builder.retrieve_files(output_directory) return task
@asyncio.coroutine
[docs] def validation(self): """DEPRECATED: run validation (requires third-party tool).""" if self._shadowing: logger.warning("Not validating: shadowing mode ON for this definition") return None # Run the validation step only task = yield from self._model_builder.validation(self.get_dir()) return task