|
@@ -17,114 +17,138 @@ from ffstatus import \
|
|
|
Storage
|
|
|
from ffstatus.exceptions import SanityCheckError
|
|
|
|
|
|
+BATCAVE = 'Batman/Alfred Transmission Collection, Aggregation & Value Engine'
|
|
|
DEFAULT_INTERVAL = 15
|
|
|
|
|
|
-parser = argparse.ArgumentParser(description='Batman/Alfred Transmission Collection, Aggregation & Value Engine')
|
|
|
-parser.add_argument('--logfile', help='path for log file')
|
|
|
-parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='data poll interval')
|
|
|
-parser.add_argument('-v', '--verbose', action='store_true', help='increase output verbosity')
|
|
|
-parser.add_argument('-d', '--no-detach', action='store_true', help='Don\'t detach (daemonize) ourself')
|
|
|
-parser.add_argument('-n', '--no-send', action='store_true', help='Fetch data but don\'t send it')
|
|
|
-parser.add_argument('-A', '--alfred-json', help='executable path for alfred-json')
|
|
|
-parser.add_argument('-B', '--batadv-vis', help='executable path for batadv-vis')
|
|
|
-parser.add_argument('-G', '--graphite-host', help='Graphite host')
|
|
|
-parser.add_argument('--graphite-port', type=int, default=2003, help='Graphite port')
|
|
|
-parser.add_argument('--dashing-url', help='Dashing URL')
|
|
|
-parser.add_argument('--dashing-token', help='Dashing\'s secret update token')
|
|
|
-parser.add_argument('--api-bind-host', default='', help='API-Server Hostname')
|
|
|
-parser.add_argument('--api-bind-port', type=int, default=8888, help='API-Server Port')
|
|
|
-parser.add_argument('-S', '--storage-dir', default='.', help='Path where to store data')
|
|
|
-args = parser.parse_args()
|
|
|
-
|
|
|
-if args.interval < 5:
|
|
|
- print('A poll interval lower than 5s is not supported.')
|
|
|
- sys.exit(1)
|
|
|
-
|
|
|
-shall_daemonize = not args.no_detach
|
|
|
-
|
|
|
-logger = logging.getLogger()
|
|
|
-logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
|
|
|
-
|
|
|
-if not args.logfile is None:
|
|
|
- fh = logging.FileHandler(args.logfile)
|
|
|
- fh.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
|
|
|
- logger.addHandler(fh)
|
|
|
-
|
|
|
-if args.no_detach:
|
|
|
- ch = logging.StreamHandler(sys.stdout)
|
|
|
- ch.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
|
|
|
- logger.addHandler(ch)
|
|
|
-
|
|
|
-logger.info('Starting up')
|
|
|
-
|
|
|
-storage = Storage(args.storage_dir)
|
|
|
-storage.open()
|
|
|
-logger.info('Storage: ' + str(storage))
|
|
|
-
|
|
|
-a = AlfredParser()
|
|
|
-b = BatmanParser()
|
|
|
-d = DashingClient(args.dashing_url, args.dashing_token) if not args.dashing_url is None else None
|
|
|
-g = GraphitePush(args.graphite_host, args.graphite_port) if not args.graphite_host is None else None
|
|
|
-
|
|
|
-if args.no_send:
|
|
|
- if not g is None: g.dont_send = True
|
|
|
-
|
|
|
-if not args.alfred_json is None: a.alfred_json = args.alfred_json
|
|
|
-if not args.batadv_vis is None: b.batadv_vis = args.batadv_vis
|
|
|
-
|
|
|
-logger.debug('Configured A.L.F.R.E.D. source: ' + str(a))
|
|
|
-logger.debug('Configured B.A.T.M.A.N. source: ' + str(b))
|
|
|
-logger.debug('Configured Dashing: ' + str(d))
|
|
|
-logger.debug('Configured Graphite: ' + str(g))
|
|
|
-
|
|
|
-# execute sanitycheck() where possible
|
|
|
-for i in [('AlfredParser', a), ('BatmanParser', b)]:
|
|
|
- try:
|
|
|
- i[1].sanitycheck()
|
|
|
- except SanityCheckError as err:
|
|
|
- logger.critical(i[0] + '.sanitycheck() failed: ' + str(err))
|
|
|
- print('FAILED SANITY CHECK: ' + str(err))
|
|
|
- sys.exit(1)
|
|
|
-
|
|
|
-server = ApiServer((args.api_bind_host, args.api_bind_port), storage)
|
|
|
-server_thread = threading.Thread(target=server.serve_forever)
|
|
|
-server_thread.daemon = True # exit thread when main thread terminates
|
|
|
-server_thread.start()
|
|
|
-logger.info('Started server: ' + str(server))
|
|
|
-
|
|
|
-if shall_daemonize:
|
|
|
- daemon_context = daemon.DaemonContext(
|
|
|
- files_preserve=[fh.stream],
|
|
|
- )
|
|
|
-
|
|
|
- daemon_context.open()
|
|
|
-
|
|
|
-while True:
|
|
|
- try:
|
|
|
- ts = int(time.time())
|
|
|
- logger.debug('Step 1/3: Fetching data ...')
|
|
|
- alfreddata = a.fetch()
|
|
|
- batmandata = b.fetch()
|
|
|
- newdata = merge_alfred_batman(alfreddata, batmandata)
|
|
|
- logger.debug('Fetched data: {0} ALFRED with {1} BATMAN makes {2} total'.format(len(alfreddata), len(batmandata), len(newdata)))
|
|
|
|
|
|
- logger.debug('Step 2/3: Pushing update data ...')
|
|
|
- if not g is None:
|
|
|
- graphitedata = g.push(newdata, ts=ts)
|
|
|
- logger.info('Sent ' + str(graphitedata.count('\n')+1) + ' lines to Graphite.')
|
|
|
- if not d is None:
|
|
|
- d.push(newdata)
|
|
|
-
|
|
|
- logger.debug('Step 3/3: Merging current data ...')
|
|
|
- storage.merge_new_data(newdata)
|
|
|
-
|
|
|
- logger.debug('I have data for %d nodes.', len(storage.data))
|
|
|
-
|
|
|
- except Exception as err:
|
|
|
- logger.error(str(err))
|
|
|
-
|
|
|
- logger.debug('Sleeping for {0} seconds'.format(args.interval))
|
|
|
- time.sleep(args.interval)
|
|
|
+def get_args():
|
|
|
+ parser = argparse.ArgumentParser(description=BATCAVE)
|
|
|
+ parser.add_argument('--logfile',
|
|
|
+ help='path for log file')
|
|
|
+ parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL,
|
|
|
+ help='data poll interval')
|
|
|
+ parser.add_argument('-v', '--verbose', action='store_true',
|
|
|
+ help='increase output verbosity')
|
|
|
+ parser.add_argument('-d', '--no-detach', action='store_true',
|
|
|
+ help='Don\'t detach (daemonize) ourself')
|
|
|
+ parser.add_argument('-n', '--no-send', action='store_true',
|
|
|
+ help='Fetch data but don\'t send it')
|
|
|
+ parser.add_argument('-A', '--alfred-json',
|
|
|
+ help='executable path for alfred-json')
|
|
|
+ parser.add_argument('-B', '--batadv-vis',
|
|
|
+ help='executable path for batadv-vis')
|
|
|
+ parser.add_argument('-G', '--graphite-host',
|
|
|
+ help='Graphite host')
|
|
|
+ parser.add_argument('--graphite-port', type=int, default=2003,
|
|
|
+ help='Graphite port')
|
|
|
+ parser.add_argument('--dashing-url',
|
|
|
+ help='Dashing URL')
|
|
|
+ parser.add_argument('--dashing-token',
|
|
|
+ help='Dashing\'s secret update token')
|
|
|
+ parser.add_argument('--api-bind-host', default='',
|
|
|
+ help='API-Server Hostname')
|
|
|
+ parser.add_argument('--api-bind-port', type=int, default=8888,
|
|
|
+ help='API-Server Port')
|
|
|
+ parser.add_argument('-S', '--storage-dir', default='.',
|
|
|
+ help='Path where to store data')
|
|
|
+ return parser.parse_args()
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ args = get_args()
|
|
|
+
|
|
|
+ if args.interval < 5:
|
|
|
+ print('A poll interval lower than 5s is not supported.')
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
-storage.close()
|
|
|
-logger.info('Shut down.')
|
|
|
+ shall_daemonize = not args.no_detach
|
|
|
+
|
|
|
+ logger = logging.getLogger()
|
|
|
+ logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
|
|
|
+
|
|
|
+ if not args.logfile is None:
|
|
|
+ fh = logging.FileHandler(args.logfile)
|
|
|
+ fh.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
|
|
|
+ logger.addHandler(fh)
|
|
|
+
|
|
|
+ if args.no_detach:
|
|
|
+ ch = logging.StreamHandler(sys.stdout)
|
|
|
+ ch.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
|
|
|
+ logger.addHandler(ch)
|
|
|
+
|
|
|
+ logger.info('Starting up')
|
|
|
+
|
|
|
+ storage = Storage(args.storage_dir)
|
|
|
+ storage.open()
|
|
|
+ logger.info('Storage: ' + str(storage))
|
|
|
+
|
|
|
+ a = AlfredParser()
|
|
|
+ b = BatmanParser()
|
|
|
+ d = DashingClient(args.dashing_url, args.dashing_token) if not args.dashing_url is None else None
|
|
|
+ g = GraphitePush(args.graphite_host, args.graphite_port) if not args.graphite_host is None else None
|
|
|
+
|
|
|
+ if args.no_send:
|
|
|
+ if not g is None: g.dont_send = True
|
|
|
+
|
|
|
+ if not args.alfred_json is None: a.alfred_json = args.alfred_json
|
|
|
+ if not args.batadv_vis is None: b.batadv_vis = args.batadv_vis
|
|
|
+
|
|
|
+ logger.debug('Configured A.L.F.R.E.D. source: ' + str(a))
|
|
|
+ logger.debug('Configured B.A.T.M.A.N. source: ' + str(b))
|
|
|
+ logger.debug('Configured Dashing: ' + str(d))
|
|
|
+ logger.debug('Configured Graphite: ' + str(g))
|
|
|
+
|
|
|
+ # execute sanitycheck() where possible
|
|
|
+ for i in [('AlfredParser', a), ('BatmanParser', b)]:
|
|
|
+ try:
|
|
|
+ i[1].sanitycheck()
|
|
|
+ except SanityCheckError as err:
|
|
|
+ logger.critical(i[0] + '.sanitycheck() failed: ' + str(err))
|
|
|
+ print('FAILED SANITY CHECK: ' + str(err))
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ server = ApiServer((args.api_bind_host, args.api_bind_port), storage)
|
|
|
+ server_thread = threading.Thread(target=server.serve_forever)
|
|
|
+ server_thread.daemon = True # exit thread when main thread terminates
|
|
|
+ server_thread.start()
|
|
|
+ logger.info('Started server: ' + str(server))
|
|
|
+
|
|
|
+ if shall_daemonize:
|
|
|
+ daemon_context = daemon.DaemonContext(
|
|
|
+ files_preserve=[fh.stream],
|
|
|
+ )
|
|
|
+
|
|
|
+ daemon_context.open()
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ ts = int(time.time())
|
|
|
+ logger.debug('Step 1/3: Fetching data ...')
|
|
|
+ alfreddata = a.fetch()
|
|
|
+ batmandata = b.fetch()
|
|
|
+ newdata = merge_alfred_batman(alfreddata, batmandata)
|
|
|
+ logger.debug('Fetched data: {0} ALFRED with {1} BATMAN makes {2} total'.format(len(alfreddata), len(batmandata), len(newdata)))
|
|
|
+
|
|
|
+ logger.debug('Step 2/3: Pushing update data ...')
|
|
|
+ if not g is None:
|
|
|
+ graphitedata = g.push(newdata, ts=ts)
|
|
|
+ logger.info('Sent ' + str(graphitedata.count('\n')+1) + ' lines to Graphite.')
|
|
|
+ if not d is None:
|
|
|
+ d.push(newdata)
|
|
|
+
|
|
|
+ logger.debug('Step 3/3: Merging current data ...')
|
|
|
+ storage.merge_new_data(newdata)
|
|
|
+
|
|
|
+ logger.debug('I have data for %d nodes.', len(storage.data))
|
|
|
+
|
|
|
+ except Exception as err:
|
|
|
+ logger.error(str(err))
|
|
|
+
|
|
|
+ logger.debug('Sleeping for {0} seconds'.format(args.interval))
|
|
|
+ time.sleep(args.interval)
|
|
|
+
|
|
|
+ storage.close()
|
|
|
+ logger.info('Shut down.')
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|