You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
18 KiB
Python
401 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
werkzeug.testsuite.formparser
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Tests the form parsing facilities.
|
|
|
|
:copyright: (c) 2013 by Armin Ronacher.
|
|
:license: BSD, see LICENSE for more details.
|
|
"""
|
|
|
|
from __future__ import with_statement
|
|
|
|
import unittest
|
|
from os.path import join, dirname
|
|
|
|
from werkzeug.testsuite import WerkzeugTestCase
|
|
|
|
from werkzeug import formparser
|
|
from werkzeug.test import create_environ, Client
|
|
from werkzeug.wrappers import Request, Response
|
|
from werkzeug.exceptions import RequestEntityTooLarge
|
|
from werkzeug.datastructures import MultiDict
|
|
from werkzeug.formparser import parse_form_data
|
|
from werkzeug._compat import BytesIO
|
|
|
|
|
|
@Request.application
|
|
def form_data_consumer(request):
|
|
result_object = request.args['object']
|
|
if result_object == 'text':
|
|
return Response(repr(request.form['text']))
|
|
f = request.files[result_object]
|
|
return Response(b'\n'.join((
|
|
repr(f.filename).encode('ascii'),
|
|
repr(f.name).encode('ascii'),
|
|
repr(f.content_type).encode('ascii'),
|
|
f.stream.read()
|
|
)))
|
|
|
|
|
|
def get_contents(filename):
|
|
with open(filename, 'rb') as f:
|
|
return f.read()
|
|
|
|
|
|
class FormParserTestCase(WerkzeugTestCase):
|
|
|
|
def test_limiting(self):
|
|
data = b'foo=Hello+World&bar=baz'
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='application/x-www-form-urlencoded',
|
|
method='POST')
|
|
req.max_content_length = 400
|
|
self.assert_strict_equal(req.form['foo'], u'Hello World')
|
|
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='application/x-www-form-urlencoded',
|
|
method='POST')
|
|
req.max_form_memory_size = 7
|
|
self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo'])
|
|
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='application/x-www-form-urlencoded',
|
|
method='POST')
|
|
req.max_form_memory_size = 400
|
|
self.assert_strict_equal(req.form['foo'], u'Hello World')
|
|
|
|
data = (b'--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\n'
|
|
b'Hello World\r\n'
|
|
b'--foo\r\nContent-Disposition: form-field; name=bar\r\n\r\n'
|
|
b'bar=baz\r\n--foo--')
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
req.max_content_length = 4
|
|
self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo'])
|
|
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
req.max_content_length = 400
|
|
self.assert_strict_equal(req.form['foo'], u'Hello World')
|
|
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
req.max_form_memory_size = 7
|
|
self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo'])
|
|
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
req.max_form_memory_size = 400
|
|
self.assert_strict_equal(req.form['foo'], u'Hello World')
|
|
|
|
def test_parse_form_data_put_without_content(self):
|
|
# A PUT without a Content-Type header returns empty data
|
|
|
|
# Both rfc1945 and rfc2616 (1.0 and 1.1) say "Any HTTP/[1.0/1.1] message
|
|
# containing an entity-body SHOULD include a Content-Type header field
|
|
# defining the media type of that body." In the case where either
|
|
# headers are omitted, parse_form_data should still work.
|
|
env = create_environ('/foo', 'http://example.org/', method='PUT')
|
|
del env['CONTENT_TYPE']
|
|
del env['CONTENT_LENGTH']
|
|
|
|
stream, form, files = formparser.parse_form_data(env)
|
|
self.assert_strict_equal(stream.read(), b'')
|
|
self.assert_strict_equal(len(form), 0)
|
|
self.assert_strict_equal(len(files), 0)
|
|
|
|
def test_parse_form_data_get_without_content(self):
|
|
env = create_environ('/foo', 'http://example.org/', method='GET')
|
|
del env['CONTENT_TYPE']
|
|
del env['CONTENT_LENGTH']
|
|
|
|
stream, form, files = formparser.parse_form_data(env)
|
|
self.assert_strict_equal(stream.read(), b'')
|
|
self.assert_strict_equal(len(form), 0)
|
|
self.assert_strict_equal(len(files), 0)
|
|
|
|
def test_large_file(self):
|
|
data = b'x' * (1024 * 600)
|
|
req = Request.from_values(data={'foo': (BytesIO(data), 'test.txt')},
|
|
method='POST')
|
|
# make sure we have a real file here, because we expect to be
|
|
# on the disk. > 1024 * 500
|
|
self.assert_true(hasattr(req.files['foo'].stream, u'fileno'))
|
|
# close file to prevent fds from leaking
|
|
req.files['foo'].close()
|
|
|
|
def test_streaming_parse(self):
|
|
data = b'x' * (1024 * 600)
|
|
class StreamMPP(formparser.MultiPartParser):
|
|
def parse(self, file, boundary, content_length):
|
|
i = iter(self.parse_lines(file, boundary, content_length))
|
|
one = next(i)
|
|
two = next(i)
|
|
return self.cls(()), {'one': one, 'two': two}
|
|
class StreamFDP(formparser.FormDataParser):
|
|
def _sf_parse_multipart(self, stream, mimetype,
|
|
content_length, options):
|
|
form, files = StreamMPP(
|
|
self.stream_factory, self.charset, self.errors,
|
|
max_form_memory_size=self.max_form_memory_size,
|
|
cls=self.cls).parse(stream, options.get('boundary').encode('ascii'),
|
|
content_length)
|
|
return stream, form, files
|
|
parse_functions = {}
|
|
parse_functions.update(formparser.FormDataParser.parse_functions)
|
|
parse_functions['multipart/form-data'] = _sf_parse_multipart
|
|
class StreamReq(Request):
|
|
form_data_parser_class = StreamFDP
|
|
req = StreamReq.from_values(data={'foo': (BytesIO(data), 'test.txt')},
|
|
method='POST')
|
|
self.assert_strict_equal('begin_file', req.files['one'][0])
|
|
self.assert_strict_equal(('foo', 'test.txt'), req.files['one'][1][1:])
|
|
self.assert_strict_equal('cont', req.files['two'][0])
|
|
self.assert_strict_equal(data, req.files['two'][1])
|
|
|
|
|
|
class MultiPartTestCase(WerkzeugTestCase):
|
|
|
|
def test_basic(self):
|
|
resources = join(dirname(__file__), 'multipart')
|
|
client = Client(form_data_consumer, Response)
|
|
|
|
repository = [
|
|
('firefox3-2png1txt', '---------------------------186454651713519341951581030105', [
|
|
(u'anchor.png', 'file1', 'image/png', 'file1.png'),
|
|
(u'application_edit.png', 'file2', 'image/png', 'file2.png')
|
|
], u'example text'),
|
|
('firefox3-2pnglongtext', '---------------------------14904044739787191031754711748', [
|
|
(u'accept.png', 'file1', 'image/png', 'file1.png'),
|
|
(u'add.png', 'file2', 'image/png', 'file2.png')
|
|
], u'--long text\r\n--with boundary\r\n--lookalikes--'),
|
|
('opera8-2png1txt', '----------zEO9jQKmLc2Cq88c23Dx19', [
|
|
(u'arrow_branch.png', 'file1', 'image/png', 'file1.png'),
|
|
(u'award_star_bronze_1.png', 'file2', 'image/png', 'file2.png')
|
|
], u'blafasel öäü'),
|
|
('webkit3-2png1txt', '----WebKitFormBoundaryjdSFhcARk8fyGNy6', [
|
|
(u'gtk-apply.png', 'file1', 'image/png', 'file1.png'),
|
|
(u'gtk-no.png', 'file2', 'image/png', 'file2.png')
|
|
], u'this is another text with ümläüts'),
|
|
('ie6-2png1txt', '---------------------------7d91b03a20128', [
|
|
(u'file1.png', 'file1', 'image/x-png', 'file1.png'),
|
|
(u'file2.png', 'file2', 'image/x-png', 'file2.png')
|
|
], u'ie6 sucks :-/')
|
|
]
|
|
|
|
for name, boundary, files, text in repository:
|
|
folder = join(resources, name)
|
|
data = get_contents(join(folder, 'request.txt'))
|
|
for filename, field, content_type, fsname in files:
|
|
response = client.post('/?object=' + field, data=data, content_type=
|
|
'multipart/form-data; boundary="%s"' % boundary,
|
|
content_length=len(data))
|
|
lines = response.get_data().split(b'\n', 3)
|
|
self.assert_strict_equal(lines[0], repr(filename).encode('ascii'))
|
|
self.assert_strict_equal(lines[1], repr(field).encode('ascii'))
|
|
self.assert_strict_equal(lines[2], repr(content_type).encode('ascii'))
|
|
self.assert_strict_equal(lines[3], get_contents(join(folder, fsname)))
|
|
response = client.post('/?object=text', data=data, content_type=
|
|
'multipart/form-data; boundary="%s"' % boundary,
|
|
content_length=len(data))
|
|
self.assert_strict_equal(response.get_data(), repr(text).encode('utf-8'))
|
|
|
|
def test_ie7_unc_path(self):
|
|
client = Client(form_data_consumer, Response)
|
|
data_file = join(dirname(__file__), 'multipart', 'ie7_full_path_request.txt')
|
|
data = get_contents(data_file)
|
|
boundary = '---------------------------7da36d1b4a0164'
|
|
response = client.post('/?object=cb_file_upload_multiple', data=data, content_type=
|
|
'multipart/form-data; boundary="%s"' % boundary, content_length=len(data))
|
|
lines = response.get_data().split(b'\n', 3)
|
|
self.assert_strict_equal(lines[0],
|
|
repr(u'Sellersburg Town Council Meeting 02-22-2010doc.doc').encode('ascii'))
|
|
|
|
def test_end_of_file(self):
|
|
# This test looks innocent but it was actually timeing out in
|
|
# the Werkzeug 0.5 release version (#394)
|
|
data = (
|
|
b'--foo\r\n'
|
|
b'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
|
|
b'Content-Type: text/plain\r\n\r\n'
|
|
b'file contents and no end'
|
|
)
|
|
data = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
self.assert_true(not data.files)
|
|
self.assert_true(not data.form)
|
|
|
|
def test_broken(self):
|
|
data = (
|
|
'--foo\r\n'
|
|
'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
|
|
'Content-Transfer-Encoding: base64\r\n'
|
|
'Content-Type: text/plain\r\n\r\n'
|
|
'broken base 64'
|
|
'--foo--'
|
|
)
|
|
_, form, files = formparser.parse_form_data(create_environ(data=data,
|
|
method='POST', content_type='multipart/form-data; boundary=foo'))
|
|
self.assert_true(not files)
|
|
self.assert_true(not form)
|
|
|
|
self.assert_raises(ValueError, formparser.parse_form_data,
|
|
create_environ(data=data, method='POST',
|
|
content_type='multipart/form-data; boundary=foo'),
|
|
silent=False)
|
|
|
|
def test_file_no_content_type(self):
|
|
data = (
|
|
b'--foo\r\n'
|
|
b'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n\r\n'
|
|
b'file contents\r\n--foo--'
|
|
)
|
|
data = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
self.assert_equal(data.files['test'].filename, 'test.txt')
|
|
self.assert_strict_equal(data.files['test'].read(), b'file contents')
|
|
|
|
def test_extra_newline(self):
|
|
# this test looks innocent but it was actually timeing out in
|
|
# the Werkzeug 0.5 release version (#394)
|
|
data = (
|
|
b'\r\n\r\n--foo\r\n'
|
|
b'Content-Disposition: form-data; name="foo"\r\n\r\n'
|
|
b'a string\r\n'
|
|
b'--foo--'
|
|
)
|
|
data = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
self.assert_true(not data.files)
|
|
self.assert_strict_equal(data.form['foo'], u'a string')
|
|
|
|
def test_headers(self):
|
|
data = (b'--foo\r\n'
|
|
b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
|
|
b'X-Custom-Header: blah\r\n'
|
|
b'Content-Type: text/plain; charset=utf-8\r\n\r\n'
|
|
b'file contents, just the contents\r\n'
|
|
b'--foo--')
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
foo = req.files['foo']
|
|
self.assert_strict_equal(foo.mimetype, 'text/plain')
|
|
self.assert_strict_equal(foo.mimetype_params, {'charset': 'utf-8'})
|
|
self.assert_strict_equal(foo.headers['content-type'], foo.content_type)
|
|
self.assert_strict_equal(foo.content_type, 'text/plain; charset=utf-8')
|
|
self.assert_strict_equal(foo.headers['x-custom-header'], 'blah')
|
|
|
|
def test_nonstandard_line_endings(self):
|
|
for nl in b'\n', b'\r', b'\r\n':
|
|
data = nl.join((
|
|
b'--foo',
|
|
b'Content-Disposition: form-data; name=foo',
|
|
b'',
|
|
b'this is just bar',
|
|
b'--foo',
|
|
b'Content-Disposition: form-data; name=bar',
|
|
b'',
|
|
b'blafasel',
|
|
b'--foo--'
|
|
))
|
|
req = Request.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; '
|
|
'boundary=foo', method='POST')
|
|
self.assert_strict_equal(req.form['foo'], u'this is just bar')
|
|
self.assert_strict_equal(req.form['bar'], u'blafasel')
|
|
|
|
def test_failures(self):
|
|
def parse_multipart(stream, boundary, content_length):
|
|
parser = formparser.MultiPartParser(content_length)
|
|
return parser.parse(stream, boundary, content_length)
|
|
self.assert_raises(ValueError, parse_multipart, BytesIO(), b'broken ', 0)
|
|
|
|
data = b'--foo\r\n\r\nHello World\r\n--foo--'
|
|
self.assert_raises(ValueError, parse_multipart, BytesIO(data), b'foo', len(data))
|
|
|
|
data = b'--foo\r\nContent-Disposition: form-field; name=foo\r\n' \
|
|
b'Content-Transfer-Encoding: base64\r\n\r\nHello World\r\n--foo--'
|
|
self.assert_raises(ValueError, parse_multipart, BytesIO(data), b'foo', len(data))
|
|
|
|
data = b'--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\nHello World\r\n'
|
|
self.assert_raises(ValueError, parse_multipart, BytesIO(data), b'foo', len(data))
|
|
|
|
x = formparser.parse_multipart_headers(['foo: bar\r\n', ' x test\r\n'])
|
|
self.assert_strict_equal(x['foo'], 'bar\n x test')
|
|
self.assert_raises(ValueError, formparser.parse_multipart_headers,
|
|
['foo: bar\r\n', ' x test'])
|
|
|
|
def test_bad_newline_bad_newline_assumption(self):
|
|
class ISORequest(Request):
|
|
charset = 'latin1'
|
|
contents = b'U2vlbmUgbORu'
|
|
data = b'--foo\r\nContent-Disposition: form-data; name="test"\r\n' \
|
|
b'Content-Transfer-Encoding: base64\r\n\r\n' + \
|
|
contents + b'\r\n--foo--'
|
|
req = ISORequest.from_values(input_stream=BytesIO(data),
|
|
content_length=len(data),
|
|
content_type='multipart/form-data; boundary=foo',
|
|
method='POST')
|
|
self.assert_strict_equal(req.form['test'], u'Sk\xe5ne l\xe4n')
|
|
|
|
def test_empty_multipart(self):
|
|
environ = {}
|
|
data = b'--boundary--'
|
|
environ['REQUEST_METHOD'] = 'POST'
|
|
environ['CONTENT_TYPE'] = 'multipart/form-data; boundary=boundary'
|
|
environ['CONTENT_LENGTH'] = str(len(data))
|
|
environ['wsgi.input'] = BytesIO(data)
|
|
stream, form, files = parse_form_data(environ, silent=False)
|
|
rv = stream.read()
|
|
self.assert_equal(rv, b'')
|
|
self.assert_equal(form, MultiDict())
|
|
self.assert_equal(files, MultiDict())
|
|
|
|
|
|
class InternalFunctionsTestCase(WerkzeugTestCase):
|
|
|
|
def test_line_parser(self):
|
|
assert formparser._line_parse('foo') == ('foo', False)
|
|
assert formparser._line_parse('foo\r\n') == ('foo', True)
|
|
assert formparser._line_parse('foo\r') == ('foo', True)
|
|
assert formparser._line_parse('foo\n') == ('foo', True)
|
|
|
|
def test_find_terminator(self):
|
|
lineiter = iter(b'\n\n\nfoo\nbar\nbaz'.splitlines(True))
|
|
find_terminator = formparser.MultiPartParser()._find_terminator
|
|
line = find_terminator(lineiter)
|
|
self.assert_equal(line, b'foo')
|
|
self.assert_equal(list(lineiter), [b'bar\n', b'baz'])
|
|
self.assert_equal(find_terminator([]), b'')
|
|
self.assert_equal(find_terminator([b'']), b'')
|
|
|
|
|
|
def suite():
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(FormParserTestCase))
|
|
suite.addTest(unittest.makeSuite(MultiPartTestCase))
|
|
suite.addTest(unittest.makeSuite(InternalFunctionsTestCase))
|
|
return suite
|