Source code for firetower.category

import calendar
from collections import namedtuple
import hashlib
import simplejson as json
import time

from logbook import Logger

import classifier
import redis_util

log = Logger('Firetower-category')
TSTuple = namedtuple("TimeSeriesTuple", ("timestamp", "count"))

SECOND = 1
MINUTE = 60
HOUR = MINUTE * 60
DAY = HOUR * 24
WEEK = DAY * 7

TIME_SLICES = {
    "second": SECOND, "minute": MINUTE, "5_minute": MINUTE * 5,
    "10_minute": MINUTE * 10, "15_minute": MINUTE * 15,
    "30_minute": MINUTE * 30, "hour": HOUR, "3_hour": HOUR * 3,
    "6_hour": HOUR * 6, "12_hour": HOUR * 12, "day": DAY,
    "week": WEEK
}

class TimeSeries(object):
    def __init__(self, redis_conn, cat_id):
        """Create a time series instance for a category

        Args:
            redis_conn: redis.Redis connection.
            cat_id: str, identifier for category.

        This object exposes the time series data stored in the sorted set
        ts_{cat_id}.

        """
        self.cat_id = cat_id
        self.redis_conn = redis_conn

    def convert_ts_list(self, ts_list, time_slice=None):
        """Process lists from timeseries sorted sets.

        Args:
            ts_list: List of tuples from the ts_ sorted set.
            time_slice: Optional. Name of a key in TIME_SLICES. The value of
                that key sets the size of the bucket the time series data
                is sorted into. Defaults to minute buckets.


        Takes a list of scores and values from a time series sorted set and
        return a list of entries in the form ({TIMESTAMP}, {COUNT}). This is
        required because the TS sorted set doesn't hold the raw count, to make
        the values unique it stores them as '{TIMESTAMP}:{COUNT}'

        """
        ret = []

        slice_dict = {}
        # This will be an int of seconds pulled from TIME_SLICES. This is how
        # large the buckets will be when sorting the time series data.
        if time_slice is None:
            time_slice = "minute"
        time_slice = TIME_SLICES[time_slice]

        if not ts_list:
            return []

        for ts_entry in ts_list:
            ts = int(ts_entry[1])
            count = int(ts_entry[0].split(":")[1])
            key = ts/time_slice
            slice_dict[key] = slice_dict.get(key, 0) + count

        keys = slice_dict.keys()
        keys.sort()
        for i in range(keys[0], keys[-1]):
            if i not in slice_dict:
                slice_dict[i] = 0
        keys = slice_dict.keys()
        keys.sort()

        for key in keys:
            ret.append(TSTuple(key*time_slice, slice_dict[key]))
        return ret

    def all(self, time_slice=None):
        """Return all timeseries data for the category.

        The data will be returned as a sequence of sequences.
        Each sub-sequence is of the form (VALUE, TIMESTAMP) where TIMESTAMP is
        a number of seconds since epoch and VALUE is the number of times the
        category appeared in that second.
        """
        return self.convert_ts_list(
            self.redis_conn.zrange(
                "ts_%s" % (self.cat_id), 0, -1, withscores=True
            ), time_slice
        )

    def range(self, start, end, time_slice=None):
        """Return all timeseries data for the category between start and end.

        start and end are assumed to be seconds since the epoch.

        The data will be returned as a sequence of sequences.
        Each sub-sequence is of the form (VALUE, TIMESTAMP) where TIMESTAMP is
        a number of seconds since epoch and VALUE is the number of times the
        category appeared in that second.
        """
        return self.convert_ts_list(
            self.redis_conn.zrevrangebyscore(
                "ts_%s" % (self.cat_id), end, start, withscores=True
            ),
            time_slice
        )

    @staticmethod
    def generate_ts_value(ts, count):
        """Turn a timestamp and cat count into a value for storage in a set"""
        return "%s:%s" % (ts, count)

    def archive_cat_counts(self, start_time, preserve=True):
        """Move everything before start_time into a Sorted Set.

        Args:
            start_time: datetime, utc timezone.
            preserve: boolean, if set will add to existing ts counts rather than
                over-writing them. Useful when archiving counts that already
                have entries in the ts set (e.g. backfilling)
        """
        ts_key = 'ts_%s' % (self.cat_id,)
        counter_key = 'counter_%s' % (self.cat_id,)
        check_mark = calendar.timegm(start_time.timetuple())
        now  = calendar.timegm(time.gmtime())
        counters_to_delete = []
        count_dict = self.redis_conn.hgetall(counter_key)

        interesting_ts = [
            x for x in count_dict if int(x) < check_mark
        ]

        for ts in interesting_ts:
            if ts == now:
                continue
            new_value = count_dict[ts]
            if preserve:
                existing_entry = self.range(ts, ts)
                if existing_entry:
                    # If there is an existing entry then don't overwrite it.
                    # If we do find something it should be the only entry for
                    # that second timeslot.
                    new_value += existing_entry[0].count
            self.redis_conn.zadd(
                ts_key, self.generate_ts_value(ts, new_value), ts
            )
            counters_to_delete.append(ts)

        # Remove the counters from the 'counter' key.
        # We store longterm counters in the timeseries key (ts).
        for counter in counters_to_delete:
            self.redis_conn.hdel(counter_key, counter)


