Changeset 1017
- Timestamp:
- 03/26/06 16:14:11
- Files:
-
- trunk/cherrypy/test/helper.py (modified) (5 diffs)
- trunk/cherrypy/test/modpy.py (added)
- trunk/cherrypy/test/static/dirback.jpg (added)
- trunk/cherrypy/test/test.py (modified) (9 diffs)
- trunk/cherrypy/test/test_baseurl_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_cache_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_combinedfilters.py (modified) (2 diffs)
- trunk/cherrypy/test/test_config.py (modified) (4 diffs)
- trunk/cherrypy/test/test_core.py (modified) (22 diffs)
- trunk/cherrypy/test/test_custom_filters.py (modified) (4 diffs)
- trunk/cherrypy/test/test_decodingencoding_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_gzip_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_http.py (modified) (3 diffs)
- trunk/cherrypy/test/test_logdebuginfo_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_objectmapping.py (modified) (2 diffs)
- trunk/cherrypy/test/test_response_headers_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_session_filter.py (modified) (3 diffs)
- trunk/cherrypy/test/test_sessionauthenticate_filter.py (modified) (3 diffs)
- trunk/cherrypy/test/test_static_filter.py (modified) (3 diffs)
- trunk/cherrypy/test/test_virtualhost_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_wsgiapp_filter.py (modified) (2 diffs)
- trunk/cherrypy/test/test_xmlrpc_filter.py (modified) (3 diffs)
- trunk/cherrypy/test/webtest.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/cherrypy/test/helper.py
r1014 r1017 5 5 6 6 Usage: 7 Each individual test_*.py module imports this module (helper),8 usually to make an instance of CPWebCase, and then call testmain().9 10 The CP test suite script (test.py) imports this module and calls11 run_test_suite, possibly more than once. CP applications may also12 import test.py (to use TestHarness), which then calls helper.py.7 Each individual test_*.py module imports this module (helper), 8 usually to make an instance of CPWebCase, and then call testmain(). 9 10 The CP test suite script (test.py) imports this module and calls 11 run_test_suite, possibly more than once. CP applications may also 12 import test.py (to use TestHarness), which then calls helper.py. 13 13 """ 14 14 … … 18 18 # or vice-versa, unless you *really* know what you're doing. 19 19 20 import os, os.path21 20 import re 22 import socket23 import StringIO24 21 import sys 25 22 import thread 26 import threading27 import time28 import types29 23 30 24 import cherrypy 31 from cherrypy import _cpwsgi32 25 from cherrypy.lib import httptools 33 26 import webtest 34 35 for _x in dir(cherrypy):36 y = getattr(cherrypy, _x)37 if isinstance(y, types.ClassType) and issubclass(y, cherrypy.Error):38 webtest.ignored_exceptions.append(y)39 40 41 def onerror():42 """Assign to _cp_on_error to enable webtest server-side debugging."""43 handled = webtest.server_error()44 if not handled:45 cherrypy._cputil._cp_on_error()46 47 48 def error_middleware(environ, start_response):49 started = [False]50 def start(s, h, exc=None):51 started[0] = True52 start_response(s, h, exc)53 54 try:55 for chunk in _cpwsgi.wsgiApp(environ, start):56 yield chunk57 except (KeyboardInterrupt, SystemExit):58 raise59 except Exception, x:60 # We should only reach this point if server.throw_errors is True.61 if not started[0]:62 start_response("500 Server Error", [])63 yield "THROWN ERROR: %s" % x.__class__.__name__64 65 66 class TestWSGI(_cpwsgi.WSGIServer):67 """Wrapper for WSGI server so we can test thrown errors."""68 69 def __init__(self):70 _cpwsgi.WSGIServer.__init__(self)71 self.mount_points = [(base, error_middleware)72 for base, wsgiapp in self.mount_points]73 27 74 28 … … 83 37 sys.exit() 84 38 85 def _getRequest(self, url, headers, method, body):86 # Like getPage, but for serverless requests.87 webtest.ServerError.on = False88 self.url = url89 90 requestLine = "%s %s HTTP/1.1" % (method.upper(), url)91 headers = webtest.cleanHeaders(headers, method, body,92 self.HOST, self.PORT)93 if body is not None:94 body = StringIO.StringIO(body)95 96 request = cherrypy.server.request((self.HOST, self.PORT), self.HOST, "http")97 response = request.run(requestLine, headers, body)98 99 self.status = response.status100 self.headers = response.header_list101 102 # Build a list of request cookies from the previous response cookies.103 self.cookies = [('Cookie', v) for k, v in self.headers104 if k.lower() == 'set-cookie']105 106 try:107 newbody = []108 for chunk in response.body:109 newbody.append(chunk)110 request.close()111 except Exception, ex:112 if cherrypy.config.get("stream_response", False):113 try:114 request.close()115 except:116 cherrypy.log(cherrypy._cputil.formatExc())117 # Pass the error through118 raise ex119 120 s, h, b = cherrypy._cputil.bareError()121 # Don't reset status or headers; we're emulating an error which122 # occurs after status and headers have been written to the client.123 for chunk in b:124 newbody.append(chunk)125 self.body = "".join(newbody)126 127 if webtest.ServerError.on:128 self.tearDown()129 raise webtest.ServerError()130 131 39 def tearDown(self): 132 40 pass 133 41 134 def getPage(self, url, headers=None, method="GET", body=None): 135 """Open the url with debugging support. Return status, headers, body.""" 136 # Install a custom error handler, so errors in the server will: 137 # 1) show server tracebacks in the test output, and 138 # 2) stop the HTTP request (if any) and ignore further assertions. 139 cherrypy.root._cp_on_error = onerror 140 # Backward compatibility: 141 cherrypy.root._cpOnError = onerror 142 42 def getPage(self, url, headers=None, method="GET", body=None, protocol="HTTP/1.1"): 43 """Open the url. Return status, headers, body.""" 143 44 if self.mount_point: 144 45 url = httptools.urljoin(self.mount_point, url) 145 46 146 if cherrypy.server.httpserver is None: 147 self._getRequest(url, headers, method, body) 148 else: 149 webtest.WebCase.getPage(self, url, headers, method, body) 47 webtest.WebCase.getPage(self, url, headers, method, body, protocol) 150 48 151 49 def assertErrorPage(self, status, message=None, pattern=''): … … 201 99 setConfig(conf) 202 100 cherrypy.server.start_with_callback(_run_test_suite_thread, 203 args = (moduleNames, conf), server_class = server) 101 args = (moduleNames, conf), 102 server_class = server) 204 103 205 104 def _run_test_suite_thread(moduleNames, conf): … … 212 111 setConfig(conf) 213 112 113 m = __import__(testmod, globals(), locals()) 114 setup = getattr(m, "setup_server", None) 115 if setup: 116 setup() 214 117 suite = CPTestLoader.loadTestsFromName(testmod) 215 118 CPTestRunner.run(suite) 216 119 thread.interrupt_main() 217 120 218 def testmain( server=None, conf=None):121 def testmain(conf=None, *args, **kwargs): 219 122 """Run __main__ as a test module, with webtest debugging.""" 220 123 if conf is None: 221 124 conf = {} 222 125 setConfig(conf) 223 cherrypy.server.start_with_callback(_test_main_thread, 224 server_class = server) 126 cherrypy.server.start_with_callback(_test_main_thread, *args, **kwargs) 225 127 226 128 def _test_main_thread(): trunk/cherrypy/test/test.py
r1005 r1017 18 18 19 19 class TestHarness(object): 20 21 20 """A test harness for the CherryPy framework and CherryPy applications.""" 22 21 23 # The first server in the list is the default server. 24 available_servers = {'serverless': (0, "Serverless", None), 25 'wsgi': (1, "Native WSGI Server", 26 "cherrypy.test.helper.TestWSGI"), 22 def __init__(self, tests=None, server=None, protocol="HTTP/1.1", port=8000): 23 """Constructor to populate the TestHarness instance. 24 25 tests should be a list of module names (strings). 26 """ 27 self.protocol = protocol 28 self.port = port 29 self.server = server 30 self.tests = tests or [] 31 32 def run(self, conf=None): 33 """Run the test harness.""" 34 import cherrypy 35 v = sys.version.split()[0] 36 print "Python version used to run this test script:", v 37 print "CherryPy version", cherrypy.__version__ 38 print 39 40 if conf is None: 41 conf = {'server.socket_host': '127.0.0.1', 42 'server.socket_port': self.port, 43 'server.thread_pool': 10, 44 'server.log_to_screen': False, 45 'server.environment': "production", 46 'server.show_tracebacks': True, 47 } 48 elif isinstance(conf, basestring): 49 conf = cherrypy.config.dict_from_config_file(conf) 50 51 conf['server.protocol_version'] = self.protocol 52 self._run(conf) 53 54 def _run(self, conf): 55 # helper must be imported lazily so the coverage tool 56 # can run against module-level statements within cherrypy. 57 # Also, we have to do a relative import here, not 58 # "from cherrypy.test import helper", because the latter 59 # would stick a second instance of webtest in sys.modules, 60 # and we wouldn't be able to globally override the port anymore. 61 import helper 62 webtest.WebCase.PORT = self.port 63 print 64 print "Running tests:", self.server 65 helper.run_test_suite(self.tests, self.server, conf) 66 67 68 class CommandLineParser(object): 69 available_servers = {'wsgi': "cherrypy._cpwsgi.WSGIServer", 70 'modpy': "modpy", 27 71 } 28 72 default_server = "wsgi" 29 73 30 def __init__(self, available_tests ):74 def __init__(self, available_tests, args=sys.argv[1:]): 31 75 """Constructor to populate the TestHarness instance. 32 76 33 77 available_tests should be a list of module names (strings). 34 """35 self.available_tests = available_tests36 37 self.cover = False38 self.profile = False39 self.protocol = "HTTP/1.0"40 self.basedir = None41 self.PORT = 800042 43 self.servers = []44 self.tests = []45 46 def load(self, args=sys.argv[1:]):47 """Populate a TestHarness from sys.argv.48 78 49 79 args defaults to sys.argv[1:], but you can provide a different 50 80 set of args if you like. 51 81 """ 52 53 longopts = ['cover', 'profile', '1.1', 'help', 54 'basedir=', 'all', 'port='] 82 self.available_tests = available_tests 83 self.cover = False 84 self.profile = False 85 self.server = None 86 self.port = 8080 87 self.protocol = "HTTP/1.1" 88 89 longopts = ['cover', 'profile', '1.1', 'help', 'basedir=', 'port=', 90 'server='] 55 91 longopts.extend(self.available_servers) 56 92 longopts.extend(self.available_tests) … … 62 98 sys.exit(2) 63 99 64 self.cover = False65 self.profile = False66 self.protocol = "HTTP/1.0"67 self.basedir = None68 69 self.servers = []70 100 self.tests = [] 71 101 … … 78 108 elif o == "--profile": 79 109 self.profile = True 80 elif o == "--1. 1":81 self.protocol = "HTTP/1. 1"110 elif o == "--1.0": 111 self.protocol = "HTTP/1.0" 82 112 elif o == "--basedir": 83 113 self.basedir = a 84 elif o == "--all":85 self.servers = self.available_servers.keys()86 114 elif o == "--port": 87 self.PORT = int(a) 115 self.port = int(a) 116 elif o == "--server": 117 if a in self.available_servers: 118 a = self.available_servers[a] 119 self.server = a 88 120 else: 89 121 o = o[2:] 90 if o in self.available_servers and o not in self.servers: 91 self.servers.append(o) 92 elif o in self.available_tests and o not in self.tests: 122 if o in self.available_tests and o not in self.tests: 93 123 self.tests.append(o) 94 124 … … 99 129 sys.exit(2) 100 130 101 if not self.server s:102 self.server s =[self.default_server]131 if not self.server: 132 self.server = self.available_servers[self.default_server] 103 133 104 134 if not self.tests: … … 110 140 print """CherryPy Test Program 111 141 Usage: 112 test.py --server s* --1.1 --cover --basedir=path --profile --tests**142 test.py --server=* --1.1 --cover --basedir=path --profile --tests** 113 143 114 144 """ 115 145 print ' * servers:' 116 s = [(val, name) for name, val in self.available_servers.iteritems()] 117 s.sort() 118 for val, name in s: 146 for name, val in self.available_servers.iteritems(): 119 147 if name == self.default_server: 120 print ' -- ' + name, '(default)'148 print ' --%s: %s (default)' % (name, val) 121 149 else: 122 print ' -- ' + name123 124 print """ --all (runs all servers in order)150 print ' --%s: %s' % (name, val) 151 152 print """ 125 153 126 154 --1.1: use HTTP/1.1 servers instead of default HTTP/1.0 … … 220 248 def run(self, conf=None): 221 249 """Run the test harness.""" 222 self.load()223 224 250 # Start the coverage tool before importing cherrypy, 225 251 # so module-level global statements are covered. … … 227 253 self.start_coverage() 228 254 229 import cherrypy230 v = sys.version.split()[0]231 print "Python version used to run this test script:", v232 print "CherryPy version", cherrypy.__version__233 print234 235 if conf is None:236 conf = {'server.socket_host': '127.0.0.1',237 'server.socket_port': self.PORT,238 'server.thread_pool': 10,239 'server.log_to_screen': False,240 'server.environment': "production",241 'server.show_tracebacks': True,242 }243 elif isinstance(conf, basestring):244 conf = cherrypy.config.dict_from_config_file(conf)245 246 conf['server.protocol_version'] = self.protocol247 248 255 if self.profile: 249 256 conf['profiling.on'] = True 250 257 251 self._run_all_servers(conf) 258 if self.server == 'modpy': 259 import modpy 260 modpy.ModPythonTestHarness(self.tests, self.server, 261 self.protocol, self.port).run(conf) 262 else: 263 TestHarness(self.tests, self.server, 264 self.protocol, self.port).run(conf) 252 265 253 266 if self.profile: … … 259 272 if self.cover: 260 273 self.stop_coverage() 261 262 def _run_all_servers(self, conf): 263 # helper must be imported lazily so the coverage tool 264 # can run against module-level statements within cherrypy. 265 # Also, we have to do a relative import here, not 266 # "from cherrypy.test import helper", because the latter 267 # would stick a second instance of webtest in sys.modules, 268 # and we wouldn't be able to globally override the port anymore. 269 import helper 270 s = [self.available_servers[name] for name in self.servers] 271 s.sort() 272 webtest.WebCase.PORT = self.PORT 273 for priority, name, cls in s: 274 print 275 print "Running tests:", name 276 helper.run_test_suite(self.tests, cls, conf) 277 278 279 class CPTestHarness(TestHarness): 280 281 def _run_all_servers(self, conf): 282 # helper must be imported lazily so the coverage tool 283 # can run against module-level statements within cherrypy. 284 # Also, we have to do a relative import here, not 285 # "from cherrypy.test import helper", because the latter 286 # would stick a second instance of webtest in sys.modules, 287 # and we wouldn't be able to globally override the port anymore. 288 import helper, test_states 289 s = [self.available_servers[name] for name in self.servers] 290 s.sort() 291 webtest.WebCase.PORT = self.PORT 292 for priority, name, cls in s: 293 print 294 print "Running tests:", name 295 reload(test_states) 296 test_states.run(cls, conf) 297 helper.run_test_suite(self.tests, cls, conf) 274 298 275 299 276 def prefer_parent_path(): … … 320 297 'test_gzip_filter', 321 298 'test_logdebuginfo_filter', 299 'test_objectmapping', 322 300 'test_response_headers_filter', 323 'test_objectmapping',324 301 'test_static_filter', 325 'test_tutorials',302 ## 'test_tutorials', 326 303 'test_virtualhost_filter', 327 304 'test_session_filter', 328 305 'test_sessionauthenticate_filter', 306 ## 'test_states', 329 307 'test_xmlrpc_filter', 330 308 'test_wsgiapp_filter', 331 309 ] 332 C PTestHarness(testList).run()310 CommandLineParser(testList).run() 333 311 334 312 print trunk/cherrypy/test/test_baseurl_filter.py
r910 r1017 5 5 6 6 7 class Root: 8 def index(self): 9 raise cherrypy.HTTPRedirect('dummy') 10 index.exposed = True 7 def setup_server(): 8 class Root: 9 def index(self): 10 raise cherrypy.HTTPRedirect('dummy') 11 index.exposed = True 12 13 cherrypy.tree.mount(Root()) 14 cherrypy.config.update({ 15 'server.environment': 'production', 16 'server.log_to_screen': False, 17 'base_url_filter.on': True, 18 'base_url_filter.base_url': 'http://www.mydomain.com' 19 }) 11 20 12 cherrypy.root = Root()13 cherrypy.config.update({14 'server.environment': 'production',15 'server.log_to_screen': False,16 'base_url_filter.on': True,17 'base_url_filter.base_url': 'http://www.mydomain.com'18 })19 21 20 22 import helper … … 29 31 30 32 if __name__ == '__main__': 33 setup_server() 31 34 helper.testmain() 32 trunk/cherrypy/test/test_cache_filter.py
r906 r1017 6 6 7 7 8 class Root: 9 def __init__(self): 10 cherrypy.counter = 0 11 12 def index(self): 13 cherrypy.counter += 1 14 msg = "visit #%s" % cherrypy.counter 15 return msg 16 index.exposed = True 8 def setup_server(): 9 class Root: 10 def __init__(self): 11 cherrypy.counter = 0 12 13 def index(self): 14 cherrypy.counter += 1 15 msg = "visit #%s" % cherrypy.counter 16 return msg 17 index.exposed = True 17 18 18 cherrypy.root = Root()19 cherrypy.config.update({20 'server.log_to_screen': False,21 'server.environment': 'production',22 'cache_filter.on': True,23 })19 cherrypy.root = Root() 20 cherrypy.config.update({ 21 'server.log_to_screen': False, 22 'server.environment': 'production', 23 'cache_filter.on': True, 24 }) 24 25 25 26 … … 35 36 36 37 if __name__ == '__main__': 38 setup_server() 37 39 helper.testmain() 38 40 trunk/cherrypy/test/test_combinedfilters.py
r856 r1017 7 7 europoundUnicode = u'\x80\xa3' 8 8 9 class Root: 10 def index(self): 11 yield u"Hello," 12 yield u"world" 13 yield europoundUnicode 14 index.exposed = True 9 def setup_server(): 10 class Root: 11 def index(self): 12 yield u"Hello," 13 yield u"world" 14 yield europoundUnicode 15 index.exposed = True 15 16 16 cherrypy.root = Root()17 cherrypy.config.update({18 'server.log_to_screen': False,19 'server.environment': 'production',20 'gzip_filter.on': True,21 'encoding_filter.on': True,22 })17 cherrypy.root = Root() 18 cherrypy.config.update({ 19 'server.log_to_screen': False, 20 'server.environment': 'production', 21 'gzip_filter.on': True, 22 'encoding_filter.on': True, 23 }) 23 24 24 25 import helper … … 38 39 39 40 if __name__ == '__main__': 41 setup_server() 40 42 helper.testmain() trunk/cherrypy/test/test_config.py
r1004 r1017 7 7 8 8 9 class Root: 10 def index(self, key): 11 return cherrypy.config.get(key, "None") 12 index.exposed = True 13 global_ = index 14 xyz = index 9 def setup_server(): 10 11 class Root: 12 def index(self, key): 13 return cherrypy.config.get(key, "None") 14 index.exposed = True 15 global_ = index 16 xyz = index 17 18 class Foo: 19 def index(self, key): 20 return cherrypy.config.get(key, "None") 21 index.exposed = True 22 bar = index 23 nex = index 24 25 class Env: 26 def index(self, key): 27 return str(cherrypy.config.get(key, "None")) 28 index.exposed = True 29 prod = index 30 embed = index 31 32 def wrong(self): 33 conf = "\n[global]\nserver.environment = production\n" 34 cherrypy.config.update(file=StringIO.StringIO(conf)) 35 wrong.exposed=True 36 37 cherrypy.tree.mount(Root()) 38 cherrypy.root.foo = Foo() 39 40 cherrypy.config.update({ 41 'global': {'server.log_to_screen': False, 42 'server.environment': 'production', 43 'server.show_tracebacks': True, 44 }, 45 '/': { 46 'foo': 'this', 47 'bar': 'that', 48 }, 49 '/foo': { 50 'foo': 'this2', 51 'baz': 'that2', 52 }, 53 '/foo/bar': { 54 'foo': 'this3', 55 'bax': 'this4', 56 }, 57 }) 15 58 16 class Foo: 17 def index(self, key): 18 return cherrypy.config.get(key, "None") 19 index.exposed = True 20 bar = index 21 nex = index 59 _env_conf = {'/': {'server.environment': 'development'}, 60 '/prod': {'server.environment': 'production'}, 61 '/embed': {'server.environment': 'embedded'}, 62 } 63 cherrypy.tree.mount(Env(), "/env", _env_conf) 22 64 23 class Env: 24 def index(self, key): 25 return str(cherrypy.config.get(key, "None")) 26 index.exposed = True 27 prod = index 28 embed = index 65 # Shortcut syntax--should get put in the "global" bucket 66 cherrypy.config.update({'luxuryyacht': 'throatwobblermangrove'}) 29 67 30 cherrypy.tree.mount(Root())31 cherrypy.root.foo = Foo()32 68 33 cherrypy.config.update({ 34 'global': {'server.log_to_screen': False, 35 'server.environment': 'production', 36 'server.show_tracebacks': True, 37 }, 38 '/': { 39 'foo': 'this', 40 'bar': 'that', 41 }, 42 '/foo': { 43 'foo': 'this2', 44 'baz': 'that2', 45 }, 46 '/foo/bar': { 47 'foo': 'this3', 48 'bax': 'this4', 49 }, 50 }) 51 52 _env_conf = {'/': {'server.environment': 'development'}, 53 '/prod': {'server.environment': 'production'}, 54 '/embed': {'server.environment': 'embedded'}, 55 } 56 cherrypy.tree.mount(Env(), "/env", _env_conf) 57 58 # Shortcut syntax--should get put in the "global" bucket 59 cherrypy.config.update({'luxuryyacht': 'throatwobblermangrove'}) 69 # Client-side code # 60 70 61 71 import helper … … 65 75 def testConfig(self): 66 76 tests = [ 67 ('*', 'luxuryyacht', 'throatwobblermangrove'),77 ## ('*', 'luxuryyacht', 'throatwobblermangrove'), 68 78 ('/', 'nex', 'None'), 69 79 ('/', 'foo', 'this'), … … 83 93 84 94 def testUnrepr(self): 85 self.assertRaises(cherrypy.WrongConfigValue, cherrypy.config.update,86 file=StringIO.StringIO("""87 [global] 88 server.environment = production 89 """))95 err = ('WrongConfigValue: ("section: ' 96 "'global', option: 'server.environment', value: 'production'" 97 '''", 'UnknownType', ('production',))''') 98 self.getPage("/env/wrong") 99 self.assertErrorPage(500, pattern=err) 90 100 91 101 def testEnvironments(self): … … 105 115 106 116 if __name__ == '__main__': 117 setup_server() 107 118 helper.testmain() trunk/cherrypy/test/test_core.py
r1010 r1017 7 7 from cherrypy.lib import cptools, httptools 8 8 import types 9 9 10 import os 10 11 localDir = os.path.dirname(__file__) 11 12 13 class Root: 14 15 def index(self): 16 return "hello" 17 index.exposed = True 18 19 def andnow(self): 20 return "the larch" 21 andnow.exposed = True 22 23 def global_(self): 24 pass 25 global_.exposed = True 26 27 cherrypy.root = Root() 28 29 30 class TestType(type): 31 """Metaclass which automatically exposes all functions in each subclass, 32 and adds an instance of the subclass as an attribute of cherrypy.root. 33 """ 34 def __init__(cls, name, bases, dct): 35 type.__init__(name, bases, dct) 36 for value in dct.itervalues(): 37 if isinstance(value, types.FunctionType): 38 value.exposed = True 39 setattr(cherrypy.root, name.lower(), cls()) 40 class Test(object): 41 __metaclass__ = TestType 42 43 44 class Params(Test): 45 46 def index(self, thing): 47 return repr(thing) 48 49 def ismap(self, x, y): 50 return "Coordinates: %s, %s" % (x, y) 51 52 def default(self, *args, **kwargs): 53 return "args: %s kwargs: %s" % (args, kwargs) 54 55 56 class Status(Test): 57 58 def index(self): 59 return "normal" 60 61 def blank(self): 62 cherrypy.response.status = "" 63 64 # According to RFC 2616, new status codes are OK as long as they 65 # are between 100 and 599. 66 67 # Here is an illegal code... 68 def illegal(self): 69 cherrypy.response.status = 781 70 return "oops" 71 72 # ...and here is an unknown but legal code. 73 def unknown(self): 74 cherrypy.response.status = "431 My custom error" 75 return "funky" 76 77 # Non-numeric code 78 def bad(self): 79 cherrypy.response.status = "error" 80 return "bad news" 81 82 83 class Redirect(Test): 84 85 class Error: 86 def _cp_on_error(self): 87 raise cherrypy.HTTPRedirect("/errpage") 88 89 def index(self): 90 raise NameError() 91 index.exposed = True 92 error = Error() 93 94 def index(self): 95 return "child" 96 97 def by_code(self, code): 98 raise cherrypy.HTTPRedirect("somewhere else", code) 99 100 def nomodify(self): 101 raise cherrypy.HTTPRedirect("", 304) 102 103 def proxy(self): 104 raise cherrypy.HTTPRedirect("proxy", 305) 105 106 def stringify(self): 107 return str(cherrypy.HTTPRedirect("/")) 108 109 110 class LoginFilter: 111 112 def before_main(self): 113 if cherrypy.config.get("auth.on", False): 114 if not getattr(cherrypy.request, "login", None): 115 raise cherrypy.InternalRedirect("/internalredirect/login") 116 117 class InternalRedirect(Test): 118 119 _cp_filters = [LoginFilter()] 120 121 def index(self): 122 raise cherrypy.InternalRedirect("/") 123 124 def petshop(self, user_id): 125 if user_id == "parrot": 126 # Trade it for a slug when redirecting 127 raise cherrypy.InternalRedirect('/image/getImagesByUser', 128 "user_id=slug") 129 elif user_id == "terrier": 130 # Trade it for a fish when redirecting 131 raise cherrypy.InternalRedirect('/image/getImagesByUser', 132 {"user_id": "fish"}) 133 else: 134 raise cherrypy.InternalRedirect('/image/getImagesByUser') 135 136 def secure(self): 137 return "Welcome!" 138 139 def login(self): 140 return "Please log in" 141 142 143 class Image(Test): 144 145 def getImagesByUser(self, user_id): 146 return "0 images for %s" % user_id 147 148 149 class Flatten(Test): 150 151 def as_string(self): 152 return "content" 153 154 def as_list(self): 155 return ["con", "tent"] 156 157 def as_yield(self): 158 yield "content" 159 160 def as_dblyield(self): 161 yield self.as_yield() 162 163 def as_refyield(self): 164 for chunk in self.as_yield(): 165 yield chunk 166 167 168 class Error(Test): 169 170 def custom(self): 171 raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!") 172 173 def noexist(self): 174 raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!") 175 176 def page_method(self): 177 raise ValueError() 178 179 def page_yield(self): 180 yield "howdy" 181 raise ValueError() 182 183 def page_streamed(self): 184 yield "word up" 185 raise ValueError() 186 yield "very oops" 187 188 def cause_err_in_finalize(self): 189 # Since status must start with an int, this should error. 190 cherrypy.response.status = "ZOO OK" 191 192 def log_unhandled(self): 193 raise ValueError() 194 195 def rethrow(self): 196 """Test that an error raised here will be thrown out to the server.""" 197 raise ValueError() 198 199 200 class Ranges(Test): 201 202 def get_ranges(self): 203 h = cherrypy.request.headers.get('Range') 204 return repr(httptools.getRanges(h, 8)) 205 206 def slice_file(self): 207 path = os.path.join(os.getcwd(), os.path.dirname(__file__)) 208 return cptools.serveFile(os.path.join(path, "static/index.html")) 209 210 211 class Expect(Test): 212 213 def expectation_failed(self): 214 expect = cherrypy.request.headers.elements("Expect") 215 if expect and expect[0].value != '100-continue': 216 raise cherrypy.HTTPError(400) 217 raise cherrypy.HTTPError(417, 'Expectation Failed') 218 219 class Headers(Test): 220 221 def doubledheaders(self): 222 # From http://www.cherrypy.org/ticket/165: 223 # "header field names should not be case sensitive sayes the rfc. 224 # if i set a headerfield in complete lowercase i end up with two 225 # header fields, one in lowercase, the other in mixed-case." 226 227 # Set the most common headers 228 hMap = cherrypy.response.headers 229 hMap['content-type'] = "text/html" 230 hMap['content-length'] = 18 231 hMap['server'] = 'CherryPy headertest' 232 hMap['location'] = ('%s://127.0.0.1:%s/headers/' 233 % (cherrypy.request.remote_port, 234 cherrypy.request.scheme)) 235 236 # Set a rare header for fun 237 hMap['Expires'] = 'Thu, 01 Dec 2194 16:00:00 GMT' 238 239 return "double header test" 240 241 242 class HeaderElements(Test): 243 244 def get_elements(self, headername): 245 e = cherrypy.request.headers.elements(headername) 246 return "\n".join([str(x) for x in e]) 247 12 log_file = os.path.join(localDir, "error.log") 13 log_access_file = os.path.join(localDir, "access.log") 248 14 249 15 defined_http_methods = ("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", 250 16 "TRACE", "CONNECT") 251 class Method(Test): 252 253 def index(self): 254 m = cherrypy.request.method 255 if m in defined_http_methods: 256 return m 257 258 if m == "LINK": 259 raise cherrypy.HTTPError(405) 260 else: 261 raise cherrypy.HTTPError(501) 262 263 def parameterized(self, data): 264 return data 265 266 def request_body(self): 267 # This should be a file object (temp file), 268 # which CP will just pipe back out if we tell it to. 269 return cherrypy.request.body 270 271 class Divorce: 272 """HTTP Method handlers shouldn't collide with normal method names. 273 For example, a GET-handler shouldn't collide with a method named 'get'. 274 275 If you build HTTP method dispatching into CherryPy, rewrite this class 276 to use your new dispatch mechanism and make sure that: 277 "GET /divorce HTTP/1.1" maps to divorce.index() and 278 "GET /divorce/get?ID=13 HTTP/1.1" maps to divorce.get() 279 """ 280 281 documents = {} 282 283 def index(self): 284 yield "<h1>Choose your document</h1>\n" 285 yield "<ul>\n" 286 for id, contents in self.documents: 287 yield (" <li><a href='/divorce/get?ID=%s'>%s</a>: %s</li>\n" 288 % (id, id, contents)) 289 yield "</ul>" 290 index.exposed = True 291 292 def get(self, ID): 293 return ("Divorce document %s: %s" % 294 (ID, self.documents.get(ID, "empty"))) 295 get.exposed = True 296 297 cherrypy.root.divorce = Divorce() 298 299 300 class Cookies(Test): 301 302 def single(self, name): 303 cookie = cherrypy.request.simple_cookie[name] 304 cherrypy.response.simple_cookie[name] = cookie.value 305 306 def multiple(self, names): 307 for name in names: 17 18 19 def setup_server(): 20 class Root: 21 22 def index(self): 23 return "hello" 24 index.exposed = True 25 26 def andnow(self): 27 return "the larch" 28 andnow.exposed = True 29 30 def global_(self): 31 pass 32 global_.exposed = True 33 34 def delglobal(self): 35 del self.__class__.__dict__['global_'] 36 delglobal.exposed = True 37 38 def defct(self, newct): 39 newct = "text/%s" % newct 40 cherrypy.config.update({'server.default_content_type': newct}) 41 defct.exposed = True 42 43 def upload(self, file): 44 return "Size: %s" % len(file.file.read()) 45 upload.exposed = True 46 47 cherrypy.root = Root() 48 49 50 class TestType(type): 51 """Metaclass which automatically exposes all functions in each subclass, 52 and adds an instance of the subclass as an attribute of cherrypy.root. 53 """ 54 def __init__(cls, name, bases, dct): 55 type.__init__(name, bases, dct) 56 for value in dct.itervalues(): 57 if isinstance(value, types.FunctionType): 58 value.exposed = True 59 setattr(cherrypy.root, name.lower(), cls()) 60 class Test(object): 61 __metaclass__ = TestType 62 63 64 class Params(Test): 65 66 def index(self, thing): 67 return repr(thing) 68 69 def ismap(self, x, y): 70 return "Coordinates: %s, %s" % (x, y) 71 72 def default(self, *args, **kwargs): 73 return "args: %s kwargs: %s" % (args, kwargs) 74 75 76 class Status(Test): 77 78 def index(self): 79 return "normal" 80 81 def blank(self): 82 cherrypy.response.status = "" 83 84 # According to RFC 2616, new status codes are OK as long as they 85 # are between 100 and 599. 86 87 # Here is an illegal code... 88 def illegal(self): 89 cherrypy.response.status = 781 90 return "oops" 91 92 # ...and here is an unknown but legal code. 93 def unknown(self): 94 cherrypy.response.status = "431 My custom error" 95 return "funky" 96 97 # Non-numeric code 98 def bad(self): 99 cherrypy.response.status = "error" 100 return "bad news" 101 102 103 class Redirect(Test): 104 105 class Error: 106 def _cp_on_error(self): 107 raise cherrypy.HTTPRedirect("/errpage") 108 109 def index(self): 110 raise NameError() 111 index.exposed = True 112 error = Error() 113 114 def index(self): 115 return "child" 116 117 def by_code(self, code): 118 raise cherrypy.HTTPRedirect("somewhere else", code) 119 120 def nomodify(self): 121 raise cherrypy.HTTPRedirect("", 304) 122 123 def proxy(self): 124 raise cherrypy.HTTPRedirect("proxy", 305) 125 126 def stringify(self): 127 return str(cherrypy.HTTPRedirect("/")) 128 129 130 class LoginFilter: 131 132 def before_main(self): 133 if cherrypy.config.get("auth.on", False): 134 if not getattr(cherrypy.request, "login", None): 135 raise cherrypy.InternalRedirect("/internalredirect/login") 136 137 class InternalRedirect(Test): 138 139 _cp_filters = [LoginFilter()] 140 141 def index(self): 142 raise cherrypy.InternalRedirect("/") 143 144 def petshop(self, user_id): 145 if user_id == "parrot": 146 # Trade it for a slug when redirecting 147 raise cherrypy.InternalRedirect('/image/getImagesByUser', 148 "user_id=slug") 149 elif user_id == "terrier": 150 # Trade it for a fish when redirecting 151 raise cherrypy.InternalRedirect('/image/getImagesByUser', 152 {"user_id": "fish"}) 153 else: 154 raise cherrypy.InternalRedirect('/image/getImagesByUser') 155 156 def secure(self): 157 return "Welcome!" 158 159 def login(self): 160 return "Please log in" 161 162 163 class Image(Test): 164 165 def getImagesByUser(self, user_id): 166 return "0 images for %s" % user_id 167 168 169 class Flatten(Test): 170 171 def as_string(self): 172 return "content" 173 174 def as_list(self): 175 return ["con", "tent"] 176 177 def as_yield(self): 178 yield "content" 179 180 def as_dblyield(self): 181 yield self.as_yield() 182 183 def as_refyield(self): 184 for chunk in self.as_yield(): 185 yield chunk 186 187 188 class Error(Test): 189 190 def custom(self): 191 raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!") 192 193 def noexist(self): 194 raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!") 195 196 def page_method(self): 197 raise ValueError() 198 199 def page_yield(self): 200 yield "howdy" 201 raise ValueError() 202 203 def page_streamed(self): 204 yield "word up" 205 raise ValueError() 206 yield "very oops" 207 208 def cause_err_in_finalize(self): 209 # Since status must start with an int, this should error. 210 cherrypy.response.status = "ZOO OK" 211 212 def log_unhandled(self): 213 raise ValueError() 214 215 def rethrow(self): 216 """Test that an error raised here will be thrown out to the server.""" 217 raise ValueError() 218 219 220 class Ranges(Test): 221 222 def get_ranges(self): 223 h = cherrypy.request.headers.get('Range') 224 return repr(httptools.getRanges(h, 8)) 225 226 def slice_file(self): 227 path = os.path.join(os.getcwd(), os.path.dirname(__file__)) 228 return cptools.serveFile(os.path.join(path, "static/index.html")) 229 230 231 class Expect(Test): 232 233 def expectation_failed(self): 234 expect = cherrypy.request.headers.elements("Expect") 235 if expect and expect[0].value != '100-continue': 236 raise cherrypy.HTTPError(400) 237 raise cherrypy.HTTPError(417, 'Expectation Failed') 238 239 class Headers(Test): 240 241 def doubledheaders(self): 242 # From http://www.cherrypy.org/ticket/165: 243 # "header field names should not be case sensitive sayes the rfc. 244 # if i set a headerfield in complete lowercase i end up with two 245 # header fields, one in lowercase, the other in mixed-case." 246 247 # Set the most common headers 248 hMap = cherrypy.response.headers 249 hMap['content-type'] = "text/html" 250 hMap['content-length'] = 18 251 hMap['server'] = 'CherryPy headertest' 252 hMap['location'] = ('%s://127.0.0.1:%s/headers/' 253 % (cherrypy.request.remote_port, 254 cherrypy.request.scheme)) 255 256 # Set a rare header for fun 257 hMap['Expires'] = 'Thu, 01 Dec 2194 16:00:00 GMT' 258 259 return "double header test" 260 261 262 class HeaderElements(Test): 263 264 def get_elements(self, headername): 265 e = cherrypy.request.headers.elements(headername) 266 return "\n".join([str(x) for x in e]) 267 268 269 class Method(Test): 270 271 def index(self): 272 m = cherrypy.request.method 273 if m in defined_http_methods: 274 return m 275 276 if m == "LINK": 277 raise cherrypy.HTTPError(405) 278 else: 279 raise cherrypy.HTTPError(501) 280 281 def parameterized(self, data): 282 return data 283 284 def request_body(self): 285 # This should be a file object (temp file), 286 # which CP will just pipe back out if we tell it to. 287 return cherrypy.request.body 288 289 class Divorce: 290 """HTTP Method handlers shouldn't collide with normal method names. 291 For example, a GET-handler shouldn't collide with a method named 'get'. 292 293 If you build HTTP method dispatching into CherryPy, rewrite this class 294 to use your new dispatch mechanism and make sure that: 295 "GET /divorce HTTP/1.1" maps to divorce.index() and 296 "GET /divorce/get?ID=13 HTTP/1.1" maps to divorce.get() 297 """ 298 299 documents = {} 300 301 def index(self): 302 yield "<h1>Choose your document</h1>\n" 303 yield "<ul>\n" 304 for id, contents in self.documents: 305 yield (" <li><a href='/divorce/get?ID=%s'>%s</a>: %s</li>\n" 306 % (id, id, contents)) 307 yield "</ul>" 308 index.exposed = True 309 310 def get(self, ID): 311 return ("Divorce document %s: %s" % 312 (ID, self.documents.get(ID, "empty"))) 313 get.exposed = True 314 315 cherrypy.root.divorce = Divorce() 316 317 318 class Cookies(Test): 319 320 def single(self, name): 308 321 cookie = cherrypy.request.simple_cookie[name] 309 322 cherrypy.response.simple_cookie[name] = cookie.value 310 311 class MaxRequestSize(Test): 312 313 def index(self): 314 return "OK" 315 316 def upload(self, file): 317 return "Size: %s" % len(file.file.read()) 318 319 320 class ThreadLocal(Test): 321 322 def index(self): 323 existing = repr(getattr(cherrypy.request, "asdf", None)) 324 cherrypy.request.asdf = "rassfrassin" 325 return existing 326 327 328 log_file = os.path.join(localDir, "error.log") 329 log_access_file = os.path.join(localDir, "access.log") 330 331 cherrypy.config.update({ 332 'global': {'server.log_to_screen': False, 333 'server.environment': 'production', 334 'server.show_tracebacks': True, 335 }, 336 '/flatten': { 337 'server.log_file': log_file, 338 'server.log_access_file': log_access_file, 339 }, 340 '/params': { 341 'server.log_file': log_file, 342 }, 343 '/internalredirect/secure': { 344 'auth.on': True, 345 }, 346 '/error': { 347 'server.log_file': log_file, 348 'server.log_tracebacks': True, 349 }, 350 '/error/page_streamed': { 351 'stream_response': True, 352 }, 353 '/error/cause_err_in_finalize': { 354 'server.show_tracebacks': False, 355 }, 356 '/error/custom': { 357 'error_page.404': os.path.join(localDir, "static/index.html"), 358 }, 359 '/error/noexist': { 360 'error_page.404': "nonexistent.html", 361 }, 362 '/error/log_unhandled': { 363 'server.log_tracebacks': False, 364 'server.log_unhandled_tracebacks': True, 365 }, 366 '/error/rethrow': { 367 'server.throw_errors': True, 368 }, 369 }) 370 323 324 def multiple(self, names): 325 for name in names: 326 cookie = cherrypy.request.simple_cookie[name] 327 cherrypy.response.simple_cookie[name] = cookie.value 328 329 330 class ThreadLocal(Test): 331 332 def index(self): 333 existing = repr(getattr(cherrypy.request, "asdf", None)) 334 cherrypy.request.asdf = "rassfrassin" 335 return existing 336 337 cherrypy.config.update({ 338 'global': { 339 'server.log_to_screen': False, 340 'server.environment': 'production', 341 'server.show_tracebacks': True, 342 'server.max_request_body_size': 200, 343 'server.max_request_header_size': 500, 344 }, 345 '/flatten': { 346 'server.log_file': log_file, 347 'server.log_access_file': log_access_file, 348 }, 349 '/params': { 350 'server.log_file': log_file, 351 }, 352 '/internalredirect/secure': { 353 'auth.on': True, 354 }, 355 '/error': { 356 'server.log_file': log_file, 357 'server.log_tracebacks': True, 358 }, 359 '/error/page_streamed': { 360 'stream_response': True, 361 }, 362 '/error/cause_err_in_finalize': { 363 'server.show_tracebacks': False, 364 }, 365 '/error/custom': { 366 'error_page.404': os.path.join(localDir, "static/index.html"), 367 }, 368 '/error/noexist': { 369 'error_page.404': "nonexistent.html", 370 }, 371 '/error/log_unhandled': { 372 'server.log_tracebacks': False, 373 'server.log_unhandled_tracebacks': True, 374 }, 375 '/error/rethrow': { 376 'server.throw_errors': True, 377 }, 378 }) 379 380 381 # Client-side code # 371 382 372 383 import helper … … 407 418 self.getPage("/status/") 408 419 self.assertBody('normal') 409 self.assertStatus( '200 OK')420 self.assertStatus(200) 410 421 411 422 self.getPage("/status/blank") 412 423 self.assertBody('') 413 self.assertStatus( '200 OK')424 self.assertStatus(200) 414 425 415 426 self.getPage("/status/illegal") 416 self.assertStatus( '500 Internal error')427 self.assertStatus(500) 417 428 msg = "Illegal response status from server (out of range)." 418 429 self.assertErrorPage(500, msg) … … 420 431 self.getPage("/status/unknown") 421 432 self.assertBody('funky') 422 self.assertStatus( '431 My custom error')433 self.assertStatus(431) 423 434 424 435 self.getPage("/status/bad") 425 self.assertStatus( '500 Internal error')436 self.assertStatus(500) 426 437 msg = "Illegal response status from server (non-numeric)." 427 438 self.assertErrorPage(500, msg) … … 433 444 self.getPage("/flatten/as_string") 434 445 self.assertBody('content') 435 self.assertStatus( '200 OK')446 self.assertStatus(200) 436 447 437 448 self.getPage("/flatten/as_yield") 438 449 self.assertBody('content') 439 self.assertStatus( '200 OK')450 self.assertStatus(200) 440 451 441 452 data = open(log_access_file, "rb").readlines() … … 492 503 self.getPage("/redirect/") 493 504 self.assertBody('child') 494 self.assertStatus( '200 OK')505 self.assertStatus(200) 495 506 496 507 # Test that requests for index methods without a trailing slash … … 514 525 self.getPage("/redirect/by_code?code=300") 515 526 self.assertMatchesBody(r"<a href='(.*)somewhere else'>\1somewhere else</a>") 516 self.assertStatus( '300 Multiple Choices')527 self.assertStatus(300) 517 528 518 529 self.getPage("/redirect/by_code?code=301") 519 530 self.assertMatchesBody(r"<a href='(.*)somewhere else'>\1somewhere else</a>") 520 self.assertStatus( '301 Moved Permanently')531 self.assertStatus(301) 521 532 522 533 self.getPage("/redirect/by_code?code=302") 523 534 self.assertMatchesBody(r"<a href='(.*)somewhere else'>\1somewhere else</a>") 524 self.assertStatus( '302 Found')535 self.assertStatus(302) 525 536 526 537 self.getPage("/redirect/by_code?code=303") 527 538 self.assertMatchesBody(r"<a href='(.*)somewhere else'>\1somewhere else</a>") 528 self.assertStatus( '303 See Other')539 self.assertStatus(303) 529 540 530 541 self.getPage("/redirect/by_code?code=307") 531 542 self.assertMatchesBody(r"<a href='(.*)somewhere else'>\1somewhere else</a>") 532 self.assertStatus( '307 Temporary Redirect')543 self.assertStatus(307) 533 544 534 545 self.getPage("/redirect/nomodify") 535 546 self.assertBody('') 536 self.assertStatus( '304 Not modified')547 self.assertStatus(304) 537 548 538 549 self.getPage("/redirect/proxy") 539 550 self.assertBody('') 540 self.assertStatus( '305 Use Proxy')551 self.assertStatus(305) 541 552 542 553 # InternalRedirect 543 554 self.getPage("/internalredirect/") 544 555 self.assertBody('hello') 545 self.assertStatus( '200 OK')556 self.assertStatus(200) 546 557 547 558 self.getPage("/internalredirect/petshop?user_id=Sir-not-appearing-in-this-film") 548 559 self.assertBody('0 images for Sir-not-appearing-in-this-film') 549 self.assertStatus( '200 OK')560 self.assertStatus(200) 550 561 551 562 self.getPage("/internalredirect/petshop?user_id=parrot") 552 563 self.assertBody('0 images for slug') 553 self.assertStatus( '200 OK')564 self.assertStatus(200) 554 565 555 566 self.getPage("/internalredirect/petshop?user_id=terrier") 556 567 self.assertBody('0 images for fish') 557 self.assertStatus( '200 OK')568 self.assertStatus(200) 558 569 559 570 self.getPage("/internalredirect/secure") 560 571 self.assertBody('Please log in') 561 self.assertStatus( '200 OK')572 self.assertStatus(200) 562 573 563 574 # HTTPRedirect on error … … 567 578 568 579 # Make sure str(HTTPRedirect()) works. 569 self.getPage("/redirect/stringify") 570 self.assertStatus('200 OK') 571 protocol = cherrypy.config.get('server.protocol_version') 572 if protocol == "HTTP/1.1": 573 self.assertBody("(['http://127.0.0.1:%s/'], 303)" % self.PORT) 574 else: 575 self.assertBody("(['http://127.0.0.1:%s/'], 302)" % self.PORT) 580 self.getPage("/redirect/stringify", protocol="HTTP/1.0") 581 self.assertStatus(200) 582 self.assertBody("(['http://127.0.0.1:%s/'], 302)" % self.PORT) 583 self.getPage("/redirect/stringify", protocol="HTTP/1.1") 584 self.assertStatus(200) 585 self.assertBody("(['http://127.0.0.1:%s/'], 303)" % self.PORT) 576 586 577 587 def testFlatten(self): … … 584 594 def testErrorHandling(self): 585 595 self.getPage("/error/missing") 586 self.assertStatus( "404 Not Found")596 self.assertStatus(404) 587 597 self.assertErrorPage(404, "The path '/error/missing' was not found.") 588 598 … … 597 607 self.assertErrorPage(500, pattern=valerr) 598 608 599 import cherrypy 600 # stream_response should be True for this path. 601 if cherrypy.server.httpserver is None: 602 self.assertRaises(ValueError, self.getPage, 603 "/error/page_streamed") 604 else: 605 self.getPage("/error/page_streamed") 606 # Because this error is raised after the response body has 607 # started, the status should not change to an error status. 608 self.assertStatus("200 OK") 609 self.assertBody("word upUnrecoverable error in the server.") 609 self.getPage("/error/page_streamed") 610 # Because this error is raised after the response body has 611 # started, the status should not change to an error status. 612 self.assertStatus(200) 613 self.assertBody("word upUnrecoverable error in the server.") 610 614 611 615 # No traceback should be present … … 618 622 # Test custom error page. 619 623 self.getPage("/error/custom") 620 self.assertStatus( "404 Not Found")624 self.assertStatus(404) 621 625 self.assertEqual(len(self.body), 513) 622 626 self.assertBody("Hello, world\r\n" + (" " * 499)) … … 625 629 # Note that the message is escaped for HTML (ticket #310). 626 630 self.getPage("/error/noexist") 627 self.assertStatus( "404 Not Found")631 self.assertStatus(404) 628 632 msg = ("No, <b>really</b>, not found!<br />" 629 633 "In addition, the custom error page failed:\n<br />" 630 634 "[Errno 2] No such file or directory: 'nonexistent.html'") 631 635 self.assertInBody(msg) 632 633 # Test server.throw_errors (ticket #186). 634 s = cherrypy.server.httpserver 635 if s: 636 self.getPage("/error/rethrow") 637 self.assertBody("THROWN ERROR: ValueError") 638 else: 639 self.assertRaises(ValueError, self.getPage, "/error/rethrow") 636 ## 637 ## # Test server.throw_errors (ticket #186). 638 ## self.getPage("/error/rethrow") 639 ## self.assertBody("THROWN ERROR: ValueError") 640 640 641 641 def testRanges(self): 642 protocol = cherrypy.config.get('server.protocol_version') 643 if protocol == "HTTP/1.1": 644 self.getPage("/ranges/get_ranges", [('Range', 'bytes=3-6')]) 645 self.assertBody("[(3, 7)]") 646 647 # Test multiple ranges and a suffix-byte-range-spec, for good measure. 648 self.getPage("/ranges/get_ranges", [('Range', 'bytes=2-4,-1')]) 649 self.assertBody("[(2, 5), (7, 8)]") 650 651 # Get a partial file. 652 self.getPage("/ranges/slice_file", [('Range', 'bytes=2-5')]) 653 self.assertStatus("206 Partial Content") 654 self.assertHeader("Content-Type", "text/html") 655 self.assertHeader("Content-Range", "bytes 2-5/14") 656 self.assertBody("llo,") 657 658 # What happens with overlapping ranges (and out of order, too)? 659 self.getPage("/ranges/slice_file", [('Range', 'bytes=4-6,2-5')]) 660 self.assertStatus("206 Partial Content") 661 ct = "" 662 for k, v in self.headers: 663 if k.lower() == "content-type": 664 ct = v 665 break 666 expected_type = "multipart/byteranges; boundary=" 667 self.assert_(ct.startswith(expected_type)) 668 boundary = ct[len(expected_type):] 669 expected_body = """--%s 642 self.getPage("/ranges/get_ranges", [('Range', 'bytes=3-6')]) 643 self.assertBody("[(3, 7)]") 644 645 # Test multiple ranges and a suffix-byte-range-spec, for good measure. 646 self.getPage("/ranges/get_ranges", [('Range', 'bytes=2-4,-1')]) 647 self.assertBody("[(2, 5), (7, 8)]") 648 649 # Get a partial file. 650 self.getPage("/ranges/slice_file", [('Range', 'bytes=2-5')]) 651 self.assertStatus(206) 652 self.assertHeader("Content-Type", "text/html") 653 self.assertHeader("Content-Range", "bytes 2-5/14") 654 self.assertBody("llo,") 655 656 # What happens with overlapping ranges (and out of order, too)? 657 self.getPage("/ranges/slice_file", [('Range', 'bytes=4-6,2-5')]) 658 self.assertStatus(206) 659 ct = "" 660 for k, v in self.headers: 661 if k.lower() == "content-type": 662 ct = v 663 break 664 expected_type = "multipart/byteranges; boundary=" 665 self.assert_(ct.startswith(expected_type)) 666 boundary = ct[len(expected_type):] 667 expected_body = """--%s 670 668 Content-type: text/html 671 669 Content-range: bytes 4-6/14 … … 678 676 llo, 679 677 --%s""" % (boundary, boundary, boundary) 680 self.assertBody(expected_body) 681 self.assertHeader("Content-Length") 682 683 # Test "416 Requested Range Not Satisfiable" 684 self.getPage("/ranges/slice_file", [('Range', 'bytes=2300-2900')]) 685 self.assertStatus("416 Requested Range Not Satisfiable") 686 self.assertHeader("Content-Range", "bytes */14") 687 else: 688 self.getPage("/ranges/slice_file", [('Range', 'bytes=2-5')]) 689 self.assertStatus("200 OK") 690 self.assertBody("Hello, world\r\n") 678 self.assertBody(expected_body) 679 self.assertHeader("Content-Length") 680 681 # Test "416 Requested Range Not Satisfiable" 682 self.getPage("/ranges/slice_file", [('Range', 'bytes=2300-2900')]) 683 self.assertStatus(416) 684 self.assertHeader("Content-Range", "bytes */14") 685 686 # Test Range behavior with HTTP/1.0 request 687 self.getPage("/ranges/slice_file", [('Range', 'bytes=2-5')], protocol="HTTP/1.0") 688 self.assertStatus(200) 689 self.assertBody("Hello, world\r\n") 691 690 692 691 def testExpect(self): … … 696 695 697 696 self.getPage("/expect/expectation_failed", [('Content-Length', '200'), e]) 698 self.assertStatus( '417 Expectation Failed')697 self.assertStatus(417) 699 698 700 699 def testHeaderElements(self): … … 702 701 h = [('Accept', 'audio/*; q=0.2, audio/basic')] 703 702 self.getPage("/headerelements/get_elements?headername=Accept", h) 704 self.assertStatus( "200 OK")703 self.assertStatus(200) 705 704 self.assertBody("audio/basic\n" 706 705 "audio/*;q=0.2") … … 708 707 h = [('Accept', 'text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c')] 709 708 self.getPage("/headerelements/get_elements?headername=Accept", h) 710 self.assertStatus( "200 OK")709 self.assertStatus(200) 711 710 self.assertBody("text/x-c\n" 712 711 "text/html\n" … … 717 716 h = [('Accept', 'text/*, text/html, text/html;level=1, */*')] 718 717 self.getPage("/headerelements/get_elements?headername=Accept", h) 719 self.assertStatus( "200 OK")718 self.assertStatus(200) 720 719 self.assertBody("text/html;level=1\n" 721 720 "text/html\n" … … 762 761 # HEAD requests should not return any body. 763 762 if m == "HEAD": 764 m = "" 765 766 self.assertBody(m) 763 self.assertBody("") 764 elif m == "TRACE": 765 # Some HTTP servers (like modpy) have their own TRACE support 766 self.assertEqual(self.body[:5], "TRACE") 767 else: 768 self.assertBody(m) 767 769 768 770 # Request a PUT method with a form-urlencoded body … … 781 783 # Request a disallowed method 782 784 self.getPage("/method/", method="LINK") 783 self.assertStatus( "405 Method Not Allowed")785 self.assertStatus(405) 784 786 785 787 # Request an unknown method 786 788 self.getPage("/method/", method="SEARCH") 787 self.assertStatus( "501 Not Implemented")789 self.assertStatus(501) 788 790 789 791 # Request the OPTIONS method with a Request-URI of "*". 790 792 self.getPage("*", method="OPTIONS") 791 self.assertStatus( "200 OK")793 self.assertStatus(200) 792 794 # Content-Length header required for OPTIONS with no response body. 793 795 # See http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.2 … … 796 798 # Now be really dastardly and delete our custom global_ handler, 797 799 # to see if the default one works. 798 del Root.global_800 self.getPage("/delglobal") 799 801 self.getPage("*", method="OPTIONS") 800 self.assertStatus( "200 OK")802 self.assertStatus(200) 801 803 self.assertHeader("Allow", 'HEAD, GET, POST, PUT, OPTIONS') 802 804 … … 807 809 self.getPage("/divorce/get?ID=13") 808 810 self.assertBody('Divorce document 13: empty') 809 self.assertStatus( '200 OK')811 self.assertStatus(200) 810 812 self.getPage("/divorce/", method="GET") 811 813 self.assertBody('<h1>Choose your document</h1>\n<ul>\n</ul>') 812 self.assertStatus( '200 OK')814 self.assertStatus(200) 813 815 814 816 def testFavicon(self): … … 834 836 835 837 def testMaxRequestSize(self): 836 self.getPage("/maxrequestsize/index") 837 self.assertBody("OK") 838 839 s = cherrypy.server.httpserver 840 if s: 841 cherrypy.config.update({'server.max_request_header_size': 10}) 842 self.getPage("/maxrequestsize/index") 843 self.assertStatus("413 Request Entity Too Large") 844 self.assertInBody("Request Entity Too Large") 845 cherrypy.config.update({'server.max_request_header_size': 0}) 846 847 # Test for http://www.cherrypy.org/ticket/421 848 # (Incorrect border condition in readline of SizeCheckWrapper). 849 # This hangs in rev 891 and earlier. 850 lines256 = "x" * 248 851 self.getPage("/maxrequestsize/index", 852 headers=[('Host', '127.0.0.1:%s' % self.PORT), 853 ('From', lines256)]) 838 self.getPage("/", headers=[('From', "x" * 500)]) 839 self.assertStatus(413) 840 self.assertInBody("Request Entity Too Large") 841 842 # Test for http://www.cherrypy.org/ticket/421 843 # (Incorrect border condition in readline of SizeCheckWrapper). 844 # This hangs in rev 891 and earlier. 845 lines256 = "x" * 248 846 self.getPage("/", 847 headers=[('Host', '127.0.0.1:%s' % self.PORT), 848 ('From', lines256)]) 854 849 855 850 # Test upload 856 h = [("Content-type", "multipart/form-data; boundary=x"), 857 ("Content-Length", "110")] 858 b = """--x 851 body = """--x 859 852 Content-Disposition: form-data; name="file"; filename="hello.txt" 860 853 Content-Type: text/plain 861 854 862 hello 855 %s 863 856 --x-- 864 857 """ 865 self.getPage('/maxrequestsize/upload', h, "POST", b) 866 self.assertBody('Size: 5') 867 868 s = cherrypy.server.httpserver 869 if s: 870 cherrypy.config.update({ 871 '%s/maxrequestsize' % self.prefix(): { 872 'server.max_request_body_size': 3}}) 873 self.getPage('/maxrequestsize/upload', h, "POST", b) 874 self.assertStatus("413 Request Entity Too Large") 875 self.assertInBody("Request Entity Too Large") 858 b = body % ("x" * 96) 859 h = [("Content-type", "multipart/form-data; boundary=x"), 860 ("Content-Length", len(b))] 861 self.getPage('/upload', h, "POST", b) 862 self.assertBody('Size: 96') 863 864 b = body % ("x" * 200) 865 h = [("Content-type", "multipart/form-data; boundary=x"), 866 ("Content-Length", len(b))] 867 self.getPage('/upload', h, "POST", b) 868 self.assertStatus(413) 869 self.assertInBody("Request Entity Too Large") 876 870 877 871 def testEmptyThreadlocals(self): … … 881 875 results.append(self.body) 882 876 self.assertEqual(results, ["None"] * 20) 883 877 884 878 def testDefaultContentType(self): 885 879 self.getPage('/') 886 880 self.assertHeader('Content-Type', 'text/html') 887 888 cherrypy.config.update({'server.default_content_type': 'text/plain'}) 881 self.getPage('/defct/plain') 889 882 self.getPage('/') 890 883 self.assertHeader('Content-Type', 'text/plain') 891 892 def testTreeBackwardCompatibility(self): 893 self.assertEqual(cherrypy.tree.mount_points, {"/": cherrypy.root}) 884 self.getPage('/defct/html') 894 885 895 886 896 887 if __name__ == '__main__': 888 setup_server() 897 889 helper.testmain() trunk/cherrypy/test/test_custom_filters.py
r917 r1017 10 10 11 11 12 class Numerify(BaseFilter):13 14 def on_start_resource(self):15 m = cherrypy.config.get("numerify_filter.map", {})16 cherrypy.request.numerify_map = m.items()17 18 def before_finalize(self):19 if not cherrypy.config.get("numerify_filter.on", False):20 return21 22 def number_it(body):23 for chunk in body:24 for k, v in cherrypy.request.numerify_map:25 chunk = chunk.replace(k, v)26 yield chunk27 cherrypy.response.body = number_it(cherrypy.response.body)28 29 30 12 class AccessFilter(BaseFilter): 31 13 … … 38 20 39 21 40 # It's not mandatory to inherit from BaseFilter. 41 class NadsatFilter: 42 43 def before_finalize(self): 44 self.ended = False 45 def nadsat_it_up(body): 46 for chunk in body: 47 chunk = chunk.replace("good", "horrorshow") 48 chunk = chunk.replace("piece", "lomtick") 49 yield chunk 50 cherrypy.response.body = nadsat_it_up(cherrypy.response.body) 51 52 def on_end_request(self): 53 # This runs after the request has been completely written out. 54 cherrypy.response.body = "razdrez" 55 self.ended = True 56 57 58 59 class Root: 60 def index(self): 61 return "Howdy earth!" 62 index.exposed = True 63 64 cherrypy.root = Root() 65 66 67 class TestType(type): 68 """Metaclass which automatically exposes all functions in each subclass, 69 and adds an instance of the subclass as an attribute of cherrypy.root. 70 """ 71 def __init__(cls, name, bases, dct): 72 type.__init__(name, bases, dct) 73 for value in dct.itervalues(): 74 if isinstance(value, types.FunctionType): 75 value.exposed = True 76 setattr(cherrypy.root, name.lower(), cls()) 77 class Test(object): 78 __metaclass__ = TestType 79 80 81 class CPFilterList(Test): 82 83 # METHOD ONE: 84 # Use _cp_filters (old name: _cpFilterList) 85 _cp_filters = [NadsatFilter()] 86 87 def index(self): 88 return "A good piece of cherry pie" 89 90 def err(self): 91 raise ValueError() 92 93 def errinstream(self): 94 raise ValueError() 95 yield "confidential" 96 97 def restricted(self): 98 return "Welcome!" 99 100 def err_in_onstart(self): 101 return "success!" 102 103 104 cherrypy.config.update({ 105 'global': { 106 # METHOD TWO: 107 # Declare a classname in server.input_filters. 108 'server.input_filters': ["cherrypy.test.test_custom_filters.AccessFilter"], 109 'server.log_to_screen': False, 110 'server.environment': 'production', 111 'server.show_tracebacks': True, 112 }, 113 '/cpfilterlist': { 114 'numerify_filter.on': True, 115 'numerify_filter.map': {"pie": "3.14159"} 116 }, 117 '/cpfilterlist/restricted': { 118 'access_filter.on': True, 119 'server.show_tracebacks': False, 120 }, 121 '/cpfilterlist/errinstream': { 122 'stream_response': True, 123 }, 124 '/cpfilterlist/err_in_onstart': { 125 # Because this isn't a dict, on_start_resource will error. 126 'numerify_filter.map': "pie->3.14159" 127 }, 128 }) 129 130 # METHOD THREE: 131 # Insert a class directly into the filters.output_filters chain. 132 # You can also insert a string, but we're effectively testing 133 # using-a-string via the config file. 134 filters.input_filters.insert(0, Numerify) 135 filters.output_filters.insert(0, Numerify) 136 137 # We have to call filters.init() here (if we want methods #2 and #3 138 # to work), because the test suite may already have run server.start() 139 # (which is where filters.init() is usually called). 140 filters.init() 22 def setup_server(): 23 24 class Numerify(BaseFilter): 25 26 def on_start_resource(self): 27 m = cherrypy.config.get("numerify_filter.map", {}) 28 cherrypy.request.numerify_map = m.items() 29 30 def before_finalize(self): 31 if not cherrypy.config.get("numerify_filter.on", False): 32 return 33 34 def number_it(body): 35 for chunk in body: 36 for k, v in cherrypy.request.numerify_map: 37 chunk = chunk.replace(k, v) 38 yield chunk 39 cherrypy.response.body = number_it(cherrypy.response.body) 40 41 42 # It's not mandatory to inherit from BaseFilter. 43 class NadsatFilter: 44 45 def __init__(self): 46 self.counter = 0 47 self.ended = {} 48 49 def before_main(self): 50 cherrypy.request.counter = self.counter = self.counter + 1 51 self.ended[cherrypy.request.counter] = False 52 53 def before_finalize(self): 54 def nadsat_it_up(body): 55 for chunk in body: 56 chunk = chunk.replace("good", "horrorshow") 57 chunk = chunk.replace("piece", "lomtick") 58 yield chunk 59 cherrypy.response.body = nadsat_it_up(cherrypy.response.body) 60 61 def on_end_request(self): 62 # This runs after the request has been completely written out. 63 cherrypy.response.body = "razdrez" 64 self.ended[cherrypy.request.counter] = True 65 66 67 68 class Root: 69 def index(self): 70 return "Howdy earth!" 71 index.exposed = True 72 73 cherrypy.root = Root() 74 75 76 class TestType(type): 77 """Metaclass which automatically exposes all functions in each subclass, 78 and adds an instance of the subclass as an attribute of cherrypy.root. 79 """ 80 def __init__(cls, name, bases, dct): 81 type.__init__(name, bases, dct) 82 for value in dct.itervalues(): 83 if isinstance(value, types.FunctionType): 84 value.exposed = True 85 setattr(cherrypy.root, name.lower(), cls()) 86 class Test(object): 87 __metaclass__ = TestType 88 89 90 class CPFilterList(Test): 91 92 # METHOD ONE: 93 # Use _cp_filters (old name: _cpFilterList) 94 _cp_filters = [NadsatFilter()] 95 96 def index(self): 97 return "A good piece of cherry pie" 98 99 def ended(self, id): 100 return repr(self._cp_filters[0].ended[int(id)]) 101 102 def err(self): 103 raise ValueError() 104 105 def errinstream(self): 106 raise ValueError() 107 yield "confidential" 108 109 def restricted(self): 110 return "Welcome!" 111 112 def err_in_onstart(self): 113 return "success!" 114 115 116 cherrypy.config.update({ 117 'global': { 118 # METHOD TWO: 119 # Declare a classname in server.input_filters. 120 'server.input_filters': ["cherrypy.test.test_custom_filters.AccessFilter"], 121 'server.log_to_screen': False, 122 'server.environment': 'production', 123 'server.show_tracebacks': True, 124 }, 125 '/cpfilterlist': { 126 'numerify_filter.on': True, 127 'numerify_filter.map': {"pie": "3.14159"} 128 }, 129 '/cpfilterlist/restricted': { 130 'access_filter.on': True, 131 'server.show_tracebacks': False, 132 }, 133 '/cpfilterlist/errinstream': { 134 'stream_response': True, 135 }, 136 '/cpfilterlist/err_in_onstart': { 137 # Because this isn't a dict, on_start_resource will error. 138 'numerify_filter.map': "pie->3.14159" 139 }, 140 }) 141 142 # METHOD THREE: 143 # Insert a class directly into the filters.output_filters chain. 144 # You can also insert a string, but we're effectively testing 145 # using-a-string via the config file. 146 filters.input_filters.insert(0, Numerify) 147 filters.output_filters.insert(0, Numerify) 148 149 # We have to call filters.init() here (if we want methods #2 and #3 150 # to work), because the test suite may already have run server.start() 151 # (which is where filters.init() is usually called). 152 filters.init() 153 154 155 # Client-side code # 141 156 142 157 import helper … … 146 161 147 162 def testCPFilterList(self): 148 # Lazily import _nf, since filters.__init__ will reimport this module.149 _nf = cherrypy.root.cpfilterlist._cp_filters[0]150 151 163 self.getPage("/cpfilterlist/") 152 164 # If body is "razdrez", then on_end_request is being called too early. 153 165 self.assertBody("A horrorshow lomtick of cherry 3.14159") 154 166 # If this fails, then on_end_request isn't being called at all. 155 self.assertEqual(_nf.ended, True) 156 157 ignore = helper.webtest.ignored_exceptions 158 ignore.append(ValueError) 159 try: 160 valerr = '\n raise ValueError()\nValueError' 161 self.getPage("/cpfilterlist/err") 162 # If body is "razdrez", then on_end_request is being called too early. 163 self.assertErrorPage(500, pattern=valerr) 164 # If this fails, then on_end_request isn't being called at all. 165 self.assertEqual(_nf.ended, True) 166 167 # If body is "razdrez", then on_end_request is being called too early. 168 if cherrypy.server.httpserver is None: 169 self.assertRaises(ValueError, self.getPage, 170 "/cpfilterlist/errinstream") 171 else: 172 self.getPage("/cpfilterlist/errinstream") 173 # Because this error is raised after the response body has 174 # started, the status should not change to an error status. 175 self.assertStatus("200 OK") 176 self.assertBody("Unrecoverable error in the server.") 177 # If this fails, then on_end_request isn't being called at all. 178 self.assertEqual(_nf.ended, True) 179 finally: 180 ignore.pop() 167 self.getPage("/cpfilterlist/ended/1") 168 self.assertBody("True") 169 170 valerr = '\n raise ValueError()\nValueError' 171 self.getPage("/cpfilterlist/err") 172 # If body is "razdrez", then on_end_request is being called too early. 173 self.assertErrorPage(500, pattern=valerr) 174 # If this fails, then on_end_request isn't being called at all. 175 self.getPage("/cpfilterlist/ended/3") 176 self.assertBody("True") 177 178 # If body is "razdrez", then on_end_request is being called too early. 179 self.getPage("/cpfilterlist/errinstream") 180 # Because this error is raised after the response body has 181 # started, the status should not change to an error status. 182 self.assertStatus("200 OK") 183 self.assertBody("Unrecoverable error in the server.") 184 # If this fails, then on_end_request isn't being called at all. 185 self.getPage("/cpfilterlist/ended/5") 186 self.assertBody("True") 181 187 182 188 # Test the config method. … … 191 197 # but because that failure is logged and passed over, the error 192 198 # page we obtain in the user agent should be from before_finalize. 193 ignore = helper.webtest.ignored_exceptions 194 ignore.append(AttributeError) 195 try: 196 self.getPage("/cpfilterlist/err_in_onstart") 197 self.assertErrorPage(500) 198 self.assertInBody("AttributeError: 'Request' object has no " 199 "attribute 'numerify_map'") 200 finally: 201 ignore.pop() 199 self.getPage("/cpfilterlist/err_in_onstart") 200 self.assertErrorPage(500) 201 self.assertInBody("AttributeError: 'Request' object has no " 202 "attribute 'numerify_map'") 202 203 203 204 204 205 if __name__ == '__main__': 206 setup_server() 205 207 helper.testmain() 206 208 trunk/cherrypy/test/test_decodingencoding_filter.py
r935 r1017 8 8 sing16 = sing.encode('utf-16') 9 9 10 class Root:11 def index(self, param):12 assert param == europoundUnicode13 yield europoundUnicode14 index.exposed = True15 16 def mao_zedong(self):17 return sing18 mao_zedong.exposed = True19 10 20 cherrypy.root = Root() 21 cherrypy.config.update({ 22 'server.log_to_screen': False, 23 'server.environment': 'production', 24 'encoding_filter.on': True, 25 'decoding_filter.on': True 26 }) 11 def setup_server(): 12 class Root: 13 def index(self, param): 14 assert param == europoundUnicode 15 yield europoundUnicode 16 index.exposed = True 17 18 def mao_zedong(self): 19 return sing 20 mao_zedong.exposed = True 21 22 cherrypy.root = Root() 23 cherrypy.config.update({ 24 'server.log_to_screen': False, 25 'server.environment': 'production', 26 'encoding_filter.on': True, 27 'decoding_filter.on': True 28 }) 27 29 28 30 … … 64 66 65 67 if __name__ == "__main__": 68 setup_server() 66 69 helper.testmain() trunk/cherrypy/test/test_gzip_filter.py
r917 r1017 5 5 import cherrypy 6 6 7 class Root: 8 def index(self): 9 yield "Hello, world" 10 index.exposed = True 11 12 def noshow(self): 13 # Test for ticket #147, where yield showed no exceptions (content- 14 # encoding was still gzip even though traceback wasn't zipped). 15 raise IndexError() 16 yield "Here be dragons" 17 noshow.exposed = True 18 19 def noshow_stream(self): 20 # Test for ticket #147, where yield showed no exceptions (content- 21 # encoding was still gzip even though traceback wasn't zipped). 22 raise IndexError() 23 yield "Here be dragons" 24 noshow_stream.exposed = True 7 def setup_server(): 8 class Root: 9 def index(self): 10 yield "Hello, world" 11 index.exposed = True 12 13 def noshow(self): 14 # Test for ticket #147, where yield showed no exceptions (content- 15 # encoding was still gzip even though traceback wasn't zipped). 16 raise IndexError() 17 yield "Here be dragons" 18 noshow.exposed = True 19 20 def noshow_stream(self): 21 # Test for ticket #147, where yield showed no exceptions (content- 22 # encoding was still gzip even though traceback wasn't zipped). 23 raise IndexError() 24 yield "Here be dragons" 25 noshow_stream.exposed = True 25 26 26 cherrypy.root = Root()27 cherrypy.config.update({28 'global': {'server.log_to_screen': False,29 'server.environment': 'production',30 'server.show_tracebacks': True,31 'gzip_filter.on': True,32 },33 '/noshow_stream': {'stream_response': True},34 })27 cherrypy.root = Root() 28 cherrypy.config.update({ 29 'global': {'server.log_to_screen': False, 30 'server.environment': 'production', 31 'server.show_tracebacks': True, 32 'gzip_filter.on': True, 33 }, 34 '/noshow_stream': {'stream_response': True}, 35 }) 35 36 36 37 … … 61 62 62 63 self.getPage('/', headers=[("Accept-Encoding", "*;q=0")]) 63 self.assertStatus( "406 Not Acceptable")64 self.assertStatus(406) 64 65 self.assertNoHeader("Vary") 65 66 self.assertErrorPage(406, "identity, gzip") 66 67 67 68 # Test for ticket #147 68 helper.webtest.ignored_exceptions.append(IndexError) 69 try: 70 self.getPage('/noshow', headers=[("Accept-Encoding", "gzip")]) 71 self.assertNoHeader('Content-Encoding') 72 self.assertStatus('500 Internal error') 73 self.assertErrorPage(500, pattern="IndexError\n") 74 75 # In this case, there's nothing we can do to deliver a 76 # readable page, since 1) the gzip header is already set, 77 # and 2) we may have already written some of the body. 78 # The fix is to never stream yields when using gzip. 79 if cherrypy.server.httpserver is None: 80 self.assertRaises(IndexError, self.getPage, 81 '/noshow_stream', 82 [("Accept-Encoding", "gzip")]) 83 else: 84 self.getPage('/noshow_stream', 85 headers=[("Accept-Encoding", "gzip")]) 86 self.assertHeader('Content-Encoding', 'gzip') 87 self.assertMatchesBody(r"Unrecoverable error in the server.$") 88 finally: 89 helper.webtest.ignored_exceptions.pop() 69 self.getPage('/noshow', headers=[("Accept-Encoding", "gzip")]) 70 self.assertNoHeader('Content-Encoding') 71 self.assertStatus(500) 72 self.assertErrorPage(500, pattern="IndexError\n") 73 74 # In this case, there's nothing we can do to deliver a 75 # readable page, since 1) the gzip header is already set, 76 # and 2) we may have already written some of the body. 77 # The fix is to never stream yields when using gzip. 78 self.getPage('/noshow_stream', 79 headers=[("Accept-Encoding", "gzip")]) 80 self.assertHeader('Content-Encoding', 'gzip') 81 self.assertMatchesBody(r"Unrecoverable error in the server.$") 90 82 91 83 92 84 if __name__ == "__main__": 85 setup_server() 93 86 helper.testmain() trunk/cherrypy/test/test_http.py
r1012 r1017 3 3 Some of these tests check timeouts, etcetera, and therefore take a long 4 4 time to run. Therefore, this module should probably not be included in 5 the 'comprehensive' test suite (test.py). Note, however, that we default 6 to the builtin HTTP server (most tests default to 'serverless'). 5 the 'comprehensive' test suite (test.py). 7 6 """ 8 7 … … 19 18 data = object() 20 19 21 class Root: 22 def index(self, *args, **kwargs): 23 cherrypy.thread_data.thing = data 24 return "Hello world!" 25 index.exposed = True 26 cherrypy.tree.mount(Root()) 20 def setup_server(): 21 class Root: 22 def index(self, *args, **kwargs): 23 cherrypy.thread_data.thing = data 24 return "Hello world!" 25 index.exposed = True 26 cherrypy.tree.mount(Root()) 27 27 28 cherrypy.config.update({ 29 'global': {'server.log_to_screen': False, 30 'server.environment': 'production', 31 'server.show_tracebacks': True, 32 }, 33 }) 28 cherrypy.config.update({ 29 'global': {'server.log_to_screen': False, 30 'server.environment': 'production', 31 'server.show_tracebacks': True, 32 }, 33 }) 34 34 35 35 36 import helper … … 74 75 75 76 if __name__ == '__main__': 76 helper.testmain(server="cherrypy._cpwsgi.WSGIServer") 77 setup_server() 78 helper.testmain() trunk/cherrypy/test/test_logdebuginfo_filter.py
r967 r1017 4 4 import cherrypy 5 5 6 class Root:7 def index(self):8 yield "Hello, world"9 index.exposed = True10 6 11 def bug326(self, file): 12 return "OK" 13 bug326.exposed = True 7 def setup_server(): 8 class Root: 9 def index(self): 10 yield "Hello, world" 11 index.exposed = True 14 12 15 cherrypy.root = Root() 13 def bug326(self, file): 14 return "OK" 15 bug326.exposed = True 16 16 17 cherrypy.config.update({ 17 cherrypy.root = Root() 18 19 cherrypy.config.update({ 18 20 'server.log_to_screen': False, 19 21 'server.environment': 'production', 20 22 'log_debug_info_filter.on': True, 21 }) 23 '/bug326': { 24 'server.max_request_body_size': 300, 25 'server.environment': 'development', 26 } 27 }) 22 28 23 29 … … 35 41 36 42 def testBug326(self): 37 from cherrypy import _cpwsgi 38 s = cherrypy.server.httpserver 39 if s and isinstance(s, _cpwsgi.WSGIServer): 40 h = [("Content-type", "multipart/form-data; boundary=x"), 41 ("Content-Length", "110")] 42 b = """--x 43 b = """--x 43 44 Content-Disposition: form-data; name="file"; filename="hello.txt" 44 45 Content-Type: text/plain 45 46 46 hello 47 %s 47 48 --x-- 48 """ 49 cherrypy.config.update({ 50 ('%s/bug326' % self.prefix()): { 51 'server.max_request_body_size': 3, 52 'server.environment': 'development', 53 } 54 }) 55 ignore = helper.webtest.ignored_exceptions 56 ignore.append(AttributeError) 57 try: 58 self.getPage('/bug326', h, "POST", b) 59 self.assertStatus("413 Request Entity Too Large") 60 finally: 61 ignore.pop() 49 """ % ("x" * 300) 50 h = [("Content-type", "multipart/form-data; boundary=x"), 51 ("Content-Length", len(b))] 52 self.getPage('/bug326', h, "POST", b) 53 self.assertStatus("413 Request Entity Too Large") 54 62 55 63 56 if __name__ == "__main__": 57 setup_server() 64 58 helper.testmain() trunk/cherrypy/test/test_objectmapping.py
r968 r1017 5 5 6 6 7 class Root:8 def index(self, name="world"):9 return name10 index.exposed = True11 12 def default(self, *params):13 return "default:" + repr(params)14 default.exposed = True15 16 def other(self):17 return "other"18 other.exposed = True19 20 def extra(self, *p):21 return repr(p)22 extra.exposed = True23 24 def redirect(self):25 raise cherrypy.HTTPRedirect('dir1/', 302)26 redirect.exposed = True27 28 def notExposed(self):29 return "not exposed"30 31 def confvalue(self):32 return cherrypy.config.get("user")33 confvalue.exposed = True34 35 def mapped_func(self, ID=None):36 return "ID is %s" % ID37 mapped_func.exposed = True38 setattr(Root, "Von B\xfclow", mapped_func)39 40 41 class Exposing:42 def base(self):43 return "expose works!"44 cherrypy.expose(base)45 cherrypy.expose(base, "1")46 cherrypy.expose(base, "2")47 48 class ExposingNewStyle(object):49 def base(self):50 return "expose works!"51 cherrypy.expose(base)52 cherrypy.expose(base, "1")53 cherrypy.expose(base, "2")54 55 56 57 class Dir1:58 def index(self):59 return "index for dir1"60 index.exposed = True61 62 def myMethod(self):63 return "myMethod from dir1, object Path is:" + repr(cherrypy.request.object_path)64 myMethod.exposed = True65 66 def default(self, *params):67 return "default for dir1, param is:" + repr(params)68 default.exposed = True69 70 71 class Dir2:72 def index(self):73 return "index for dir2, path is:" + cherrypy.request.path74 index.exposed = True75 76 def mount_point(self):77 return cherrypy.tree.mount_point()78 mount_point.exposed = True79 80 def tree_url(self):81 return cherrypy.tree.url("/extra")82 tree_url.exposed = True83 84 def posparam(self, *vpath):85 return "/".join(vpath)86 posparam.exposed = True87 88 89 class Dir3:90 def default(self):91 return "default for dir3, not exposed"92 93 94 class Dir4:95 def index(self):96 return "index for dir4, not exposed"97 98 class DefNoIndex:99 def default(self, *args):100 return "defnoindex:" + repr(args)101 default.exposed = True102 103 104 Root.exposing = Exposing()105 Root.exposingnew = ExposingNewStyle()106 Root.dir1 = Dir1()107 Root.dir1.dir2 = Dir2()108 Root.dir1.dir2.dir3 = Dir3()109 Root.dir1.dir2.dir3.dir4 = Dir4()110 Root.defnoindex = DefNoIndex()111 112 113 7 mount_points = ["/", "/users/fred/blog", "/corp/blog"] 114 for url in mount_points: 115 conf = {'user': url.split("/")[-2]} 116 cherrypy.tree.mount(Root(), url, {'/': conf}) 117 118 cherrypy.config.update({ 119 'server.log_to_screen': False, 120 'server.environment': "production", 121 }) 122 123 124 class Isolated: 125 def index(self): 126 return "made it!" 127 index.exposed = True 128 129 cherrypy.tree.mount(Isolated(), "/isolated") 8 9 def setup_server(): 10 class Root: 11 def index(self, name="world"): 12 return name 13 index.exposed = True 14 15 def default(self, *params): 16 return "default:" + repr(params) 17 default.exposed = True 18 19 def other(self): 20 return "other" 21 other.exposed = True 22 23 def extra(self, *p): 24 return repr(p) 25 extra.exposed = True 26 27 def redirect(self): 28 raise cherrypy.HTTPRedirect('dir1/', 302) 29 redirect.exposed = True 30 31 def notExposed(self): 32 return "not exposed" 33 34 def confvalue(self): 35 return cherrypy.config.get("user") 36 confvalue.exposed = True 37 38 def mapped_func(self, ID=None): 39 return "ID is %s" % ID 40 mapped_func.exposed = True 41 setattr(Root, "Von B\xfclow", mapped_func) 42 43 44 class Exposing: 45 def base(self): 46 return "expose works!" 47 cherrypy.expose(base) 48 cherrypy.expose(base, "1") 49 cherrypy.expose(base, "2") 50 51 class ExposingNewStyle(object): 52 def base(self): 53 return "expose works!" 54 cherrypy.expose(base) 55 cherrypy.expose(base, "1") 56 cherrypy.expose(base, "2") 57 58 59 60 class Dir1: 61 def index(self): 62 return "index for dir1" 63 index.exposed = True 64 65 def myMethod(self): 66 return "myMethod from dir1, object Path is:" + repr(cherrypy.request.object_path) 67 myMethod.exposed = True 68 69 def default(self, *params): 70 return "default for dir1, param is:" + repr(params) 71 default.exposed = True 72 73 74 class Dir2: 75 def index(self): 76 return "index for dir2, path is:" + cherrypy.request.path 77 index.exposed = True 78 79 def mount_point(self): 80 return cherrypy.tree.mount_point() 81 mount_point.exposed = True 82 83 def tree_url(self): 84 return cherrypy.tree.url("/extra") 85 tree_url.exposed = True 86 87 def posparam(self, *vpath): 88 return "/".join(vpath) 89 posparam.exposed = True 90 91 92 class Dir3: 93 def default(self): 94 return "default for dir3, not exposed" 95 96 97 class Dir4: 98 def index(self): 99 return "index for dir4, not exposed" 100 101 class DefNoIndex: 102 def default(self, *args): 103 return "defnoindex:" + repr(args) 104 default.exposed = True 105 106 107 Root.exposing = Exposing() 108 Root.exposingnew = ExposingNewStyle() 109 Root.dir1 = Dir1() 110 Root.dir1.dir2 = Dir2() 111 Root.dir1.dir2.dir3 = Dir3() 112 Root.dir1.dir2.dir3.dir4 = Dir4() 113 Root.defnoindex = DefNoIndex() 114 115 116 for url in mount_points: 117 conf = {'user': url.split("/")[-2]} 118 cherrypy.tree.mount(Root(), url, {'/': conf}) 119 120 cherrypy.config.update({ 121 'server.log_to_screen': False, 122 'server.environment': "production", 123 }) 124 125 126 class Isolated: 127 def index(self): 128 return "made it!" 129 index.exposed = True 130 131 cherrypy.tree.mount(Isolated(), "/isolated") 130 132 131 133 … … 245 247 246 248 if __name__ == "__main__": 249 setup_server() 247 250 helper.testmain() trunk/cherrypy/test/test_response_headers_filter.py
r910 r1017 5 5 from cherrypy._cputil import headers 6 6 7 class Root:8 def index(self):9 yield "Hello, world"10 index = headers([("Content-Language", "en-GB"),11 ('Content-Type', 'text/plain')])(index)12 index.exposed = True13 14 def other(self):15 return "salut"16 other.exposed = True17 7 18 cherrypy.root = Root() 19 cherrypy.config.update({ 20 '/other': { 21 'response_headers_filter.on': True, 22 'response_headers_filter.headers': [("Content-Language", "fr"), 23 ('Content-Type', 'text/plain')] 24 }, 25 }) 8 def setup_server(): 9 class Root: 10 def index(self): 11 yield "Hello, world" 12 index = headers([("Content-Language", "en-GB"), 13 ('Content-Type', 'text/plain')])(index) 14 index.exposed = True 15 16 def other(self): 17 return "salut" 18 other.exposed = True 19 20 cherrypy.root = Root() 21 cherrypy.config.update({ 22 '/other': { 23 'response_headers_filter.on': True, 24 'response_headers_filter.headers': [("Content-Language", "fr"), 25 ('Content-Type', 'text/plain')] 26 }, 27 }) 26 28 27 29 … … 44 46 45 47 if __name__ == "__main__": 48 setup_server() 46 49 helper.testmain() trunk/cherrypy/test/test_session_filter.py
r856 r1017 4 4 import cherrypy, os 5 5 6 class Root: 7 def testGen(self): 8 counter = cherrypy.session.get('counter', 0) + 1 9 cherrypy.session['counter'] = counter 10 yield str(counter) 11 testGen.exposed = True 12 def testStr(self): 13 counter = cherrypy.session.get('counter', 0) + 1 14 cherrypy.session['counter'] = counter 15 return str(counter) 16 testStr.exposed = True 17 18 cherrypy.root = Root() 19 cherrypy.config.update({ 20 'server.log_to_screen': False, 21 'server.environment': 'production', 22 'session_filter.on': True, 23 'session_filter.storage_type' : 'file', 24 'session_filter.storage_path' : '.', 25 }) 6 7 def setup_server(): 8 class Root: 9 def testGen(self): 10 counter = cherrypy.session.get('counter', 0) + 1 11 cherrypy.session['counter'] = counter 12 yield str(counter) 13 testGen.exposed = True 14 15 def testStr(self): 16 counter = cherrypy.session.get('counter', 0) + 1 17 cherrypy.session['counter'] = counter 18 return str(counter) 19 testStr.exposed = True 20 21 def setsessiontype(self, newtype): 22 cherrypy.config.update({'session_filter.storage_type': newtype}) 23 setsessiontype.exposed = True 24 25 cherrypy.root = Root() 26 cherrypy.config.update({ 27 'server.log_to_screen': False, 28 'server.environment': 'production', 29 'session_filter.on': True, 30 'session_filter.storage_type' : 'file', 31 'session_filter.storage_path' : '.', 32 }) 26 33 27 34 import helper … … 36 43 self.getPage('/testStr', self.cookies) 37 44 self.assertBody('3') 38 cherrypy.config.update({ 39 'session_filter.storage_type' : 'file', 40 }) 45 self.getPage('/setsessiontype/file') 41 46 self.getPage('/testStr') 42 47 self.assertBody('1') … … 50 55 if fname.startswith('session-'): 51 56 os.unlink(fname) 52 53 57 54 58 if __name__ == "__main__": 59 setup_server() 55 60 helper.testmain() 56 61 trunk/cherrypy/test/test_sessionauthenticate_filter.py
r871 r1017 4 4 import cherrypy 5 5 6 class Test: 7 def index(self): 8 return "Hi, you are logged in" 9 index.exposed = True 6 def setup_server(): 7 class Test: 8 def index(self): 9 return "Hi, you are logged in" 10 index.exposed = True 11 12 cherrypy.root = Test() 13 14 cherrypy.config.update({ 15 'server.log_to_screen': False, 16 'server.environment': 'production', 17 'session_filter.on': True, 18 '/': {'session_authenticate_filter.on':True}, 19 }) 10 20 11 cherrypy.root = Test()12 21 13 cherrypy.config.update({14 'server.log_to_screen': False,15 'server.environment': 'production',16 'session_filter.on': True,17 })18 19 cherrypy.config.update({'/':20 {'session_authenticate_filter.on':True,21 }22 })23 22 import helper 24 23 … … 26 25 27 26 def testSessionAuthenticateFilter(self): 28 protocol = cherrypy.config.get('server.protocol_version')29 27 # request a page and check for login form 30 28 self.getPage('/') 31 29 self.assertInBody('<form method="post" action="do_login">') 32 30 33 # setup credentials 31 # setup credentials 34 32 login_body = 'login=login&password=password&from_page=/' 35 33 36 34 # attempt a login 37 35 self.getPage('/do_login', method='POST', body=login_body) 38 if protocol == 'HTTP/1.0': 39 self.assertStatus('302 Found') 40 else: 41 self.assertStatus('303 See Other') 36 self.assert_(self.status in ('302 Found', '303 See Other')) 42 37 43 38 # get the page now that we are logged in … … 47 42 # do a logout 48 43 self.getPage('/do_logout', self.cookies) 49 if protocol == 'HTTP/1.0': 50 self.assertStatus('302 Found') 51 else: 52 self.assertStatus('303 See Other') 44 self.assert_(self.status in ('302 Found', '303 See Other')) 53 45 54 46 # verify we are logged out 55 47 self.getPage('/', self.cookies) 56 48 self.assertInBody('<form method="post" action="do_login">') 57 58 49 50 59 51 if __name__ == "__main__": 52 setup_server() 60 53 helper.testmain() 61 54 trunk/cherrypy/test/test_static_filter.py
r921 r1017 4 4 import os 5 5 curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) 6 import threading 6 7 7 8 import cherrypy … … 9 10 10 11 11 class Root: 12 pass 12 def setup_server(): 13 class Root: 14 pass 13 15 14 class Static:15 16 def index(self):17 return 'You want the Baron? You can have the Baron!'18 index.exposed = True19 20 def dynamic(self):21 return "This is a DYNAMIC page"22 dynamic.exposed = True16 class Static: 17 18 def index(self): 19 return 'You want the Baron? You can have the Baron!' 20 index.exposed = True 21 22 def dynamic(self): 23 return "This is a DYNAMIC page" 24 dynamic.exposed = True 23 25 24 26 25 cherrypy.root = Root()26 cherrypy.root.static = Static()27 cherrypy.root = Root() 28 cherrypy.root.static = Static() 27 29 28 cherrypy.config.update({29 'global': {30 'static_filter.on': False,31 'server.log_to_screen': False,32 'server.environment': 'production',33 },34 '/static': {35 'static_filter.on': True,36 'static_filter.dir': 'static',37 'static_filter.root': curdir,38 },39 '/style.css': {40 'static_filter.on': True,41 'static_filter.file': 'style.css',42 'static_filter.root': curdir,43 },44 '/docroot': {45 'static_filter.on': True,46 'static_filter.root': curdir,47 'static_filter.dir': 'static',48 'static_filter.index': 'index.html',49 },50 '/error': {51 'static_filter.on': True,52 'server.show_tracebacks': True,53 },54 })30 cherrypy.config.update({ 31 'global': { 32 'static_filter.on': False, 33 'server.log_to_screen': False, 34 'server.environment': 'production', 35 }, 36 '/static': { 37 'static_filter.on': True, 38 'static_filter.dir': 'static', 39 'static_filter.root': curdir, 40 }, 41 '/style.css': { 42 'static_filter.on': True, 43 'static_filter.file': 'style.css', 44 'static_filter.root': curdir, 45 }, 46 '/docroot': { 47 'static_filter.on': True, 48 'static_filter.root': curdir, 49 'static_filter.dir': 'static', 50 'static_filter.index': 'index.html', 51 }, 52 '/error': { 53 'static_filter.on': True, 54 'server.show_tracebacks': True, 55 }, 56 }) 55 57 56 58 import helper … … 114 116 115 117 # Test up-level security 116 self.getPage("/static/../style.css") 117 self.assertStatus('403 Forbidden') 118 118 self.getPage("/static/../../test/style.css") 119 self.assertStatus((400, 403)) 120 121 # Test modified-since on a reasonably-large file 122 self.getPage("/static/dirback.jpg") 123 self.assertStatus("200 OK") 124 ims = ("If-Modified-Since", "Wed, 13 Jul 2005 16:19:55 GMT") 125 self.getPage("/static/dirback.jpg", headers=[ims]) 126 self.assertStatus("304 Not Modified") 127 ## 128 ## # Test lots of requests for the same file (no If-Mod). 129 ## ts = [] 130 ## for x in xrange(500): 131 ## t = threading.Thread(target=self.getPage, 132 ## args=("/static/dirback.jpg",)) 133 ## ts.append(t) 134 ## t.start() 135 ## for t in ts: 136 ## t.join() 137 ## 119 138 120 139 if __name__ == "__main__": 140 setup_server() 121 141 helper.testmain() trunk/cherrypy/test/test_virtualhost_filter.py
r883 r1017 5 5 import cherrypy 6 6 7 class Root: 8 def index(self): 9 return "Hello, world" 10 index.exposed = True 11 12 def dom4(self): 13 return "Under construction" 14 dom4.exposed = True 7 def setup_server(): 8 class Root: 9 def index(self): 10 return "Hello, world" 11 index.exposed = True 12 13 def dom4(self): 14 return "Under construction" 15 dom4.exposed = True 15 16 16 class VHost:17 def __init__(self, sitename):18 self.sitename = sitename19 20 def index(self):21 return "Welcome to %s" % self.sitename22 index.exposed = True17 class VHost: 18 def __init__(self, sitename): 19 self.sitename = sitename 20 21 def index(self): 22 return "Welcome to %s" % self.sitename 23 index.exposed = True 23 24 24 25 25 cherrypy.root = Root()26 cherrypy.root.mydom2 = VHost("Domain 2")27 cherrypy.root.mydom3 = VHost("Domain 3")26 cherrypy.root = Root() 27 cherrypy.root.mydom2 = VHost("Domain 2") 28 cherrypy.root.mydom3 = VHost("Domain 3") 28 29 29 cherrypy.config.update({30 'server.logToScreen': False,31 'server.environment': 'production',32 'virtual_host_filter.on': True,33 'virtual_host_filter.www.mydom2.com': '/mydom2',34 'virtual_host_filter.www.mydom3.com': '/mydom3',35 'virtual_host_filter.www.mydom4.com': '/dom4',36 })30 cherrypy.config.update({ 31 'server.logToScreen': False, 32 'server.environment': 'production', 33 'virtual_host_filter.on': True, 34 'virtual_host_filter.www.mydom2.com': '/mydom2', 35 'virtual_host_filter.www.mydom3.com': '/mydom3', 36 'virtual_host_filter.www.mydom4.com': '/dom4', 37 }) 37 38 38 39 import helper … … 55 56 56 57 if __name__ == "__main__": 58 setup_server() 57 59 helper.testmain() trunk/cherrypy/test/test_wsgiapp_filter.py
r1008 r1017 2 2 test.prefer_parent_path() 3 3 4 import os5 curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))6 4 7 import cherrypy 8 from cherrypy.filters.wsgiappfilter import WSGIAppFilter 9 from cherrypy.lib.cptools import WSGIApp 5 def setup_server(): 6 import os 7 curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) 10 8 11 def test_app(environ, start_response): 12 status = '200 OK' 13 response_headers = [('Content-type', 'text/plain')] 14 start_response(status, response_headers) 15 yield 'Hello, world!\n' 16 yield 'This is a wsgi app running within CherryPy!\n\n' 17 keys = environ.keys() 18 keys.sort() 19 for k in keys: 20 yield '%s: %s\n' % (k,environ[k]) 9 import cherrypy 10 from cherrypy.filters.wsgiappfilter import WSGIAppFilter 11 from cherrypy.lib.cptools import WSGIApp 21 12 22 class Root: 23 def index(self): 24 return "I'm a regular CherryPy page handler!" 25 index.exposed = True 13 def test_app(environ, start_response): 14 status = '200 OK' 15 response_headers = [('Content-type', 'text/plain')] 16 start_response(status, response_headers) 17 yield 'Hello, world!\n' 18 yield 'This is a wsgi app running within CherryPy!\n\n' 19 keys = environ.keys() 20 keys.sort() 21 for k in keys: 22 yield '%s: %s\n' % (k,environ[k]) 23 24 class Root: 25 def index(self): 26 return "I'm a regular CherryPy page handler!" 27 index.exposed = True 26 28 27 29 28 class HostedWSGI(object):29 _cp_filters = [WSGIAppFilter(test_app),]30 class HostedWSGI(object): 31 _cp_filters = [WSGIAppFilter(test_app),] 30 32 31 33 32 cherrypy.tree.mount(Root(), '') 33 cherrypy.tree.mount(HostedWSGI(), '/hosted/app0') 34 cherrypy.tree.mount(WSGIApp(test_app), '/hosted/app1') 34 conf = {'server.log_to_screen': False, 35 'server.environment': 'production', 36 'server.show_tracebacks': True, 37 } 38 cherrypy.tree.mount(Root(), '/', conf) 39 conf0 = {'/static': {'static_filter.on': True, 40 'static_filter.root': curdir, 41 'static_filter.dir': 'static', 42 }} 43 cherrypy.tree.mount(HostedWSGI(), '/hosted/app0', conf0) 44 cherrypy.tree.mount(WSGIApp(test_app), '/hosted/app1') 45 35 46 36 47 import helper 37 38 cherrypy.config.update({39 'global': {'server.log_to_screen': False,40 'server.environment': 'production',41 'server.show_tracebacks': True,42 'server.socket_host': helper.CPWebCase.HOST,43 'server.socket_port': helper.CPWebCase.PORT,44 },45 '/hosted/app0/static' : {46 'static_filter.on':True,47 'static_filter.root': curdir,48 'static_filter.dir': 'static',49 },50 51 })52 48 53 49 … … 78 74 79 75 if __name__ == '__main__': 80 from cherrypy import _cpwsgi 81 server_class = _cpwsgi.WSGIServer 82 helper.testmain(server_class) 76 setup_server() 77 helper.testmain() 83 78 trunk/cherrypy/test/test_xmlrpc_filter.py
r943 r1017 1 1 import test 2 2 test.prefer_parent_path() 3 4 3 import xmlrpclib 5 import cherrypy6 7 class Root:8 def index(self):9 return "I'm a standard index!"10 index.exposed = True11 4 12 5 13 class XmlRpc: 14 def return_single_item_list(self): 15 return [42] 16 return_single_item_list.exposed = True 17 18 def return_string(self): 19 return "here is a string" 20 return_string.exposed = True 21 22 def return_tuple(self): 23 return ('here', 'is', 1, 'tuple') 24 return_tuple.exposed = True 25 26 def return_dict(self): 27 return dict(a=1, b=2, c=3) 28 return_dict.exposed = True 29 30 def return_composite(self): 31 return dict(a=1,z=26), 'hi', ['welcome', 'friend'] 32 return_composite.exposed = True 6 def setup_server(): 7 import cherrypy 33 8 34 def return_int(self): 35 return 42 36 return_int.exposed = True 9 class Root: 10 def index(self): 11 return "I'm a standard index!" 12 index.exposed = True 37 13 38 def return_float(self):39 return 3.1440 return_float.exposed = True41 14 42 def return_datetime(self): 43 return xmlrpclib.DateTime((2003, 10, 7, 8, 1, 0, 1, 280, -1)) 44 return_datetime.exposed = True 15 class XmlRpc: 16 def return_single_item_list(self): 17 return [42] 18 return_single_item_list.exposed = True 19 20 def return_string(self): 21 return "here is a string" 22 return_string.exposed = True 23 24 def return_tuple(self): 25 return ('here', 'is', 1, 'tuple') 26 return_tuple.exposed = True 27 28 def return_dict(self): 29 return dict(a=1, b=2, c=3) 30 return_dict.exposed = True 31 32 def return_composite(self): 33 return dict(a=1,z=26), 'hi', ['welcome', 'friend'] 34 return_composite.exposed = True 45 35 46 def return_boolean(self):47 return True48 return_boolean.exposed = True36 def return_int(self): 37 return 42 38 return_int.exposed = True 49 39 50 def test_argument_passing(self, num):51 return num * 252 test_argument_passing.exposed = True40 def return_float(self): 41 return 3.14 42 return_float.exposed = True 53 43 54 cherrypy.root = Root() 55 cherrypy.root.xmlrpc = XmlRpc() 44 def return_datetime(self): 45 return xmlrpclib.DateTime((2003, 10, 7, 8, 1, 0, 1, 280, -1)) 46 return_datetime.exposed = True 47 48 def return_boolean(self): 49 return True 50 return_boolean.exposed = True 51 52 def test_argument_passing(self, num): 53 return num * 2 54 test_argument_passing.exposed = True 55 56 cherrypy.root = Root() 57 cherrypy.root.xmlrpc = XmlRpc() 58 cherrypy.config.update({ 59 'server.log_to_screen': False, 60 'server.environment': 'production', 61 'server.show_tracebacks': True, 62 '/xmlrpc': {'xmlrpc_filter.on': True}, 63 }) 56 64 57 65 58 66 import helper 59 60 cherrypy.config.update({61 'global': {'server.log_to_screen': False,62 'server.environment': 'production',63 'server.show_tracebacks': True,64 'server.socket_host': helper.CPWebCase.HOST,65 'server.socket_port': helper.CPWebCase.PORT,66 },67 '/xmlrpc': {'xmlrpc_filter.on':True},68 })69 70 71 class ServerlessProxy(object):72 """An xmlrpc proxy for the test suite's 'serverless' tests."""73 def __init__(self, webcase, url):74 self._webcase = webcase75 self.url = url76 77 def __getattr__(self, attr, *args):78 def xmlrpc_method(*args):79 args = tuple(args)80 body = xmlrpclib.dumps(args, attr)81 cl = len(body)82 headers = [('Content-Type', 'text/xml'), ('Content-Length', str(cl))]83 self._webcase.getPage(self.url, headers=headers,84 method="POST", body=body)85 p, u = xmlrpclib.getparser()86 for line in self._webcase.body:87 p.feed(line)88 p.close()89 response = u.close()90 if len(response) == 1:91 response = response[0]92 return response93 return xmlrpc_method94 95 67 96 68 class XmlRpcFilterTest(helper.CPWebCase): … … 99 71 # load the appropriate xmlrpc proxy 100 72 url = 'http://localhost:%s/xmlrpc/' % (self.PORT) 101 if cherrypy.server.httpserver is None: 102 proxy = ServerlessProxy(self, url) 103 else: 104 proxy = xmlrpclib.ServerProxy(url) 73 proxy = xmlrpclib.ServerProxy(url) 105 74 106 75 # begin the tests ... … … 120 89 121 90 # Test an error in the page handler (should raise an xmlrpclib.Fault) 122 ignore = helper.webtest.ignored_exceptions123 ignore.append(TypeError)124 91 try: 125 try: 126 proxy.test_argument_passing({}) 127 except Exception, x: 128 self.assertEqual(x.__class__, xmlrpclib.Fault) 129 self.assertEqual(x.faultString, ("unsupported operand type(s) " 130 "for *: 'dict' and 'int'")) 131 else: 132 self.fail("Expected xmlrpclib.Fault") 133 finally: 134 ignore.pop() 92 proxy.test_argument_passing({}) 93 except Exception, x: 94 self.assertEqual(x.__class__, xmlrpclib.Fault) 95 self.assertEqual(x.faultString, ("unsupported operand type(s) " 96 "for *: 'dict' and 'int'")) 97 else: 98 self.fail("Expected xmlrpclib.Fault") 135 99 136 100 137 101 if __name__ == '__main__': 138 from cherrypy import _cpwsgi 139 server_class = _cpwsgi.WSGIServer 140 helper.testmain(server_class) 102 setup_server() 103 helper.testmain() 141 104 trunk/cherrypy/test/webtest.py
r874 r1017 142 142 HTTP_CONN=httplib.HTTPConnection 143 143 144 def getPage(self, url, headers=None, method="GET", body=None ):144 def getPage(self, url, headers=None, method="GET", body=None, protocol="HTTP/1.1"): 145 145 """Open the url with debugging support. Return status, headers, body.""" 146 146 ServerError.on = False … … 148 148 self.url = url 149 149 result = openURL(url, headers, method, body, self.HOST, self.PORT, 150 self.HTTP_CONN )150 self.HTTP_CONN, protocol) 151 151 self.status, self.headers, self.body = result 152 152 … … 249 249 msg = 'Status (%s) != %s' % (`self.status`, `status`) 250 250 self._handlewebError(msg) 251 elif isinstance(status, int): 252 code = int(self.status[:3]) 253 if code != status: 254 if msg is None: 255 msg = 'Status (%s) != %s' % (`self.status`, `status`) 256 self._handlewebError(msg) 251 257 else: 252 if not self.status in status: 258 # status is a tuple or list. 259 match = False 260 for s in status: 261 if isinstance(s, basestring): 262 if self.status == s: 263 match = True 264 break 265 elif int(self.status[:3]) == s: 266 match = True 267 break 268 if not match: 253 269 if msg is None: 254 270 msg = 'Status (%s) not in %s' % (`self.status`, `status`) … … 339 355 340 356 def openURL(url, headers=None, method="GET", body=None, 341 host="127.0.0.1", port=8000, http_conn=httplib.HTTPConnection): 357 host="127.0.0.1", port=8000, http_conn=httplib.HTTPConnection, 358 protocol="HTTP/1.1"): 342 359 """Open the given HTTP resource and return status, headers, and body.""" 343 360 … … 350 367 try: 351 368 conn = http_conn(host, port) 352 conn.putrequest(method.upper(), url) 369 conn._http_vsn_str = protocol 370 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) 371 conn.putrequest(method.upper(), url, 372 skip_host=True, skip_accept_encoding=True) 353 373 354 374 for key, value in headers:

