123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- from __future__ import print_function
- import argparse
- import daemon
- import logging
- import sys
- import time
- import threading
- from ffstatus import \
- merge_alfred_batman, \
- ApiServer, \
- AlfredParser, BatmanParser, \
- DashingClient, GraphitePush, \
- FileStorage, RedisStorage
- from ffstatus.exceptions import SanityCheckError
- BATCAVE = 'Batman/Alfred Transmission Collection, Aggregation & Value Engine'
- DEFAULT_INTERVAL = 15
- def get_args():
- """Parse commandline arguments."""
- parser = argparse.ArgumentParser(description=BATCAVE)
- parser.add_argument('--logfile',
- help='path for log file')
- parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL,
- help='data poll interval')
- parser.add_argument('-v', '--verbose', action='store_true',
- help='increase output verbosity')
- parser.add_argument('-d', '--no-detach', action='store_true',
- help='Don\'t detach (daemonize) ourself')
- parser.add_argument('-n', '--no-send', action='store_true',
- help='Fetch data but don\'t send it')
- parser.add_argument('-A', '--alfred-json',
- help='executable path for alfred-json')
- parser.add_argument('-B', '--batadv-vis',
- help='executable path for batadv-vis')
- parser.add_argument('-C', '--batctl',
- help='executable path for batctl')
- parser.add_argument('-G', '--graphite-host',
- help='Graphite host')
- parser.add_argument('--graphite-port', type=int, default=2003,
- help='Graphite port')
- parser.add_argument('--dashing-url',
- help='Dashing URL')
- parser.add_argument('--graphite-whitelist',
- help='comma-separated list of node-ids to send to Graphite only')
- parser.add_argument('--dashing-token',
- help='Dashing\'s secret update token')
- parser.add_argument('--api-bind-host', default='',
- help='API-Server Hostname')
- parser.add_argument('--api-bind-port', type=int, default=8888,
- help='API-Server Port')
- parser.add_argument('-S', '--storage', default='.',
- help='Path where to store data or ' +
- '"redis:[<host>[:<port>[:<password>]]]')
- return parser.parse_args()
- def prepare_logging(args):
- """Configures Python's logging according to args."""
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
- fmt = logging.Formatter(
- '%(asctime)s [%(levelname)s] %(message)s',
- '%Y-%m-%d %H:%M:%S')
- if not args.logfile is None:
- file_handler = logging.FileHandler(args.logfile)
- file_handler.setFormatter(fmt)
- logger.addHandler(file_handler)
- if args.no_detach:
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setFormatter(fmt)
- logger.addHandler(console_handler)
- return logger
- def main():
- args = get_args()
- if args.interval < 5:
- print('A poll interval lower than 5s is not supported.')
- sys.exit(1)
- shall_daemonize = not args.no_detach
- logger = prepare_logging(args)
- logger.info('Starting up')
- storage = None
- storage_target = args.storage
- if storage_target.startswith('redis:'):
- redis_opts = storage_target.split(':')
- redis_host = redis_opts[1] if len(redis_opts) > 1 else 'localhost'
- redis_port = int(redis_opts[2]) if len(redis_opts) > 2 else 6379
- redis_pass = redis_opts[3] if len(redis_opts) > 3 else None
- storage = RedisStorage(redis_host, redis_port, redis_pass)
- else:
- storage = FileStorage(args.storage)
- storage.open()
- logger.info('Storage: ' + str(storage))
- alfred = AlfredParser()
- batman = BatmanParser()
- dashing = None
- if args.dashing_url is not None:
- dashing = DashingClient(args.dashing_url, args.dashing_token)
- graphite = None
- if args.graphite_host is not None:
- graphite = GraphitePush(args.graphite_host, args.graphite_port)
- if args.graphite_whitelist is not None:
- graphite.whitelist = args.graphite_whitelist.split(',')
- if args.no_send:
- if graphite is not None:
- graphite.dont_send = True
- if not args.alfred_json is None:
- alfred.alfred_json = args.alfred_json
- if not args.batadv_vis is None:
- batman.batadv_vis = args.batadv_vis
- if not args.batctl is None:
- batman.batctl = args.batctl
- logger.debug('Configured A.L.F.R.E.D. source: %s', alfred)
- logger.debug('Configured B.A.T.M.A.N. source: %s', batman)
- logger.debug('Configured Dashing: %s', dashing)
- logger.debug('Configured Graphite: %s', graphite)
- # execute sanitycheck() where possible
- for i in [('AlfredParser', alfred), ('BatmanParser', batman)]:
- try:
- i[1].sanitycheck()
- except SanityCheckError as err:
- logger.critical(i[0] + '.sanitycheck() failed: ' + str(err))
- print('FAILED SANITY CHECK: ' + str(err))
- sys.exit(1)
- server = ApiServer((args.api_bind_host, args.api_bind_port), storage)
- server_thread = threading.Thread(target=server.serve_forever)
- server_thread.daemon = True # exit thread when main thread terminates
- server_thread.start()
- logger.info('Started server: ' + str(server))
- if shall_daemonize:
- streams = [handler.stream for handler in logger.handlers
- if isinstance(handler, logging.FileHandler)]
- daemon_context = daemon.DaemonContext(
- files_preserve=streams,
- )
- daemon_context.open()
- if graphite is not None:
- storage.metric_handler = graphite.handle_metric
- while True:
- try:
- now = int(time.time())
- logger.debug('Step 1/3: Fetching data ...')
- alfreddata = alfred.fetch()
- batmandata = batman.fetch()
- newdata = merge_alfred_batman(alfreddata, batmandata)
- logger.debug(
- 'Fetched data: %d ALFRED with %d BATMAN makes %d total',
- len(alfreddata), len(batmandata), len(newdata))
- logger.debug('Step 2/3: Pushing update data ...')
- if graphite is not None:
- graphitedata = graphite.push(newdata, timestamp=now)
- logger.info(
- 'Sent %d lines to Graphite.',
- graphitedata.count('\n')+1)
- if dashing is not None:
- dashing.push(newdata)
- logger.debug('Step 3/3: Merging current data ...')
- storage.merge_new_data(newdata)
- storage.save()
- logger.debug('I have data for %d nodes.', storage.status['nodes'])
- except Exception as err:
- import traceback
- logger.error(str(err) + "\n" + traceback.format_exc())
- logger.debug('Sleeping for {0} seconds'.format(args.interval))
- time.sleep(args.interval)
- storage.close()
- logger.info('Shut down.')
- if __name__ == '__main__':
- main()
|