Source code for SoftLayer.managers.messaging

"""
    SoftLayer.messaging
    ~~~~~~~~~~~~~~~~~~~
    Manager for the SoftLayer Message Queue service

    :copyright: (c) 2013, SoftLayer Technologies, Inc. All rights reserved.
    :license: MIT, see LICENSE for more details.
"""
import json
import requests.auth

from SoftLayer.consts import USER_AGENT
from SoftLayer.exceptions import Unauthenticated, SoftLayerError

ENDPOINTS = {
    "dal05": {
        "public": "dal05.mq.softlayer.net",
        "private": "dal05.mq.service.networklayer.com"
    }
}


[docs]class QueueAuth(requests.auth.AuthBase): """ Message Queue authentication for requests :param endpoint: endpoint URL :param username: SoftLayer username :param api_key: SoftLayer API Key :param auth_token: (optional) Starting auth token """ def __init__(self, endpoint, username, api_key, auth_token=None): self.endpoint = endpoint self.username = username self.api_key = api_key self.auth_token = auth_token
[docs] def auth(self): """ Do Authentication """ headers = { 'X-Auth-User': self.username, 'X-Auth-Key': self.api_key } resp = requests.post(self.endpoint, headers=headers) if resp.ok: self.auth_token = resp.headers['X-Auth-Token'] else: raise Unauthenticated("Error while authenticating: %s" % resp.status_code)
[docs] def handle_error(self, r, **_): """ Handle errors """ r.request.deregister_hook('response', self.handle_error) if r.status_code == 503: r.connection.send(r.request) elif r.status_code == 401: self.auth() r.request.headers['X-Auth-Token'] = self.auth_token r.connection.send(r.request)
def __call__(self, r): """ Attach auth token to the request. Do authentication if an auth token isn't available """ if not self.auth_token: self.auth() r.register_hook('response', self.handle_error) r.headers['X-Auth-Token'] = self.auth_token return r
[docs]class MessagingManager(object): """ Manage SoftLayer Message Queue """ def __init__(self, client): self.client = client
[docs] def list_accounts(self, **kwargs): """ List message queue accounts :param dict \*\*kwargs: response-level arguments (limit, offset, etc.) """ if 'mask' not in kwargs: items = set([ 'id', 'name', 'status', 'nodes', ]) kwargs['mask'] = "mask[%s]" % ','.join(items) return self.client['Account'].getMessageQueueAccounts(**kwargs)
[docs] def get_endpoint(self, datacenter=None, network=None): """ Get a message queue endpoint based on datacenter/network type :param datacenter: datacenter code :param network: network ('public' or 'private') """ if datacenter is None: datacenter = 'dal05' if network is None: network = 'public' try: host = ENDPOINTS[datacenter][network] return "https://%s" % host except KeyError: raise TypeError('Invalid endpoint %s/%s' % (datacenter, network))
[docs] def get_endpoints(self): """ Get all known message queue endpoints """ return ENDPOINTS
[docs] def get_connection(self, account_id, datacenter=None, network=None): """ Get connection to Message Queue Service :param account_id: Message Queue Account id :param datacenter: Datacenter code :param network: network ('public' or 'private') """ if not self.client.auth \ or not getattr(self.client.auth, 'username', None) \ or not getattr(self.client.auth, 'api_key', None): raise SoftLayerError( 'Client instance auth must be BasicAuthentication.') client = MessagingConnection( account_id, endpoint=self.get_endpoint(datacenter, network)) client.authenticate(self.client.auth.username, self.client.auth.api_key) return client
[docs] def ping(self, datacenter=None, network=None): r = requests.get('%s/v1/ping' % self.get_endpoint(datacenter, network)) r.raise_for_status() return True
[docs]class MessagingConnection(object): """ Message Queue Service Connection :param account_id: Message Queue Account id :param endpoint: Endpoint URL """ def __init__(self, account_id, endpoint=None): self.account_id = account_id self.endpoint = endpoint self.auth = None def _make_request(self, method, path, **kwargs): """ Make request. Generally not called directly :param method: HTTP Method :param path: resource Path :param dict \*\*kwargs: extra request arguments """ headers = { 'Content-Type': 'application/json', 'User-Agent': USER_AGENT, } headers.update(kwargs.get('headers', {})) kwargs['headers'] = headers kwargs['auth'] = self.auth url = '/'.join((self.endpoint, 'v1', self.account_id, path)) r = requests.request(method, url, **kwargs) r.raise_for_status() return r
[docs] def authenticate(self, username, api_key, auth_token=None): """ Make request. Generally not called directly :param username: SoftLayer username :param api_key: SoftLayer API Key :param auth_token: (optional) Starting auth token """ auth_endpoint = '/'.join((self.endpoint, 'v1', self.account_id, 'auth')) auth = QueueAuth(auth_endpoint, username, api_key, auth_token=auth_token) auth.auth() self.auth = auth
[docs] def stats(self, period='hour'): """ Get account stats :param period: 'hour', 'day', 'week', 'month' """ r = self._make_request('get', 'stats/%s' % period) return json.loads(r.content) # QUEUE METHODS
[docs] def get_queues(self, tags=None): """ Get listing of queues :param list tags: (optional) list of tags to filter by """ params = {} if tags: params['tags'] = ','.join(tags) r = self._make_request('get', 'queues', params=params) return json.loads(r.content)
[docs] def create_queue(self, queue_name, **kwargs): """ Create Queue :param queue_name: Queue Name :param dict \*\*kwargs: queue options """ queue = {} queue.update(kwargs) data = json.dumps(queue) r = self._make_request('put', 'queues/%s' % queue_name, data=data) return json.loads(r.content)
[docs] def modify_queue(self, queue_name, **kwargs): """ Modify Queue :param queue_name: Queue Name :param dict \*\*kwargs: queue options """ return self.create_queue(queue_name, **kwargs)
[docs] def get_queue(self, queue_name): """ Get queue details :param queue_name: Queue Name """ r = self._make_request('get', 'queues/%s' % queue_name) return json.loads(r.content)
[docs] def delete_queue(self, queue_name, force=False): """ Delete Queue :param queue_name: Queue Name :param force: (optional) Force queue to be deleted even if there are pending messages """ params = {} if force: params['force'] = 1 self._make_request('delete', 'queues/%s' % queue_name, params=params) return True
[docs] def push_queue_message(self, queue_name, body, **kwargs): """ Create Queue Message :param queue_name: Queue Name :param body: Message body :param dict \*\*kwargs: Message options """ message = {'body': body} message.update(kwargs) r = self._make_request('post', 'queues/%s/messages' % queue_name, data=json.dumps(message)) return json.loads(r.content)
[docs] def pop_message(self, queue_name, count=1): """ Pop message from a queue :param queue_name: Queue Name :param count: (optional) number of messages to retrieve """ r = self._make_request('get', 'queues/%s/messages' % queue_name, params={'batch': count}) return json.loads(r.content)
[docs] def delete_message(self, queue_name, message_id): """ Delete a message :param queue_name: Queue Name :param message_id: Message id """ self._make_request('delete', 'queues/%s/messages/%s' % (queue_name, message_id)) return True # TOPIC METHODS
[docs] def get_topics(self, tags=None): """ Get listing of topics :param list tags: (optional) list of tags to filter by """ params = {} if tags: params['tags'] = ','.join(tags) r = self._make_request('get', 'topics', params=params) return json.loads(r.content)
[docs] def create_topic(self, topic_name, **kwargs): """ Create Topic :param topic_name: Topic Name :param dict \*\*kwargs: Topic options """ data = json.dumps(kwargs) r = self._make_request('put', 'topics/%s' % topic_name, data=data) return json.loads(r.content)
[docs] def modify_topic(self, topic_name, **kwargs): """ Modify Topic :param topic_name: Topic Name :param dict \*\*kwargs: Topic options """ return self.create_topic(topic_name, **kwargs)
[docs] def get_topic(self, topic_name): """ Get topic details :param topic_name: Topic Name """ r = self._make_request('get', 'topics/%s' % topic_name) return json.loads(r.content)
[docs] def delete_topic(self, topic_name, force=False): """ Delete Topic :param topic_name: Topic Name :param force: (optional) Force topic to be deleted even if there are attached subscribers """ params = {} if force: params['force'] = 1 self._make_request('delete', 'topics/%s' % topic_name, params=params) return True
[docs] def push_topic_message(self, topic_name, body, **kwargs): """ Create Topic Message :param topic_name: Topic Name :param body: Message body :param dict \*\*kwargs: Topic message options """ message = {'body': body} message.update(kwargs) r = self._make_request('post', 'topics/%s/messages' % topic_name, data=json.dumps(message)) return json.loads(r.content)
[docs] def get_subscriptions(self, topic_name): """ Listing of subscriptions on a topic :param topic_name: Topic Name """ r = self._make_request('get', 'topics/%s/subscriptions' % topic_name) return json.loads(r.content)
[docs] def create_subscription(self, topic_name, subscription_type, **kwargs): """ Create Subscription :param topic_name: Topic Name :param subscription_type: type ('queue' or 'http') :param dict \*\*kwargs: Subscription options """ r = self._make_request( 'post', 'topics/%s/subscriptions' % topic_name, data=json.dumps({ 'endpoint_type': subscription_type, 'endpoint': kwargs})) return json.loads(r.content)
[docs] def delete_subscription(self, topic_name, subscription_id): """ Delete a subscription :param topic_name: Topic Name :param subscription_id: Subscription id """ self._make_request('delete', 'topics/%s/subscriptions/%s' % (topic_name, subscription_id)) return True
Read the Docs v: v3.0.2
Versions
latest
v3.0.2
v3.0.1
v3.0.0
v2.3.1
v2.3.0
Downloads
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.