class Events(object):
    def __init__(self, redis_conn, cat_id):
        self.cat_id = cat_id
        self.redis_conn = redis_conn

    def add_event(self, event, timestamp=None,
            event_increment=True, ts_increment=True):
        """Add an event to the event list.

        Args:
            event: the event dictionary to save into the event set
            timestamp: Optional time stamp to use when inserting the event.
                Defaults to now.
            event_increment: Optional. boolean - adds event to event series.
            ts_increment: Optional. boolean - Increments the counter for this category.

        """
        if timestamp is None:
            timestamp = int(time.time())

        event['ts'] = timestamp
        if event_increment:
            self.redis_conn.zadd("data_" + self.cat_id,  json.dumps(event), timestamp)
        if ts_increment:
            self.redis_conn.hincrby("counter_" + self.cat_id, timestamp, 1)

    def get_recent(self, num_events):
        """Return a list of recent events up to the num specified.

        Args:
            num_events: int, number of events to return.
        Returns:
            list, the most recent events up to num_events.
        """
        return self.range(int(num_events).__neg__(), -1)

    def recent_signatures(self, num_sigs):
        """Return a list of recent sigatures."""

        events = self.get_recent(num_sigs)
        return [json.loads(event)['sig'] for event in events]

    def range(self, start, end):
        """Return a range of events

        Args:
            start: Start index to return from.
            end: End index to return to.

        Both start and end are inclusive.
        """
        return self.redis_conn.zrange("data_%s" % (self.cat_id,), start, end)

    def _backfill_timeseries(self, delete=False):
        """This is for pulling data out an event stream and putting in ts.

        Args:
            delete: bool, whether or not to delete the archived ts.
        """
        # Not sure this is an ideal solution, since it depends on archiving
        # to work properly, which may be why we're backfilling in the
        # first place.
        events = self.redis_conn.zrange(
                "data_%s" % self.cat_id, 0, -1, withscores=True)
        cat_counter_id = "counter_%s" % (self.cat_id,)
        cat_ts_id = "ts_%s" % (self.cat_id,)
        if delete:
            self.redis_conn.delete(cat_ts_id)
        for _sig, ts in events:
            self.redis_conn.hincrby(cat_counter_id, int(ts), 1)

    def delete(self):
        """Delete an entire set of data"""
        self.redis_conn.delete("data_%s" % self.cat_id)


