basestorage.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. from __future__ import print_function, unicode_literals
  4. import logging
  5. import re
  6. import time
  7. import ffstatus
  8. from .exceptions import VpnKeyFormatError
  9. def sanitize_node(data, include_raw_data=False):
  10. """
  11. Filters potentially harmful entries from the node's data.
  12. """
  13. export = ffstatus.dict_merge({}, data)
  14. if export is None:
  15. return {}
  16. # remove fields from output: __RAW__
  17. if '__RAW__' in export and not include_raw_data:
  18. del export['__RAW__']
  19. return export
  20. class BaseStorage(object):
  21. """
  22. Provides operations on the storage data.
  23. This class gets subclassed to actually write the data
  24. to a file, database, whatever.
  25. """
  26. DATAKEY_VPN = '__VPN__'
  27. FIELDKEY_UPDATED = '__UPDATED__'
  28. metric_handler = None
  29. def open(self):
  30. """
  31. When overridden in a subclass,
  32. opens the persistent storage.
  33. """
  34. pass
  35. def save(self):
  36. """
  37. When overriden in a subclass,
  38. stores the data to a persistent storage.
  39. """
  40. pass
  41. def close(self):
  42. """
  43. When overridden in a subclass,
  44. closes the persistent storage.
  45. """
  46. pass
  47. @property
  48. def status(self):
  49. """Gets status information on the storage."""
  50. nodes = 0
  51. nodes_active = 0
  52. gateways = 0
  53. gateways_active = 0
  54. sum_clients = 0
  55. clients = set()
  56. for node in self.get_nodes():
  57. nodetype = node.get('type', 'node')
  58. if nodetype == 'gateway':
  59. gateways += 1
  60. if self.get_nodestatus(node=node) == 'active':
  61. gateways_active += 1
  62. continue
  63. nodes += 1
  64. nodemacs = [x for x in node.get('macs', [])]
  65. if 'mac' in node:
  66. nodemacs.append(node['mac'])
  67. if self.get_nodestatus(node=node) == 'active':
  68. nodes_active += 1
  69. sum_clients += node.get('clientcount', 0)
  70. for client in node.get('clients', []):
  71. if client in nodemacs:
  72. continue
  73. clients.add(client)
  74. return {
  75. 'clients_sum': sum_clients,
  76. 'clients_unique': len(clients),
  77. 'gateways': gateways,
  78. 'gateways_active': gateways_active,
  79. 'nodes': nodes,
  80. 'nodes_active': nodes_active,
  81. 'now': int(time.time()),
  82. }
  83. def __merge_alias_node(self, item, alias):
  84. # start by using standard dict_merge()
  85. update = ffstatus.dict_merge(item, alias, overwrite_lists=False)
  86. # extract some fields for further inspection
  87. update_macs = update.get('macs', []) or []
  88. # field 'node_id': keep original value
  89. update['node_id'] = item['node_id']
  90. # field 'mac': keep original value
  91. if 'mac' in item:
  92. update['mac'] = item['mac']
  93. if 'mac' in alias:
  94. update_macs.append(alias['mac'])
  95. # field 'macs': get rid of duplicates and primary mac
  96. primary_mac = update.get('mac')
  97. macs = []
  98. for x in update_macs:
  99. if x != primary_mac and x not in macs:
  100. macs.append(x)
  101. update['macs'] = update_macs = macs
  102. # field 'type': keep special node type
  103. item_type = item.get('type', 'node')
  104. if item_type != 'node' and update.get('type', 'node') == 'node':
  105. update['type'] = item_type
  106. return update
  107. def __send_metric(self, key, value, ts=None):
  108. if ts is None:
  109. ts = time.time()
  110. func = self.metric_handler
  111. if func is None or not callable(func):
  112. # no handler -> do nothing
  113. return
  114. func(self, key, value, ts)
  115. def merge_new_data(self, newdata):
  116. """Updates data in the storage by merging the new data."""
  117. ts = time.time()
  118. if newdata is None or not isinstance(newdata, dict):
  119. raise ValueError("Expected a dict as new data.")
  120. # keep a list of aliased nodes so they can be removed from the result
  121. aliased_nodes = {}
  122. # start merge on a copy of the current data
  123. current = {}
  124. for node in self.get_nodes():
  125. item_id = node['node_id']
  126. current[item_id] = ffstatus.dict_merge(node, {})
  127. current[item_id]['aliases'] = []
  128. current[item_id]['clients.bak'] = current[item_id].get('clients', [])
  129. current[item_id]['clients'] = []
  130. current[item_id]['neighbours'] = {}
  131. current[item_id]['type'] = 'node'
  132. if not item_id in newdata:
  133. continue
  134. if not '__RAW__' in current[item_id]:
  135. current[item_id]['__RAW__'] = {}
  136. if '__RAW__' in newdata[item_id]:
  137. for key in newdata[item_id]['__RAW__']:
  138. if key in current[item_id]['__RAW__']:
  139. del current[item_id]['__RAW__'][key]
  140. # merge the dictionaries
  141. updated = {}
  142. for itemid in newdata:
  143. if not itemid in current:
  144. # new element which did not exist in storage before, that's easy
  145. updated[itemid] = newdata[itemid]
  146. else:
  147. # merge the old and new element
  148. update = ffstatus.dict_merge(current[itemid], newdata[itemid])
  149. updated[itemid] = update
  150. for alias_id in updated[itemid].get('aliases', []):
  151. if alias_id in aliased_nodes:
  152. aliased_nodes[alias_id].append(itemid)
  153. else:
  154. aliased_nodes[alias_id] = [itemid]
  155. # merge aliased nodes
  156. for alias_id in aliased_nodes:
  157. if len(aliased_nodes[alias_id]) != 1:
  158. logging.warn("Node '%s' is aliased by multiple nodes: %s",
  159. alias_id, aliased_nodes[alias_id])
  160. continue
  161. # target's id is the single entry of the alias list
  162. item_id = aliased_nodes[alias_id][0]
  163. if alias_id == item_id:
  164. # the node has itself as alias -> remove the alias entry
  165. if alias_id in updated and 'aliases' in updated[alias_id]:
  166. updated[alias_id]['aliases'].remove(alias_id)
  167. logging.debug("Removed self-alias of '%s'.", alias_id)
  168. continue
  169. # look for alias node
  170. alias = updated.get(alias_id, current.get(alias_id))
  171. if alias is None:
  172. # no alias node present already, as we're trying to achieve here
  173. continue
  174. # look for target node
  175. item = updated.get(item_id, current.get(item_id))
  176. if item is None:
  177. logging.warn("Alias node '%s' is missing its target '%s.",
  178. alias_id, item_id)
  179. continue
  180. # ensure both target and alias node have 'node_id' field set
  181. if not 'node_id' in item:
  182. item['node_id'] = item_id
  183. alias['node_id'] = alias_id
  184. # merge data
  185. update = self.__merge_alias_node(item, alias)
  186. updated[item_id] = update
  187. logging.debug("Merged alias '%s' into '%s'.", alias_id, item_id)
  188. # mark alias node for deletion
  189. updated[alias_id] = None
  190. # sanitize each item's data
  191. for itemid in updated:
  192. if itemid.startswith('__'):
  193. continue
  194. item = updated[itemid]
  195. # delete node if requested
  196. if item is None:
  197. self.set_node_data(itemid, None)
  198. continue
  199. # ensure 'node_id' is set
  200. if not 'node_id' in item:
  201. item['node_id'] = itemid
  202. # remove node's MACs from clients list
  203. item_mac = item.get('mac') or ffstatus.guess_mac_from_nodeid(itemid)
  204. clients = [x for x in item.get('clients', [])]
  205. if item_mac is not None and item_mac in clients:
  206. clients.remove(item_mac)
  207. for mac in item.get('macs', []):
  208. if mac in clients:
  209. clients.remove(mac)
  210. # set clientcount
  211. item['clientcount'] = len(clients)
  212. # compute client delta
  213. prev_clients = item.get('clients.bak', [])
  214. new_clients = item.get('clients', [])
  215. if 'clients.bak' in item:
  216. del(item['clients.bak'])
  217. diff_added = [x for x in new_clients if x not in prev_clients]
  218. diff_removed = [x for x in prev_clients if x not in new_clients]
  219. self.__send_metric("nodes." + item_mac + ".clients_added",
  220. len(diff_added), ts)
  221. self.__send_metric("nodes." + item_mac + ".clients_removed",
  222. len(diff_removed), ts)
  223. # finally, set each new data
  224. self.set_node_data(itemid, item)
  225. def get_nodes(self, sortby=None, include_raw_data=False):
  226. """Gets a list of all known nodes."""
  227. nodes = self.get_all_nodes_raw()
  228. sorted_ids = [x for x in nodes]
  229. if sortby is not None:
  230. if sortby == 'name':
  231. sortkey = lambda x: nodes[x]['hostname'].lower()
  232. sorted_ids = sorted(sorted_ids, key=sortkey)
  233. elif sortby == 'id':
  234. sorted_ids = sorted(sorted_ids)
  235. result = []
  236. for nodeid in sorted_ids:
  237. if nodeid.startswith('__'):
  238. continue
  239. node = sanitize_node(nodes[nodeid], include_raw_data)
  240. result.append(node)
  241. return result
  242. def find_node(self, rawid, include_raw_data=False, search_aliases=True):
  243. """
  244. Fetch node data by given id.
  245. If necessary, look through node aliases.
  246. """
  247. # try direct match, first
  248. node = self.get_node(rawid)
  249. if node is not None:
  250. return sanitize_node(node, include_raw_data=include_raw_data)
  251. # look through all nodes
  252. found = None
  253. nodes = self.get_all_nodes_raw()
  254. for nodeid in nodes:
  255. node = nodes[nodeid]
  256. # if we have a direct hit, return it immediately
  257. if nodeid == rawid:
  258. return sanitize_node(node, include_raw_data=include_raw_data)
  259. # search through aliases
  260. if search_aliases and rawid in node.get('aliases', []):
  261. found = node
  262. # return found node
  263. if not found is None:
  264. return sanitize_node(found, include_raw_data=include_raw_data)
  265. else:
  266. return None
  267. def find_node_by_mac(self, mac):
  268. """Fetch node data by given MAC address."""
  269. needle = mac.lower()
  270. # iterate over all nodes
  271. for node in self.get_nodes():
  272. # check node's primary MAC
  273. if 'mac' in node and needle == node['mac'].lower():
  274. return sanitize_node(node)
  275. # check alias MACs
  276. if 'macs' in node:
  277. haystack = [x.lower() for x in node['macs']]
  278. if mac in haystack:
  279. return sanitize_node(node)
  280. # MAC address not found
  281. return None
  282. def get_nodestatus(self, rawid=None, node=None):
  283. """Determine node's status."""
  284. # search node by the given id
  285. if node is None and not rawid is None:
  286. node = self.find_node(rawid)
  287. # handle unknown nodes
  288. if node is None:
  289. return None
  290. # check that the last batadv update is noted in the data
  291. updated = node.get(self.FIELDKEY_UPDATED, None)
  292. if updated is None:
  293. return 'unknown'
  294. u = updated.get('batadv', updated.get('batctl'))
  295. if u is None:
  296. return 'unknown'
  297. # make decision based on time of last batadv update
  298. diff = time.time() - u
  299. if diff < 150:
  300. return 'active'
  301. elif diff < 300:
  302. return 'stale'
  303. else:
  304. return 'offline'
  305. def set_node_data(self, key, data):
  306. """
  307. Overwrite data for the node with the given key.
  308. Specifying 'None' as data effectively means deleting the key.
  309. """
  310. raise NotImplementedError("set_node_data was not overridden")
  311. def check_vpn_key(self, key):
  312. if key is None or re.match(r'^[a-fA-F0-9]+$', key) is None:
  313. raise VpnKeyFormatError(key)
  314. def get_vpn_keys(self):
  315. """Gets a list of VPN keys."""
  316. raise NotImplementedError("get_vpn_keys was not overriden")
  317. def get_vpn_item(self, key, create=False):
  318. self.check_vpn_key(key)
  319. raise NotImplementedError("store_vpn_item was not overriden")
  320. def store_vpn_item(self, key, data):
  321. raise NotImplementedError("store_vpn_item was not overriden")
  322. def resolve_vpn_remotes(self):
  323. """Iterates all remotes and resolves IP blocks not yet resolved."""
  324. vpn = self.get_vpn_keys()
  325. init_vpn_cache = {}
  326. for key in vpn:
  327. entry = self.get_vpn_item(key)
  328. entry_modified = False
  329. for mode in entry:
  330. if not isinstance(entry[mode], dict):
  331. continue
  332. for gateway in entry[mode]:
  333. if not isinstance(entry[mode][gateway], dict):
  334. continue
  335. item = entry[mode][gateway]
  336. if 'remote' in item and not 'remote_raw' in item:
  337. item['remote_raw'] = item['remote']
  338. resolved = None
  339. if item['remote'] in init_vpn_cache:
  340. resolved = init_vpn_cache[item['remote']]
  341. else:
  342. resolved = ffstatus.resolve_ipblock(item['remote'])
  343. init_vpn_cache[item['remote']] = resolved
  344. if resolved is not None:
  345. logging.info(
  346. 'Resolved VPN entry \'%s\' to net \'%s\'.',
  347. item['remote'],
  348. resolved['name'],
  349. )
  350. if resolved is not None:
  351. item['remote'] = resolved
  352. entry_modified = True
  353. if entry_modified:
  354. self.store_vpn_item(key, entry)
  355. def get_vpn_gateways(self):
  356. gateways = set()
  357. vpn = self.get_vpn_keys()
  358. for key in vpn:
  359. entry = self.get_vpn_item(key)
  360. for conntype in entry:
  361. for gateway in entry[conntype]:
  362. gateways.add(gateway)
  363. return sorted(gateways)
  364. def get_vpn_connections(self):
  365. conntypes = ['active', 'last']
  366. result = []
  367. vpnkeys = self.get_vpn_keys()
  368. for key in vpnkeys:
  369. vpn_entry = self.get_vpn_item(key)
  370. if not isinstance(vpn_entry, dict):
  371. continue
  372. item = {
  373. 'key': key,
  374. 'count': {},
  375. 'remote': {},
  376. }
  377. names = set()
  378. for conntype in conntypes:
  379. item['count'][conntype] = 0
  380. item['remote'][conntype] = {}
  381. if conntype in vpn_entry:
  382. for gateway in vpn_entry[conntype]:
  383. if 'remote' in vpn_entry[conntype][gateway]:
  384. remote = vpn_entry[conntype][gateway]['remote']
  385. if remote is None or remote == '':
  386. continue
  387. item['count'][conntype] += 1
  388. item['remote'][conntype][gateway] = remote
  389. if 'peer' in vpn_entry[conntype][gateway]:
  390. names.add(vpn_entry[conntype][gateway]['peer'])
  391. item['names'] = sorted(names)
  392. item['online'] = item['count']['active'] > 0
  393. result.append(item)
  394. return result
  395. def log_vpn_connect(self, key, peername, remote, gateway, timestamp):
  396. item = self.get_vpn_item(key, create=True)
  397. # resolve remote addr to its netblock
  398. remote_raw = remote
  399. remote_resolved = None
  400. if remote is not None:
  401. remote_resolved = ffstatus.resolve_ipblock(remote)
  402. if remote_resolved is not None:
  403. logging.debug('Resolved IP \'{0}\' to block \'{1}\'.'.format(
  404. remote, remote_resolved['name'],
  405. ))
  406. remote = remote_resolved
  407. # store connection info
  408. item['active'][gateway] = {
  409. 'establish': timestamp,
  410. 'peer': peername,
  411. 'remote': remote,
  412. 'remote_raw': remote_raw,
  413. }
  414. self.store_vpn_item(key, item)
  415. def log_vpn_disconnect(self, key, gateway, timestamp):
  416. item = self.get_vpn_item(key, create=True)
  417. active = {}
  418. if gateway in item['active']:
  419. active = item['active'][gateway]
  420. del item['active'][gateway]
  421. active['disestablish'] = timestamp
  422. item['last'][gateway] = active
  423. self.store_vpn_item(key, item)