import json
import logging
import logging.handlers
import requests
import socket
import sys
import datetime
import time
from .amsexceptions import (AmsServiceException, AmsConnectionException,
AmsMessageException, AmsException,
AmsTimeoutException, AmsBalancerException)
from .amsmsg import AmsMessage
from .amstopic import AmsTopic
from .amssubscription import AmsSubscription
from .amsuser import AmsUser, AmsUserPage, AmsUserProject
try:
from collections import OrderedDict
except:
from ordereddict import OrderedDict
log = logging.getLogger(__name__)
[docs]class AmsHttpRequests(object):
"""Class encapsulates methods used by ArgoMessagingService.
Each method represent HTTP request made to AMS with the help of requests
library. service error handling is implemented according to HTTP
status codes returned by service and the balancer.
"""
[docs] def __init__(self, endpoint, authn_port, token="", cert="", key=""):
self.endpoint = endpoint
self.authn_port = authn_port
self.token = token
# Create route list
self.routes = {
# topic api calls
"topic_list": ["get", "https://{0}/v1/projects/{1}/topics"],
"topic_get": ["get", "https://{0}/v1/projects/{1}/topics/{2}"],
"topic_publish": ["post", "https://{0}/v1/projects/{1}/topics/{2}:publish"],
"topic_create": ["put", "https://{0}/v1/projects/{1}/topics/{2}"],
"topic_delete": ["delete", "https://{0}/v1/projects/{1}/topics/{2}"],
"topic_getacl": ["get", "https://{0}/v1/projects/{1}/topics/{2}:acl"],
"topic_modifyacl": ["post", "https://{0}/v1/projects/{1}/topics/{2}:modifyAcl"],
# subscription api calls
"sub_create": ["put", "https://{0}/v1/projects/{1}/subscriptions/{2}"],
"sub_delete": ["delete", "https://{0}/v1/projects/{1}/subscriptions/{2}"],
"sub_list": ["get", "https://{0}/v1/projects/{1}/subscriptions"],
"sub_get": ["get", "https://{0}/v1/projects/{1}/subscriptions/{2}"],
"sub_pull": ["post", "https://{0}/v1/projects/{1}/subscriptions/{2}:pull"],
"sub_ack": ["post", "https://{0}/v1/projects/{1}/subscriptions/{2}:acknowledge"],
"sub_pushconfig": ["post", "https://{0}/v1/projects/{1}/subscriptions/{2}:modifyPushConfig"],
"sub_getacl": ["get", "https://{0}/v1/projects/{1}/subscriptions/{2}:acl"],
"sub_modifyacl": ["post", "https://{0}/v1/projects/{1}/subscriptions/{2}:modifyAcl"],
"sub_offsets": ["get", "https://{0}/v1/projects/{1}/subscriptions/{2}:offsets"],
"sub_mod_offset": ["post", "https://{0}/v1/projects/{1}/subscriptions/{2}:modifyOffset"],
"sub_timeToOffset": ["get", "https://{0}/v1/projects/{1}/subscriptions/{2}:timeToOffset?time={3}"],
# miscellaneous api calls about metrics,version,status
"api_status": ["get", "https://{0}/v1/status"],
"api_metrics": ["get", "https://{0}/v1/metrics"],
"api_va_metrics": ["get", "https://{0}/v1/metrics/va_metrics"],
"api_version": ["get", "https://{0}/v1/version"],
"api_usage_report": ["get", "https://{0}/v1/users/usageReport"],
# user api calls
"user_create": ["post", "https://{0}/v1/users/{1}"],
"user_update": ["put", "https://{0}/v1/users/{1}"],
"user_get": ["get", "https://{0}/v1/users/{1}"],
"user_get_by_token": ["get", "https://{0}/v1/users:byToken/{1}"],
"user_get_by_uuid": ["get", "https://{0}/v1/users:byUUID/{1}"],
"user_get_profile": ["get", "https://{0}/v1/users/profile"],
"users_list": ["get", "https://{0}/v1/users"],
"user_delete": ["delete", "https://{0}/v1/users/{1}"],
"user_refresh_token": ["post", "https://{0}/v1/users/{1}:refreshToken"],
# project api calls
"project_add_member": ["post", "https://{0}/v1/projects/{1}/members/{2}:add"],
"project_get_member": ["get", "https://{0}/v1/projects/{1}/members/{2}"],
"project_create_member": ["post", "https://{0}/v1/projects/{1}/members/{2}"],
"project_remove_member": ["post", "https://{0}/v1/projects/{1}/members/{2}:remove"],
"project_create": ["post", "https://{0}/v1/projects/{1}"],
"project_update": ["put", "https://{0}/v1/projects/{1}"],
"project_get": ["get", "https://{0}/v1/projects/{1}"],
"project_delete": ["delete", "https://{0}/v1/projects/{1}"],
"auth_x509": ["get", "https://{0}:{1}/v1/service-types/ams/hosts/{0}:authx509"],
}
# HTTP error status codes returned by AMS according to:
# http://argoeu.github.io/messaging/v1/api_errors/
self.ams_errors_route = {
"topic_create": ["put", set([409, 401, 403])],
"topic_list": ["get", set([400, 401, 403, 404])],
"topic_delete": ["delete", set([401, 403, 404])],
"topic_get": ["get", set([404, 401, 403])],
"topic_modifyacl": ["post", set([400, 401, 403, 404])],
"topic_publish": ["post", set([413, 401, 403])],
"sub_create": ["put", set([400, 409, 408, 401, 403])],
"sub_get": ["get", set([404, 401, 403])],
"sub_mod_offset": ["post", set([400, 401, 403, 404])],
"sub_ack": ["post", set([408, 400, 401, 403, 404])],
"sub_pushconfig": ["post", set([400, 401, 403, 404])],
"sub_pull": ["post", set([400, 401, 403, 404])],
"sub_timeToOffset": ["get", set([400, 401, 403, 404, 409])],
"user_create": ["post", set([400, 401, 403, 404, 409])],
"user_update": ["post", set([400, 401, 403, 404, 409])],
"user_get": ["get", set([400, 401, 403, 404])],
"user_get_by_token": ["get", set([400, 401, 403, 404])],
"user_get_by_uuid": ["get", set([400, 401, 403, 404])],
"user_get_profile": ["get", set([400, 401, 403, 404])],
"users_list": ["get", set([401, 403])],
"user_delete": ["delete", set([401, 403, 404])],
"user_refresh_token": ["post", set([401, 403, 404])],
"api_status": ["get", set([401, 403])],
"api_metrics": ["get", set([401, 403])],
"api_va_metrics": ["get", set([400, 401, 403, 404])],
"api_version": ["get", set([401, 403])],
"api_usage_report": ["get", set([400, 401, 403])],
"project_add_member": ["post", set([400, 401, 403, 404, 409])],
"project_get_member": ["get", set([400, 401, 403, 404])],
"project_create_member": ["post", set([400, 401, 403, 404, 409])],
"project_remove_member": ["get", set([401, 403, 404])],
"project_create": ["post", set([400, 401, 403, 409])],
"project_update": ["put", set([400, 401, 403, 404, 409])],
"project_get": ["get", set([401, 403, 404])],
"project_delete": ["delete", set([401, 403, 404])],
"auth_x509": ["post", set([400, 401, 403, 404])]}
# https://cbonte.github.io/haproxy-dconv/1.8/configuration.html#1.3
self.balancer_errors_route = {"sub_ack": ["post", set([500, 502, 503, 504])],
"sub_pull": ["post", set([500, 502, 503, 504])],
"topic_publish": ["post", set([500, 502, 503, 504])]}
# determine the token to be used
self.assign_token(token, cert, key)
[docs] def assign_token(self, token, cert, key):
"""Assign a token to the ams object
Args:
token(str): a valid ams token
cert(str): a path to a valid certificate file
key(str): a path to the associated key file for the provided certificate
"""
# check if a token has been provided
if token != "":
return
try:
# otherwise use the provided certificate to retrieve it
self.token = self.auth_via_cert(cert, key)
except AmsServiceException as e:
# if the request send to authn didn't contain an x509 cert, that means that there was also no token provided
# when initializing the ArgoMessagingService object, since we only try to authenticate through authn
# when no token was provided
if e.msg == 'While trying the [auth_x509]: No certificate provided.':
refined_msg = "No certificate provided. No token provided."
errormsg = self._error_dict(refined_msg, e.code)
raise AmsServiceException(json=errormsg, request="auth_x509")
raise e
[docs] def auth_via_cert(self, cert, key, **reqkwargs):
"""Retrieve an ams token based on the provided certificate
Args:
cert(str): a path to a valid certificate file
key(str): a path to the associated key file for the provided certificate
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
if cert == "" and key == "":
errord = self._error_dict("No certificate provided.", 400)
raise AmsServiceException(json=errord, request="auth_x509")
# create the certificate tuple needed by the requests library
reqkwargs = {"cert": (cert, key)}
route = self.routes["auth_x509"]
# Compose url
url = route[1].format(self.endpoint, self.authn_port)
method = getattr(self, 'do_{0}'.format(route[0]))
try:
r = method(url, "auth_x509", **reqkwargs)
# if the `token` field was not found in the response, raise an error
if "token" not in r:
errord = self._error_dict("Token was not found in the response. Response: " + str(r), 500)
raise AmsServiceException(json=errord, request="auth_x509")
return r["token"]
except (AmsServiceException, AmsConnectionException) as e:
raise e
def _error_dict(self, response_content, status):
error_dict = dict()
try:
if (response_content and sys.version_info < (3, 6,) and
isinstance(response_content, bytes)):
response_content = response_content.decode()
error_dict = json.loads(response_content) if response_content else {}
except ValueError:
error_dict = {'error': {'code': status, 'message': response_content}}
return error_dict
def _gen_backoff_time(self, try_number, backoff_factor):
for i in range(0, try_number):
value = backoff_factor * (2 ** (i - 1))
yield value
def _retry_make_request(self, url, body=None, route_name=None, retry=0,
retrysleep=60, retrybackoff=None, **reqkwargs):
"""Wrapper around _make_request() that decides whether should request
be retried or not.
Two request retry modes are available:
1) static sleep - fixed amount of seconds to sleep between
request attempts
2) backoff - each next sleep before request attempt is
exponentially longer
If enabled, request will be retried in the following occassions:
* timeouts from AMS (HTTP 408) or load balancer (HTTP 408 and 504)
* load balancer HTTP 502, 503
* connection related problems in the lower network layers
Default behaviour is no retry attempts. If both, retry and
retrybackoff are enabled, retrybackoff will take precedence.
Args:
url: str. The final messaging service endpoint
body: dict. Payload of the request
route_name: str. The name of the route to follow selected from the route list
retry: int. Number of request retries before giving up. Default
is 0 meaning no further request retry will be made
after first unsuccesfull request.
retrysleep: int. Static number of seconds to sleep before next
request attempt
retrybackoff: int. Backoff factor to apply between each request
attempts
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
i = 1
timeout = reqkwargs.get('timeout', 0)
saved_exp = None
if retrybackoff:
try:
return self._make_request(url, body, route_name, **reqkwargs)
except (AmsBalancerException, AmsConnectionException,
AmsTimeoutException) as e:
for sleep_secs in self._gen_backoff_time(retry, retrybackoff):
try:
return self._make_request(url, body, route_name, **reqkwargs)
except (AmsBalancerException, AmsConnectionException,
AmsTimeoutException) as e:
saved_exp = e
time.sleep(sleep_secs)
if timeout:
log.warning(
'Backoff retry #{0} after {1} seconds, connection timeout set to {2} seconds - {3}: {4}'.format(
i, sleep_secs, timeout, self.endpoint, e))
else:
log.warning(
'Backoff retry #{0} after {1} seconds - {2}: {3}'.format(i, sleep_secs, self.endpoint,
e))
finally:
i += 1
else:
if saved_exp:
raise saved_exp
else:
raise e
else:
while i <= retry + 1:
try:
return self._make_request(url, body, route_name, **reqkwargs)
except (AmsBalancerException, AmsConnectionException, AmsTimeoutException) as e:
if i == retry + 1:
raise e
else:
time.sleep(retrysleep)
if timeout:
log.warning(
'Retry #{0} after {1} seconds, connection timeout set to {2} seconds - {3}: {4}'.format(
i, retrysleep, timeout, self.endpoint, e))
else:
log.warning(
'Retry #{0} after {1} seconds - {2}: {3}'.format(i, retrysleep, self.endpoint, e))
finally:
i += 1
def _make_request(self, url, body=None, route_name=None, **reqkwargs):
"""Common method for PUT, GET, POST HTTP requests with appropriate
service error handling by differing between AMS and load balancer
erroneous behaviour.
"""
m = self.routes[route_name][0]
decoded = None
try:
# the get request based on requests.
# populate all requests with the x-api-key header
# except the authn mapping call
if route_name != "auth_x509":
# if there is no defined headers dict in the reqkwargs, introduce it
if "headers" not in reqkwargs:
headers = {
"x-api-key": self.token
}
reqkwargs["headers"] = headers
else:
# if the there are already other headers defined, just append the x-api-key one
reqkwargs["headers"]["x-api-key"] = self.token
reqmethod = getattr(requests, m)
r = reqmethod(url, data=body, **reqkwargs)
content = r.content
status_code = r.status_code
if (content and sys.version_info < (3, 6,) and isinstance(content,
bytes)):
content = content.decode()
if status_code == 200:
decoded = self._error_dict(content, status_code)
# handle authnz related errors for all calls
elif status_code == 401 or status_code == 403:
raise AmsServiceException(json=self._error_dict(content,
status_code),
request=route_name)
elif status_code == 408 or (status_code == 504 and route_name in
self.balancer_errors_route):
raise AmsTimeoutException(json=self._error_dict(content,
status_code),
request=route_name)
# handle errors from AMS
elif (status_code != 200 and status_code in
self.ams_errors_route[route_name][1]):
raise AmsServiceException(json=self._error_dict(content,
status_code),
request=route_name)
# handle errors coming from load balancer
elif (status_code != 200 and route_name in
self.balancer_errors_route and status_code in
self.balancer_errors_route[route_name][1]):
raise AmsBalancerException(json=self._error_dict(content,
status_code),
request=route_name)
# handle any other erroneous behaviour by raising exception
else:
raise AmsServiceException(json=self._error_dict(content,
status_code),
request=route_name)
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout,
socket.error) as e:
raise AmsConnectionException(e, route_name)
else:
return decoded if decoded else {}
[docs] def do_get(self, url, route_name, **reqkwargs):
"""Method supports all the GET requests.
Used for (topics, subscriptions, users, messages).
Args:
url: str. The final messaging service endpoint
route_name: str. The name of the route to follow selected from the route list
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
# try to send a GET request to the messaging service.
# if a connection problem araises a Connection error exception is raised.
try:
return self._retry_make_request(url,
body=None,
route_name=route_name,
**reqkwargs)
except AmsException as e:
raise e
[docs] def do_put(self, url, body, route_name, **reqkwargs):
"""Method supports all the PUT requests.
Used for (topics, subscriptions, messages).
Args:
url: str. The final messaging service endpoint
body: dict. Body the post data to send based on the PUT request.
The post data is always in json format.
route_name: str. The name of the route to follow selected from
the route list
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
# try to send a PUT request to the messaging service.
# if a connection problem arises a Connection error exception is raised.
try:
return self._retry_make_request(url, body=body,
route_name=route_name, **reqkwargs)
except AmsException as e:
raise e
[docs] def do_post(self, url, body, route_name, retry=0, retrysleep=60,
retrybackoff=None, **reqkwargs):
"""Method supports all the POST requests.
Used for (topics, subscriptions, users, messages).
Args:
url: str. The final messaging service endpoint
body: dict. Body the post data to send based on the PUT request.
The post data is always in json format.
route_name: str. The name of the route to follow selected from
the route list
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
# try to send a Post request to the messaging service.
# if a connection problem araises a Connection error exception is raised.
try:
return self._retry_make_request(url, body=body,
route_name=route_name, retry=retry,
retrysleep=retrysleep,
retrybackoff=retrybackoff,
**reqkwargs)
except AmsException as e:
raise e
[docs] def do_delete(self, url, route_name, retry=0, retrysleep=60,
retrybackoff=None, **reqkwargs):
"""Delete method that is used to make the appropriate request.
Used for (topics, subscriptions).
Args:
url: str. The final messaging service endpoint
route_name: str. The name of the route to follow selected from the route list
reqkwargs: keyword argument that will be passed to underlying python-requests library call.
"""
# try to send a delete request to the messaging service.
try:
return self._retry_make_request(url, body=None,
route_name=route_name, retry=retry,
retrysleep=retrysleep,
retrybackoff=retrybackoff,
**reqkwargs)
except AmsException as e:
raise e
[docs]class ArgoMessagingService(AmsHttpRequests):
"""Class is entry point for client code.
Class abstract Argo Messaging Service by covering all available HTTP API
calls that are wrapped in series of methods.
"""
[docs] def __init__(self, endpoint, token="", project="", cert="", key="", authn_port=8443):
super(ArgoMessagingService, self).__init__(endpoint, authn_port, token, cert, key)
self.project = project
self.pullopts = {"maxMessages": "1",
"returnImmediately": "false"}
# Containers for topic and subscription objects
self.topics = OrderedDict()
self.subs = OrderedDict()
def _create_sub_obj(self, s, topic):
self.subs.update({s['name']: AmsSubscription(s['name'], topic,
s['pushConfig'],
s['ackDeadlineSeconds'],
init=self)})
def _delete_sub_obj(self, s):
del self.subs[s['name']]
def _create_topic_obj(self, t):
self.topics.update({t['name']: AmsTopic(t['name'], init=self)})
def _delete_topic_obj(self, t):
del self.topics[t['name']]
[docs] def getacl_topic(self, topic, **reqkwargs):
"""Get access control lists for topic
Args:
topic (str): The topic name.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
topicobj = self.get_topic(topic, retobj=True, **reqkwargs)
route = self.routes["topic_getacl"]
# Compose url
url = route[1].format(self.endpoint, self.project, topic)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "topic_getacl", **reqkwargs)
if r:
self.topics[topicobj.fullname].acls = r['authorized_users']
return r
else:
self.topics[topicobj.fullname].acls = []
return []
[docs] def modifyacl_topic(self, topic, users, **reqkwargs):
"""Modify access control lists for topic
Args:
topic (str): The topic name.
users (list): List of users that will have access to topic.
Empty list of users will reset access control list.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
topicobj = self.get_topic(topic, retobj=True, **reqkwargs)
route = self.routes["topic_modifyacl"]
# Compose url
url = route[1].format(self.endpoint, self.project, topic)
method = getattr(self, 'do_{0}'.format(route[0]))
r = None
try:
msg_body = json.dumps({"authorized_users": users})
r = method(url, msg_body, "topic_modifyacl", **reqkwargs)
if r is not None:
self.topics[topicobj.fullname].acls = users
return True
except (AmsServiceException, AmsConnectionException) as e:
raise e
[docs] def getacl_sub(self, sub, **reqkwargs):
"""Get access control lists for subscription
Args:
sub (str): The subscription name.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
subobj = self.get_sub(sub, retobj=True, **reqkwargs)
route = self.routes["sub_getacl"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "sub_getacl", **reqkwargs)
if r:
self.subs[subobj.fullname].acls = r['authorized_users']
return r
else:
self.subs[subobj.fullname].acls = []
return []
[docs] def getoffsets_sub(self, sub, offset='all', **reqkwargs):
"""Retrieve the current positions of min,max and current offsets.
Args:
sub (str): The subscription name.
offset(str): The name of the offset.If not specified, it will return all three of them as a dict.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["sub_offsets"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "sub_offsets", **reqkwargs)
try:
if offset != 'all':
return r[offset]
return r
except KeyError as e:
errormsg = {'error': {'message': str(e) + " is not valid offset position"}}
raise AmsServiceException(json=errormsg, request="sub_offsets")
[docs] def time_to_offset_sub(self, sub, timestamp, **reqkwargs):
"""Retrieve the closest(greater than) available offset to the given timestamp.
Args:
sub (str): The subscription name.
timestamp(datetime.datetime): The timestamp of the offset we are looking for.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["sub_timeToOffset"]
method = getattr(self, 'do_{0}'.format(route[0]))
time_in_string = ""
if isinstance(timestamp, datetime.datetime):
if timestamp.microsecond != 0:
time_in_string = timestamp.isoformat()[:-3] + "Z"
else:
time_in_string = timestamp.strftime("%Y-%m-%d %H:%M:%S.000Z")
# Compose url
url = route[1].format(self.endpoint, self.project, sub, time_in_string)
try:
r = method(url, "sub_timeToOffset", **reqkwargs)
return r["offset"]
except AmsServiceException as e:
raise e
[docs] def modifyoffset_sub(self, sub, move_to, **reqkwargs):
"""Modify the position of the current offset.
Args:
sub (str): The subscription name.
move_to(int): Position to move the offset.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["sub_mod_offset"]
method = getattr(self, 'do_{0}'.format(route[0]))
if not isinstance(move_to, int):
move_to = int(move_to)
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
# Request body
data = {"offset": move_to}
try:
r = method(url, json.dumps(data), "sub_mod_offset", **reqkwargs)
return r
except AmsServiceException as e:
raise e
[docs] def modifyacl_sub(self, sub, users, **reqkwargs):
"""Modify access control lists for subscription
Args:
sub (str): The subscription name.
users (list): List of users that will have access to subscription.
Empty list of users will reset access control list.
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
subobj = self.get_sub(sub, retobj=True, **reqkwargs)
route = self.routes["sub_modifyacl"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = None
try:
msg_body = json.dumps({"authorized_users": users})
r = method(url, msg_body, "sub_modifyacl", **reqkwargs)
if r is not None:
self.subs[subobj.fullname].acls = users
return True
except (AmsServiceException, AmsConnectionException) as e:
raise e
[docs] def pushconfig_sub(self, sub, push_endpoint=None, retry_policy_type='linear', retry_policy_period=300, **reqkwargs):
"""Modify push configuration of given subscription
Args:
sub: shortname of subscription
push_endpoint: URL of remote endpoint that should receive messages in push subscription mode
retry_policy_type:
retry_policy_period:
reqkwargs: keyword argument that will be passed to underlying python-requests library call.
"""
if push_endpoint:
push_dict = {"pushConfig": {"pushEndpoint": push_endpoint,
"retryPolicy": {"type": retry_policy_type,
"period": retry_policy_period}}}
else:
push_dict = {"pushConfig": {}}
msg_body = json.dumps(push_dict)
route = self.routes["sub_pushconfig"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
p = method(url, msg_body, "sub_pushconfig", **reqkwargs)
subobj = self.subs.get('/projects/{0}/subscriptions/{1}'.format(self.project, sub), False)
if subobj:
subobj.push_endpoint = push_endpoint
subobj.retry_policy_type = retry_policy_type
subobj.retry_policy_period = retry_policy_period
return p
[docs] def iter_subs(self, topic=None, **reqkwargs):
"""Iterate over AmsSubscription objects
Args:
topic: Iterate over subscriptions only associated to this topic name
"""
self.list_subs(**reqkwargs)
try:
values = self.subs.copy().itervalues()
except AttributeError:
values = self.subs.copy().values()
for s in values:
if topic and topic == s.topic.name:
yield s
elif not topic:
yield s
[docs] def iter_topics(self, **reqkwargs):
"""Iterate over AmsTopic objects"""
self.list_topics(**reqkwargs)
try:
values = self.topics.copy().itervalues()
except AttributeError:
values = self.topics.copy().values()
for t in values:
yield t
[docs] def list_topics(self, **reqkwargs):
"""List the topics of a selected project
Args:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call
"""
route = self.routes["topic_list"]
# Compose url
url = route[1].format(self.endpoint, self.project)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "topic_list", **reqkwargs)
for t in r['topics']:
if t['name'] not in self.topics:
self._create_topic_obj(t)
if r:
return r
else:
return []
[docs] def has_topic(self, topic, **reqkwargs):
"""Inspect if topic already exists or not
Args:
topic: str. Topic name
"""
try:
self.get_topic(topic, **reqkwargs)
return True
except AmsServiceException as e:
if e.code == 404:
return False
else:
raise e
except AmsConnectionException as e:
raise e
[docs] def get_topic(self, topic, retobj=False, **reqkwargs):
"""Get the details of a selected topic.
Args:
topic: str. Topic name.
retobj: Controls whether method should return AmsTopic object
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["topic_get"]
# Compose url
url = route[1].format(self.endpoint, self.project, topic)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "topic_get", **reqkwargs)
if r['name'] not in self.topics:
self._create_topic_obj(r)
if retobj:
return self.topics[r['name']]
else:
return r
[docs] def publish(self, topic, msg, retry=0, retrysleep=60, retrybackoff=None, **reqkwargs):
"""Publish a message or list of messages to a selected topic.
If enabled (retry > 0), multiple topic publishes will be tried in
case of problems/glitches with the AMS service. retry* options are
eventually passed to _retry_make_request()
Args:
topic (str): Topic name.
msg (list): A list with one or more messages to send.
Each message is represented as AmsMessage object or python
dictionary with at least data or one attribute key defined.
Kwargs:
retry: int. Number of request retries before giving up. Default
is 0 meaning no further request retry will be made
after first unsuccesfull request.
retrysleep: int. Static number of seconds to sleep before next
request attempt
retrybackoff: int. Backoff factor to apply between each request
attempts
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
Return:
dict: Dictionary with messageIds of published messages
"""
if not isinstance(msg, list):
msg = [msg]
if all(isinstance(m, AmsMessage) for m in msg):
msg = [m.dict() for m in msg]
try:
msg_body = json.dumps({"messages": msg})
except TypeError as e:
raise AmsMessageException(e)
route = self.routes["topic_publish"]
# Compose url
url = route[1].format(self.endpoint, self.project, topic)
method = getattr(self, 'do_{0}'.format(route[0]))
return method(url, msg_body, "topic_publish", retry=retry,
retrysleep=retrysleep, retrybackoff=retrybackoff,
**reqkwargs)
[docs] def list_subs(self, **reqkwargs):
"""Lists all subscriptions in a project with a GET request.
Args:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["sub_list"]
# Compose url
url = route[1].format(self.endpoint, self.project)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "sub_list", **reqkwargs)
for s in r['subscriptions']:
if s['topic'] not in self.topics:
self._create_topic_obj({'name': s['topic']})
if s['name'] not in self.subs:
self._create_sub_obj(s, self.topics[s['topic']].fullname)
if r:
return r
else:
return []
[docs] def get_sub(self, sub, retobj=False, **reqkwargs):
"""Get the details of a subscription.
Args:
sub: str. The subscription name.
retobj: Controls whether method should return AmsSubscription object
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["sub_get"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "sub_get", **reqkwargs)
if r['topic'] not in self.topics:
self._create_topic_obj({'name': r['topic']})
if r['name'] not in self.subs:
self._create_sub_obj(r, self.topics[r['topic']].fullname)
if retobj:
return self.subs[r['name']]
else:
return r
[docs] def has_sub(self, sub, **reqkwargs):
"""Inspect if subscription already exists or not
Args:
sub: str. The subscription name.
"""
try:
self.get_sub(sub, **reqkwargs)
return True
except AmsServiceException as e:
if e.code == 404:
return False
else:
raise e
except AmsConnectionException as e:
raise e
[docs] def pull_sub(self, sub, num=1, return_immediately=False, retry=0,
retrysleep=60, retrybackoff=None, **reqkwargs):
"""This function consumes messages from a subscription in a project
with a POST request.
If enabled (retry > 0), multiple subscription pulls will be tried in
case of problems/glitches with the AMS service. retry* options are
eventually passed to _retry_make_request()
Args:
sub: str. The subscription name.
num: int. The number of messages to pull.
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
wasmax = self.get_pullopt('maxMessages')
wasretim = self.get_pullopt('returnImmediately')
self.set_pullopt('maxMessages', num)
self.set_pullopt('returnImmediately', str(return_immediately).lower())
msg_body = json.dumps(self.pullopts)
route = self.routes["sub_pull"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, msg_body, "sub_pull", retry=retry,
retrysleep=retrysleep, retrybackoff=retrybackoff,
**reqkwargs)
msgs = r['receivedMessages']
self.set_pullopt('maxMessages', wasmax)
self.set_pullopt('returnImmediately', wasretim)
return list(map(lambda m: (m['ackId'], AmsMessage(b64enc=False, **m['message'])), msgs))
[docs] def ack_sub(self, sub, ids, **reqkwargs):
"""Acknownledgment of received messages
Messages retrieved from a pull subscription can be acknowledged by
sending message with an array of ackIDs. The service will retrieve
the ackID corresponding to the highest message offset and will
consider that message and all previous messages as acknowledged by
the consumer.
Args:
sub: str. The subscription name.
ids: list(str). A list of ids of the messages to acknowledge.
reqkwargs: keyword argument that will be passed to underlying python-requests library call.
"""
msg_body = json.dumps({"ackIds": ids})
route = self.routes["sub_ack"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
method(url, msg_body, "sub_ack", **reqkwargs)
return True
[docs] def pullack_sub(self, sub, num=1, return_immediately=False, retry=0,
retrysleep=60, retrybackoff=None, **reqkwargs):
"""Pull messages from subscription and acknownledge them in one call.
If enabled (retry > 0), multiple subscription pulls will be tried in
case of problems/glitches with the AMS service. retry* options are
eventually passed to _retry_make_request().
If succesfull subscription pull immediately follows with failed
acknownledgment (e.g. network hiccup just before acknowledgement of
received messages), consume cycle will reset and start from
beginning with new subscription pull. This ensures that ack deadline
time window is moved to new start period, that is the time when the
second pull was initiated.
Args:
sub: str. The subscription name.
num: int. The number of messages to pull.
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
while True:
try:
ackIds = list()
messages = list()
for id, msg in self.pull_sub(sub, num,
return_immediately=return_immediately,
retry=retry,
retrysleep=retrysleep,
retrybackoff=retrybackoff,
**reqkwargs):
ackIds.append(id)
messages.append(msg)
except AmsException as e:
raise e
if messages and ackIds:
try:
self.ack_sub(sub, ackIds, **reqkwargs)
break
except AmsException as e:
log.warning('Continuing with sub_pull after sub_ack: {0}'.format(e))
pass
else:
break
return messages
[docs] def set_pullopt(self, key, value):
"""Function for setting pull options
Args:
key: str. The name of the pull option (ex. maxMessages, returnImmediately). Messaging specific
names are allowed.
value: str or int. The name of the pull option (ex. maxMessages,
returnImmediately). Messaging specific names are allowed.
"""
self.pullopts.update({key: str(value)})
[docs] def get_pullopt(self, key):
"""Function for getting pull options
Args:
key: str. The name of the pull option (ex. maxMessages,
returnImmediately). Messaging specific names are allowed.
Returns:
str. The value of the pull option
"""
return self.pullopts[key]
[docs] def create_user(self, user, **reqkwargs):
"""
This function creates a new user with a POST request
Args:
user: AmsUser. The user to be created.
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
Return:
object (AmsUser)
"""
if not isinstance(user, AmsUser):
raise ValueError("user has to be of type AmsUser")
try:
route = self.routes["user_create"]
url = route[1].format(self.endpoint, user.name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, user.to_json(), "user_create", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def update_user(self, name,
first_name="",
last_name="",
description="",
organization="",
username="",
email="",
service_roles=None,
projects=None,
**reqkwargs):
"""
Update the respective user using the provided username with a PUT request
:param last_name: (str) the last name of the user
:param first_name: (str) the first name of the user
:param description: (str) user description
:param organization: (str) user organisation
:param username: (str) new username
:param email: (str) the email the user
:param service_roles: (str[]) new service roles
:param projects: (AmsUserProject[]) new projects and roles
:param name: (str) the username of the user to be updated
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
body = {}
if first_name != "":
body["first_name"] = first_name
if last_name != "":
body["last_name"] = last_name
if description != "":
body["description"] = description
if organization != "":
body["organization"] = organization
if username != "":
body["name"] = username
if email != "":
body["email"] = email
if len(service_roles) > 0:
body["service_roles"] = service_roles
if len(projects) > 0:
body["projects"] = [{"project": x.project, "roles": x.roles} for x in projects]
try:
route = self.routes["user_update"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, json.dumps(body), "user_update", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def get_user(self, name, **reqkwargs):
"""
Retrieves the respective user using the provided username with a GET request
:param name: (str) the username of the user to be retrieved
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
try:
route = self.routes["user_get"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "user_get", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def get_user_by_token(self, token, **reqkwargs):
"""
Retrieves the respective user using the provided token with a GET request
:param token: (str) the token of the user to be retrieved
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
try:
route = self.routes["user_get_by_token"]
url = route[1].format(self.endpoint, token)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "user_get_by_token", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def get_user_by_uuid(self, uuid, **reqkwargs):
"""
Retrieves the respective user using the provided uuid with a GET request
:param uuid: (str) the uuid of the user to be retrieved
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
try:
route = self.routes["user_get_by_uuid"]
url = route[1].format(self.endpoint, uuid)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "user_get_by_uuid", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def get_user_profile(self, **reqkwargs):
"""
Retrieves the respective user using the provided token in the ams object with a GET request
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
try:
route = self.routes["user_get_profile"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "user_get_profile", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def list_users(self, details=True, page_size=0, next_page_token="", **reqkwargs):
"""
Retrieves the respective user using the provided token in the ams object with a GET request
:param next_page_token: (str) next page token in case of paginated retrieval
:param page_size: (int) size of each page
:param details: (bool) whether to include project details per user
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUserPage) a page containing AmsUser objects and pagination details
"""
try:
url_params = {
"details": details,
"pageSize": page_size,
"nextPageToken": next_page_token
}
route = self.routes["users_list"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
reqkwargs["params"] = url_params
r = method(url, "users_list", **reqkwargs)
return AmsUserPage(
users=[AmsUser().load_from_dict(x) for x in r["users"]],
total_size=r["totalSize"],
next_page_token=r["nextPageToken"]
)
except AmsException as e:
raise e
[docs] def delete_user(self, name, **reqkwargs):
"""
Deletes the respective user using the provided username with a DELETE request
:param name: (str) the username of the user to be retrieved
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["user_delete"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "user_delete", **reqkwargs)
except AmsException as e:
raise e
[docs] def refresh_user_token(self, name, **reqkwargs):
"""
Refresh the token for the respective user using the provided username with a POST request
:param name: (str) the username of the user to be retrieved
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
try:
route = self.routes["user_refresh_token"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, None, "user_refresh_token", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def add_project_member(self, username, project=None, roles=None, **reqkwargs):
"""
Assigns an existing user to the provided project with a POST request
:param (str) project: the name of the project.If no
project is supplied, the declared global project will be used instead
:param (str) username: the name of user
:param (str[]) roles: project roles for the user
:param reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
:return: (AmsUser) the assigned user object
"""
if roles is None or not isinstance(roles, list):
roles = []
if project is None:
project = self.project
body = {
"roles": roles
}
try:
route = self.routes["project_add_member"]
url = route[1].format(self.endpoint, project, username)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, json.dumps(body), "project_add_member", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def create_project_member(self, username, project=None, roles=None, email=None, **reqkwargs):
"""
This function creates a new user with a POST request under the given project
:param (str) project: the name of the project.If no
project is supplied, the declared global project will be used instead
:param (str) username: the name of the user
:param (str) email: the email of the user
:param (str[]) roles: project roles for the user
:return: (AmsUser) the assigned user object
"""
if roles is None or not isinstance(roles, list):
roles = []
if project is None:
project = self.project
user = AmsUser(
projects=[AmsUserProject(project=project, roles=roles)]
)
if email is not None:
user.email = email
try:
route = self.routes["project_create_member"]
url = route[1].format(self.endpoint, project, username)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, user.to_json(), "project_create_member", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def get_project_member(self, username, project=None, **reqkwargs):
"""
Retrieves the respective project member using the provided username with a GET request
:param username: (str) the username of the user to be retrieved
:param project: (str) the name of the project the user belongs to.If no
project is supplied, the declared global project will be used instead
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
:return: (AmsUser) the ams user
"""
try:
if project is None:
project = self.project
route = self.routes["project_get_member"]
url = route[1].format(self.endpoint, project, username)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "project_get_member", **reqkwargs)
return AmsUser().load_from_dict(r)
except AmsException as e:
raise e
[docs] def remove_project_member(self, username, project, **reqkwargs):
"""
Removes an existing user from the provided project with a POST request
:param username: (str) the name of user
:param project: (Str) the name of the project
:param reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["project_remove_member"]
url = route[1].format(self.endpoint, project, username)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, None, "project_remove_member", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def create_project(self, name, description, **reqkwargs):
"""
Create a new project using the provided name and description with a POST request
:param name: (str) the name of the project
:param description: (str) the description of the project
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
body = {
"description": description
}
try:
route = self.routes["project_create"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, json.dumps(body), "project_create", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def update_project(self, name, description="", updated_name="", **reqkwargs):
"""
Create a new project using the provided name and description with a POST request
:param name: (str) the name of the project
:param description: (str) the description of the project
:param updated_name: (str) updated name
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
body = {}
if description != "":
body["description"] = description
if updated_name != "":
body["name"] = updated_name
try:
route = self.routes["project_update"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, json.dumps(body), "project_update", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def get_project(self, name, **reqkwargs):
"""
Retrieve a project using the provided name with a GET request
:param name: (str) the name of the project
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["project_get"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "project_get", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def delete_project(self, name, **reqkwargs):
"""
Delete a project using the provided name with a DELETE request
:param name: (str) the name of the project
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["project_delete"]
url = route[1].format(self.endpoint, name)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "project_delete", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def create_sub(self, sub, topic, ackdeadline=10, push_endpoint=None,
retry_policy_type='linear', retry_policy_period=300, retobj=False, **reqkwargs):
"""This function creates a new subscription in a project with a PUT request
Args:
sub: str. The subscription name.
topic: str. The topic name.
ackdeadline: int. It is a custom "ack" deadline (in seconds) in
the subscription. If your code doesn't
acknowledge the message in this time, the
message is sent again. If you don't specify
the deadline, the default is 10 seconds.
push_endpoint: URL of remote endpoint that should receive
messages in push subscription mode
retry_policy_type:
retry_policy_period:
retobj: Controls whether method should return AmsSubscription object
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
topic = self.get_topic(topic, retobj=True, **reqkwargs)
msg_body = json.dumps({"topic": topic.fullname.strip('/'),
"ackDeadlineSeconds": ackdeadline})
route = self.routes["sub_create"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, msg_body, "sub_create", **reqkwargs)
if push_endpoint:
ret = self.pushconfig_sub(sub, push_endpoint, retry_policy_type, retry_policy_period, **reqkwargs)
r['pushConfig'] = {"pushEndpoint": push_endpoint,
"retryPolicy": {"type": retry_policy_type,
"period": retry_policy_period}}
if r['name'] not in self.subs:
self._create_sub_obj(r, topic.fullname)
if retobj:
return self.subs[r['name']]
else:
return r
[docs] def delete_sub(self, sub, **reqkwargs):
"""This function deletes a selected subscription in a project
Args:
sub: str. The subscription name.
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["sub_delete"]
# Compose url
url = route[1].format(self.endpoint, self.project, sub)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "sub_delete", **reqkwargs)
sub_fullname = "/projects/{0}/subscriptions/{1}".format(self.project, sub)
if sub_fullname in self.subs:
self._delete_sub_obj({'name': sub_fullname})
return r
[docs] def topic(self, topic, **reqkwargs):
"""Function create a topic in a project.
It's wrapper around few methods defined in client class. Method will
ensure that AmsTopic object is returned either by fetching existing
one or creating a new one in case it doesn't exist.
Args:
topic (str): The topic name
Kwargs:
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
Return:
object (AmsTopic)
"""
try:
if self.has_topic(topic, **reqkwargs):
return self.get_topic(topic, retobj=True, **reqkwargs)
else:
return self.create_topic(topic, retobj=True, **reqkwargs)
except AmsException as e:
raise e
[docs] def create_topic(self, topic, retobj=False, **reqkwargs):
"""This function creates a topic in a project
Args:
topic: str. The topic name.
retobj: Controls whether method should return AmsTopic object
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["topic_create"]
# Compose url
url = route[1].format(self.endpoint, self.project, topic)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, '', "topic_create", **reqkwargs)
if r['name'] not in self.topics:
self._create_topic_obj(r)
if retobj:
return self.topics[r['name']]
else:
return r
[docs] def delete_topic(self, topic, **reqkwargs):
"""This function deletes a topic in a project
Args:
topic: str. The topic name.
reqkwargs: keyword argument that will be passed to underlying
python-requests library call.
"""
route = self.routes["topic_delete"]
# Compose url
url = route[1].format(self.endpoint, self.project, topic)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "topic_delete", **reqkwargs)
topic_fullname = "/projects/{0}/topics/{1}".format(self.project, topic)
if topic_fullname in self.topics:
self._delete_topic_obj({'name': topic_fullname})
return r
[docs] def status(self, **reqkwargs):
"""
Retrieves the status of the service
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["api_status"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "api_status", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def metrics(self, **reqkwargs):
"""
Retrieves the operational metrics of the service
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["api_metrics"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "api_metrics", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def version(self, **reqkwargs):
"""
Retrieves the version information about the service
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
try:
route = self.routes["api_version"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
r = method(url, "api_version", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def va_metrics(self, projects=None, start_date=None, end_date=None, **reqkwargs):
"""
Retrieves va report metrics for the given projects and the given time period
:param projects: (str[]) filter based on the given projects
:param start_date: (datetime.datetime time period starting date
:param end_date: (datetime.datetime) time period end date
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
url_params = {}
if projects is None or not isinstance(projects, list):
projects = []
if start_date is not None and isinstance(start_date, datetime.datetime):
url_params["start_date"] = start_date.strftime("%Y-%m-%d")
if end_date is not None and isinstance(end_date, datetime.datetime):
url_params["end_date"] = end_date.strftime("%Y-%m-%d")
if len(projects) > 0:
url_params["projects"] = ",".join(projects)
try:
route = self.routes["api_va_metrics"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
reqkwargs["params"] = url_params
r = method(url, "api_va_metrics", **reqkwargs)
return r
except AmsException as e:
raise e
[docs] def usage_report(self, projects=None, start_date=None, end_date=None, **reqkwargs):
"""
Retrieves va report metrics for the given projects and the given time period
alongside the service's operational metrics.
The api call will retrieve all projects that the requesting user is a project admin for.
:param projects: (str[]) filter based on the given projects
:param start_date: (datetime.datetime time period starting date
:param end_date: (datetime.datetime) time period end date
:param reqkwargs: keyword arguments that will be passed to underlying
python-requests library call.
"""
url_params = {}
if projects is None or not isinstance(projects, list):
projects = []
if start_date is not None and isinstance(start_date, datetime.datetime):
url_params["start_date"] = start_date.strftime("%Y-%m-%d")
if end_date is not None and isinstance(end_date, datetime.datetime):
url_params["end_date"] = end_date.strftime("%Y-%m-%d")
if len(projects) > 0:
url_params["projects"] = ",".join(projects)
try:
route = self.routes["api_usage_report"]
url = route[1].format(self.endpoint)
method = getattr(self, 'do_{0}'.format(route[0]))
reqkwargs["params"] = url_params
r = method(url, "api_usage_report", **reqkwargs)
return r
except AmsException as e:
raise e