#!/usr/bin/python3 # Copyright (C) 2021 Philipp Fromme # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import os import re import subprocess import sys import time from enum import Enum class State(Enum): OK = 0 WARNING = 1 CRITICAL = 2 UNKNOWN = 3 def match_regex(string, regex_list): for regex in regex_list: r = re.compile(regex) if bool(r.search(string)): return True return False def is_pgp(file): output = subprocess.check_output(["/usr/bin/file", "-b", "--mime-type", file]) output = output.decode().rstrip() pgp_types = ["application/pgp-keys", "application/x-gnupg-keyring"] if output in pgp_types: return True return False def get_file_list(directory): path_list = [] if not os.path.isdir(directory): return None for dirpath, dirnames, filenames in os.walk(directory): for f in filenames: file_path = os.path.join(dirpath, f) if is_pgp(file_path): path_list.append(file_path) return path_list def parse_gpg(output): lines = output.split('\n') keys = {} for line in lines: elements = line.split(':') # see https://github.com/gpg/gnupg/blob/master/doc/DETAILS # for colon listings format record = elements[0] if record == 'pub': added = False length = elements[2] algorithm = elements[3] issue_date = elements[5] expiry_date = elements[6] if expiry_date: expiry_date = int(expiry_date) else: expiry_date = None elif record == 'uid': userid = elements[9] if not added: keys[userid] = (length, algorithm, issue_date, expiry_date) added = True return keys def check_expiry(key, warning, critical): current_time = int(time.time()) state = State.OK expiry_date = key[3] if expiry_date: expired = expiry_date - current_time if expired <= critical: state = State.CRITICAL elif expired <= warning: state = State.WARNING return state def verbose_print(details): hr_format = "%d-%m-%Y" for uid, fields in details.items(): if len(details) > 1: print(" {}:".format(uid)) print(" Algorithm: {}".format(fields[1])) print(" Length: {}".format(fields[0])) issue_date = int(fields[2]) issue_date_hr = time.strftime(hr_format, time.localtime(issue_date)) print(" Issue Date: {} - {}".format(issue_date, issue_date_hr)) expiry_date = fields[3] if expiry_date: expiry_date_hr = time.strftime(hr_format, time.localtime(expiry_date)) expiry_state = "{} - {}".format(expiry_date, expiry_date_hr) else: expiry_state = "None" print(" Expiry Date: {}".format(expiry_state)) def main(): parser = argparse.ArgumentParser(description="Check for expiring pgp keys") parser.add_argument("-w", "--warning", help="Warning threshold, default " "604800 seconds (one week)", nargs="?", type=int, default=604800) parser.add_argument("-c", "--critical", help="Critical threshold, default 0 seconds", nargs="?", type=int, default=0) parser.add_argument("-d", "--dirs", help="Directories to check for pgp keys in", nargs='+', default=["/etc/apt/trusted.gpg.d/"]) parser.add_argument("-s", "--sort", help="Sort by expiry date, " "sorts by path otherwise", action="store_true") parser.add_argument("-v", "--verbose", help="Verbose output", action="store_true") parser.add_argument("-e", "--expiring", help="Only show expiring keys, " "requires -v/--verbose to actually do something", action="store_true") parser.add_argument("-i", "--ignore", help="Regular expressions separated " "by space matching file paths to ignore", nargs="*") args = parser.parse_args() state = State.OK crit_list = [] warn_list = [] unkn_list = [] err_list = [] files = [] verbose_info = {} closest_expiry = {} for directory in args.dirs: file_list = get_file_list(directory) if file_list is None: state = State.UNKNOWN unkn_list.append(directory) continue files = [*files, *file_list] files = set(files) for file_path in files: if args.ignore: if match_regex(file_path, args.ignore): continue try: command = ["/usr/bin/gpg", "--dry-run", "--import-options", "import-show", "--import", "--batch", "--quiet", "--no-keyring", "--trust-model", "always", "--with-colons", file_path] output = subprocess.check_output(command, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: unkn_list.append(file_path) err_list.append(e.output.decode().rstrip()) state = State.UNKNOWN continue keys = parse_gpg(output.decode()) for key in keys: expiry_state = check_expiry(keys[key], args.warning, args.critical) if expiry_state == State.CRITICAL: if state != State.UNKNOWN: state = expiry_state crit_list.append(file_path) elif expiry_state == State.WARNING: if state == State.OK: state = expiry_state warn_list.append(file_path) if args.verbose: verbose_info[file_path] = keys for key in keys: expiry_date = keys[key][3] if not expiry_date: expiry_date = 3000000000 if file_path in closest_expiry: if closest_expiry[file_path] > expiry_date: closest_expiry[file_path] = expiry_date else: closest_expiry[file_path] = expiry_date output = "{} -".format(state.name) if crit_list: output += " Critical: [ {} ]".format(", ".join(x for x in crit_list)) if warn_list: output += " Warning: [ {} ]".format(", ".join(x for x in warn_list)) if unkn_list: output += " Unknown: [ {} ]".format(", ".join(x for x in unkn_list)) if err_list: output += " Error: [ {} ]".format(", ".join(x for x in err_list)) if not (crit_list or warn_list or unkn_list or err_list): output += " All keys ok" print(output) if args.verbose: sorted_keys = sorted(verbose_info.keys(), key=lambda file_path: (closest_expiry[file_path], file_path) if args.sort else file_path) for file_path in sorted_keys: cat_tupl = (*crit_list, *warn_list, *unkn_list) if file_path in cat_tupl or not args.expiring: print() print("{}:".format(file_path)) verbose_print(verbose_info[file_path]) sys.exit(state.value) if __name__ == "__main__": main()