123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- from __future__ import print_function, unicode_literals
- import logging
- import re
- import time
- import ffstatus
- from .exceptions import VpnKeyFormatError
- def sanitize_node(data, include_raw_data=False):
- """
- Filters potentially harmful entries from the node's data.
- """
- export = ffstatus.dict_merge({}, data)
- # remove fields from output: __RAW__
- if '__RAW__' in export and not include_raw_data:
- del export['__RAW__']
- return export
- class BaseStorage(object):
- """
- Provides operations on the storage data.
- This class gets subclassed to actually write the data
- to a file, database, whatever.
- """
- DATAKEY_VPN = '__VPN__'
- FIELDKEY_UPDATED = '__UPDATED__'
- metric_handler = None
- def open(self):
- """
- When overridden in a subclass,
- opens the persistent storage.
- """
- pass
- def save(self):
- """
- When overriden in a subclass,
- stores the data to a persistent storage.
- """
- pass
- def close(self):
- """
- When overridden in a subclass,
- closes the persistent storage.
- """
- pass
- @property
- def status(self):
- """Gets status information on the storage."""
- nodes = 0
- nodes_active = 0
- gateways = 0
- gateways_active = 0
- sum_clients = 0
- clients = set()
- for node in self.get_nodes():
- nodetype = node.get('type', 'node')
- if nodetype == 'gateway':
- gateways += 1
- if self.get_nodestatus(node=node) == 'active':
- gateways_active += 1
- continue
- nodes += 1
- nodemacs = [x for x in node.get('macs', [])]
- if 'mac' in node:
- nodemacs.append(node['mac'])
- if self.get_nodestatus(node=node) == 'active':
- nodes_active += 1
- sum_clients += node.get('clientcount', 0)
- for client in node.get('clients', []):
- if client in nodemacs:
- continue
- clients.add(client)
- return {
- 'clients_sum': sum_clients,
- 'clients_unique': len(clients),
- 'gateways': gateways,
- 'gateways_active': gateways_active,
- 'nodes': nodes,
- 'nodes_active': nodes_active,
- 'now': int(time.time()),
- }
- def __merge_alias_node(self, item, alias):
- # start by using standard dict_merge()
- update = ffstatus.dict_merge(item, alias, overwrite_lists=False)
- # extract some fields for further inspection
- update_macs = update.get('macs', []) or []
- # field 'node_id': keep original value
- update['node_id'] = item['node_id']
- # field 'mac': keep original value
- if 'mac' in item:
- update['mac'] = item['mac']
- if 'mac' in alias:
- update_macs.append(alias['mac'])
- # field 'macs': get rid of duplicates and primary mac
- primary_mac = update.get('mac')
- macs = []
- for x in update_macs:
- if x != primary_mac and x not in macs:
- macs.append(x)
- update['macs'] = update_macs = macs
- # field 'type': keep special node type
- item_type = item.get('type', 'node')
- if item_type != 'node' and update.get('type', 'node') == 'node':
- update['type'] = item_type
- return update
- def __send_metric(self, key, value, ts=None):
- if ts is None:
- ts = time.time()
- func = self.metric_handler
- if func is None or not callable(func):
- # no handler -> do nothing
- return
- func(self, key, value, ts)
- def merge_new_data(self, newdata):
- """Updates data in the storage by merging the new data."""
- ts = time.time()
- if newdata is None or not isinstance(newdata, dict):
- raise ValueError("Expected a dict as new data.")
- # keep a list of aliased nodes so they can be removed from the result
- aliased_nodes = {}
- # start merge on a copy of the current data
- current = {}
- for node in self.get_nodes():
- item_id = node['node_id']
- current[item_id] = ffstatus.dict_merge(node, {})
- current[item_id]['aliases'] = []
- current[item_id]['clients.bak'] = current[item_id].get('clients', [])
- current[item_id]['clients'] = []
- current[item_id]['neighbours'] = {}
- current[item_id]['type'] = 'node'
- if not item_id in newdata:
- continue
- if not '__RAW__' in current[item_id]:
- current[item_id]['__RAW__'] = {}
- if '__RAW__' in newdata[item_id]:
- for key in newdata[item_id]['__RAW__']:
- if key in current[item_id]['__RAW__']:
- del current[item_id]['__RAW__'][key]
- # merge the dictionaries
- updated = {}
- for itemid in newdata:
- if not itemid in current:
- # new element which did not exist in storage before, that's easy
- updated[itemid] = newdata[itemid]
- else:
- # merge the old and new element
- update = ffstatus.dict_merge(current[itemid], newdata[itemid])
- updated[itemid] = update
- for alias_id in updated[itemid].get('aliases', []):
- if alias_id in aliased_nodes:
- aliased_nodes[alias_id].append(itemid)
- else:
- aliased_nodes[alias_id] = [itemid]
- # merge aliased nodes
- for alias_id in aliased_nodes:
- if len(aliased_nodes[alias_id]) != 1:
- logging.warn("Node '%s' is aliased by multiple nodes: %s",
- alias_id, aliased_nodes[alias_id])
- continue
- # target's id is the single entry of the alias list
- item_id = aliased_nodes[alias_id][0]
- if alias_id == item_id:
- # the node has itself as alias -> remove the alias entry
- if alias_id in updated and 'aliases' in updated[alias_id]:
- updated[alias_id]['aliases'].remove(alias_id)
- logging.debug("Removed self-alias of '%s'.", alias_id)
- continue
- # look for alias node
- alias = updated.get(alias_id, current.get(alias_id))
- if alias is None:
- # no alias node present already, as we're trying to achieve here
- continue
- # look for target node
- item = updated.get(item_id, current.get(item_id))
- if item is None:
- logging.warn("Alias node '%s' is missing its target '%s.",
- alias_id, item_id)
- continue
- # ensure both target and alias node have 'node_id' field set
- if not 'node_id' in item:
- item['node_id'] = item_id
- alias['node_id'] = alias_id
- # merge data
- update = self.__merge_alias_node(item, alias)
- updated[item_id] = update
- logging.debug("Merged alias '%s' into '%s'.", alias_id, item_id)
- # mark alias node for deletion
- updated[alias_id] = None
- # sanitize each item's data
- for itemid in updated:
- if itemid.startswith('__'):
- continue
- item = updated[itemid]
- # delete node if requested
- if item is None:
- self.set_node_data(itemid, None)
- continue
- # ensure 'node_id' is set
- if not 'node_id' in item:
- item['node_id'] = itemid
- # remove node's MACs from clients list
- item_mac = item.get('mac') or ffstatus.guess_mac_from_nodeid(itemid)
- clients = [x for x in item.get('clients', [])]
- if item_mac is not None and item_mac in clients:
- clients.remove(item_mac)
- for mac in item.get('macs', []):
- if mac in clients:
- clients.remove(mac)
- # set clientcount
- item['clientcount'] = len(clients)
- # compute client delta
- prev_clients = item.get('clients.bak', [])
- new_clients = item.get('clients', [])
- if 'clients.bak' in item:
- del(item['clients.bak'])
- diff_added = [x for x in new_clients if x not in prev_clients]
- diff_removed = [x for x in prev_clients if x not in new_clients]
- self.__send_metric("nodes." + item_mac + ".clients_added",
- len(diff_added), ts)
- self.__send_metric("nodes." + item_mac + ".clients_removed",
- len(diff_removed), ts)
- # finally, set each new data
- self.set_node_data(itemid, item)
- def get_nodes(self, sortby=None, include_raw_data=False):
- """Gets a list of all known nodes."""
- nodes = self.get_all_nodes_raw()
- sorted_ids = [x for x in nodes]
- if sortby is not None:
- if sortby == 'name':
- sortkey = lambda x: nodes[x]['hostname'].lower()
- sorted_ids = sorted(sorted_ids, key=sortkey)
- elif sortby == 'id':
- sorted_ids = sorted(sorted_ids)
- result = []
- for nodeid in sorted_ids:
- if nodeid.startswith('__'):
- continue
- node = sanitize_node(nodes[nodeid], include_raw_data)
- result.append(node)
- return result
- def find_node(self, rawid, include_raw_data=False, search_aliases=True):
- """
- Fetch node data by given id.
- If necessary, look through node aliases.
- """
- # try direct match, first
- node = self.get_node(rawid)
- if node is not None:
- return sanitize_node(node, include_raw_data=include_raw_data)
- # look through all nodes
- found = None
- nodes = self.get_all_nodes_raw()
- for nodeid in nodes:
- node = nodes[nodeid]
- # if we have a direct hit, return it immediately
- if nodeid == rawid:
- return sanitize_node(node, include_raw_data=include_raw_data)
- # search through aliases
- if search_aliases and rawid in node.get('aliases', []):
- found = node
- # return found node
- if not found is None:
- return sanitize_node(found, include_raw_data=include_raw_data)
- else:
- return None
- def find_node_by_mac(self, mac):
- """Fetch node data by given MAC address."""
- needle = mac.lower()
- # iterate over all nodes
- for node in self.get_nodes():
- # check node's primary MAC
- if 'mac' in node and needle == node['mac'].lower():
- return sanitize_node(node)
- # check alias MACs
- if 'macs' in node:
- haystack = [x.lower() for x in node['macs']]
- if mac in haystack:
- return sanitize_node(node)
- # MAC address not found
- return None
- def get_nodestatus(self, rawid=None, node=None):
- """Determine node's status."""
- # search node by the given id
- if node is None and not rawid is None:
- node = self.find_node(rawid)
- # handle unknown nodes
- if node is None:
- return None
- # check that the last batadv update is noted in the data
- updated = node.get(self.FIELDKEY_UPDATED, None)
- if updated is None:
- return 'unknown'
- u = updated.get('batadv', updated.get('batctl'))
- if u is None:
- return 'unknown'
- # make decision based on time of last batadv update
- diff = time.time() - u
- if diff < 150:
- return 'active'
- elif diff < 300:
- return 'stale'
- else:
- return 'offline'
- def set_node_data(self, key, data):
- """
- Overwrite data for the node with the given key.
- Specifying 'None' as data effectively means deleting the key.
- """
- raise NotImplementedError("set_node_data was not overridden")
- def check_vpn_key(self, key):
- if key is None or re.match(r'^[a-fA-F0-9]+$', key) is None:
- raise VpnKeyFormatError(key)
- def get_vpn_keys(self):
- """Gets a list of VPN keys."""
- raise NotImplementedError("get_vpn_keys was not overriden")
- def get_vpn_item(self, key, create=False):
- self.check_vpn_key(key)
- raise NotImplementedError("store_vpn_item was not overriden")
- def store_vpn_item(self, key, data):
- raise NotImplementedError("store_vpn_item was not overriden")
- def resolve_vpn_remotes(self):
- """Iterates all remotes and resolves IP blocks not yet resolved."""
- vpn = self.get_vpn_keys()
- init_vpn_cache = {}
- for key in vpn:
- entry = self.get_vpn_item(key)
- entry_modified = False
- for mode in entry:
- if not isinstance(entry[mode], dict):
- continue
- for gateway in entry[mode]:
- if not isinstance(entry[mode][gateway], dict):
- continue
- item = entry[mode][gateway]
- if 'remote' in item and not 'remote_raw' in item:
- item['remote_raw'] = item['remote']
- resolved = None
- if item['remote'] in init_vpn_cache:
- resolved = init_vpn_cache[item['remote']]
- else:
- resolved = ffstatus.resolve_ipblock(item['remote'])
- init_vpn_cache[item['remote']] = resolved
- if resolved is not None:
- logging.info(
- 'Resolved VPN entry \'%s\' to net \'%s\'.',
- item['remote'],
- resolved['name'],
- )
- if resolved is not None:
- item['remote'] = resolved
- entry_modified = True
- if entry_modified:
- self.store_vpn_item(key, entry)
- def get_vpn_gateways(self):
- gateways = set()
- vpn = self.get_vpn_keys()
- for key in vpn:
- entry = self.get_vpn_item(key)
- for conntype in entry:
- for gateway in entry[conntype]:
- gateways.add(gateway)
- return sorted(gateways)
- def get_vpn_connections(self):
- conntypes = ['active', 'last']
- result = []
- vpnkeys = self.get_vpn_keys()
- for key in vpnkeys:
- vpn_entry = self.get_vpn_item(key)
- if not isinstance(vpn_entry, dict):
- continue
- item = {
- 'key': key,
- 'count': {},
- 'remote': {},
- }
- names = set()
- for conntype in conntypes:
- item['count'][conntype] = 0
- item['remote'][conntype] = {}
- if conntype in vpn_entry:
- for gateway in vpn_entry[conntype]:
- if 'remote' in vpn_entry[conntype][gateway]:
- remote = vpn_entry[conntype][gateway]['remote']
- if remote is None or remote == '':
- continue
- item['count'][conntype] += 1
- item['remote'][conntype][gateway] = remote
- if 'peer' in vpn_entry[conntype][gateway]:
- names.add(vpn_entry[conntype][gateway]['peer'])
- item['names'] = sorted(names)
- item['online'] = item['count']['active'] > 0
- result.append(item)
- return result
- def log_vpn_connect(self, key, peername, remote, gateway, timestamp):
- item = self.get_vpn_item(key, create=True)
- # resolve remote addr to its netblock
- remote_raw = remote
- remote_resolved = None
- if remote is not None:
- remote_resolved = ffstatus.resolve_ipblock(remote)
- if remote_resolved is not None:
- logging.debug('Resolved IP \'{0}\' to block \'{1}\'.'.format(
- remote, remote_resolved['name'],
- ))
- remote = remote_resolved
- # store connection info
- item['active'][gateway] = {
- 'establish': timestamp,
- 'peer': peername,
- 'remote': remote,
- 'remote_raw': remote_raw,
- }
- self.store_vpn_item(key, item)
- def log_vpn_disconnect(self, key, gateway, timestamp):
- item = self.get_vpn_item(key, create=True)
- active = {}
- if gateway in item['active']:
- active = item['active'][gateway]
- del item['active'][gateway]
- active['disestablish'] = timestamp
- item['last'][gateway] = active
- self.store_vpn_item(key, item)
|