]> git.llucax.com Git - software/pymin.git/blobdiff - pymin/dispatcher.py
Catch exception when trying to write /proc/.../ip_forward in the config file.
[software/pymin.git] / pymin / dispatcher.py
index 39b4b75cf6522f64e78b2d097422a2bfc69285e1..0d28cb337bf1686d53f46446cdef03ec2272c2bc 100644 (file)
@@ -7,7 +7,11 @@ It's based on Zope or Cherrypy dispatching (but implemented from the scratch)
 and translates commands to functions/objects/methods.
 """
 
-__ALL__ = ('Error', 'HandlerError', 'CommandNotFoundError', 'Handler',
+import re
+import inspect
+import logging ; log = logging.getLogger('pymin.dispatcher')
+
+__all__ = ('Error', 'HandlerError', 'CommandNotFoundError', 'Handler',
             'Dispatcher', 'handler', 'is_handler', 'get_help')
 
 class Error(RuntimeError):
@@ -52,6 +56,51 @@ class CommandError(Error):
     def __unicode__(self):
         return u'Error in command "%s".' % u' '.join(self.command)
 
+class WrongArgumentsError(CommandError):
+    r"""WrongArgumentsError(handler, error) -> WrongArgumentsError instance.
+
+    This exception is raised when an empty command string is received.
+    """
+
+    def __init__(self, handler, error):
+        r"Initialize the object, see class documentation for more info."
+        self.handler = handler
+        self.error = error
+
+    args_re = re.compile(r'\w+\(\) takes (.+) (\d+) \w+ \((\d+) given\)')
+
+    extra_kw_re = re.compile(r'\w+\(\) got an unexpected keyword argument (.+)')
+
+    dup_kw_re = re.compile(r'\w+\(\) got multiple values for keyword argument '
+                            r"'(.+)'")
+
+    def format(self):
+        r"format() -> unicode - Format a TypeError to adapt it to a command."
+        m = self.args_re.match(unicode(self.error))
+        if m:
+            (quant, n_ok, n_bad)  = m.groups()
+            n_ok = int(n_ok)
+            n_bad = int(n_bad)
+            n_ok -= 1
+            n_bad -= 1
+            pl = ''
+            if n_ok != 1:
+                pl = 's'
+            return u'takes %s %s argument%s, %s given' \
+                        % (quant, n_ok, pl, n_bad)
+        m = self.extra_kw_re.match(unicode(self.error))
+        if m:
+            (kw,)  = m.groups()
+            return u'got an unexpected keyword argument %s' % kw
+        m = self.dup_kw_re.match(unicode(self.error))
+        if m:
+            (kw,)  = m.groups()
+            return u'got multiple values for argument %s' % kw
+        return u'got wrong arguments'
+
+    def __unicode__(self):
+        return u'Command "%s" %s.' % (self.handler.__name__, self.format())
+
 class CommandNotSpecifiedError(CommandError):
     r"""CommandNotSpecifiedError() -> CommandNotSpecifiedError instance.
 
