| | 8 | import mimetypes |
|---|
| | 9 | |
|---|
| | 10 | |
|---|
| | 11 | def encode_multipart_formdata(files): |
|---|
| | 12 | """Return (content_type, body) ready for httplib.HTTP instance. |
|---|
| | 13 | |
|---|
| | 14 | files: a sequence of (name, filename, value) tuples for multipart uploads. |
|---|
| | 15 | """ |
|---|
| | 16 | BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$' |
|---|
| | 17 | L = [] |
|---|
| | 18 | for key, filename, value in files: |
|---|
| | 19 | L.append('--' + BOUNDARY) |
|---|
| | 20 | L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % |
|---|
| | 21 | (key, filename)) |
|---|
| | 22 | ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream' |
|---|
| | 23 | L.append('Content-Type: %s' % ct) |
|---|
| | 24 | L.append('') |
|---|
| | 25 | L.append(value) |
|---|
| | 26 | L.append('--' + BOUNDARY + '--') |
|---|
| | 27 | L.append('') |
|---|
| | 28 | body = '\r\n'.join(L) |
|---|
| | 29 | content_type = 'multipart/form-data; boundary=%s' % BOUNDARY |
|---|
| | 30 | return content_type, body |
|---|
| | 39 | |
|---|
| | 40 | def post_multipart(self, file): |
|---|
| | 41 | """Return a summary ("a * 1000000\nb * 1000000") of the uploaded file.""" |
|---|
| | 42 | contents = file.file.read() |
|---|
| | 43 | summary = [] |
|---|
| | 44 | curchar = "" |
|---|
| | 45 | count = 0 |
|---|
| | 46 | for c in contents: |
|---|
| | 47 | if c == curchar: |
|---|
| | 48 | count += 1 |
|---|
| | 49 | else: |
|---|
| | 50 | if count: |
|---|
| | 51 | summary.append("%s * %d" % (curchar, count)) |
|---|
| | 52 | count = 1 |
|---|
| | 53 | curchar = c |
|---|
| | 54 | if count: |
|---|
| | 55 | summary.append("%s * %d" % (curchar, count)) |
|---|
| | 56 | return ", ".join(summary) |
|---|
| | 57 | post_multipart.exposed = True |
|---|
| | 75 | |
|---|
| | 76 | def test_post_multipart(self): |
|---|
| | 77 | alphabet = "abcdefghijklmnopqrstuvwxyz" |
|---|
| | 78 | # generate file contents for a large post |
|---|
| | 79 | contents = "".join([c * 1000000 for c in alphabet]) |
|---|
| | 80 | |
|---|
| | 81 | # encode as multipart form data |
|---|
| | 82 | files=[('file', 'file.txt', contents)] |
|---|
| | 83 | content_type, body = encode_multipart_formdata(files) |
|---|
| | 84 | |
|---|
| | 85 | # post file |
|---|
| | 86 | if self.scheme == 'https': |
|---|
| | 87 | c = httplib.HTTPS('127.0.0.1:%s' % self.PORT) |
|---|
| | 88 | else: |
|---|
| | 89 | c = httplib.HTTP('127.0.0.1:%s' % self.PORT) |
|---|
| | 90 | c.putrequest('POST', '/post_multipart') |
|---|
| | 91 | c.putheader('Content-Type', content_type) |
|---|
| | 92 | c.putheader('Content-Length', str(len(body))) |
|---|
| | 93 | c.endheaders() |
|---|
| | 94 | c.send(body) |
|---|
| | 95 | |
|---|
| | 96 | errcode, errmsg, headers = c.getreply() |
|---|
| | 97 | self.assertEqual(errcode, 200) |
|---|
| | 98 | |
|---|
| | 99 | response_body = c.file.read() |
|---|
| | 100 | self.assertEquals(", ".join(["%s * 1000000" % c for c in alphabet]), |
|---|
| | 101 | response_body) |
|---|