|
@@ -0,0 +1,216 @@
|
|
|
|
+#!/usr/bin/python3
|
|
|
|
+# Copyright (C) 2021 Philipp Fromme
|
|
|
|
+#
|
|
|
|
+# This program is free software: you can redistribute it and/or modify
|
|
|
|
+# it under the terms of the GNU General Public License as published by
|
|
|
|
+# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
+# (at your option) any later version.
|
|
|
|
+#
|
|
|
|
+# This program is distributed in the hope that it will be useful,
|
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
+# GNU General Public License for more details.
|
|
|
|
+#
|
|
|
|
+# You should have received a copy of the GNU General Public License
|
|
|
|
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
+
|
|
|
|
+import argparse
|
|
|
|
+import os
|
|
|
|
+import re
|
|
|
|
+import subprocess
|
|
|
|
+import sys
|
|
|
|
+import time
|
|
|
|
+from enum import Enum
|
|
|
|
+
|
|
|
|
+class State(Enum):
|
|
|
|
+ OK = 0
|
|
|
|
+ WARNING = 1
|
|
|
|
+ CRITICAL = 2
|
|
|
|
+ UNKNOWN = 3
|
|
|
|
+
|
|
|
|
+def match_regex(string, regex_list):
|
|
|
|
+ for regex in regex_list:
|
|
|
|
+ r = re.compile(regex)
|
|
|
|
+ if bool(r.search(string)):
|
|
|
|
+ return True
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+def is_pgp(file):
|
|
|
|
+ output = subprocess.check_output(["/usr/bin/file", "-b", "--mime-type", file])
|
|
|
|
+ output = output.decode().rstrip()
|
|
|
|
+ pgp_types = ["application/pgp-keys", "application/x-gnupg-keyring"]
|
|
|
|
+ if output in pgp_types:
|
|
|
|
+ return True
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+def get_file_list(directory):
|
|
|
|
+ path_list = []
|
|
|
|
+ if not os.path.isdir(directory):
|
|
|
|
+ return None
|
|
|
|
+ for dirpath, dirnames, filenames in os.walk(directory):
|
|
|
|
+ for f in filenames:
|
|
|
|
+ file_path = os.path.join(dirpath, f)
|
|
|
|
+ if is_pgp(file_path):
|
|
|
|
+ path_list.append(file_path)
|
|
|
|
+ return path_list
|
|
|
|
+
|
|
|
|
+def parse_gpg(output):
|
|
|
|
+ lines = output.split('\n')
|
|
|
|
+ keys = {}
|
|
|
|
+ for line in lines:
|
|
|
|
+ elements = line.split(':')
|
|
|
|
+ # see https://github.com/gpg/gnupg/blob/master/doc/DETAILS
|
|
|
|
+ # for colon listings format
|
|
|
|
+ record = elements[0]
|
|
|
|
+ if record == 'pub':
|
|
|
|
+ added = False
|
|
|
|
+ length = elements[2]
|
|
|
|
+ algorithm = elements[3]
|
|
|
|
+ issue_date = elements[5]
|
|
|
|
+ expiry_date = elements[6]
|
|
|
|
+ if expiry_date:
|
|
|
|
+ expiry_date = int(expiry_date)
|
|
|
|
+ else:
|
|
|
|
+ expiry_date = None
|
|
|
|
+ elif record == 'uid':
|
|
|
|
+ userid = elements[9]
|
|
|
|
+ if not added:
|
|
|
|
+ keys[userid] = (length, algorithm, issue_date, expiry_date)
|
|
|
|
+ added = True
|
|
|
|
+ return keys
|
|
|
|
+
|
|
|
|
+def check_expiry(key, warning, critical):
|
|
|
|
+ current_time = int(time.time())
|
|
|
|
+ state = State.OK
|
|
|
|
+ expiry_date = key[3]
|
|
|
|
+ if expiry_date:
|
|
|
|
+ expired = expiry_date - current_time
|
|
|
|
+ if expired <= critical:
|
|
|
|
+ state = State.CRITICAL
|
|
|
|
+ elif expired <= warning:
|
|
|
|
+ state = State.WARNING
|
|
|
|
+ return state
|
|
|
|
+
|
|
|
|
+def verbose_print(details):
|
|
|
|
+ hr_format = "%d-%m-%Y"
|
|
|
|
+ for uid, fields in details.items():
|
|
|
|
+ if len(details) > 1:
|
|
|
|
+ print(" {}:".format(uid))
|
|
|
|
+ print(" Algorithm: {}".format(fields[1]))
|
|
|
|
+ print(" Length: {}".format(fields[0]))
|
|
|
|
+ issue_date = int(fields[2])
|
|
|
|
+ issue_date_hr = time.strftime(hr_format, time.localtime(issue_date))
|
|
|
|
+ print(" Issue Date: {} - {}".format(issue_date, issue_date_hr))
|
|
|
|
+ expiry_date = fields[3]
|
|
|
|
+ if expiry_date:
|
|
|
|
+ expiry_date_hr = time.strftime(hr_format, time.localtime(expiry_date))
|
|
|
|
+ expiry_state = "{} - {}".format(expiry_date, expiry_date_hr)
|
|
|
|
+ else:
|
|
|
|
+ expiry_state = "None"
|
|
|
|
+ print(" Expiry Date: {}".format(expiry_state))
|
|
|
|
+
|
|
|
|
+def main():
|
|
|
|
+ parser = argparse.ArgumentParser(description="Check for expiring pgp keys")
|
|
|
|
+ parser.add_argument("-w", "--warning", help="Warning threshold, default "
|
|
|
|
+ "604800 seconds (one week)", nargs="?", type=int,
|
|
|
|
+ default=604800)
|
|
|
|
+ parser.add_argument("-c", "--critical", help="Critical threshold, default 0 seconds",
|
|
|
|
+ nargs="?", type=int, default=0)
|
|
|
|
+ parser.add_argument("-d", "--dirs", help="Directories to check for pgp keys in",
|
|
|
|
+ nargs='+', default=["/etc/apt/trusted.gpg.d/"])
|
|
|
|
+ parser.add_argument("-s", "--sort", help="Sort by expiry date, "
|
|
|
|
+ "sorts by path otherwise", action="store_true")
|
|
|
|
+ parser.add_argument("-v", "--verbose", help="Verbose output", action="store_true")
|
|
|
|
+ parser.add_argument("-e", "--expiring", help="Only show expiring keys, "
|
|
|
|
+ "requires -v/--verbose to actually do something", action="store_true")
|
|
|
|
+ parser.add_argument("-i", "--ignore", help="Regular expressions separated "
|
|
|
|
+ "by space matching file paths to ignore", nargs="*")
|
|
|
|
+ args = parser.parse_args()
|
|
|
|
+
|
|
|
|
+ state = State.OK
|
|
|
|
+
|
|
|
|
+ crit_list = []
|
|
|
|
+ warn_list = []
|
|
|
|
+ unkn_list = []
|
|
|
|
+ err_list = []
|
|
|
|
+
|
|
|
|
+ files = []
|
|
|
|
+
|
|
|
|
+ verbose_info = {}
|
|
|
|
+ closest_expiry = {}
|
|
|
|
+
|
|
|
|
+ for directory in args.dirs:
|
|
|
|
+ file_list = get_file_list(directory)
|
|
|
|
+ if file_list is None:
|
|
|
|
+ state = State.UNKNOWN
|
|
|
|
+ unkn_list.append(directory)
|
|
|
|
+ continue
|
|
|
|
+ files = [*files, *file_list]
|
|
|
|
+ files = set(files)
|
|
|
|
+
|
|
|
|
+ for file_path in files:
|
|
|
|
+ if args.ignore:
|
|
|
|
+ if match_regex(file_path, args.ignore):
|
|
|
|
+ continue
|
|
|
|
+ try:
|
|
|
|
+ command = ["/usr/bin/gpg", "--dry-run", "--import-options",
|
|
|
|
+ "import-show", "--import", "--batch", "--quiet",
|
|
|
|
+ "--no-keyring", "--trust-model", "always",
|
|
|
|
+ "--with-colons", file_path]
|
|
|
|
+ output = subprocess.check_output(command, stderr=subprocess.STDOUT)
|
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
|
+ unkn_list.append(file_path)
|
|
|
|
+ err_list.append(e.output.decode().rstrip())
|
|
|
|
+ state = State.UNKNOWN
|
|
|
|
+ continue
|
|
|
|
+ keys = parse_gpg(output.decode())
|
|
|
|
+ for key in keys:
|
|
|
|
+ expiry_state = check_expiry(keys[key], args.warning, args.critical)
|
|
|
|
+ if expiry_state == State.CRITICAL:
|
|
|
|
+ if state != State.UNKNOWN:
|
|
|
|
+ state = expiry_state
|
|
|
|
+ crit_list.append(file_path)
|
|
|
|
+ elif expiry_state == State.WARNING:
|
|
|
|
+ if state == State.OK:
|
|
|
|
+ state = expiry_state
|
|
|
|
+ warn_list.append(file_path)
|
|
|
|
+ if args.verbose:
|
|
|
|
+ verbose_info[file_path] = keys
|
|
|
|
+ for key in keys:
|
|
|
|
+ expiry_date = keys[key][3]
|
|
|
|
+ if not expiry_date:
|
|
|
|
+ expiry_date = 3000000000
|
|
|
|
+ if file_path in closest_expiry:
|
|
|
|
+ if closest_expiry[file_path] > expiry_date:
|
|
|
|
+ closest_expiry[file_path] = expiry_date
|
|
|
|
+ else:
|
|
|
|
+ closest_expiry[file_path] = expiry_date
|
|
|
|
+
|
|
|
|
+ output = "{} -".format(state.name)
|
|
|
|
+ if crit_list:
|
|
|
|
+ output += " Critical: [ {} ]".format(", ".join(x for x in crit_list))
|
|
|
|
+ if warn_list:
|
|
|
|
+ output += " Warning: [ {} ]".format(", ".join(x for x in warn_list))
|
|
|
|
+ if unkn_list:
|
|
|
|
+ output += " Unknown: [ {} ]".format(", ".join(x for x in unkn_list))
|
|
|
|
+ if err_list:
|
|
|
|
+ output += " Error: [ {} ]".format(", ".join(x for x in err_list))
|
|
|
|
+ if not (crit_list or warn_list or unkn_list or err_list):
|
|
|
|
+ output += " All keys ok"
|
|
|
|
+ print(output)
|
|
|
|
+
|
|
|
|
+ if args.verbose:
|
|
|
|
+ sorted_keys = sorted(verbose_info.keys(), key=lambda file_path:
|
|
|
|
+ (closest_expiry[file_path], file_path)
|
|
|
|
+ if args.sort else file_path)
|
|
|
|
+ for file_path in sorted_keys:
|
|
|
|
+ cat_tupl = (*crit_list, *warn_list, *unkn_list)
|
|
|
|
+ if file_path in cat_tupl or not args.expiring:
|
|
|
|
+ print()
|
|
|
|
+ print("{}:".format(file_path))
|
|
|
|
+ verbose_print(verbose_info[file_path])
|
|
|
|
+
|
|
|
|
+ sys.exit(state.value)
|
|
|
|
+
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+ main()
|