[docs]class Category(object): """A class to encapsulate operations involving categories and their metadata Currently exposes 3 read/write properties: signature, human_name, and threshold. """ CAT_META_HASH = "category_ids" SIGNATURE_KEY = "signature" HUMAN_NAME_KEY = "human_name" THRESHOLD_KEY = "threshold" STDEV_KEY = "stdev" MEAN_KEY = "mean" RE_KEY = "regex" keys = [ CAT_META_HASH, SIGNATURE_KEY, HUMAN_NAME_KEY, THRESHOLD_KEY, STDEV_KEY, MEAN_KEY, RE_KEY ] def __init__(self, redis_conn, signature=None, cat_id=None, event=None): """ Init. Args: redis_conn: redis.Redis instance. signature: str. cat_id: str, used to fetch existing category if it exists. event: dict, added to existing set of events. """ self.conn = redis_conn if signature: self.cat_id = redis_util.Redis.construct_cat_id(signature) elif cat_id: self.cat_id = cat_id else: self.cat_id = None self.timeseries, self.events = None, None if self.cat_id: self.timeseries = TimeSeries(self.conn, self.cat_id) self.events = Events(self.conn, self.cat_id) if event and self.events: self.events.add_event(event)
[docs] def to_dict(self, num_recent=5): """Return a dictionary representation of this cats metadata Args: num_recent: int, number of recent categorizations to include. defaults to five. Returns: dict, containing string value of signature, human readbable name of the category, the classification threshold, and a list of the most recent event signatures classified as this category. """ return { self.SIGNATURE_KEY: self.signature, self.HUMAN_NAME_KEY: self.human_name, self.THRESHOLD_KEY: self.threshold, "recent_signatures": self.events.recent_signatures(num_recent) }
[docs] def recategorise(self, default_threshold, archive_time=None): """WARNING: Will remove this category and re-sort its events Args: default_threshold: Default threshold to try and reclassify on. This takes the place of what is pulled from the config in firetower-server. archive_time: Optional. After reclassification if this is set everything before this time will be archived from this point back. """ for key in self.keys: self.conn.hdel(self.CAT_META_HASH, "%s:%s" % (self.cat_id, key)) comp = classifier.Levenshtein() event_chunk = 1000 curr_count = 0 while 1: events = self.events.range(curr_count, (curr_count+event_chunk-1)) if not events: break for event in events: event_dict = json.loads(event) self.classify(self.conn, comp, event_dict, default_threshold) curr_count += event_chunk if archive_time: for cat in self.get_all_categories(self.conn): cat.timeseries.archive_cat_counts(archive_time) self.events.delete() self.conn.delete("counter_%s" % self.cat_id) self.conn.delete("ts_%s" % self.cat_id)
@classmethod
[docs] def classify(cls, queue, classifiers, error, threshold): """Determine which category, if any, a signature belongs to. If it doesn't find a match, then it'll save the error into a new category, which subsequent errors are checked against. Args: queue: redis connection instance. classifiers: list of Classifier instances. error: dict of json payload with a 'sig' key. threshold: float, classification threshold to match. """ categories = cls.get_all_categories(queue) matched_cat = None start = time.time() for classifier in classifiers: matched = False for cat in categories: if classifier.check_message(cat, error, threshold): cat.events.add_event(error) matched_cat = cat matched = True break if matched: break else: cat_sig = error['sig'] matched_cat = cls.create(queue, cat_sig, event=error) end = time.time() log.info('Classification took %.2f seconds for matched category %s' % ( end - start, matched_cat.cat_id)) return matched_cat
@classmethod
[docs] def create(cls, redis_conn, signature, event=None): """Adds category metadata. Args: redis_conn: Redis connection to use. signature: Signature of the new category. event: Optional. Event to save under the new category. This method will set 3 metadata fields for the category hash: * category is the full text of the original cateory. * verbose_name is the human readable name for this category (used for display purposes) * threshold is the custom threshold for this category The key values are in the form {category_hash}:{metadata_name} e.g. a909ede39c09d84ed1839c5ca0f9b9876113770b:category """ redis_conn.zadd("categories", signature, 0) cat_id = cls.construct_cat_id(signature) cat_fields = ( (cls.SIGNATURE_KEY, signature), (cls.HUMAN_NAME_KEY, cat_id), (cls.THRESHOLD_KEY, ""), ) for key, value in cat_fields: redis_conn.hset(cls.CAT_META_HASH, "%s:%s" %(cat_id, key), value) kwargs = {"cat_id": cat_id} if event: kwargs["event"] = event return cls(redis_conn, **kwargs)
@classmethod
[docs] def get_all_categories(cls, redis_conn): """Return a list of all currently defined categories""" categories = [] hash_map = redis_conn.hgetall(cls.CAT_META_HASH) for key in hash_map: if key.endswith(cls.SIGNATURE_KEY): categories.append(cls( redis_conn, cat_id=key.replace(":" + cls.SIGNATURE_KEY, "") )) return categories
@staticmethod
[docs] def construct_cat_id(signature): """Create Category ID hash from a category signature. Args: signature: str, signature of category. Returns: str, sha1 hash. """ cat_id = hashlib.sha1() cat_id.update(signature) return cat_id.hexdigest()
def _get_key(self, key): return self.conn.hget( self.CAT_META_HASH, "%s:%s" % (self.cat_id, key) ) def _set_key(self, key, value): return self.conn.hset( self.CAT_META_HASH, "%s:%s" % (self.cat_id, key), value ) def _get_signature(self): return self._get_key(self.SIGNATURE_KEY) def _set_signature(self, value): return self._set_key(self.SIGNATURE_KEY, value) signature = property(_get_signature, _set_signature) def _get_human(self): return self._get_key(self.HUMAN_NAME_KEY) def _set_human(self, value): return self._set_key(self.HUMAN_NAME_KEY, value) human_name = property(_get_human, _set_human) def _get_threshold(self): return self._get_key(self.THRESHOLD_KEY) def _set_threshold(self, value): return self._set_key(self.THRESHOLD_KEY, value) threshold = property(_get_threshold, _set_threshold) def _get_stdev(self): return self._get_key(self.STDEV_KEY) def _set_stdev(self, value): return self._set_key(self.STDEV_KEY) stdev = property(_get_stdev, _set_stdev) def _get_mean(self): return self._get_key(self.MEAN_KEY) def _set_mean(self, value): return self._set_key(self.MEAN_KEY, value) mean = property(_get_mean, _set_mean) def _get_regex(self): return self._get_key(self.RE_KEY) def _set_regex(self, value): return self._set_key(self.RE_KEY, value) regex = property(_get_regex, _set_regex)

Project Versions