Changeset 1752
- Timestamp:
- 10/16/07 15:06:41
- Files:
-
- trunk/cherrypy/__init__.py (modified) (3 diffs)
- trunk/cherrypy/_cpconfig.py (modified) (3 diffs)
- trunk/cherrypy/_cpserver.py (modified) (1 diff)
- trunk/cherrypy/_cptree.py (modified) (2 diffs)
- trunk/cherrypy/lib/sessions.py (modified) (1 diff)
- trunk/cherrypy/restsrv/plugins.py (modified) (20 diffs)
- trunk/cherrypy/restsrv/servers.py (modified) (1 diff)
- trunk/cherrypy/restsrv/wspbus.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/cherrypy/__init__.py
r1739 r1752 190 190 class _TimeoutMonitor(restsrv.plugins.Monitor): 191 191 192 def __init__(self, engine , channel=None):192 def __init__(self, engine): 193 193 self.servings = [] 194 restsrv.plugins.Monitor.__init__(self, engine, self.run , channel)194 restsrv.plugins.Monitor.__init__(self, engine, self.run) 195 195 196 196 def acquire(self): … … 207 207 for req, resp in self.servings: 208 208 resp.check_timeout() 209 _timeout_monitor = _TimeoutMonitor(engine, "CherryPy Timeout Monitor")209 timeout_monitor = _TimeoutMonitor(engine) 210 210 211 211 # Add an autoreloader (the 'engine' config namespace may detach/attach it). 212 212 engine.autoreload = restsrv.plugins.Autoreloader(engine) 213 restsrv.plugins.Reexec(engine) 214 _thread_manager = restsrv.plugins.ThreadManager(engine)213 restsrv.plugins.Reexec(engine).subscribe() 214 restsrv.plugins.ThreadManager(engine).subscribe() 215 215 216 216 from cherrypy import _cpserver … … 240 240 241 241 engine.subscribe('start', server.quickstart) 242 restsrv.plugins.SignalHandler(engine) 242 restsrv.plugins.SignalHandler(engine).subscribe() 243 243 engine.start() 244 244 engine.block() trunk/cherrypy/_cpconfig.py
r1690 r1752 147 147 """ 148 148 if isinstance(other, basestring): 149 cherrypy.engine. publish('Autoreloader', 'add()',other)149 cherrypy.engine.autoreload.files.add(other) 150 150 151 151 # Load other into base … … 255 255 if isinstance(config, basestring): 256 256 # Filename 257 cherrypy.engine. publish('Autoreloader', 'add()',config)257 cherrypy.engine.autoreload.files.add(config) 258 258 config = _Parser().dict_from_file(config) 259 259 elif hasattr(config, 'read'): … … 291 291 if k == 'autoreload_on': 292 292 if v: 293 engine.autoreload. attach()293 engine.autoreload.subscribe() 294 294 else: 295 engine.autoreload. detach()295 engine.autoreload.unsubscribe() 296 296 elif k == 'autoreload_frequency': 297 engine. publish('Autoreloader', 'frequency', v)297 engine.autoreload.frequency = v 298 298 elif k == 'autoreload_match': 299 engine. publish('Autoreloader', 'match', v)299 engine.autoreload.match = v 300 300 elif k == 'reload_files': 301 engine. publish('Autoreloader', 'files', v)301 engine.autoreload.files = v 302 302 elif k == 'deadlock_poll_freq': 303 engine.publish('CherryPy Timeout Monitor', 'frequency', v)303 cherrypy.timeout_monitor.frequency = v 304 304 elif k == 'reexec_retry': 305 305 engine.publish('restart', 'retry', v) trunk/cherrypy/_cpserver.py
r1691 r1752 99 99 self.mgr.httpservers[httpserver] = bind_addr 100 100 self.mgr.start() 101 cherrypy.engine.subscribe('stop', self.mgr.stop) 101 102 102 103 def httpserver_from_self(self, httpserver=None): trunk/cherrypy/_cptree.py
r1671 r1752 101 101 resp = self.response_class() 102 102 cherrypy.serving.load(req, resp) 103 cherrypy. engine.publish('CherryPy Timeout Monitor', 'acquire()')103 cherrypy.timeout_monitor.acquire() 104 104 cherrypy.engine.publish('acquire_thread') 105 105 … … 110 110 req = cherrypy.serving.request 111 111 112 cherrypy. engine.publish('CherryPy Timeout Monitor', 'release()')112 cherrypy.timeout_monitor.release() 113 113 114 114 try: trunk/cherrypy/lib/sessions.py
r1736 r1752 115 115 # so that tool config can be accessed inside the method. 116 116 t = cherrypy.restsrv.plugins.Monitor( 117 cherrypy.engine, self.clean_up, "CP Session Cleanup")118 t. frequency = self.clean_freq117 cherrypy.engine, self.clean_up, self.clean_freq) 118 t.subscribe() 119 119 cls.clean_thread = t 120 120 t.start() trunk/cherrypy/restsrv/plugins.py
r1746 r1752 13 13 14 14 15 class SubscribedObject(object): 16 """An object whose attributes are manipulable via publishing. 17 18 An instance of this class will subscribe to a channel. Messages 19 published to that channel should be one of three types: 20 21 getattr: 22 >>> values = bus.publish('thing', 'attr') 23 Note that the 'publish' method will return a list of values 24 (from potentially multiple subscribed objects). 25 26 setattr: 27 >>> bus.publish('thing', 'attr', value) 28 29 call an attribute: 30 >>> bus.publish('thing', 'attr()', *a, **kw) 31 """ 32 33 def __init__(self, bus, channel): 34 self.bus = bus 35 self.channel = channel 36 bus.subscribe(self.channel, self.handle) 37 38 def handle(self, attr, *args, **kwargs): 39 if attr.endswith("()"): 40 # Call 41 return getattr(self, attr[:-2])(*args, **kwargs) 42 else: 43 if args: 44 # Set 45 return setattr(self, attr, args[0]) 46 else: 47 # Get 48 return getattr(self, attr) 15 class SimplePlugin(object): 16 """Plugin base class which auto-subscribes methods for known channels.""" 17 18 def subscribe(self): 19 """Register this monitor as a (multi-channel) listener on the bus.""" 20 for channel in self.bus.listeners: 21 method = getattr(self, channel, None) 22 if method is not None: 23 self.bus.subscribe(channel, method) 24 25 def unsubscribe(self): 26 """Unregister this monitor as a listener on the bus.""" 27 for channel in self.bus.listeners: 28 method = getattr(self, channel, None) 29 if method is not None: 30 self.bus.unsubscribe(channel, method) 31 49 32 50 33 … … 52 35 """Register bus channels (and listeners) for system signals. 53 36 54 By default, instantiating this object registers the following signals37 By default, instantiating this object subscribes the following signals 55 38 and listeners: 56 39 … … 69 52 def __init__(self, bus): 70 53 self.bus = bus 71 72 54 # Set default handlers 73 for sig, func in [('SIGTERM', bus.exit), 74 ('SIGHUP', bus.restart), 75 ('SIGUSR1', bus.graceful)]: 55 self.handlers = {'SIGTERM': self.bus.exit, 56 'SIGHUP': self.bus.restart, 57 'SIGUSR1': self.bus.graceful, 58 } 59 60 def subscribe(self): 61 for sig, func in self.handlers.iteritems(): 76 62 try: 77 63 self.set_handler(sig, func) … … 80 66 81 67 def set_handler(self, signal, listener=None): 82 """ Registera handler for the given signal (number or name).68 """Subscribe a handler for the given signal (number or name). 83 69 84 70 If the optional 'listener' argument is provided, it will be 85 registered as a listener for the given signal's channel.71 subscribed as a listener for the given signal's channel. 86 72 87 73 If the given signal name or number is not available on the current … … 107 93 108 94 def _handle_signal(self, signum=None, frame=None): 109 """Python signal handler (self.set_handler registers it for you)."""95 """Python signal handler (self.set_handler subscribes it for you).""" 110 96 self.bus.publish(self.signals[signum]) 111 97 112 98 113 class Reexec(S ubscribedObject):99 class Reexec(SimplePlugin): 114 100 """A process restarter (using execv) for the 'restart' WSPBus channel. 115 101 … … 122 108 self.bus = bus 123 109 self.retry = retry 124 bus.subscribe('restart', self) 125 126 def __call__(self): 110 111 def restart(self): 127 112 """Re-execute the current process.""" 128 113 args = sys.argv[:] … … 149 134 150 135 151 class DropPrivileges(S ubscribedObject):136 class DropPrivileges(SimplePlugin): 152 137 """Drop privileges. 153 138 … … 164 149 os.umask 165 150 except AttributeError: 166 def __call__(self):151 def start(self): 167 152 """Drop privileges. Not implemented on this platform.""" 168 153 raise NotImplementedError … … 170 155 umask = None 171 156 172 def __call__(self):157 def start(self): 173 158 """Drop privileges. Windows version (umask only).""" 174 159 if umask is not None: … … 181 166 umask = None 182 167 183 def __call__(self):168 def start(self): 184 169 """Drop privileges. UNIX version (uid, gid, and umask).""" 185 170 if not (uid is None and gid is None): … … 214 199 self.bus.log('umask old: %03o, new: %03o' % 215 200 (old_umask, umask)) 216 __call__.priority = 70217 218 219 def daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):201 start.priority = 70 202 203 204 class Daemonizer(SimplePlugin): 220 205 """Daemonize the running script. 221 206 222 207 Use this with a Web Site Process Bus via: 223 208 224 bus.subscribe('start', daemonize) 225 226 When this method returns, the process is completely decoupled from the 227 parent environment. 228 """ 229 230 # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 231 # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7) 232 # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 233 234 # Finish up with the current stdout/stderr 235 sys.stdout.flush() 236 sys.stderr.flush() 237 238 # Do first fork. 239 try: 240 pid = os.fork() 241 if pid == 0: 242 # This is the child process. Continue. 243 pass 244 else: 245 # This is the first parent. Exit, now that we've forked. 246 sys.exit(0) 247 except OSError, exc: 248 # Python raises OSError rather than returning negative numbers. 249 sys.exit("%s: fork #1 failed: (%d) %s\n" 250 % (sys.argv[0], exc.errno, exc.strerror)) 251 252 os.setsid() 253 254 # Do second fork 255 try: 256 pid = os.fork() 257 if pid > 0: 258 sys.exit(0) # Exit second parent 259 except OSError, exc: 260 sys.exit("%s: fork #2 failed: (%d) %s\n" 261 % (sys.argv[0], exc.errno, exc.strerror)) 262 263 os.chdir("/") 264 os.umask(0) 265 266 si = open(stdin, "r") 267 so = open(stdout, "a+") 268 se = open(stderr, "a+", 0) 269 270 # os.dup2(fd,fd2) will close fd2 if necessary (so we don't explicitly close 271 # stdin,stdout,stderr): 272 # http://docs.python.org/lib/os-fd-ops.html 273 os.dup2(si.fileno(), sys.stdin.fileno()) 274 os.dup2(so.fileno(), sys.stdout.fileno()) 275 os.dup2(se.fileno(), sys.stderr.fileno()) 276 daemonize.priority = 10 277 278 279 class PIDFile(object): 209 Daemonizer(bus).subscribe() 210 211 When this component finishes, the process is completely decoupled from 212 the parent environment. 213 """ 214 215 def __init__(self, bus, stdin='/dev/null', stdout='/dev/null', 216 stderr='/dev/null'): 217 self.bus = bus 218 self.stdin = stdin 219 self.stdout = stdout 220 self.stderr = stderr 221 222 def start(self): 223 # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 224 # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7) 225 # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 226 227 # Finish up with the current stdout/stderr 228 sys.stdout.flush() 229 sys.stderr.flush() 230 231 # Do first fork. 232 try: 233 pid = os.fork() 234 if pid == 0: 235 # This is the child process. Continue. 236 pass 237 else: 238 # This is the first parent. Exit, now that we've forked. 239 sys.exit(0) 240 except OSError, exc: 241 # Python raises OSError rather than returning negative numbers. 242 sys.exit("%s: fork #1 failed: (%d) %s\n" 243 % (sys.argv[0], exc.errno, exc.strerror)) 244 245 os.setsid() 246 247 # Do second fork 248 try: 249 pid = os.fork() 250 if pid > 0: 251 sys.exit(0) # Exit second parent 252 except OSError, exc: 253 sys.exit("%s: fork #2 failed: (%d) %s\n" 254 % (sys.argv[0], exc.errno, exc.strerror)) 255 256 os.chdir("/") 257 os.umask(0) 258 259 si = open(stdin, "r") 260 so = open(stdout, "a+") 261 se = open(stderr, "a+", 0) 262 263 # os.dup2(fd,fd2) will close fd2 if necessary (so we don't explicitly close 264 # stdin,stdout,stderr): 265 # http://docs.python.org/lib/os-fd-ops.html 266 os.dup2(si.fileno(), sys.stdin.fileno()) 267 os.dup2(so.fileno(), sys.stdout.fileno()) 268 os.dup2(se.fileno(), sys.stderr.fileno()) 269 270 self.bus.log('Daemonized to PID: %s' % os.getpid()) 271 start.priority = 10 272 273 274 class PIDFile(SimplePlugin): 280 275 """Maintain a PID file via a WSPBus.""" 281 276 … … 283 278 self.bus = bus 284 279 self.pidfile = pidfile 285 bus.subscribe('start', self.start)286 bus.subscribe('stop', self.stop)287 280 288 281 def start(self): … … 310 303 311 304 312 class Monitor(S ubscribedObject):305 class Monitor(SimplePlugin): 313 306 """WSPBus listener to periodically run a callback in its own thread. 314 307 315 308 bus: a Web Site Process Bus object. 316 309 callback: the function to call at intervals. 317 channel: optional. If provided, the name of the channel to use318 for managing this object. Defaults to class.__name__,319 so either provide a channel name or only use one instance320 of any given subclass.321 322 310 frequency: the time in seconds between callback runs. 323 311 """ … … 325 313 frequency = 60 326 314 327 def __init__(self, bus, callback, channel=None): 315 def __init__(self, bus, callback, frequency=60): 316 self.bus = bus 328 317 self.callback = callback 318 self.frequency = frequency 329 319 self.thread = None 330 331 if channel is None:332 channel = self.__class__.__name__333 SubscribedObject.__init__(self, bus, channel)334 335 self.listeners = [('start', self.start),336 ('stop', self.stop),337 ('graceful', self.graceful),338 ]339 self.attach()340 341 def attach(self):342 """Register this monitor as a (multi-channel) listener on the bus."""343 for point, callback in self.listeners:344 self.bus.subscribe(point, callback)345 346 def detach(self):347 """Unregister this monitor as a listener on the bus."""348 for point, callback in self.listeners:349 self.bus.unsubscribe(point, callback)350 320 351 321 def start(self): … … 353 323 if self.frequency > 0: 354 324 self.thread = PerpetualTimer(self.frequency, self.callback) 355 self.thread.setName("restsrv %s" % self. channel)325 self.thread.setName("restsrv %s" % self.__class__.__name__) 356 326 self.thread.start() 357 327 … … 376 346 match = '.*' 377 347 378 def __init__(self, bus ):348 def __init__(self, bus, frequency=1, match='.*'): 379 349 self.mtimes = {} 380 350 self.files = set() 381 Monitor.__init__(self, bus, self.run) 382 383 def add(self, filename): 384 """Add a file to monitor for changes.""" 385 self.files.add(filename) 386 387 def discard(self, filename): 388 """Remove a file to monitor for changes.""" 389 self.files.discard(filename) 351 self.match = match 352 Monitor.__init__(self, bus, self.run, frequency) 390 353 391 354 def start(self): 392 355 """Start our own perpetual timer thread for self.run.""" 393 356 self.mtimes = {} 394 self.files = set()395 357 Monitor.start(self) 396 358 … … 431 393 432 394 433 class ThreadManager( object):395 class ThreadManager(SimplePlugin): 434 396 """Manager for HTTP request threads. 435 397 … … 450 412 self.threads = {} 451 413 self.bus = bus 452 bus.subscribe('acquire_thread', self.acquire) 453 bus.subscribe('release_thread', self.release) 454 bus.subscribe('stop', self.release_all) 455 bus.subscribe('graceful', self.release_all) 456 457 def acquire(self): 414 415 def acquire_thread(self): 458 416 """Run 'start_thread' listeners for the current thread. 459 417 … … 469 427 self.bus.publish('start_thread', i) 470 428 471 def release (self):429 def release_thread(self): 472 430 """Release the current thread and run 'stop_thread' listeners.""" 473 431 thread_ident = threading._get_ident() … … 476 434 self.bus.publish('stop_thread', i) 477 435 478 def release_all(self):436 def stop(self): 479 437 """Release all threads and run all 'stop_thread' listeners.""" 480 438 for thread_ident, i in self.threads.iteritems(): 481 439 self.bus.publish('stop_thread', i) 482 440 self.threads.clear() 441 graceful = stop 442 trunk/cherrypy/restsrv/servers.py
r1691 r1752 34 34 self.interrupt = None 35 35 36 def subscribe(self): 37 self.engine.subscribe('start', self.start) 38 self.engine.subscribe('stop', self.stop) 39 36 40 def start(self): 37 41 """Start all registered HTTP servers.""" 38 42 self.interrupt = None 39 43 if not self.httpservers: 40 raise ValueError("No HTTP servers have been created. ")44 raise ValueError("No HTTP servers have been created.") 41 45 for httpserver in self.httpservers: 42 46 self._start_http(httpserver) 43 self.engine.subscribe('stop', self.stop)44 47 45 48 def _start_http(self, httpserver): trunk/cherrypy/restsrv/wspbus.py
r1747 r1752 42 42 In general, there should only ever be a single Bus object per process. 43 43 Frameworks and site containers share a single Bus object by publishing 44 messages and registering (subscribing)listeners.44 messages and subscribing listeners. 45 45 46 46 The Bus object works as a finite state machine which models the current … … 101 101 del self._priorities[(channel, callback)] 102 102 103 def register(self, plugin):104 """Tells the plugin to attach all subscriptions to this bus."""105 plugin._attach(self)106 107 103 def publish(self, channel, *args, **kwargs): 108 104 """Return output of all subscribers for the given channel."""