@@ -146,13 +195,52 @@ def handler(help):
 
     help - Help string for the handler.
     """
-    def wrapper(f):
-        if not help:
-            raise TypeError("'help' should not be empty")
-        f._dispatcher_handler = True
-        f.handler_help = help
-        return f
-    return wrapper
+    if not help:
+        raise TypeError("'help' should not be empty")
+    def make_wrapper(f):
+        log.debug('handler(): Decorating %s()', f.__name__)
+        # Here comes the tricky part:
+        # We need to make our wrapped function to accept any number of
+        # positional and keyword arguments, but checking for the correct
+        # arguments and raising an exception in case the arguments doesn't
+        # match.
+        # So we create a dummy function, with the same signature as the
+        # wrapped one, so we can check later (at "dispatch-time") if the
+        # real function call will be successful. If the dummy function don't
+        # raise a TypeError, the arguments are just fine.
+        env = dict()
+        argspec = inspect.getargspec(f)
+        signature = inspect.formatargspec(*argspec)
+        # The dummy function
+        exec "def f%s: pass" % signature in env
+        signature_check = env['f']
+        # The wrapper to check the signature at "dispatch-time"
+        def wrapper(*args, **kwargs):
+            # First we check if the arguments passed are OK.
+            try:
+                signature_check(*args, **kwargs)
+            except TypeError, e:
+                # If not, we raise an appropriate error.
+                raise WrongArgumentsError(f, e)
+            # If they are fine, we call the real function
+            return f(*args, **kwargs)
+        # Some flag to mark our handlers for simple checks
+        wrapper._dispatcher_handler = True
+        # The help string we asked for in the first place =)
+        wrapper.handler_help = help
+        # We store the original signature for better help generation
+        wrapper.handler_argspec = argspec
+        # And some makeup, to make our wrapper look like the original function
+        wrapper.__name__ = f.__name__
+        wrapper.__dict__.update(f.__dict__)
+        # We add a hint in the documentation
+        wrapper.__doc__ = "Pymin handler with signature: %s%s" \
+                            % (wrapper.__name__, signature)
+        if f.__doc__ is not None:
+            wrapper.__doc__ += "\n\n" + f.__doc__
+        wrapper.__module__ = f.__module__
+        return wrapper
+    return make_wrapper
 
 def is_handler(handler):
     r"is_handler(handler) -> bool :: Tell if a object is a handler."
@@ -186,17 +274,31 @@ class Handler:
             d = dict()
             for a in dir(self):
                 h = getattr(self, a)
+                if a == 'parent': continue # Skip parents in SubHandlers
                 if is_handler(h) or isinstance(h, Handler):
                     d[a] = h.handler_help
             return d
         # A command was specified
+        if command == 'parent': # Skip parents in SubHandlers
+            raise HelpNotFoundError(command)
         if not hasattr(self, command.encode('utf-8')):
             raise HelpNotFoundError(command)
         handler = getattr(self, command.encode('utf-8'))
-        if not is_handler(handler) and not hasattr(handler):
+        if not is_handler(handler) and not hasattr(handler, 'handler_help'):
             raise HelpNotFoundError(command)
         return handler.handler_help
 
+    def handle_timer(self):
+        r"""handle_timer() -> None :: Do periodic tasks.
+
+        By default we do nothing but calling handle_timer() on subhandlers.
+        """
+        for a in dir(self):
+            if a == 'parent': continue # Skip parents in SubHandlers
+            h = getattr(self, a)
+            if isinstance(h, Handler):
+                h.handle_timer()
+
 def parse_command(command):
     r"""parse_command(command) -> (args, kwargs) :: Parse a command.
 
@@ -223,6 +325,8 @@ def parse_command(command):
     arguments is preserved and if there are multiple keyword arguments with
     the same key, the last value is the winner (all other values are lost).
 
+    The command should be a unicode string.
+
     Examples:
 
     >>> parse_command('hello world')
@@ -250,6 +354,12 @@ def parse_command(command):
     ([u'=hello'], {})
     >>> parse_command(r'\thello')
     ([u'\thello'], {})
+    >>> parse_command(r'hello \n')
+    ([u'hello', u'\n'], {})
+    >>> parse_command(r'hello \nmundo')
+    ([u'hello', u'\nmundo'], {})
+    >>> parse_command(r'test \N')
+    ([u'test', None], {})
     >>> parse_command(r'\N')
     ([None], {})
     >>> parse_command(r'none=\N')
@@ -274,9 +384,23 @@ def parse_command(command):
     escape = False
     keyword = None
     state = SEP
+    def register_token(buff, keyword, seq, dic):
+        if buff == r'\N':
+            buff = None
+        if keyword is not None:
+            dic[keyword.encode('utf-8')] = buff
+            keyword = None
+        else:
+            seq.append(buff)
+        buff = u''
+        return (buff, keyword)
     for n, c in enumerate(command):
         # Escaped character
         if escape:
+            # Not yet registered the token
+            if state == SEP and buff:
+                (buff, keyword) = register_token(buff, keyword, seq, dic)
+                state = TOKEN
             for e in escaped_chars:
                 if c == e:
                     buff += eval(u'"\\' + e + u'"')
@@ -301,14 +425,7 @@ def parse_command(command):
                     keyword = buff
                     buff = u''
                     continue
