I'm trying to implement an upload feature to the basic http.server Python module.
So far, I've created a new class named SimpleHTTPRequestHandlerWithUpload
which inherits from SimpleHTTPRequestHandler
and added an upload section to list_directory()
. The next step would be creating a do_POST()
method, which handles the request and saves the file inside the current working directory. However, I have no idea how to do this. I looked at UniIsland's code on GitHub but I can't understand what he did and the code is very old. I also read this question and tried to implement it in my code.
It kind of works, but the file is "littered" with headers. This does not pose a big problem on txt files, but it corrupts all of the other file extensions.
I'd like to know how to remove the headers, save the uploaded file inside the current working directory with its original name and check if the upload was successful or not.
This is my code:
__version__ = '0.1'
import http.server
import html
import io
import os
import socket # For gethostbyaddr()
import sys
import urllib.parse
import contextlib
from http import HTTPStatus
class SimpleHTTPRequestHandlerWithUpload(http.server.SimpleHTTPRequestHandler):
server_version = 'SimpleHTTPWithUpload/' + __version__
def do_POST(self):
"""Serve a POST request."""
data = self.rfile.read(int(self.headers['content-length']))
with open('file.txt', 'wb') as file:
file.write(data)
r = []
enc = sys.getfilesystemencoding()
r.append('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
r.append("<html>\n<title>Upload Result Page</title>\n")
r.append("<body>\n<h2>Upload Result Page</h2>\n")
r.append("</body>\n</html>")
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
if f:
self.copyfile(f, self.wfile)
f.close()
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
"""
try:
list = os.listdir(path)
except OSError:
self.send_error(
HTTPStatus.NOT_FOUND,
'No permission to list directory')
return None
list.sort(key=lambda a: a.lower())
r = []
try:
displaypath = urllib.parse.unquote(self.path,
errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
title = 'Directory listing for %s' % displaypath
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
'"http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>\n<head>')
r.append('<meta http-equiv="Content-Type" '
'content="text/html; charset=%s">' % enc)
r.append('<title>%s</title>\n</head>' % title)
r.append('<body>\n<h1>%s</h1>' % title)
r.append('<hr>\n<ul>')
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + '/'
linkname = name + '/'
if os.path.islink(fullname):
displayname = name + '@'
# Note: a link to a directory displays with @ and links with /
r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
html.escape(displayname, quote=False)))
r.append('</ul>\n<hr>\n')
r.append('<form id="upload" enctype="multipart/form-data" method="post" action="#">\n'
'<input id="fileupload" name="file" type="file" />\n'
'<input type="submit" value="Submit" id="submit" />\n'
'</form>')
r.append('\n<hr>\n</body>\n</html>\n')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html; charset=%s' % enc)
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
return f
if __name__ == '__main__':
class DualStackServer(http.server.ThreadingHTTPServer):
def server_bind(self):
# suppress exception when protocol is IPv4
with contextlib.suppress(Exception):
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
http.server.test(
HandlerClass=SimpleHTTPRequestHandlerWithUpload,
ServerClass=DualStackServer
)
If you want to test it, just run the script on your machine, open a web browser on a different machine and type in the address bar <IP_ADDRESS_1>:8000
where IP_ADDRESS_1
is the IP of the machine you're running the code on.
Please, tell me if there's something wrong with it other than the do_POST()
method. I'm a new Python programmer and I'm trying to improve my software design skills in general. Thank you!
EDIT: I figured out how to remove the headers and save the file with its original name. However, the script hangs on data = self.rfile.readlines()
until I close the browser tab and then works well. I don't know what to do. It seems I have to send some sort of EOF to notify readlines()
that I'm finished sending the file but I have no clue how to do it. I also can't figure out how to check if the file has been uploaded successfully or not. Any help is appreciated!
Updated do_POST()
method:
def do_POST(self):
"""Serve a POST request."""
data = self.rfile.readlines()
filename = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', str(data[1]))
if len(filename) == 1:
filename = ''.join(filename)
else:
return
data = data[4:-2]
data = b''.join(data)
with open(filename, 'wb') as file:
file.write(data)
r = []
enc = sys.getfilesystemencoding()
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
'"http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>\n<title>Upload result page</title>\n')
r.append('<body>\n<h2>Upload result page</h2>\n')
r.append('</body>\n</html>')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html')
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
if f:
self.copyfile(f, self.wfile)
f.close()
I managed to solve all of my problems. I posted my code on GitHub, for anyone interested.