Basic Python HTTP Server The 2019 Stack Overflow Developer Survey Results Are In ...
Presidential Pardon
One-dimensional Japanese puzzle
How to make Illustrator type tool selection automatically adapt with text length
Why can't devices on different VLANs, but on the same subnet, communicate?
Did the new image of black hole confirm the general theory of relativity?
Using dividends to reduce short term capital gains?
My body leaves; my core can stay
Can the DM override racial traits?
Drawing vertical/oblique lines in Metrical tree (tikz-qtree, tipa)
Can a flute soloist sit?
How to determine omitted units in a publication
Mortgage adviser recommends a longer term than necessary combined with overpayments
What is the padding with red substance inside of steak packaging?
How to support a colleague who finds meetings extremely tiring?
How to politely respond to generic emails requesting a PhD/job in my lab? Without wasting too much time
Make it rain characters
Why don't hard Brexiteers insist on a hard border to prevent illegal immigration after Brexit?
Nested ellipses in tikzpicture: Chomsky hierarchy
What do I do when my TA workload is more than expected?
What happens to a Warlock's expended Spell Slots when they gain a Level?
What other Star Trek series did the main TNG cast show up in?
Huge performance difference of the command find with and without using %M option to show permissions
Fixing different display colors within string
What information about me do stores get via my credit card?
Basic Python HTTP Server
The 2019 Stack Overflow Developer Survey Results Are In
Unicorn Meta Zoo #1: Why another podcast?
Announcing the arrival of Valued Associate #679: Cesar ManaraNode.JS HTTP server displaying GoogleConcise HTTP serverPython HTTP ServerNodeJS static file HTTP server“Two way” HTTP ClientBasic multithread server codeBasic web server in Python 3Very basic C++ HTTP ParserNode.js HTTP server with request loggingA simple HTTP server implementation in Java
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ margin-bottom:0;
}
$begingroup$
There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'{address[0]} unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from {client_data[1][0]}')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 {code} {message}']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'{header}: {content}']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t = {}
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'{request_address} -> {headers["Host"]}/{file_name}')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'{e}')
run()
Thank you for taking the time.
python-3.x http server
$endgroup$
add a comment |
$begingroup$
There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'{address[0]} unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from {client_data[1][0]}')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 {code} {message}']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'{header}: {content}']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t = {}
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'{request_address} -> {headers["Host"]}/{file_name}')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'{e}')
run()
Thank you for taking the time.
python-3.x http server
$endgroup$
add a comment |
$begingroup$
There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'{address[0]} unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from {client_data[1][0]}')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 {code} {message}']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'{header}: {content}']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t = {}
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'{request_address} -> {headers["Host"]}/{file_name}')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'{e}')
run()
Thank you for taking the time.
python-3.x http server
$endgroup$
There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'{address[0]} unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from {client_data[1][0]}')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 {code} {message}']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'{header}: {content}']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t = {}
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'{request_address} -> {headers["Host"]}/{file_name}')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'{e}')
run()
Thank you for taking the time.
python-3.x http server
python-3.x http server
asked 4 mins ago
LForchiniLForchini
485
485
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217358%2fbasic-python-http-server%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217358%2fbasic-python-http-server%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown