batcave.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. from __future__ import print_function
  4. import argparse
  5. import daemon
  6. import logging
  7. import sys
  8. import time
  9. import threading
  10. from ffstatus import \
  11. merge_alfred_batman, \
  12. ApiServer, \
  13. AlfredParser, BatmanParser, \
  14. DashingClient, GraphitePush, \
  15. FileStorage, RedisStorage
  16. from ffstatus.exceptions import SanityCheckError
  17. BATCAVE = 'Batman/Alfred Transmission Collection, Aggregation & Value Engine'
  18. DEFAULT_INTERVAL = 15
  19. def get_args():
  20. """Parse commandline arguments."""
  21. parser = argparse.ArgumentParser(description=BATCAVE)
  22. parser.add_argument('--logfile',
  23. help='path for log file')
  24. parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL,
  25. help='data poll interval')
  26. parser.add_argument('-v', '--verbose', action='store_true',
  27. help='increase output verbosity')
  28. parser.add_argument('-d', '--no-detach', action='store_true',
  29. help='Don\'t detach (daemonize) ourself')
  30. parser.add_argument('-n', '--no-send', action='store_true',
  31. help='Fetch data but don\'t send it')
  32. parser.add_argument('-A', '--alfred-json',
  33. help='executable path for alfred-json')
  34. parser.add_argument('-B', '--batadv-vis',
  35. help='executable path for batadv-vis')
  36. parser.add_argument('-C', '--batctl',
  37. help='executable path for batctl')
  38. parser.add_argument('-G', '--graphite-host',
  39. help='Graphite host')
  40. parser.add_argument('--graphite-port', type=int, default=2003,
  41. help='Graphite port')
  42. parser.add_argument('--dashing-url',
  43. help='Dashing URL')
  44. parser.add_argument('--graphite-whitelist',
  45. help='comma-separated list of node-ids to send to Graphite only')
  46. parser.add_argument('--dashing-token',
  47. help='Dashing\'s secret update token')
  48. parser.add_argument('--api-bind-host', default='',
  49. help='API-Server Hostname')
  50. parser.add_argument('--api-bind-port', type=int, default=8888,
  51. help='API-Server Port')
  52. parser.add_argument('-S', '--storage', default='.',
  53. help='Path where to store data or ' +
  54. '"redis:[<host>[:<port>[:<password>]]]')
  55. return parser.parse_args()
  56. def prepare_logging(args):
  57. """Configures Python's logging according to args."""
  58. logger = logging.getLogger()
  59. logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
  60. fmt = logging.Formatter(
  61. '%(asctime)s [%(levelname)s] %(message)s',
  62. '%Y-%m-%d %H:%M:%S')
  63. if not args.logfile is None:
  64. file_handler = logging.FileHandler(args.logfile)
  65. file_handler.setFormatter(fmt)
  66. logger.addHandler(file_handler)
  67. if args.no_detach:
  68. console_handler = logging.StreamHandler(sys.stdout)
  69. console_handler.setFormatter(fmt)
  70. logger.addHandler(console_handler)
  71. return logger
  72. def main():
  73. args = get_args()
  74. if args.interval < 5:
  75. print('A poll interval lower than 5s is not supported.')
  76. sys.exit(1)
  77. shall_daemonize = not args.no_detach
  78. logger = prepare_logging(args)
  79. logger.info('Starting up')
  80. storage = None
  81. storage_target = args.storage
  82. if storage_target.startswith('redis:'):
  83. redis_opts = storage_target.split(':')
  84. redis_host = redis_opts[1] if len(redis_opts) > 1 else 'localhost'
  85. redis_port = int(redis_opts[2]) if len(redis_opts) > 2 else 6379
  86. redis_pass = redis_opts[3] if len(redis_opts) > 3 else None
  87. storage = RedisStorage(redis_host, redis_port, redis_pass)
  88. else:
  89. storage = FileStorage(args.storage)
  90. storage.open()
  91. logger.info('Storage: ' + str(storage))
  92. alfred = AlfredParser()
  93. batman = BatmanParser()
  94. dashing = None
  95. if args.dashing_url is not None:
  96. dashing = DashingClient(args.dashing_url, args.dashing_token)
  97. graphite = None
  98. if args.graphite_host is not None:
  99. graphite = GraphitePush(args.graphite_host, args.graphite_port)
  100. if args.graphite_whitelist is not None:
  101. graphite.whitelist = args.graphite_whitelist.split(',')
  102. if args.no_send:
  103. if graphite is not None:
  104. graphite.dont_send = True
  105. if not args.alfred_json is None:
  106. alfred.alfred_json = args.alfred_json
  107. if not args.batadv_vis is None:
  108. batman.batadv_vis = args.batadv_vis
  109. if not args.batctl is None:
  110. batman.batctl = args.batctl
  111. logger.debug('Configured A.L.F.R.E.D. source: %s', alfred)
  112. logger.debug('Configured B.A.T.M.A.N. source: %s', batman)
  113. logger.debug('Configured Dashing: %s', dashing)
  114. logger.debug('Configured Graphite: %s', graphite)
  115. # execute sanitycheck() where possible
  116. for i in [('AlfredParser', alfred), ('BatmanParser', batman)]:
  117. try:
  118. i[1].sanitycheck()
  119. except SanityCheckError as err:
  120. logger.critical(i[0] + '.sanitycheck() failed: ' + str(err))
  121. print('FAILED SANITY CHECK: ' + str(err))
  122. sys.exit(1)
  123. server = ApiServer((args.api_bind_host, args.api_bind_port), storage)
  124. server_thread = threading.Thread(target=server.serve_forever)
  125. server_thread.daemon = True # exit thread when main thread terminates
  126. server_thread.start()
  127. logger.info('Started server: ' + str(server))
  128. if shall_daemonize:
  129. streams = [handler.stream for handler in logger.handlers
  130. if isinstance(handler, logging.FileHandler)]
  131. daemon_context = daemon.DaemonContext(
  132. files_preserve=streams,
  133. )
  134. daemon_context.open()
  135. if graphite is not None:
  136. storage.metric_handler = graphite.handle_metric
  137. while True:
  138. try:
  139. now = int(time.time())
  140. logger.debug('Step 1/3: Fetching data ...')
  141. alfreddata = alfred.fetch()
  142. batmandata = batman.fetch()
  143. newdata = merge_alfred_batman(alfreddata, batmandata)
  144. logger.debug(
  145. 'Fetched data: %d ALFRED with %d BATMAN makes %d total',
  146. len(alfreddata), len(batmandata), len(newdata))
  147. logger.debug('Step 2/3: Pushing update data ...')
  148. if graphite is not None:
  149. graphitedata = graphite.push(newdata, timestamp=now)
  150. logger.info(
  151. 'Sent %d lines to Graphite.',
  152. graphitedata.count('\n')+1)
  153. if dashing is not None:
  154. dashing.push(newdata)
  155. logger.debug('Step 3/3: Merging current data ...')
  156. storage.merge_new_data(newdata)
  157. storage.save()
  158. logger.debug('I have data for %d nodes.', storage.status['nodes'])
  159. except Exception as err:
  160. import traceback
  161. logger.error(str(err) + "\n" + traceback.format_exc())
  162. logger.debug('Sleeping for {0} seconds'.format(args.interval))
  163. time.sleep(args.interval)
  164. storage.close()
  165. logger.info('Shut down.')
  166. if __name__ == '__main__':
  167. main()