Source code for gosmart.status

from __future__ import print_function

# This file is part of the Go-Smart Simulation Architecture (GSSA).
# Go-Smart is an EU-FP7 project, funded by the European Commission.
#
# Copyright (C) 2013-  NUMA Engineering Ltd. (see AUTHORS file)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import socket
import os


[docs]class StatusUpdater: """Pass back /short/ information to the client.""" def __init__(self, update_socket_location=None): if update_socket_location is None: if 'GSSA_STATUS_SOCKET' in os.environ: update_socket_location = os.environ['GSSA_STATUS_SOCKET'] elif os.path.exists('/shared/update.sock'): update_socket_location = '/shared/update.sock' self._update_socket = None self._update_socket_location = update_socket_location
[docs] def connect(self): """Attempts to connect to the known update socket. This should be the only egress from the container, other than implied communication through the filesystem. """ if self._update_socket_location is None or not os.path.exists(self._update_socket_location): return False self._update_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._update_socket.connect(self._update_socket_location)
[docs] def status(self, message, percentage=None): """Pass a status message to the Glossia server (and on to any client) with an optional percentage (float: 0-100).""" percentage_string = b'' if percentage is not None: try: percentage = float(percentage) except ValueError: pass else: percentage_string = b'%lf|' % percentage message = b'%s%s' % (percentage_string, message) if self._update_socket is None: print(message) else: self._update_socket.sendall(message)