123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- #!/usr/bin/python
- from __future__ import print_function
- import argparse
- import daemon
- import logging
- import sys
- import time
- import threading
- from ffstatus import *
- DEFAULT_INTERVAL = 15
- parser = argparse.ArgumentParser(description='Batman/Alfred Transmission Collection, Aggregation & Value Engine')
- parser.add_argument('--logfile', help='path for log file')
- parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='data poll interval')
- parser.add_argument('-v', '--verbose', action='store_true', help='increase output verbosity')
- parser.add_argument('-d', '--no-detach', action='store_true', help='Don\'t detach (daemonize) ourself')
- parser.add_argument('-n', '--no-send', action='store_true', help='Fetch data but don\'t send it')
- parser.add_argument('-A', '--alfred-json', help='executable path for alfred-json')
- parser.add_argument('-B', '--batadv-vis', help='executable path for batadv-vis')
- parser.add_argument('-G', '--graphite-host', help='Graphite host')
- parser.add_argument('--graphite-port', type=int, default=2003, help='Graphite port')
- parser.add_argument('--dashing-url', help='Dashing URL')
- parser.add_argument('--dashing-token', help='Dashing\'s secret update token')
- parser.add_argument('--api-bind-host', default='', help='API-Server Hostname')
- parser.add_argument('--api-bind-port', type=int, default=8888, help='API-Server Port')
- parser.add_argument('-S', '--storage-dir', default='.', help='Path where to store data')
- args = parser.parse_args()
- if args.interval < 5:
- print('A poll interval lower than 5s is not supported.')
- sys.exit(1)
- shall_daemonize = not args.no_detach
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
- if not args.logfile is None:
- fh = logging.FileHandler(args.logfile)
- fh.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
- logger.addHandler(fh)
- if args.no_detach:
- ch = logging.StreamHandler(sys.stdout)
- ch.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
- logger.addHandler(ch)
- logger.info('Starting up')
- storage = Storage(args.storage_dir)
- logger.info('Storage: ' + str(storage))
- a = AlfredParser()
- b = BatmanParser()
- d = DashingClient(args.dashing_url, args.dashing_token) if not args.dashing_url is None else None
- g = GraphitePush(args.graphite_host, args.graphite_port) if not args.graphite_host is None else None
- if args.no_send:
- if not g is None: g.dont_send = True
- if not args.alfred_json is None: a.alfred_json = args.alfred_json
- if not args.batadv_vis is None: b.batadv_vis = args.batadv_vis
- logger.debug('Configured A.L.F.R.E.D. source: ' + str(a))
- logger.debug('Configured B.A.T.M.A.N. source: ' + str(b))
- logger.debug('Configured Dashing: ' + str(d))
- logger.debug('Configured Graphite: ' + str(g))
- for i in [ ('AlfredParser', a), ('BatmanParser', b) ]:
- try:
- i[1].sanitycheck()
- except Exception as err:
- logger.critical(i[0] + '.sanitycheck() failed: ' + str(err))
- print('FAILED SANITY CHECK: ' + str(err))
- sys.exit(1)
- server = ApiServer((args.api_bind_host, args.api_bind_port), storage)
- server_thread = threading.Thread(target=server.serve_forever)
- server_thread.daemon = True # exit thread when main thread terminates
- server_thread.start()
- logger.info('Started server: ' + str(server))
- if shall_daemonize:
- daemon_context = daemon.DaemonContext(
- files_preserve = [ fh.stream ],
- )
- daemon_context.open()
- while True:
- try:
- ts = int(time.time())
- logger.debug('Step 1/3: Fetching data ...')
- alfreddata = a.fetch()
- batmandata = b.fetch()
- newdata = merge_alfred_batman(alfreddata, batmandata)
- logger.debug('Fetched data: {0} ALFRED with {1} BATMAN makes {2} total'.format(len(alfreddata), len(batmandata), len(newdata)))
- logger.debug('Step 2/3: Pushing update data ...')
- if not g is None:
- graphitedata = g.push(newdata, ts=ts)
- logger.info('Sent ' + str(graphitedata.count('\n')+1) + ' lines to Graphite.')
- if not d is None:
- d.push(newdata)
- logger.debug('Step 3/3: Merging current data ...')
- temp = dict_merge(storage.data, {})
- for x in temp:
- if not x in newdata: continue
- temp[x]['aliases'] = []
- temp[x]['clients'] = []
- temp[x]['neighbours'] = []
- if not '__RAW__' in temp[x]:
- temp[x]['__RAW__'] = { }
- if '__RAW__' in newdata[x]:
- for key in newdata[x]['__RAW__']:
- if key in temp[x]['__RAW__']:
- del(temp[x]['__RAW__'][key])
- storage.data = dict_merge(temp, newdata)
- # sanitize each item's data
- for itemid in storage.data:
- if itemid.startswith('__'): continue
- item = storage.data[itemid]
- # remove node's MACs from clients list
- clients = [ x for x in item['clients'] ] if 'clients' in item else []
- if 'mac' in item and item['mac'] in clients: clients.remove(item['mac'])
- if 'macs' in item:
- for x in item['macs']:
- if x in clients: clients.remove(x)
- storage.data[itemid]['clientcount'] = len(clients)
- logger.debug('I have data for ' + str(len(storage.data)) + ' nodes.')
- storage.save()
- except Exception as err:
- logger.error(str(err))
- logger.debug('Sleeping for {0} seconds'.format(args.interval))
- time.sleep(args.interval)
- storage.close()
- logger.info('Shut down.')
|