X-Git-Url: https://git.llucax.com/software/pymin.git/blobdiff_plain/8d52115e230ebf5dd657f60021e27362ca8ed99e..5d2a72486a8ef72659df813e1c0feed41da3a667:/pymin/services/firewall/__init__.py?ds=sidebyside diff --git a/pymin/services/firewall/__init__.py b/pymin/services/firewall/__init__.py index c1b6e61..0ab2b4c 100644 --- a/pymin/services/firewall/__init__.py +++ b/pymin/services/firewall/__init__.py @@ -4,16 +4,24 @@ # of using script templates. from os import path +from formencode import Invalid +from formencode.validators import OneOf, CIDR, Int import logging ; log = logging.getLogger('pymin.services.firewall') -from pymin.seqtools import Sequence +from pymin.item import Item +from pymin.validatedclass import Field from pymin.dispatcher import Handler, handler, HandlerError from pymin.services.util import Restorable, ConfigWriter, ServiceHandler, \ TransactionalHandler, ListSubHandler __all__ = ('FirewallHandler',) -class Rule(Sequence): +class UpOneOf(OneOf): + def validate_python(self, value, state): + value = value.upper() + return OneOf.validate_python(self, value, state) + +class Rule(Item): r"""Rule(chain, target[, src[, dst[, ...]]]) -> Rule instance. chain - INPUT, OUTPUT or FORWARD. @@ -24,37 +32,23 @@ class Rule(Sequence): src_port - Source port (only for UDP or TCP protocols). dst_port - Destination port (only for UDP or TCP protocols). """ - - def __init__(self, chain, target, src=None, dst=None, protocol=None, - src_port=None, dst_port=None): - r"Initialize object, see class documentation for details." - self.chain = chain - self.target = target - self.src = src - self.dst = dst - self.protocol = protocol - # TODO Validate that src_port and dst_port could be not None only - # if the protocol is UDP or TCP - self.src_port = src_port - self.dst_port = dst_port - - def update(self, chain=None, target=None, src=None, dst=None, protocol=None, - src_port=None, dst_port=None): - r"update([chain[, ...]]) -> Update the values of a rule (see Rule doc)." - if chain is not None: self.chain = chain - if target is not None: self.target = target - if src is not None: self.src = src - if dst is not None: self.dst = dst - if protocol is not None: self.protocol = protocol - # TODO Validate that src_port and dst_port could be not None only - # if the protocol is UDP or TCP - if src_port is not None: self.src_port = src_port - if dst_port is not None: self.dst_port = dst_port - - def as_tuple(self): - r"Return a tuple representing the rule." - return (self.chain, self.target, self.src, self.dst, self.protocol, - self.src_port, self.dst_port) + chain = Field(UpOneOf(['INPUT', 'OUTPUT', 'FORWARD'], not_empty=True)) + target = Field(UpOneOf(['ACCEPT', 'REJECT', 'DROP'], not_empty=True)) + src = Field(CIDR(if_empty=None, if_missing=None)) + dst = Field(CIDR(if_empty=None, if_missing=None)) + protocol = Field(UpOneOf(['ICMP', 'UDP', 'TCP', 'ALL'], if_missing=None)) + src_port = Field(Int(min=0, max=65535, if_empty=None, if_missing=None)) + dst_port = Field(Int(min=0, max=65535, if_empty=None, if_missing=None)) + def chained_validator(self, fields, state): + errors = dict() + if fields['protocol'] not in ('TCP', 'UDP'): + for name in ('src_port', 'dst_port'): + if fields[name] is not None: + errors[name] = u"Should be None if protocol " \ + "(%(protocol)s) is not TCP or UDP" % fields + if errors: + raise Invalid(u"You can't specify any ports if the protocol " + u'is not TCP or UDP', fields, state, error_dict=errors) class RuleHandler(ListSubHandler): r"""RuleHandler(parent) -> RuleHandler instance :: Handle a list of rules. @@ -130,11 +124,11 @@ if __name__ == '__main__': dump() - fw_handler.rule.add('input','drop','icmp') + fw_handler.rule.add('input', 'drop', protocol='icmp') fw_handler.rule.update(0, dst='192.168.0.188/32') - fw_handler.rule.add('output','accept', '192.168.1.0/24') + fw_handler.rule.add('output', 'accept', '192.168.1.0/24') fw_handler.commit()