import sys import os import imp import json import threading import socket import random import time import elasticsearch from datetime import datetime import logging.config, logging.handlers class ScriptHandler(logging.Handler): """ A logging handler which behaves the way scripts should: error msgs to stderr and normal msgs to stdout. """ @classmethod def factory(cls, **args): return cls() def emit(self, record): try: f = sys.stderr if record.levelno >= logging.WARN else sys.stdout f.write(self.format(record) + '\n') f.flush() except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) class WebHandler(logging.Handler): """ A handler class which allows pages to act the way scripts do: errors to one file and msgs to another. """ @classmethod def factory(cls, **args): return cls(args['info'], args['err']) def __init__(self, info_path, err_path): self.info_stream = open(info_path, 'a') self.err_stream = open(err_path, 'a') logging.Handler.__init__(self) def emit(self, record): try: f = record.levelno >= logging.WARN and self.err_stream or self.info_stream f.write(self.format(record) + '\n') f.flush() except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def flush(self): self.info_stream.flush() self.err_stream.flush() def close(self): self.info_stream.close() self.err_stream.close() logging.Handler.close(self) class RedisHandler(logging.Handler): """ Emit logs to a Redis server using LPUSH. """ @classmethod def factory(cls, **args): key = args.pop('key', None) if not key: raise ValueError('RedisHandler requires `key`') return cls(key, **args) def __init__(self, key, host='localhost', port=6379): self.key = key self.client = redis.StrictRedis(host=host, port=port) logging.Handler.__init__(self) def emit(self, record): """ Synchronously LPUSH message to Redis. NOTE: If given a model object as an argument, then add it as keyword arg as well. Assuming the msg makes it to something like ElasticSearch, then this enables more powerful filtering. Examples: "user:X", "order:Y" """ d = dict(record.__dict__) for arg in d['args']: if hasattr(arg, '_kind'): s = str(arg) if hasattr(arg, 'id') and s != arg.id: s = "{}:{}".format(arg.id, s) d[arg._kind] = s if 'type' not in d: d['type'] = d.get('name') exc = d.pop('exc_info', None) if exc and exc[0]: # Could be (None, None, None) exc_type, exc_val, tb = exc d['exc'] = dict( type = exc_type.__name__, val = exc_val, tb = [], ) while tb: f = tb.tb_frame d['exc']['tb'].append("{}:{}:{}".format(f.f_code.co_filename, tb.tb_lineno, f.f_code.co_name)) tb = tb.tb_next del tb # Can lead to a memory leak if don't delete? d['message'] = d.pop('msg', '') % d.pop('args', ()) self.client.lpush(self.key, json.dumps(d, default=str)) class ElasticSearchHandler(logging.Handler): """ Emit logs to an ElasticSearch index. """ @classmethod def factory(cls, **args): index = args.pop('index', None) if not index: raise ValueError('ElasticSearchHandler requires `index`') return cls(index, **args) def __init__(self, index, host='localhost', port=9200): self.index = index self.client = elasticsearch.Elasticsearch([dict(host=host, port=port)]) logging.Handler.__init__(self) def emit(self, record): """ Index record to ElasticSearch. NOTE: If given a model object as an argument, then add it as keyword arg as well. Assuming the msg makes it to something like ElasticSearch, then this enables more powerful filtering. Examples: "user:X", "order:Y" """ d = dict(record.__dict__) for arg in d['args']: if hasattr(arg, '_kind'): s = str(arg) if hasattr(arg, 'id') and s != arg.id: s = "{}:{}".format(arg.id, s) d[arg._kind] = s if 'type' not in d: d['type'] = d.get('name') exc = d.pop('exc_info', None) if exc and exc[0]: # Could be (None, None, None) exc_type, exc_val, tb = exc d['exc'] = dict( type = exc_type.__name__, val = exc_val, tb = [], ) while tb: f = tb.tb_frame d['exc']['tb'].append("{}:{}:{}".format(f.f_code.co_filename, tb.tb_lineno, f.f_code.co_name)) tb = tb.tb_next del tb # Can lead to a memory leak if don't delete? d['message'] = d.pop('msg', '') % d.pop('args', ()) d['@timestamp'] = datetime.utcfromtimestamp(d.pop('created')) self.client.index(index=self.index, doc_type=d['type'], body=d) class ContextFilter(logging.Filter): """ A logging filter for storing & emitting thread-local attributes. """ def __init__(self, name, **defaults): super().__init__(name) self.tlocal = threading.local() self.defaults = defaults @property def _data(self): """ Return thread-local data if defined. Otherwise, initialize thread-local data with defaults. """ if not hasattr(self.tlocal, 'data'): self.tlocal.data = self.defaults return self.tlocal.data def filter(self, record): """ Attach collected data to the record. """ record.__dict__.update(self._data) return True # Emulate a dictionary def update(self, *args, **kwargs): self._data.update(*args, **kwargs) def __contains__(self, name): return name in self._data def __getitem__(self, name): return self._data[name] def __setitem__(self, name, val): self._data[name] = val web_filter = ContextFilter( 'web', hostname = socket.gethostname(), user = None, method = None, path = None, status = None, ip = None, size = None, times = dict(), ) LOG_CFG = { 'version': 1, 'disable_existing_loggers': False, 'handlers': { 'elasticsearch': { '()': ElasticSearchHandler.factory, 'host': 'localhost', 'index': 'desertpy', 'level': 'INFO', }, 'script' : { '()': ScriptHandler.factory, 'formatter': 'script', 'level': 'INFO', }, 'stdout' : { 'class': 'logging.StreamHandler', 'formatter': 'stdout', 'stream': 'ext://sys.stdout', 'level': 'INFO', }, 'syslog-bin' : { 'class': 'logging.handlers.SysLogHandler', 'formatter': 'syslog-bin', }, 'syslog-web' : { 'class': 'logging.handlers.SysLogHandler', 'formatter': 'syslog-web', }, }, 'root': { 'handlers': ['stdout'], 'level': 'INFO', }, 'loggers': { 'orvant.web': { 'handlers': ['elasticsearch'], 'level': 'INFO', }, }, 'formatters': { 'script': { 'format': '%(levelname)s SCRIPT[%(name)s] %(message)s' }, 'stdout': { 'format': '%(levelname)s STDOUT[%(name)s] %(message)s' }, 'syslog-bin': { 'format': '%(type)s: %(message)s' }, 'syslog-web': { 'format': '%(type)s %(ip)s %(user)s %(method)s %(path)s %(status)s %(size)s %(message)s' }, }, } def show_effective_level(log): levels = {0: 'NOTSET', 10: 'DEBUG', 20: 'INFO', 30: 'WARN', 40: 'ERROR', 50: 'FATAL'} print("LEVEL[{}]: {}".format(log.name, levels.get(log.getEffectiveLevel()))) logging.config.dictConfig(LOG_CFG) root = logging.getLogger() orvant = logging.getLogger('orvant') web = logging.getLogger('orvant.web') web.addFilter(web_filter) test1 = logging.getLogger('orvant.web.test1') show_effective_level(root) show_effective_level(orvant) show_effective_level(web) show_effective_level(test1) for i in root.handlers: print("HANDLER[root]:", i.name) for i in orvant.handlers: print("HANDLER[orvant]:", i.name) for i in web.handlers: print("HANDLER[orvant.web]:", i.name) for i in test1.handlers: print("HANDLER[orvant.web.test1]:", i.name) for i in root.filters: print("FILTER[root]:", i.name) for i in orvant.filters: print("FILTER[orvant]:", i.name) for i in web.filters: print("FILTER[orvant.web]:", i.name) for i in test1.filters: print("FILTER[orvant.web.test1]:", i.name) logging.getLogger('elasticsearch').setLevel(logging.WARN) from flask import Flask, request app = Flask(__name__) def get_filter(log_name, filter_name): """ Return filter by name, first by looking at the logger's filters and then each handler's filters. """ log = logging.getLogger(log_name) for filter in log.filters: if filter.name == filter_name: return filter for handler in log.handlers: for filter in handler.filters: if filter.name == filter_name: return filter class timed(object): """ Time wrapped function and store in given log context filter. """ def __init__(self, metric_name, log_name, filter_name): self.metric_name = metric_name self.log_name = log_name self.filter_name = filter_name def __call__(self, func): def wrapper(*args, **kwds): start = time.time() try: return func(*args, **kwds) finally: filter = get_filter(self.log_name, self.filter_name) filter['times'][self.metric_name] = round(1000 * (time.time() - start)) return wrapper @timed('auth', 'orvant.web', 'web') def authenticate(): user = random.choice(('larry', 'moe', 'curly', 'jabroney', 'jones')) get_filter('orvant.web', 'web').update(user = user) time.sleep(0.5 * random.random()) return user @timed('validate', 'orvant.web', 'web') def validate(): time.sleep(0.5 * random.random()) @timed('render', 'orvant.web', 'web') def render(): time.sleep(0.5 * random.random()) @timed('total', 'orvant.web.test1', 'web') @app.route('/t1_') def test1_endpoint(path): authenticate() validate() render() return reply('Great Success!', 'orvant.web.test1') @timed('total', 'orvant.web.test2', 'web') @app.route('/t2_') def test2_endpoint(path): authenticate() validate() render() return reply('Great Success!', 'orvant.web.test2') def reply(body, channel): "Common per-response processing" filter = get_filter('orvant.web', 'web') filter.update( method = request.method, path = request.path, status = random.choice(10*[200] + 1*[302] + 1*[304] + 1*[400] + 1*[404] + 1*[500] + 1*[503]), ip = request.remote_addr or "10.1.1.{}".format(random.choice(range(255))), size = round((1 << 20) * random.random()), ) logging.getLogger(channel).info("{} {} {}".format(filter['user'], request.method, request.path)) return body def get(path): "Simulate a GET to our web app" return app.test_client().get(path) logging.getLogger('orvant.web.test1').addFilter(web_filter) logging.getLogger('orvant.web.test2').addFilter(web_filter) for i in range(500): get("/t{}_{}".format( random.choice((1, 2)), random.choice(('foo', 'bar', 'baz', 'big', 'bam', 'boozle'))))