def post(self, url, params=None):
"""Perform an authenticated POST against a MAAS endpoint.
:param url: MAAS endpoint
:param params: extra data sent with the HTTP request
"""
return requests.post(
url=self._generate_url(url),
headers=self._oauth_headers(),
data=params
def _generate_url(self, extra=u''):
parsed = urlparse(self._url)
api = u'/MAAS/api/%s/' % self.API_VERSION
base_url = u'{protocol}://{netloc}'.format(protocol=self.PROTOCOL,
netloc=parsed.netloc)
api_url = urljoin(base_url, api)
return urljoin(api_url, extra)
def _oauth_headers(self, additional_headers=None):
"""Return OAuth attributes for protected resources.
:returns: headers to be used in http requests
"""
consumer_key, key, secret = self.api_key.split(u':')
resource_tok_string = u'oauth_token_secret=%s&oauth_token=%s' % (
secret, key)
resource_token = oauth.OAuthToken.from_string(resource_tok_string)
consumer_token = oauth.OAuthConsumer(consumer_key, u'')
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
consumer_token, token=resource_token,
http_url=self._generate_url(),
parameters={u'oauth_nonce': uuid.uuid4().hex})
oauth_request.sign_request(
oauth.OAuthSignatureMethod_PLAINTEXT(), consumer_token,
resource_token)
headers = oauth_request.to_header()
if additional_headers:
for key in additional_headers:
headers[key] = additional_headers[key]
return headers