123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- #
- # FFHO netfilter helper functions
- #
- import ipaddress
- def generate_service_rules (services, acls, af):
- rules = []
- for srv in services:
- rule = ""
- comment = srv['descr']
- acl_comment = ""
- src_prefixes = []
- # If there are no DST IPs set at all or DST IPs for this AF set, we have a rule to build,
- # if this is NOT the case, there is no rule for this AF to generate, carry on.
- if not ((not srv['ips']['4'] and not srv['ips']['6']) or srv['ips'][str(af)]):
- continue
- # Is/are IP(s) set for this service?
- if srv['ips'][str(af)]:
- rule += "ip" if af == 4 else "ip6"
- dst_ips = srv['ips'][str(af)]
- if len (dst_ips) == 1:
- rule += " daddr %s " % dst_ips[0]
- else:
- rule += " daddr { %s } " % ", ".join (dst_ips)
- # ACLs defined for this service?
- if srv['acl']:
- srv_acl = sorted (srv['acl'])
- for ace in srv_acl:
- ace_pfx = (acls[ace][af])
- # Many entries
- if type (ace_pfx) == list:
- src_prefixes.extend (ace_pfx)
- else:
- src_prefixes.append (ace_pfx)
- acl_comment = "acl: %s" % ", ".join (srv_acl)
- # Additional prefixes defined for this service?
- if srv['additional_prefixes']:
- add_pfx = []
- # Additional prefixes are given as a space separated list
- for entry in srv['additional_prefixes'].split ():
- # Strip commas and spaces, just in case
- pfx_str = entry.strip (" ,")
- pfx_obj = ipaddress.ip_network (pfx_str)
- # We only care for additional pfx for this AF
- if pfx_obj.version != af:
- continue
- add_pfx.append (pfx_str)
- if add_pfx:
- src_prefixes.extend (add_pfx)
- if acl_comment:
- acl_comment += ", "
- acl_comment += "additional pfx"
- # Combine ACL + additional prefixes (if any)
- if src_prefixes:
- rule += "ip" if af == 4 else "ip6"
- if len (src_prefixes) > 1:
- rule += " saddr { %s } " % ", ".join (src_prefixes)
- else:
- rule += " saddr %s " % src_prefixes[0]
- if acl_comment:
- comment += " (%s)" % acl_comment
- # Multiple ports?
- if len (srv['ports']) > 1:
- ports = "{ %s }" % ", ".join (map (str, sorted (srv['ports'])))
- else:
- ports = srv['ports'][0]
- rule += "%s dport %s counter accept comment \"%s\"" % (srv['proto'], ports, comment)
- rules.append (rule)
- return rules
|