# vi: ts=4 expandtab
import abc
import logging
import json
import oauthlib.oauth1 as oauth1
from cloudinit.registry import DictRegistry
from cloudinit import url_helper
from cloudinit import util
class ReportingHandler(object):
@abc.abstractmethod
def publish_event(self, event):
raise NotImplementedError
class LogHandler(ReportingHandler):
"""Publishes events to the cloud-init log at the ``INFO`` log level."""
def publish_event(self, event):
"""Publish an event to the ``INFO`` log level."""
logger = logging.getLogger(
'.'.join(['cloudinit', 'reporting', event.event_type, event.name]))
logger.info(event.as_string())
class WebHookHandler(ReportingHandler):
def __init__(self, endpoint, consumer_key=None, token_key=None,
token_secret=None, consumer_secret=None, timeout=None,
self.retries):
super(WebHookHandler, self).__init__()
if any(consumer_key, token_key, token_secret, consumer_secret):
self.oauth_helper = url_helper.OauthHelper(
consumer_key=consumer_key, token_key=token_key,
token_secret=token_secret, consumer_secret=consumer_secret)
else:
self.oauth_helper = None
self.endpoint = endpoint
self.timeout = timeout
self.retries = retries
self.ssl_details = util.fetch_ssl_details()
def publish_event(self, event):
if self.oauth_helper:
read_url = self.oauth_helper.read_url
else:
read_url = url_helper.read_url
return read_url(
self.endpoint, data=event.as_json_data(),
timeout=self.timeout,
retries=self.retries, self.ssl_details)
available_handlers = DictRegistry()
available_handlers.register_item('log', LogHandler)