batcave.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. from __future__ import print_function
  4. import argparse
  5. import daemon
  6. import logging
  7. import sys
  8. import time
  9. import threading
  10. from ffstatus import \
  11. dict_merge, merge_alfred_batman, \
  12. ApiServer, \
  13. AlfredParser, BatmanParser, \
  14. DashingClient, GraphitePush, \
  15. Storage
  16. from ffstatus.exceptions import SanityCheckError
  17. BATCAVE = 'Batman/Alfred Transmission Collection, Aggregation & Value Engine'
  18. DEFAULT_INTERVAL = 15
  19. def get_args():
  20. parser = argparse.ArgumentParser(description=BATCAVE)
  21. parser.add_argument('--logfile',
  22. help='path for log file')
  23. parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL,
  24. help='data poll interval')
  25. parser.add_argument('-v', '--verbose', action='store_true',
  26. help='increase output verbosity')
  27. parser.add_argument('-d', '--no-detach', action='store_true',
  28. help='Don\'t detach (daemonize) ourself')
  29. parser.add_argument('-n', '--no-send', action='store_true',
  30. help='Fetch data but don\'t send it')
  31. parser.add_argument('-A', '--alfred-json',
  32. help='executable path for alfred-json')
  33. parser.add_argument('-B', '--batadv-vis',
  34. help='executable path for batadv-vis')
  35. parser.add_argument('-G', '--graphite-host',
  36. help='Graphite host')
  37. parser.add_argument('--graphite-port', type=int, default=2003,
  38. help='Graphite port')
  39. parser.add_argument('--dashing-url',
  40. help='Dashing URL')
  41. parser.add_argument('--dashing-token',
  42. help='Dashing\'s secret update token')
  43. parser.add_argument('--api-bind-host', default='',
  44. help='API-Server Hostname')
  45. parser.add_argument('--api-bind-port', type=int, default=8888,
  46. help='API-Server Port')
  47. parser.add_argument('-S', '--storage-dir', default='.',
  48. help='Path where to store data')
  49. return parser.parse_args()
  50. def main():
  51. args = get_args()
  52. if args.interval < 5:
  53. print('A poll interval lower than 5s is not supported.')
  54. sys.exit(1)
  55. shall_daemonize = not args.no_detach
  56. logger = logging.getLogger()
  57. logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
  58. if not args.logfile is None:
  59. fh = logging.FileHandler(args.logfile)
  60. fh.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
  61. logger.addHandler(fh)
  62. if args.no_detach:
  63. ch = logging.StreamHandler(sys.stdout)
  64. ch.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S'))
  65. logger.addHandler(ch)
  66. logger.info('Starting up')
  67. storage = Storage(args.storage_dir)
  68. storage.open()
  69. logger.info('Storage: ' + str(storage))
  70. a = AlfredParser()
  71. b = BatmanParser()
  72. d = DashingClient(args.dashing_url, args.dashing_token) if not args.dashing_url is None else None
  73. g = GraphitePush(args.graphite_host, args.graphite_port) if not args.graphite_host is None else None
  74. if args.no_send:
  75. if not g is None: g.dont_send = True
  76. if not args.alfred_json is None: a.alfred_json = args.alfred_json
  77. if not args.batadv_vis is None: b.batadv_vis = args.batadv_vis
  78. logger.debug('Configured A.L.F.R.E.D. source: ' + str(a))
  79. logger.debug('Configured B.A.T.M.A.N. source: ' + str(b))
  80. logger.debug('Configured Dashing: ' + str(d))
  81. logger.debug('Configured Graphite: ' + str(g))
  82. # execute sanitycheck() where possible
  83. for i in [('AlfredParser', a), ('BatmanParser', b)]:
  84. try:
  85. i[1].sanitycheck()
  86. except SanityCheckError as err:
  87. logger.critical(i[0] + '.sanitycheck() failed: ' + str(err))
  88. print('FAILED SANITY CHECK: ' + str(err))
  89. sys.exit(1)
  90. server = ApiServer((args.api_bind_host, args.api_bind_port), storage)
  91. server_thread = threading.Thread(target=server.serve_forever)
  92. server_thread.daemon = True # exit thread when main thread terminates
  93. server_thread.start()
  94. logger.info('Started server: ' + str(server))
  95. if shall_daemonize:
  96. daemon_context = daemon.DaemonContext(
  97. files_preserve=[fh.stream],
  98. )
  99. daemon_context.open()
  100. while True:
  101. try:
  102. ts = int(time.time())
  103. logger.debug('Step 1/3: Fetching data ...')
  104. alfreddata = a.fetch()
  105. batmandata = b.fetch()
  106. newdata = merge_alfred_batman(alfreddata, batmandata)
  107. logger.debug('Fetched data: {0} ALFRED with {1} BATMAN makes {2} total'.format(len(alfreddata), len(batmandata), len(newdata)))
  108. logger.debug('Step 2/3: Pushing update data ...')
  109. if not g is None:
  110. graphitedata = g.push(newdata, ts=ts)
  111. logger.info('Sent ' + str(graphitedata.count('\n')+1) + ' lines to Graphite.')
  112. if not d is None:
  113. d.push(newdata)
  114. logger.debug('Step 3/3: Merging current data ...')
  115. storage.merge_new_data(newdata)
  116. logger.debug('I have data for %d nodes.', len(storage.data))
  117. except Exception as err:
  118. logger.error(str(err))
  119. logger.debug('Sleeping for {0} seconds'.format(args.interval))
  120. time.sleep(args.interval)
  121. storage.close()
  122. logger.info('Shut down.')
  123. if __name__ == '__main__':
  124. main()