Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

Changeset 1752

Show
Ignore:
Timestamp:
10/16/07 15:06:41
Author:
fumanchu
Message:

Reworked WSPBus plugins; most now have a 'subscribe' method which attaches them to the bus in a separate step from init. This allows frameworks to create canonical plugin instances but delay or skip their subscription.

Some plugins' method names changed (from e.g. __call__ to start) to take advantage of a new SimplePlugin? base class. The daemonize func is now a Daemonizer class for this reason.

Also removed the SubscribedObject? class; it was too much magic. The few consumers of it in CherryPy revert to calling canonical plugin instances instead.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/cherrypy/__init__.py

    r1739 r1752  
    190190class _TimeoutMonitor(restsrv.plugins.Monitor): 
    191191     
    192     def __init__(self, engine, channel=None): 
     192    def __init__(self, engine): 
    193193        self.servings = [] 
    194         restsrv.plugins.Monitor.__init__(self, engine, self.run, channel
     194        restsrv.plugins.Monitor.__init__(self, engine, self.run
    195195     
    196196    def acquire(self): 
     
    207207        for req, resp in self.servings: 
    208208            resp.check_timeout() 
    209 _timeout_monitor = _TimeoutMonitor(engine, "CherryPy Timeout Monitor"
     209timeout_monitor = _TimeoutMonitor(engine
    210210 
    211211# Add an autoreloader (the 'engine' config namespace may detach/attach it). 
    212212engine.autoreload = restsrv.plugins.Autoreloader(engine) 
    213 restsrv.plugins.Reexec(engine) 
    214 _thread_manager = restsrv.plugins.ThreadManager(engine
     213restsrv.plugins.Reexec(engine).subscribe() 
     214restsrv.plugins.ThreadManager(engine).subscribe(
    215215 
    216216from cherrypy import _cpserver 
     
    240240     
    241241    engine.subscribe('start', server.quickstart) 
    242     restsrv.plugins.SignalHandler(engine) 
     242    restsrv.plugins.SignalHandler(engine).subscribe() 
    243243    engine.start() 
    244244    engine.block() 
  • trunk/cherrypy/_cpconfig.py

    r1690 r1752  
    147147    """ 
    148148    if isinstance(other, basestring): 
    149         cherrypy.engine.publish('Autoreloader', 'add()', other) 
     149        cherrypy.engine.autoreload.files.add(other) 
    150150     
    151151    # Load other into base 
     
    255255        if isinstance(config, basestring): 
    256256            # Filename 
    257             cherrypy.engine.publish('Autoreloader', 'add()', config) 
     257            cherrypy.engine.autoreload.files.add(config) 
    258258            config = _Parser().dict_from_file(config) 
    259259        elif hasattr(config, 'read'): 
     
    291291    if k == 'autoreload_on': 
    292292        if v: 
    293             engine.autoreload.attach() 
     293            engine.autoreload.subscribe() 
    294294        else: 
    295             engine.autoreload.detach() 
     295            engine.autoreload.unsubscribe() 
    296296    elif k == 'autoreload_frequency': 
    297         engine.publish('Autoreloader', 'frequency', v) 
     297        engine.autoreload.frequency = v 
    298298    elif k == 'autoreload_match': 
    299         engine.publish('Autoreloader', 'match', v) 
     299        engine.autoreload.match = v 
    300300    elif k == 'reload_files': 
    301         engine.publish('Autoreloader', 'files', v) 
     301        engine.autoreload.files = v 
    302302    elif k == 'deadlock_poll_freq': 
    303         engine.publish('CherryPy Timeout Monitor', 'frequency', v) 
     303        cherrypy.timeout_monitor.frequency = v 
    304304    elif k == 'reexec_retry': 
    305305        engine.publish('restart', 'retry', v) 
  • trunk/cherrypy/_cpserver.py

    r1691 r1752  
    9999        self.mgr.httpservers[httpserver] = bind_addr 
    100100        self.mgr.start() 
     101        cherrypy.engine.subscribe('stop', self.mgr.stop) 
    101102     
    102103    def httpserver_from_self(self, httpserver=None): 
  • trunk/cherrypy/_cptree.py

    r1671 r1752  
    101101        resp = self.response_class() 
    102102        cherrypy.serving.load(req, resp) 
    103         cherrypy.engine.publish('CherryPy Timeout Monitor', 'acquire()'
     103        cherrypy.timeout_monitor.acquire(
    104104        cherrypy.engine.publish('acquire_thread') 
    105105         
     
    110110        req = cherrypy.serving.request 
    111111         
    112         cherrypy.engine.publish('CherryPy Timeout Monitor', 'release()'
     112        cherrypy.timeout_monitor.release(
    113113         
    114114        try: 
  • trunk/cherrypy/lib/sessions.py

    r1736 r1752  
    115115            # so that tool config can be accessed inside the method. 
    116116            t = cherrypy.restsrv.plugins.Monitor( 
    117                     cherrypy.engine, self.clean_up, "CP Session Cleanup"
    118             t.frequency = self.clean_freq 
     117                cherrypy.engine, self.clean_up, self.clean_freq
     118            t.subscribe() 
    119119            cls.clean_thread = t 
    120120            t.start() 
  • trunk/cherrypy/restsrv/plugins.py

    r1746 r1752  
    1313 
    1414 
    15 class SubscribedObject(object): 
    16     """An object whose attributes are manipulable via publishing. 
    17      
    18     An instance of this class will subscribe to a channel. Messages 
    19     published to that channel should be one of three types: 
    20      
    21     getattr: 
    22         >>> values = bus.publish('thing', 'attr') 
    23         Note that the 'publish' method will return a list of values 
    24         (from potentially multiple subscribed objects). 
    25      
    26     setattr: 
    27         >>> bus.publish('thing', 'attr', value) 
    28      
    29     call an attribute: 
    30         >>> bus.publish('thing', 'attr()', *a, **kw) 
    31     """ 
    32      
    33     def __init__(self, bus, channel): 
    34         self.bus = bus 
    35         self.channel = channel 
    36         bus.subscribe(self.channel, self.handle) 
    37      
    38     def handle(self, attr, *args, **kwargs): 
    39         if attr.endswith("()"): 
    40             # Call 
    41             return getattr(self, attr[:-2])(*args, **kwargs) 
    42         else: 
    43             if args: 
    44                 # Set 
    45                 return setattr(self, attr, args[0]) 
    46             else: 
    47                 # Get 
    48                 return getattr(self, attr) 
     15class SimplePlugin(object): 
     16    """Plugin base class which auto-subscribes methods for known channels.""" 
     17     
     18    def subscribe(self): 
     19        """Register this monitor as a (multi-channel) listener on the bus.""" 
     20        for channel in self.bus.listeners: 
     21            method = getattr(self, channel, None) 
     22            if method is not None: 
     23                self.bus.subscribe(channel, method) 
     24     
     25    def unsubscribe(self): 
     26        """Unregister this monitor as a listener on the bus.""" 
     27        for channel in self.bus.listeners: 
     28            method = getattr(self, channel, None) 
     29            if method is not None: 
     30                self.bus.unsubscribe(channel, method) 
     31 
    4932 
    5033 
     
    5235    """Register bus channels (and listeners) for system signals. 
    5336     
    54     By default, instantiating this object registers the following signals 
     37    By default, instantiating this object subscribes the following signals 
    5538    and listeners: 
    5639     
     
    6952    def __init__(self, bus): 
    7053        self.bus = bus 
    71          
    7254        # Set default handlers 
    73         for sig, func in [('SIGTERM', bus.exit), 
    74                           ('SIGHUP', bus.restart), 
    75                           ('SIGUSR1', bus.graceful)]: 
     55        self.handlers = {'SIGTERM': self.bus.exit, 
     56                         'SIGHUP': self.bus.restart, 
     57                         'SIGUSR1': self.bus.graceful, 
     58                         } 
     59     
     60    def subscribe(self): 
     61        for sig, func in self.handlers.iteritems(): 
    7662            try: 
    7763                self.set_handler(sig, func) 
     
    8066     
    8167    def set_handler(self, signal, listener=None): 
    82         """Register a handler for the given signal (number or name). 
     68        """Subscribe a handler for the given signal (number or name). 
    8369         
    8470        If the optional 'listener' argument is provided, it will be 
    85         registered as a listener for the given signal's channel. 
     71        subscribed as a listener for the given signal's channel. 
    8672         
    8773        If the given signal name or number is not available on the current 
     
    10793     
    10894    def _handle_signal(self, signum=None, frame=None): 
    109         """Python signal handler (self.set_handler registers it for you).""" 
     95        """Python signal handler (self.set_handler subscribes it for you).""" 
    11096        self.bus.publish(self.signals[signum]) 
    11197 
    11298 
    113 class Reexec(SubscribedObject): 
     99class Reexec(SimplePlugin): 
    114100    """A process restarter (using execv) for the 'restart' WSPBus channel. 
    115101     
     
    122108        self.bus = bus 
    123109        self.retry = retry 
    124         bus.subscribe('restart', self) 
    125      
    126     def __call__(self): 
     110     
     111    def restart(self): 
    127112        """Re-execute the current process.""" 
    128113        args = sys.argv[:] 
     
    149134 
    150135 
    151 class DropPrivileges(SubscribedObject): 
     136class DropPrivileges(SimplePlugin): 
    152137    """Drop privileges. 
    153138     
     
    164149            os.umask 
    165150        except AttributeError: 
    166             def __call__(self): 
     151            def start(self): 
    167152                """Drop privileges. Not implemented on this platform.""" 
    168153                raise NotImplementedError 
     
    170155            umask = None 
    171156             
    172             def __call__(self): 
     157            def start(self): 
    173158                """Drop privileges. Windows version (umask only).""" 
    174159                if umask is not None: 
     
    181166        umask = None 
    182167         
    183         def __call__(self): 
     168        def start(self): 
    184169            """Drop privileges. UNIX version (uid, gid, and umask).""" 
    185170            if not (uid is None and gid is None): 
     
    214199                self.bus.log('umask old: %03o, new: %03o' % 
    215200                                (old_umask, umask)) 
    216     __call__.priority = 70 
    217  
    218  
    219 def daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): 
     201    start.priority = 70 
     202 
     203 
     204class Daemonizer(SimplePlugin): 
    220205    """Daemonize the running script. 
    221206     
    222207    Use this with a Web Site Process Bus via: 
    223208         
    224         bus.subscribe('start', daemonize) 
    225      
    226     When this method returns, the process is completely decoupled from the 
    227     parent environment. 
    228     """ 
    229      
    230     # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 
    231     # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7) 
    232     # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 
    233      
    234     # Finish up with the current stdout/stderr 
    235     sys.stdout.flush() 
    236     sys.stderr.flush() 
    237      
    238     # Do first fork. 
    239     try: 
    240         pid = os.fork() 
    241         if pid == 0: 
    242             # This is the child process. Continue. 
    243             pass 
    244         else: 
    245             # This is the first parent. Exit, now that we've forked. 
    246             sys.exit(0) 
    247     except OSError, exc: 
    248         # Python raises OSError rather than returning negative numbers. 
    249         sys.exit("%s: fork #1 failed: (%d) %s\n" 
    250                  % (sys.argv[0], exc.errno, exc.strerror)) 
    251      
    252     os.setsid() 
    253      
    254     # Do second fork 
    255     try: 
    256         pid = os.fork() 
    257         if pid > 0: 
    258             sys.exit(0) # Exit second parent 
    259     except OSError, exc: 
    260         sys.exit("%s: fork #2 failed: (%d) %s\n" 
    261                  % (sys.argv[0], exc.errno, exc.strerror)) 
    262      
    263     os.chdir("/") 
    264     os.umask(0) 
    265      
    266     si = open(stdin, "r") 
    267     so = open(stdout, "a+") 
    268     se = open(stderr, "a+", 0) 
    269  
    270     # os.dup2(fd,fd2) will close fd2 if necessary (so we don't explicitly close 
    271     # stdin,stdout,stderr): 
    272     # http://docs.python.org/lib/os-fd-ops.html  
    273     os.dup2(si.fileno(), sys.stdin.fileno()) 
    274     os.dup2(so.fileno(), sys.stdout.fileno()) 
    275     os.dup2(se.fileno(), sys.stderr.fileno()) 
    276 daemonize.priority = 10 
    277  
    278  
    279 class PIDFile(object): 
     209        Daemonizer(bus).subscribe() 
     210     
     211    When this component finishes, the process is completely decoupled from 
     212    the parent environment. 
     213    """ 
     214     
     215    def __init__(self, bus, stdin='/dev/null', stdout='/dev/null', 
     216                 stderr='/dev/null'): 
     217        self.bus = bus 
     218        self.stdin = stdin 
     219        self.stdout = stdout 
     220        self.stderr = stderr 
     221     
     222    def start(self): 
     223        # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 
     224        # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7) 
     225        # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 
     226         
     227        # Finish up with the current stdout/stderr 
     228        sys.stdout.flush() 
     229        sys.stderr.flush() 
     230         
     231        # Do first fork. 
     232        try: 
     233            pid = os.fork() 
     234            if pid == 0: 
     235                # This is the child process. Continue. 
     236                pass 
     237            else: 
     238                # This is the first parent. Exit, now that we've forked. 
     239                sys.exit(0) 
     240        except OSError, exc: 
     241            # Python raises OSError rather than returning negative numbers. 
     242            sys.exit("%s: fork #1 failed: (%d) %s\n" 
     243                     % (sys.argv[0], exc.errno, exc.strerror)) 
     244         
     245        os.setsid() 
     246         
     247        # Do second fork 
     248        try: 
     249            pid = os.fork() 
     250            if pid > 0: 
     251                sys.exit(0) # Exit second parent 
     252        except OSError, exc: 
     253            sys.exit("%s: fork #2 failed: (%d) %s\n" 
     254                     % (sys.argv[0], exc.errno, exc.strerror)) 
     255         
     256        os.chdir("/") 
     257        os.umask(0) 
     258         
     259        si = open(stdin, "r") 
     260        so = open(stdout, "a+") 
     261        se = open(stderr, "a+", 0) 
     262 
     263        # os.dup2(fd,fd2) will close fd2 if necessary (so we don't explicitly close 
     264        # stdin,stdout,stderr): 
     265        # http://docs.python.org/lib/os-fd-ops.html  
     266        os.dup2(si.fileno(), sys.stdin.fileno()) 
     267        os.dup2(so.fileno(), sys.stdout.fileno()) 
     268        os.dup2(se.fileno(), sys.stderr.fileno()) 
     269         
     270        self.bus.log('Daemonized to PID: %s' % os.getpid()) 
     271    start.priority = 10 
     272 
     273 
     274class PIDFile(SimplePlugin): 
    280275    """Maintain a PID file via a WSPBus.""" 
    281276     
     
    283278        self.bus = bus 
    284279        self.pidfile = pidfile 
    285         bus.subscribe('start', self.start) 
    286         bus.subscribe('stop', self.stop) 
    287280     
    288281    def start(self): 
     
    310303 
    311304 
    312 class Monitor(SubscribedObject): 
     305class Monitor(SimplePlugin): 
    313306    """WSPBus listener to periodically run a callback in its own thread. 
    314307     
    315308    bus: a Web Site Process Bus object. 
    316309    callback: the function to call at intervals. 
    317     channel: optional. If provided, the name of the channel to use 
    318         for managing this object. Defaults to class.__name__, 
    319         so either provide a channel name or only use one instance 
    320         of any given subclass. 
    321      
    322310    frequency: the time in seconds between callback runs. 
    323311    """ 
     
    325313    frequency = 60 
    326314     
    327     def __init__(self, bus, callback, channel=None): 
     315    def __init__(self, bus, callback, frequency=60): 
     316        self.bus = bus 
    328317        self.callback = callback 
     318        self.frequency = frequency 
    329319        self.thread = None 
    330          
    331         if channel is None: 
    332             channel = self.__class__.__name__ 
    333         SubscribedObject.__init__(self, bus, channel) 
    334          
    335         self.listeners = [('start', self.start), 
    336                           ('stop', self.stop), 
    337                           ('graceful', self.graceful), 
    338                           ] 
    339         self.attach() 
    340      
    341     def attach(self): 
    342         """Register this monitor as a (multi-channel) listener on the bus.""" 
    343         for point, callback in self.listeners: 
    344             self.bus.subscribe(point, callback) 
    345      
    346     def detach(self): 
    347         """Unregister this monitor as a listener on the bus.""" 
    348         for point, callback in self.listeners: 
    349             self.bus.unsubscribe(point, callback) 
    350320     
    351321    def start(self): 
     
    353323        if self.frequency > 0: 
    354324            self.thread = PerpetualTimer(self.frequency, self.callback) 
    355             self.thread.setName("restsrv %s" % self.channel
     325            self.thread.setName("restsrv %s" % self.__class__.__name__
    356326            self.thread.start() 
    357327     
     
    376346    match = '.*' 
    377347     
    378     def __init__(self, bus): 
     348    def __init__(self, bus, frequency=1, match='.*'): 
    379349        self.mtimes = {} 
    380350        self.files = set() 
    381         Monitor.__init__(self, bus, self.run) 
    382      
    383     def add(self, filename): 
    384         """Add a file to monitor for changes.""" 
    385         self.files.add(filename) 
    386      
    387     def discard(self, filename): 
    388         """Remove a file to monitor for changes.""" 
    389         self.files.discard(filename) 
     351        self.match = match 
     352        Monitor.__init__(self, bus, self.run, frequency) 
    390353     
    391354    def start(self): 
    392355        """Start our own perpetual timer thread for self.run.""" 
    393356        self.mtimes = {} 
    394         self.files = set() 
    395357        Monitor.start(self) 
    396358     
     
    431393 
    432394 
    433 class ThreadManager(object): 
     395class ThreadManager(SimplePlugin): 
    434396    """Manager for HTTP request threads. 
    435397     
     
    450412        self.threads = {} 
    451413        self.bus = bus 
    452         bus.subscribe('acquire_thread', self.acquire) 
    453         bus.subscribe('release_thread', self.release) 
    454         bus.subscribe('stop', self.release_all) 
    455         bus.subscribe('graceful', self.release_all) 
    456      
    457     def acquire(self): 
     414     
     415    def acquire_thread(self): 
    458416        """Run 'start_thread' listeners for the current thread. 
    459417         
     
    469427            self.bus.publish('start_thread', i) 
    470428     
    471     def release(self): 
     429    def release_thread(self): 
    472430        """Release the current thread and run 'stop_thread' listeners.""" 
    473431        thread_ident = threading._get_ident() 
     
    476434            self.bus.publish('stop_thread', i) 
    477435     
    478     def release_all(self): 
     436    def stop(self): 
    479437        """Release all threads and run all 'stop_thread' listeners.""" 
    480438        for thread_ident, i in self.threads.iteritems(): 
    481439            self.bus.publish('stop_thread', i) 
    482440        self.threads.clear() 
     441    graceful = stop 
     442 
  • trunk/cherrypy/restsrv/servers.py

    r1691 r1752  
    3434        self.interrupt = None 
    3535     
     36    def subscribe(self): 
     37        self.engine.subscribe('start', self.start) 
     38        self.engine.subscribe('stop', self.stop) 
     39     
    3640    def start(self): 
    3741        """Start all registered HTTP servers.""" 
    3842        self.interrupt = None 
    3943        if not self.httpservers: 
    40             raise ValueError("No HTTP servers have been created. ") 
     44            raise ValueError("No HTTP servers have been created.") 
    4145        for httpserver in self.httpservers: 
    4246            self._start_http(httpserver) 
    43         self.engine.subscribe('stop', self.stop) 
    4447     
    4548    def _start_http(self, httpserver): 
  • trunk/cherrypy/restsrv/wspbus.py

    r1747 r1752  
    4242In general, there should only ever be a single Bus object per process. 
    4343Frameworks and site containers share a single Bus object by publishing 
    44 messages and registering (subscribing) listeners. 
     44messages and subscribing listeners. 
    4545 
    4646The Bus object works as a finite state machine which models the current 
     
    101101            del self._priorities[(channel, callback)] 
    102102     
    103     def register(self, plugin): 
    104         """Tells the plugin to attach all subscriptions to this bus.""" 
    105         plugin._attach(self) 
    106  
    107103    def publish(self, channel, *args, **kwargs): 
    108104        """Return output of all subscribers for the given channel.""" 

Hosted by WebFaction

Log in as guest/cpguest to create tickets