-                if buff == r'\N':
-                    buff = None
-                if keyword is not None: # Value found
-                    dic[str(keyword)] = buff
-                    keyword = None
-                else: # Normal parameter found
-                    seq.append(buff)
-                buff = u''
+                (buff, keyword) = register_token(buff, keyword, seq, dic)
             state = TOKEN
         # Getting a token
         if state == TOKEN:
@@ -350,12 +467,7 @@ def parse_command(command):
         raise ParseError(command,
                         u'keyword argument (%s) without value' % keyword)
     if buff:
-        if buff == r'\N':
-            buff = None
-        if keyword is not None:
-            dic[str(keyword)] = buff
-        else:
-            seq.append(buff)
+        register_token(buff, keyword, seq, dic)
     return (seq, dic)
 
 class Dispatcher:
@@ -391,6 +503,7 @@ class Dispatcher:
 
         See Dispatcher class documentation for more info.
         """
+        log.debug(u'Dispatcher(%r)', root)
         self.root = root
 
     def dispatch(self, route):
@@ -402,28 +515,51 @@ class Dispatcher:
 
         route - *unicode* string with the command route.
         """
+        log.debug('Dispatcher.dispatch(%r)', route)
         command = list()
         (route, kwargs) = parse_command(route)
+        log.debug(u'Dispatcher.dispatch: route=%r, kwargs=%r', route, kwargs)
         if not route:
+            log.debug(u'Dispatcher.dispatch: command not specified')
             raise CommandNotSpecifiedError()
         handler = self.root
         while not is_handler(handler):
+            log.debug(u'Dispatcher.dispatch: handler=%r, route=%r',
+                        handler, route)
             if len(route) is 0:
                 if isinstance(handler, Handler):
+                    log.debug(u'Dispatcher.dispatch: command is a handler')
                     raise CommandIsAHandlerError(command)
+                log.debug(u'Dispatcher.dispatch: command not found')
                 raise CommandNotFoundError(command)
             command.append(route[0])
+            log.debug(u'Dispatcher.dispatch: command=%r', command)
+            if route[0] == 'parent':
+                log.debug(u'Dispatcher.dispatch: is parent => not found')
+                raise CommandNotFoundError(command)
             if not hasattr(handler, route[0].encode('utf-8')):
                 if isinstance(handler, Handler) and len(command) > 1:
+                    log.debug(u'Dispatcher.dispatch: command not in handler')
                     raise CommandNotInHandlerError(command)
+                log.debug(u'Dispatcher.dispatch: command not found')
                 raise CommandNotFoundError(command)
             handler = getattr(handler, route[0].encode('utf-8'))
             route = route[1:]
-        return handler(*route, **kwargs)
+        log.debug(u'Dispatcher.dispatch: %r is a handler, calling it with '
+                    u'route=%r, kwargs=%r', handler, route, kwargs)
+        r = handler(*route, **kwargs)
+        log.debug(u'Dispatcher.dispatch: handler returned %s', r)
+        return r
 
 
 if __name__ == '__main__':
 
+    logging.basicConfig(
+        level   = logging.DEBUG,
+        format  = '%(asctime)s %(levelname)-8s %(message)s',
+        datefmt = '%H:%M:%S',
+    )
+
     @handler(u"test: Print all the arguments, return nothing")
     def test_func(*args):
         print 'func:', args
@@ -438,8 +574,8 @@ if __name__ == '__main__':
         def cmd1(self, *args):
             print 'class.cmd1:', args
         @handler(u"cmd2: Print all the arguments, return nothing")
-        def cmd2(self, *args):
-            print 'class.cmd2:', args
+        def cmd2(self, arg1, arg2):
+            print 'class.cmd2:', arg1, arg2
         subclass = TestClassSubHandler()
 
     class RootHandler(Handler):
@@ -448,29 +584,74 @@ if __name__ == '__main__':
 
     d = Dispatcher(RootHandler())
 
