Source code for pymod.amssubscription

[docs]class AmsSubscription(object): """Abstraction of AMS subscription Subscription represents stream of messages that can be pulled from AMS service or pushed to some receiver. Supported methods are wrappers around methods defined in client class with preconfigured subscription name. """ def _build_name(self, fullname): return fullname.split('/projects/{0}/subscriptions/'.format(self.init.project))[1]
[docs] def __init__(self, fullname, topic, pushconfig, ackdeadline, init): self.acls = None self.init = init self.fullname = fullname self.topic = self.init.topics[topic] self.push_endpoint = '' self.retry_policy_type = '' self.retry_policy_period = '' if pushconfig['pushEndpoint']: self.push_endpoint = pushconfig['pushEndpoint'] self.retry_policy_type = pushconfig['retryPolicy']['type'] self.retry_policy_period = pushconfig['retryPolicy']['period'] self.ackdeadline = ackdeadline self.name = self._build_name(self.fullname)
[docs] def delete(self): """Delete subscription""" return self.init.delete_sub(self.name)
[docs] def pushconfig(self, push_endpoint=None, retry_policy_type='linear', retry_policy_period=300, **reqkwargs): """Configure Push mode parameters of subscription. When push_endpoint is defined, subscription will automatically start to send messages to it. Kwargs: push_endpoint (str): URL of remote endpoint that will receive messages retry_policy_type (str): retry_policy_period (int): reqkwargs: keyword argument that will be passed to underlying python-requests library call. """ return self.init.pushconfig_sub(self.name, push_endpoint=push_endpoint, retry_policy_type=retry_policy_type, retry_policy_period=retry_policy_period, **reqkwargs)
[docs] def pull(self, num=1, retry=0, retrysleep=60, retrybackoff=None, return_immediately=False, **reqkwargs): """Pull messages from subscription Kwargs: num (int): Number of messages to pull retry: int. Number of request retries before giving up. Default is 0 meaning no further request retry will be made after first unsuccesfull request. retrysleep: int. Static number of seconds to sleep before next request attempt retrybackoff: int. Backoff factor to apply between each request attempts return_immediately (boolean): If True and if stream of messages is empty, subscriber call will not block and wait for messages reqkwargs: keyword argument that will be passed to underlying python-requests library call. Return: [(ackId, AmsMessage)]: List of tuples with ackId and AmsMessage instance """ return self.init.pull_sub(self.name, num=num, return_immediately=return_immediately, **reqkwargs)
[docs] def pullack(self, num=1, retry=0, retrysleep=60, retrybackoff=None, return_immediately=False, **reqkwargs): """Pull messages from subscription and acknownledge them in one call. If succesfull subscription pull immediately follows with failed acknownledgment (e.g. network hiccup just before acknowledgement of received messages), consume cycle will reset and start from begginning with new subscription pull. Kwargs: num (int): Number of messages to pull retry: int. Number of request retries before giving up. Default is 0 meaning no further request retry will be made after first unsuccesfull request. retrysleep: int. Static number of seconds to sleep before next request attempt retrybackoff: int. Backoff factor to apply between each request attempts return_immediately (boolean): If True and if stream of messages is empty, subscriber call will not block and wait for messages reqkwargs: keyword argument that will be passed to underlying python-requests library call. Return: [AmsMessage1, AmsMessage2]: List of AmsMessage instances """ return self.init.pullack_sub(self.name, num=num, return_immediately=return_immediately, **reqkwargs)
[docs] def time_to_offset(self, timestamp, **reqkwargs): """ Retrieve the closest(greater than) available offset to the given timestamp. Args: timestamp(datetime.datetime): The timestamp of the offset we are looking for. Kwargs: reqkwargs: keyword argument that will be passed to underlying python-requests library call. """ return self.init.time_to_offset_sub(self.name, timestamp, **reqkwargs)
[docs] def offsets(self, offset='all', move_to=0, **reqkwargs): """ Retrieve the positions of min, max and current offsets or move current offset to new one. Args: offset (str): The name of the offset. If not specified, it will return all three of them as a dict. Values that can be specified are 'max', 'min', 'current' and 'all'. move_to (int): New position for current offset. Kwargs: reqkwargs: keyword argument that will be passed to underlying python-requests library call. Return: dict: A dictionary containing all 3 offsets. If move_to is specified, current offset will be moved and updated. int: The value of the specified offset. """ avail_offsets = set(['max', 'min', 'current']) if (offset == 'all' or offset in avail_offsets) and move_to == 0: return self.init.getoffsets_sub(self.name, offset, **reqkwargs) elif move_to != 0: _ = self.init.modifyoffset_sub(self.name, move_to, **reqkwargs) return self.init.getoffsets_sub(self.name, offset, **reqkwargs)
[docs] def acl(self, users=None, **reqkwargs): """Set or get ACLs assigned to subscription Kwargs: users (list): If list of users is specified, give those user access to subscription. Empty list will reset access permission. reqkwargs: keyword argument that will be passed to underlying python-requests library call. """ if users is None: return self.init.getacl_sub(self.name, **reqkwargs) else: return self.init.modifyacl_sub(self.name, users, **reqkwargs)
[docs] def ack(self, ids, retry=0, retrysleep=60, retrybackoff=None, **reqkwargs): """Acknowledge receive of messages Kwargs: ids (list): A list of ackIds of the messages to acknowledge. retry: int. Number of request retries before giving up. Default is 0 meaning no further request retry will be made after first unsuccesfull request. retrysleep: int. Static number of seconds to sleep before next request attempt retrybackoff: int. Backoff factor to apply between each request attempts """ return self.init.ack_sub(self.name, ids, **reqkwargs)