Source code for pymod.amsmsg

import sys
import json
import inspect
from base64 import b64encode, b64decode
try:
    from collections.abc import Callable
except ImportError:
    from collections import Callable
from .amsexceptions import AmsMessageException

[docs]class AmsMessage(Callable): """Abstraction of AMS Message AMS Message is constituted from mandatory data field and arbitrary number of attributes. Data is Base64 encoded prior dispatching message to AMS service and Base64 decoded when it is being pulled from service. """
[docs] def __init__(self, b64enc=True, attributes='', data=None, messageId='', publishTime=''): self._attributes = attributes self._messageId = messageId self._publishTime = publishTime self.set_data(data, b64enc)
def __call__(self, **kwargs): if 'attributes' not in kwargs: kwargs.update({'attributes': self._attributes}) self.__init__(b64enc=True, **kwargs) return self.dict() def _has_dataattr(self): if not getattr(self, '_data', False) and not self._attributes: raise AmsMessageException('At least data field or one attribute needs to be defined') return True
[docs] def set_attr(self, key, value): """Set attributes of message Args: key (str): Key of the attribute value (str): Value of the attribute """ self._attributes.update({key: value})
[docs] def set_data(self, data, b64enc=True): """Set data of message Default behaviour is to Base64 encode data prior sending it by the publisher. Method is also used internally when pull from subscription is being made by subscriber and in that case, data is already Base64 encoded Args: data (str): Data of the message Kwargs: b64enc (bool): Control whether data should be Base64 encoded """ if b64enc and data: try: if sys.version_info < (3, ): self._data = b64encode(data) else: if isinstance(data, bytes): self._data = str(b64encode(data), 'utf-8') else: self._data = str(b64encode(bytearray(data, 'utf-8')), 'utf-8') except Exception as e: raise AmsMessageException('b64encode() {0}'.format(str(e))) elif data: self._data = data
[docs] def dict(self): """Construct python dict from message""" if self._has_dataattr(): d = dict() for attr in ['attributes', 'data', 'messageId', 'publishTime']: if getattr(self, '_{0}'.format(attr), False): v = eval('self._{0}'.format(attr)) d.update({attr: v}) return d
[docs] def get_data(self): """Fetch the data of the message and Base64 decode it""" if self._has_dataattr(): try: return b64decode(self._data) except Exception as e: raise AmsMessageException('b64decode() {0}'.format(str(e)))
[docs] def get_msgid(self): """Fetch the message id of the message""" return self._messageId
[docs] def get_publishtime(self): """Fetch the publish time of the message set by the AMS service""" return self._publishTime
[docs] def get_attr(self): """Fetch all attributes of the message""" if self._has_dataattr(): return self._attributes
[docs] def json(self): """Return JSON interpretation of the message""" return json.dumps(self.dict())
def __str__(self): return str(self.dict())