-    d.dispatch(r'''func arg1 arg2 arg3 "fourth 'argument' with \", a\ttab and\n\\n"''')
-    print 'inst commands:', tuple(d.dispatch('inst commands'))
-    print 'inst help:', d.dispatch('inst help')
-    d.dispatch('inst cmd1 arg1 arg2 arg3 arg4')
-    d.dispatch('inst cmd2 arg1 arg2')
-    print 'inst subclass help:', d.dispatch('inst subclass help')
-    d.dispatch('inst subclass subcmd arg1 arg2 arg3 arg4 arg5')
+    r = d.dispatch(r'''func arg1 arg2 arg3 "fourth 'argument' with \", a\ttab and\n\\n"''')
+    assert r is None
+    r = list(d.dispatch('inst commands'))
+    r.sort()
+    assert r == ['cmd1', 'cmd2', 'commands', 'help']
+    print 'inst commands:', r
+    r = d.dispatch('inst help')
+    assert r == {
+                'commands': u'List available commands',
+                'subclass': u'Undocumented handler',
+                'cmd1': u'cmd1: Print all the arguments, return nothing',
+                'cmd2': u'cmd2: Print all the arguments, return nothing',
+                'help': u'Show available commands with their help'
+    }
+    print 'inst help:', r
+    r = d.dispatch('inst cmd1 arg1 arg2 arg3 arg4')
+    assert r is None
+    r = d.dispatch('inst cmd2 arg1 arg2')
+    assert r is None
+    r = d.dispatch('inst subclass help')
+    assert r == {
+                'subcmd': u'subcmd: Print all the arguments, return nothing',
+                'commands': u'List available commands',
+                'help': u'Show available commands with their help'
+    }
+    print 'inst subclass help:', r
+    r = d.dispatch('inst subclass subcmd arg1 arg2 arg3 arg4 arg5')
+    assert r is None
     try:
         d.dispatch('')
+        assert False, 'It should raised a CommandNotSpecifiedError'
     except CommandNotSpecifiedError, e:
         print 'Not found:', e
     try:
         d.dispatch('sucutrule piquete culete')
+        assert False, 'It should raised a CommandNotFoundError'
     except CommandNotFoundError, e:
         print 'Not found:', e
     try:
         d.dispatch('inst cmd3 arg1 arg2 arg3')
+        assert False, 'It should raised a CommandNotInHandlerError'
     except CommandNotInHandlerError, e:
         print 'Not found:', e
     try:
         d.dispatch('inst')
+        assert False, 'It should raised a CommandIsAHandlerError'
     except CommandIsAHandlerError, e:
         print 'Not found:', e
+    try:
+        d.dispatch('inst cmd2 "just one arg"')
+        assert False, 'It should raised a WrongArgumentsError'
+    except WrongArgumentsError, e:
+        print 'Bad arguments:', e
+    try:
+        d.dispatch('inst cmd2 arg1 arg2 "an extra argument"')
+        assert False, 'It should raised a WrongArgumentsError'
+    except WrongArgumentsError, e:
+        print 'Bad arguments:', e
+    try:
+        d.dispatch('inst cmd2 arg1 arg2 arg3="unexpected keyword arg"')
+        assert False, 'It should raised a WrongArgumentsError'
+    except WrongArgumentsError, e:
+        print 'Bad arguments:', e
+    try:
+        d.dispatch('inst cmd2 arg1 arg2 arg2="duplicated keyword arg"')
+        assert False, 'It should raised a WrongArgumentsError'
+    except WrongArgumentsError, e:
+        print 'Bad arguments:', e
     print
     print
 
@@ -501,6 +682,12 @@ if __name__ == '__main__':
     assert p == ([u'=hello'], {}), p
     p = parse_command(r'\thello')
     assert p == ([u'\thello'], {}), p
+    p = parse_command(r'hello \n')
+    assert p == ([u'hello', u'\n'], {}), p
+    p = parse_command(r'hello \nmundo')
+    assert p == ([u'hello', u'\nmundo'], {}), p
+    p = parse_command(r'test \N')
+    assert p == ([u'test', None], {}), p
     p = parse_command(r'\N')
     assert p == ([None], {}), p
     p = parse_command(r'none=\N')
@@ -513,14 +700,12 @@ if __name__ == '__main__':
     assert p == ([u'\\None'], {}), p
     try:
         p = parse_command('hello=')
+        assert False, p + ' should raised a ParseError'
     except ParseError, e:
         pass
-    else:
-        assert False, p + ' should raised a ParseError'
     try:
         p = parse_command('"hello')
+        assert False, p + ' should raised a ParseError'
     except ParseError, e:
         pass
-    else:
-        assert False, p + ' should raised a ParseError'