# This file is part of the Go-Smart Simulation Architecture (GSSA).
# Go-Smart is an EU-FP7 project, funded by the European Commission.
#
# Copyright (C) 2013- NUMA Engineering Ltd. (see AUTHORS file)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from autobahn.asyncio.wamp import ApplicationSession
import uuid
from lxml import etree as ET
import asyncio
import os
import tarfile
import tempfile
import logging
import stat
logger = logging.getLogger(__name__)
# This should be adjusted when this issue resolution hits PIP: https://github.com/tavendo/AutobahnPython/issues/332
# http://stackoverflow.com/questions/28293198/calling-a-remote-procedure-from-a-subscriber-and-resolving-the-asyncio-promise
[docs]def wrapped_coroutine(f):
def wrapper(*args, **kwargs):
coro = f(*args, **kwargs)
asyncio.async(coro)
return wrapper
# endSO
# This is the application object for the shell GSSA client
[docs]class GoSmartSimulationClientComponent(ApplicationSession):
# Accept arguments from the command line
def __init__(self, x, gssa_file, subdirectory, output_files, tmp_transferrer='/tmp', input_files=None, definition_files=None, skip_clean=False, server=None):
ApplicationSession.__init__(self, x)
self._gssa = ET.parse(gssa_file)
self._definition_files = definition_files
self._input_files = input_files
self._server = server
self._tmp_transferrer = tmp_transferrer
# We tar the definition files into one object for transferring and add
# it to the definition node
if self._definition_files is not None:
self._definition_tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', dir=self._tmp_transferrer)
definition_tar = tarfile.open(fileobj=self._definition_tmp, mode='w:gz')
for definition_file in self._definition_files:
definition_tar.add(definition_file, os.path.basename(definition_file))
logger.debug("Added [%s]" % os.path.basename(definition_file))
definition_tar.close()
self._definition_tmp.flush()
# Note that this makes the file global readable - we assume the
# parent of the tmp directory is used to control permissions
os.chmod(self._definition_tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)
logger.debug("Made temporary tar at %s" % self._definition_tmp.name)
definition_node = self._gssa.find('.//definition')
location_remote = os.path.join('/tmp', 'gssa-transferrer', os.path.basename(self._definition_tmp.name))
definition_node.set('location', location_remote)
# Do the same with the input surfaces
if self._input_files is not None:
self._input_tmp = tempfile.NamedTemporaryFile(suffix='.tar.gz', dir=self._tmp_transferrer)
input_tar = tarfile.open(fileobj=self._input_tmp, mode='w:gz')
for input_file in self._input_files:
input_tar.add(input_file, os.path.basename(input_file))
logger.debug("Added [%s]" % os.path.basename(input_file))
input_tar.close()
self._input_tmp.flush()
# Note that this makes the file global readable - we assume the
# parent of the tmp directory is used to control permissions
os.chmod(self._input_tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)
logger.debug("Made temporary tar at %s" % self._input_tmp.name)
input_node = ET.SubElement(self._gssa.find('.//transferrer'), 'input')
location_remote = os.path.join('/tmp', 'gssa-transferrer', os.path.basename(self._input_tmp.name))
input_node.set('location', location_remote)
# Generate a simulation ID
self._guid = uuid.uuid1()
self._subdirectory = subdirectory
self._output_files = output_files
self._skip_clean = skip_clean
[docs] def make_call(self, suffix):
"""Make a call ``com.gosmartsimulation.DEST.suffix``.
If this object has a default server set, address it,
otherwise we call whichever
one got the full namespace.
"""
if self._server:
return "com.gosmartsimulation.%s.%s" % (self._server, suffix)
else:
return "com.gosmartsimulation.%s" % suffix
@asyncio.coroutine
[docs] def onJoin(self, details):
logger.debug("session ready")
# Run the simulation
guid = str(self._guid)
gssa = ET.tostring(self._gssa, encoding="unicode")
yield from self.call(self.make_call('init'), guid)
logger.debug("Initiated...")
yield from self.call(self.make_call('update_settings_xml'), guid, gssa)
logger.debug("Sent XML...")
yield from self.call(self.make_call('finalize'), guid, self._subdirectory)
logger.debug("Finalized settings...")
yield from self.call(self.make_call('start'), guid)
logger.debug("Started...")
# Listen for responses from the server
self.subscribe(self.onComplete, self.make_call('complete'))
self.subscribe(self.onFail, self.make_call('fail'))
@wrapped_coroutine
@asyncio.coroutine
[docs] def onComplete(self, guid, success, directory, time, validation):
logger.debug("Complete")
# Once we have completed, print the validation if available
if validation:
logger.debug("Validation: %s" % repr(validation))
logger.debug("Requesting files")
# Request files from the tmp transferrer
files = yield from self.call(self.make_call('request_files'), guid, {
f: os.path.join('/tmp', 'gssa-transferrer', f) for f in self._output_files
})
logger.debug(files)
yield from self.finalize(guid)
# NB: this is not currently hooked in (see
# self.subscribe(self.onComplete...)
# for example)
@wrapped_coroutine
@asyncio.coroutine
[docs] def onStatus(self, guid, message, directory, time, validation):
percentage, state = message
# Print each status message to the command line
progress = "%.2lf" % percentage if percentage else '##'
logger.debug("%s [%r] ---- %s%%: %s" % (id, time, progress, state['message']))
@wrapped_coroutine
@asyncio.coroutine
[docs] def onFail(self, guid, message, directory, time, validation):
logger.warning("Failed - %s" % message)
yield from self.finalize(guid)
# Tidy up, if needs be
[docs] def finalize(self, guid):
if not self._skip_clean:
yield from self.call(self.make_call('clean'), guid)
self.shutdown()
else:
logger.info("Skipping clean-up")
[docs] def shutdown(self):
"""Start the exiting steps."""
self.leave()
@wrapped_coroutine
@asyncio.coroutine
[docs] def onLeave(self, details):
self.disconnect()