Source code for blockade.libs.indicators

#!/usr/bin/env python
"""Indicator interface for blockade."""

__author__ = 'Brandon Dixon'
__version__ = '1.0.0'

import math
import time
from blockade.api import Client
from blockade.common.utils import (cache_items, check_whitelist,
                                   clean_indicators, hash_values, prune_cached)


[docs]class IndicatorClient(Client): """Client to interface with indicators for blockade.""" def __init__(self, *args, **kwargs): """Setup the primary client instance.""" super(IndicatorClient, self).__init__(*args, **kwargs)
[docs] def add_indicators(self, indicators=list(), private=False, tags=list()): """Add indicators to the remote instance.""" if len(indicators) == 0: raise Exception("No indicators were identified.") self.logger.debug("Checking {} indicators".format(len(indicators))) cleaned = clean_indicators(indicators) self.logger.debug("Cleaned {} indicators".format(len(cleaned))) whitelisted = check_whitelist(cleaned) self.logger.debug("Non-whitelisted {} indicators".format(len(whitelisted))) indicators = prune_cached(whitelisted) hashed = hash_values(indicators) self.logger.debug("Non-cached {} indicators".format(len(indicators))) self.logger.debug("Processing {} indicators".format(len(indicators))) request_count = int(math.ceil(len(indicators)/100.0)) if request_count == 0: mesg = "[!] No indicators were left to process after " mesg += "cleaning, whitelisting and checking the cache." return {'message': mesg} stats = {'success': 0, 'failure': 0, 'requests': request_count, 'written': 0} mesg = "{} indicators found, making {} requests" self.logger.debug(mesg.format(len(indicators), request_count)) if private: indicators = hashed if type(tags) == str: tags = [t.strip().lower() for t in tags.split(',')] start, end = (0, 100) for i, idx in enumerate(range(0, request_count)): if idx > 0: time.sleep(3) # Ensure we never trip the limit self.logger.debug("Waiting 3 seconds before next request.") to_send = {'indicators': indicators[start:end], 'tags': tags} r = self._send_data('POST', 'admin', 'add-indicators', to_send) start, end = (end, end + 100) if not r['success']: stats['failure'] += 1 continue stats['success'] += 1 stats['written'] += r['writeCount'] cache_items(to_send['indicators']) msg = "" msg += "{written} indicators written using {requests} requests: " msg += "{success} success, {failure} failure" stats['message'] = msg.format(**stats) return stats
[docs] def get_indicators(self): """List indicators available on the remote instance.""" response = self._get('', 'get-indicators') response['message'] = "%i indicators:\n%s" % ( len(response['indicators']), "\n".join(response['indicators']) ) return response