Source code for firetower.classifier

import difflib
import json
import regex

from logbook import Logger

log = Logger('Firetower-classifier')

CLASSIFIER_TYPES = (
        'none',
        'ratio',
        'quick_ratio',
        'real_quick_ratio')

[docs]def longest_common_substr(s1, s2): """ Args: s1: str, first string to compare. s2: str, second string to compare. Influenced by: http://en.wikibooks.org/wiki/Algorithm_implementation/Strings/ \ Longest_common_substring#Python """ if len(s1) < len(s2): s1, s2 = s2, s1 M = [[0]*(1+len(s1)) for i in xrange(1+len(s1))] longest, x_longest = 0, 0 for x in xrange(1, 1+len(s1)): for y in xrange(1, 1+len(s2)): if s1[x-1] == s2[y-1]: M[x][y] = M[x-1][y-1] + 1 if M[x][y] > longest: longest = M[x][y] x_longest = x else: M[x][y] = 0 return s1[x_longest-longest: x_longest]
class Classifier(object): pass class NaiveBayes(Classifier): pass class Regex(Classifier): re_map = {} def _get_compiled_re(self, re): if not re in self.re_map: self.re_map[re] = regex.compile(re) return self.re_map[re] def check_message(self, cat, error, default_thresh): re = cat.regex if not re: return False c_re = self._get_compiled_re(re) result = regex.search(c_re, error['sig']) return result is not None class Levenshtein(Classifier): def _halve_ratio_dist(self, ratio): """Dynamically close gap between any ratio and 1.00. Args: ratio: float, ratio in question. Return: float, modified to be closer to 1.00. """ return ratio + (1 - ratio)/2 def str_len_ratio(self, cat_str, sig_str, str_len_thresh=0.8): """Comare lengths to see if we should go with more complex analysis. Args: cat_str: str, the category we're going to compare against. sig_str: str, the string we're pulling from incoming event. str_len_thresh: float, passing size ratio and above. Returns: bool if the lengths are within length tolerence. """ cat_str_len = len(cat_str) sig_str_len = len(sig_str) ratio = 0.0 if cat_str_len < sig_str_len: ratio = float(cat_str_len)/sig_str_len else: ratio = float(sig_str_len)/cat_str_len if ratio > str_len_thresh: return True else: return False def str_ratio(self, exemplar_str, sig_str, small_sig_size=100, medium_sig_size=2000): """Return the ratio of similarity between two strings; ignore spaces. Args: exemplar_str: str, basis of comparison within an existing category. sig_str: str, signature string we're trying to compare. small_sig_size: int, largest sig size before we use change comparison methodologies. medium_sig_size: int, largest sig size before we downgrade comparison methods fully. Returns: tuple of float, ratio of similarity and int which maps to type of ratio in CLASSIFIER_TYPES: enum of {none, ratio, quick_ratio, real_quick_ratio}. """ if self.str_len_ratio(exemplar_str, sig_str): sig_len = len(sig_str) seq = difflib.SequenceMatcher(None, exemplar_str, sig_str) if sig_len < small_sig_size: log.debug('Small signature found, using ratio()') return (seq.ratio(), 1) elif sig_len < medium_sig_size and sig_len >= small_sig_size: log.debug('Medium signature found, using quick_ratio()') return (seq.quick_ratio(), 2) else: log.debug('Large signature found, using real_quick_ratio()') return (seq.real_quick_ratio(), 3) else: log.debug('Ratio was too far off') return (0.0, 0) def is_similar(self, golden, sig_str, thresh, is_custom): """Returns True if similarity is larger than thresh. Args: golden: str, known category's signature. sig_str: str, unknown payload's signature. thresh: float, threshold to decide if we have a match. is_custom: boolean, does category have a custom threshold. Returns: bool, True if similar. """ ratio, class_type = self.str_ratio(golden, sig_str) # Make the default ratio closer to 1.00 the less accurate the # classification algorithm we use. Do this multiple times if # we use less accuracy in the algorithm. if class_type and not is_custom: # Probably too clever: not super happy with this implementation. for _half in range(class_type): thresh = self._halve_ratio_dist(thresh) log.debug('Used this default threshold %.4f' % thresh) return ratio > thresh def check_message(self, cat, error, default_thresh): """Compare error with messages from a category. Args: cat: category object to compare the error against error: dict, error message we're processing. thresh: float, the ratio of similarity needed to match. """ sig = error['sig'] custom_thresh = cat.threshold is_custom = custom_thresh is not None thresh = custom_thresh if is_custom else default_thresh exemplar_str = cat.signature return self.is_similar(sig, exemplar_str, thresh, is_custom=is_custom)

Project Versions