Basic Python HTTP Server The 2019 Stack Overflow Developer Survey Results Are In ...

Presidential Pardon

One-dimensional Japanese puzzle

How to make Illustrator type tool selection automatically adapt with text length

Why can't devices on different VLANs, but on the same subnet, communicate?

Did the new image of black hole confirm the general theory of relativity?

Using dividends to reduce short term capital gains?

My body leaves; my core can stay

Can the DM override racial traits?

Drawing vertical/oblique lines in Metrical tree (tikz-qtree, tipa)

Can a flute soloist sit?

How to determine omitted units in a publication

Mortgage adviser recommends a longer term than necessary combined with overpayments

What is the padding with red substance inside of steak packaging?

How to support a colleague who finds meetings extremely tiring?

How to politely respond to generic emails requesting a PhD/job in my lab? Without wasting too much time

Make it rain characters

Why don't hard Brexiteers insist on a hard border to prevent illegal immigration after Brexit?

Nested ellipses in tikzpicture: Chomsky hierarchy

What do I do when my TA workload is more than expected?

What happens to a Warlock's expended Spell Slots when they gain a Level?

What other Star Trek series did the main TNG cast show up in?

Huge performance difference of the command find with and without using %M option to show permissions

Fixing different display colors within string

What information about me do stores get via my credit card?



Basic Python HTTP Server



The 2019 Stack Overflow Developer Survey Results Are In
Unicorn Meta Zoo #1: Why another podcast?
Announcing the arrival of Valued Associate #679: Cesar ManaraNode.JS HTTP server displaying GoogleConcise HTTP serverPython HTTP ServerNodeJS static file HTTP server“Two way” HTTP ClientBasic multithread server codeBasic web server in Python 3Very basic C++ HTTP ParserNode.js HTTP server with request loggingA simple HTTP server implementation in Java





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ margin-bottom:0;
}







0












$begingroup$


There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...



I've also avoided the ../../ vulnerability, although some of the code in the send_file function seems a bit weak.



It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.



import filetype
import socket
import _thread


class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)

self.handler_args = args
self.handler_kwargs = kwargs

if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler

def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()

def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]

return request, headers, content

def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'{address[0]} unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()

def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from {client_data[1][0]}')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()


class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs

def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 {code} {message}']

def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'{header}: {content}']

def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]

def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)

def get_type(self, file_name):
return filetype.guess('./public/'+file_name)

def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name

def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()

if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return

try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)

def get_request_address(self):
return self.address

def parse_headers(self, headers):
t = {}
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t

def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']

self.calculate_content_length()

message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'

try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass

def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]

if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default

func(headers, contents)
self.reply()

def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')

def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')

def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')

def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')

def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')

def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')

def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')

def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')

def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')

def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')


# =======================================================================

if __name__ == "__main__":
import sys

class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'{request_address} -> {headers["Host"]}/{file_name}')
self.send_file()

def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80

try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'{e}')

run()



Thank you for taking the time.









share









$endgroup$



















    0












    $begingroup$


    There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...



    I've also avoided the ../../ vulnerability, although some of the code in the send_file function seems a bit weak.



    It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.



    import filetype
    import socket
    import _thread


    class ServerSocket():
    ''' Recieves connections, parses the request and ships it off to a handler
    '''
    def __init__(self, address, handler=None, *args, **kwargs):
    ''' Creates a server socket and defines a handler for the server
    '''
    self.socket = socket.socket()
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.socket.bind(address)

    self.handler_args = args
    self.handler_kwargs = kwargs

    if handler:
    self.handler = handler # The custom handler
    else:
    self.handler = Handler # The default handler

    def initialise(self, open_connections=5):
    ''' Initilises the server socket and has it listen for connections
    '''
    self.socket.listen(open_connections)
    self.listen()

    def parse(self, data):
    ''' Splits a packet into
    the request,
    the headers (which includes the request),
    and contents
    '''
    stringed = str(data, 'utf-8')
    split = stringed.split('rnrn')
    headers = split[0]
    if len(split) > 1:
    content = split[1]
    else:
    content = []
    request = headers.split(' ')[0]

    return request, headers, content

    def handle(self, client, address):
    ''' Parses the data and handles the request. It then closes the connection
    '''
    try:
    data = client.recv(1024)
    except ConnectionResetError:
    if self.handler_kwargs["logging"] is True:
    print(f'{address[0]} unexpectedly quit')
    client.close()
    return
    parsed = self.parse(data)
    handler = self.handler(self.handler_args, self.handler_kwargs)
    handler.handle(client, parsed, address)
    client.close()

    def listen(self):
    ''' Listens until a keyboard interrupt and handles each connection in a
    new thread
    '''
    try:
    while True:
    client_data = self.socket.accept()
    if self.handler_kwargs['logging'] is True:
    print(f'Connection from {client_data[1][0]}')
    _thread.start_new_thread(self.handle, client_data)
    except KeyboardInterrupt:
    self.socket.close()


    class Handler():
    ''' Handles requests from the Server Socket
    '''
    def __init__(self, args, kwargs):
    self.args = args
    self.kwargs = kwargs

    def set_status(self, code, message):
    ''' Used to add a status line:
    - 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
    '''
    self.reply_headers = [f'HTTP/1.0 {code} {message}']

    def set_header(self, header, content):
    ''' Defines a custom header and adds it to the response
    '''
    self.reply_headers += [f'{header}: {content}']

    def response(self, content):
    ''' Adds to the content of the response
    '''
    if type(content) == str:
    self.reply_content += content.split('n')
    else:
    self.reply_content += [content]

    def calculate_content_length(self):
    ''' Calculates the content length and adds it to the header
    '''
    length = len(self.reply_content) * 2
    lengths = [len(line) for line in self.reply_content]
    length += sum(lengths)
    self.set_header('Content-Length', length)

    def get_type(self, file_name):
    return filetype.guess('./public/'+file_name)

    def extract_file_name(self, file_name=None):
    if file_name:
    f_name = file_name[1:]
    else:
    f_name = self.request_status.split(' ')[1][1:]
    return f_name

    def send_file(self, file_name=None):
    if file_name is None:
    file_name = self.extract_file_name()

    if file_name == '':
    file_name = 'index.html'
    elif file_name[0] in './':
    self.set_status(403, "Forbidden")
    self.set_header('Content-Type', 'text/html')
    self.reply_content = ['<p>Error 403: Forbidden</p>']
    return

    try:
    with open('./public/'+file_name, 'rb') as file:
    file_contents = file.read()
    except FileNotFoundError:
    self.set_status(404, 'Not Found')
    self.set_header('Content-Type', 'text/html')
    self.reply_content = ['<p>Error 404: File not found</p>']
    return
    file_type = self.get_type(file_name)
    if file_type is not None:
    self.set_header('Content-Type', file_type.MIME)
    elif file_name.split('.')[-1] == 'html':
    self.set_header('Content-Type', 'text/html')
    else:
    self.set_header('Content-Type', 'text/txt')
    self.response(file_contents)

    def get_request_address(self):
    return self.address

    def parse_headers(self, headers):
    t = {}
    for header in headers[1:]:
    t[header.split(': ')[0]] = header.split(': ')[1]
    return t

    def reply(self):
    ''' Assembles the response and sends it to the client
    '''
    if self.reply_headers[0][0:4] != "HTTP":
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.reply_content = ['<p>Response Status unspecified</p>']

    self.calculate_content_length()

    message = 'rn'.join(self.reply_headers)
    message += 'rnrn'
    try:
    message += 'rn'.join(self.reply_content)
    message += 'rn'
    except TypeError:
    message = bytes(message, 'utf-8')
    message += b'rn'.join(self.reply_content)
    message += b'rn'

    try:
    if type(message) == str:
    self.client.send(bytes(message, 'utf-8'))
    else:
    self.client.send(message)
    except:
    pass

    def handle(self, client, parsed_data, address):
    ''' Initialises variables and case-switches the request type to
    determine the handler function
    '''
    self.client = client
    self.address = address
    self.reply_headers = []
    self.reply_content = []
    self.headers = True
    self.request_status = parsed_data[1].split('rn')[0]
    request = parsed_data[0]
    headers = self.parse_headers(parsed_data[1].split('rn'))
    contents = parsed_data[2]

    if request == "GET":
    func = self.get
    elif request == "POST":
    func = self.post
    elif request == "HEAD":
    func = self.head
    elif request == "PUT":
    func = self.put
    elif request == "DELETE":
    func = self.delete
    elif request == "CONNECT":
    func = self.connect
    elif request == "OPTIONS":
    func = self.options
    elif request == "TRACE":
    func = self.trace
    elif request == "PATCH":
    func = self.patch
    else:
    func = self.default

    func(headers, contents)
    self.reply()

    def default(self, headers, contents):
    ''' If the request is not known, defaults to this
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Unknown Request Type</p>''')

    def get(self, headers, contents):
    ''' Overwrite to customly handle GET requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a GET Request</p>''')

    def post(self, headers, contents):
    ''' Overwrite to customly handle POST requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a POST Request</p>''')

    def head(self, headers, contents):
    ''' Overwrite to customly handle HEAD requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a HEAD Request</p>''')

    def put(self, headers, contents):
    ''' Overwrite to customly handle PUT requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a PUT Request</p>''')

    def delete(self, headers, contents):
    ''' Overwrite to customly handle DELETE requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a DELETE Request</p>''')

    def connect(self, headers, contents):
    ''' Overwrite to customly handle CONNECT requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a CONNECT Request</p>''')

    def options(self, headers, contents):
    ''' Overwrite to customly handle OPTIONS requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got an OPTIONS Request</p>''')

    def trace(self, headers, contents):
    ''' Overwrite to customly handle TRACE requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a TRACE Request</p>''')

    def patch(self, headers, contents):
    ''' Overwrite to customly handle PATCH requests
    '''
    self.set_status(200, 'OK')
    self.set_header('Content-Type', 'text/html')
    self.response('''<p>Successfully got a PATCH Request</p>''')


    # =======================================================================

    if __name__ == "__main__":
    import sys

    class CustomHandler(Handler):
    def get(self, headers, contents):
    self.set_status(200, 'OK')
    request_address = self.get_request_address()[0]
    file_name = self.extract_file_name()
    print(f'{request_address} -> {headers["Host"]}/{file_name}')
    self.send_file()

    def run():
    if len(sys.argv) == 2:
    port = int(sys.argv[1])
    else:
    port = 80

    try:
    print('Initialising...', end='')
    http_server = ServerSocket(
    ('0.0.0.0', port),
    CustomHandler,
    logging=True
    )
    print('Done')
    http_server.initialise()
    except Exception as e:
    print(f'{e}')

    run()



    Thank you for taking the time.









    share









    $endgroup$















      0












      0








      0





      $begingroup$


      There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...



      I've also avoided the ../../ vulnerability, although some of the code in the send_file function seems a bit weak.



      It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.



      import filetype
      import socket
      import _thread


      class ServerSocket():
      ''' Recieves connections, parses the request and ships it off to a handler
      '''
      def __init__(self, address, handler=None, *args, **kwargs):
      ''' Creates a server socket and defines a handler for the server
      '''
      self.socket = socket.socket()
      self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      self.socket.bind(address)

      self.handler_args = args
      self.handler_kwargs = kwargs

      if handler:
      self.handler = handler # The custom handler
      else:
      self.handler = Handler # The default handler

      def initialise(self, open_connections=5):
      ''' Initilises the server socket and has it listen for connections
      '''
      self.socket.listen(open_connections)
      self.listen()

      def parse(self, data):
      ''' Splits a packet into
      the request,
      the headers (which includes the request),
      and contents
      '''
      stringed = str(data, 'utf-8')
      split = stringed.split('rnrn')
      headers = split[0]
      if len(split) > 1:
      content = split[1]
      else:
      content = []
      request = headers.split(' ')[0]

      return request, headers, content

      def handle(self, client, address):
      ''' Parses the data and handles the request. It then closes the connection
      '''
      try:
      data = client.recv(1024)
      except ConnectionResetError:
      if self.handler_kwargs["logging"] is True:
      print(f'{address[0]} unexpectedly quit')
      client.close()
      return
      parsed = self.parse(data)
      handler = self.handler(self.handler_args, self.handler_kwargs)
      handler.handle(client, parsed, address)
      client.close()

      def listen(self):
      ''' Listens until a keyboard interrupt and handles each connection in a
      new thread
      '''
      try:
      while True:
      client_data = self.socket.accept()
      if self.handler_kwargs['logging'] is True:
      print(f'Connection from {client_data[1][0]}')
      _thread.start_new_thread(self.handle, client_data)
      except KeyboardInterrupt:
      self.socket.close()


      class Handler():
      ''' Handles requests from the Server Socket
      '''
      def __init__(self, args, kwargs):
      self.args = args
      self.kwargs = kwargs

      def set_status(self, code, message):
      ''' Used to add a status line:
      - 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
      '''
      self.reply_headers = [f'HTTP/1.0 {code} {message}']

      def set_header(self, header, content):
      ''' Defines a custom header and adds it to the response
      '''
      self.reply_headers += [f'{header}: {content}']

      def response(self, content):
      ''' Adds to the content of the response
      '''
      if type(content) == str:
      self.reply_content += content.split('n')
      else:
      self.reply_content += [content]

      def calculate_content_length(self):
      ''' Calculates the content length and adds it to the header
      '''
      length = len(self.reply_content) * 2
      lengths = [len(line) for line in self.reply_content]
      length += sum(lengths)
      self.set_header('Content-Length', length)

      def get_type(self, file_name):
      return filetype.guess('./public/'+file_name)

      def extract_file_name(self, file_name=None):
      if file_name:
      f_name = file_name[1:]
      else:
      f_name = self.request_status.split(' ')[1][1:]
      return f_name

      def send_file(self, file_name=None):
      if file_name is None:
      file_name = self.extract_file_name()

      if file_name == '':
      file_name = 'index.html'
      elif file_name[0] in './':
      self.set_status(403, "Forbidden")
      self.set_header('Content-Type', 'text/html')
      self.reply_content = ['<p>Error 403: Forbidden</p>']
      return

      try:
      with open('./public/'+file_name, 'rb') as file:
      file_contents = file.read()
      except FileNotFoundError:
      self.set_status(404, 'Not Found')
      self.set_header('Content-Type', 'text/html')
      self.reply_content = ['<p>Error 404: File not found</p>']
      return
      file_type = self.get_type(file_name)
      if file_type is not None:
      self.set_header('Content-Type', file_type.MIME)
      elif file_name.split('.')[-1] == 'html':
      self.set_header('Content-Type', 'text/html')
      else:
      self.set_header('Content-Type', 'text/txt')
      self.response(file_contents)

      def get_request_address(self):
      return self.address

      def parse_headers(self, headers):
      t = {}
      for header in headers[1:]:
      t[header.split(': ')[0]] = header.split(': ')[1]
      return t

      def reply(self):
      ''' Assembles the response and sends it to the client
      '''
      if self.reply_headers[0][0:4] != "HTTP":
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.reply_content = ['<p>Response Status unspecified</p>']

      self.calculate_content_length()

      message = 'rn'.join(self.reply_headers)
      message += 'rnrn'
      try:
      message += 'rn'.join(self.reply_content)
      message += 'rn'
      except TypeError:
      message = bytes(message, 'utf-8')
      message += b'rn'.join(self.reply_content)
      message += b'rn'

      try:
      if type(message) == str:
      self.client.send(bytes(message, 'utf-8'))
      else:
      self.client.send(message)
      except:
      pass

      def handle(self, client, parsed_data, address):
      ''' Initialises variables and case-switches the request type to
      determine the handler function
      '''
      self.client = client
      self.address = address
      self.reply_headers = []
      self.reply_content = []
      self.headers = True
      self.request_status = parsed_data[1].split('rn')[0]
      request = parsed_data[0]
      headers = self.parse_headers(parsed_data[1].split('rn'))
      contents = parsed_data[2]

      if request == "GET":
      func = self.get
      elif request == "POST":
      func = self.post
      elif request == "HEAD":
      func = self.head
      elif request == "PUT":
      func = self.put
      elif request == "DELETE":
      func = self.delete
      elif request == "CONNECT":
      func = self.connect
      elif request == "OPTIONS":
      func = self.options
      elif request == "TRACE":
      func = self.trace
      elif request == "PATCH":
      func = self.patch
      else:
      func = self.default

      func(headers, contents)
      self.reply()

      def default(self, headers, contents):
      ''' If the request is not known, defaults to this
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Unknown Request Type</p>''')

      def get(self, headers, contents):
      ''' Overwrite to customly handle GET requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a GET Request</p>''')

      def post(self, headers, contents):
      ''' Overwrite to customly handle POST requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a POST Request</p>''')

      def head(self, headers, contents):
      ''' Overwrite to customly handle HEAD requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a HEAD Request</p>''')

      def put(self, headers, contents):
      ''' Overwrite to customly handle PUT requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a PUT Request</p>''')

      def delete(self, headers, contents):
      ''' Overwrite to customly handle DELETE requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a DELETE Request</p>''')

      def connect(self, headers, contents):
      ''' Overwrite to customly handle CONNECT requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a CONNECT Request</p>''')

      def options(self, headers, contents):
      ''' Overwrite to customly handle OPTIONS requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got an OPTIONS Request</p>''')

      def trace(self, headers, contents):
      ''' Overwrite to customly handle TRACE requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a TRACE Request</p>''')

      def patch(self, headers, contents):
      ''' Overwrite to customly handle PATCH requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a PATCH Request</p>''')


      # =======================================================================

      if __name__ == "__main__":
      import sys

      class CustomHandler(Handler):
      def get(self, headers, contents):
      self.set_status(200, 'OK')
      request_address = self.get_request_address()[0]
      file_name = self.extract_file_name()
      print(f'{request_address} -> {headers["Host"]}/{file_name}')
      self.send_file()

      def run():
      if len(sys.argv) == 2:
      port = int(sys.argv[1])
      else:
      port = 80

      try:
      print('Initialising...', end='')
      http_server = ServerSocket(
      ('0.0.0.0', port),
      CustomHandler,
      logging=True
      )
      print('Done')
      http_server.initialise()
      except Exception as e:
      print(f'{e}')

      run()



      Thank you for taking the time.









      share









      $endgroup$




      There's not much to say, it's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...



      I've also avoided the ../../ vulnerability, although some of the code in the send_file function seems a bit weak.



      It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.



      import filetype
      import socket
      import _thread


      class ServerSocket():
      ''' Recieves connections, parses the request and ships it off to a handler
      '''
      def __init__(self, address, handler=None, *args, **kwargs):
      ''' Creates a server socket and defines a handler for the server
      '''
      self.socket = socket.socket()
      self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      self.socket.bind(address)

      self.handler_args = args
      self.handler_kwargs = kwargs

      if handler:
      self.handler = handler # The custom handler
      else:
      self.handler = Handler # The default handler

      def initialise(self, open_connections=5):
      ''' Initilises the server socket and has it listen for connections
      '''
      self.socket.listen(open_connections)
      self.listen()

      def parse(self, data):
      ''' Splits a packet into
      the request,
      the headers (which includes the request),
      and contents
      '''
      stringed = str(data, 'utf-8')
      split = stringed.split('rnrn')
      headers = split[0]
      if len(split) > 1:
      content = split[1]
      else:
      content = []
      request = headers.split(' ')[0]

      return request, headers, content

      def handle(self, client, address):
      ''' Parses the data and handles the request. It then closes the connection
      '''
      try:
      data = client.recv(1024)
      except ConnectionResetError:
      if self.handler_kwargs["logging"] is True:
      print(f'{address[0]} unexpectedly quit')
      client.close()
      return
      parsed = self.parse(data)
      handler = self.handler(self.handler_args, self.handler_kwargs)
      handler.handle(client, parsed, address)
      client.close()

      def listen(self):
      ''' Listens until a keyboard interrupt and handles each connection in a
      new thread
      '''
      try:
      while True:
      client_data = self.socket.accept()
      if self.handler_kwargs['logging'] is True:
      print(f'Connection from {client_data[1][0]}')
      _thread.start_new_thread(self.handle, client_data)
      except KeyboardInterrupt:
      self.socket.close()


      class Handler():
      ''' Handles requests from the Server Socket
      '''
      def __init__(self, args, kwargs):
      self.args = args
      self.kwargs = kwargs

      def set_status(self, code, message):
      ''' Used to add a status line:
      - 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
      '''
      self.reply_headers = [f'HTTP/1.0 {code} {message}']

      def set_header(self, header, content):
      ''' Defines a custom header and adds it to the response
      '''
      self.reply_headers += [f'{header}: {content}']

      def response(self, content):
      ''' Adds to the content of the response
      '''
      if type(content) == str:
      self.reply_content += content.split('n')
      else:
      self.reply_content += [content]

      def calculate_content_length(self):
      ''' Calculates the content length and adds it to the header
      '''
      length = len(self.reply_content) * 2
      lengths = [len(line) for line in self.reply_content]
      length += sum(lengths)
      self.set_header('Content-Length', length)

      def get_type(self, file_name):
      return filetype.guess('./public/'+file_name)

      def extract_file_name(self, file_name=None):
      if file_name:
      f_name = file_name[1:]
      else:
      f_name = self.request_status.split(' ')[1][1:]
      return f_name

      def send_file(self, file_name=None):
      if file_name is None:
      file_name = self.extract_file_name()

      if file_name == '':
      file_name = 'index.html'
      elif file_name[0] in './':
      self.set_status(403, "Forbidden")
      self.set_header('Content-Type', 'text/html')
      self.reply_content = ['<p>Error 403: Forbidden</p>']
      return

      try:
      with open('./public/'+file_name, 'rb') as file:
      file_contents = file.read()
      except FileNotFoundError:
      self.set_status(404, 'Not Found')
      self.set_header('Content-Type', 'text/html')
      self.reply_content = ['<p>Error 404: File not found</p>']
      return
      file_type = self.get_type(file_name)
      if file_type is not None:
      self.set_header('Content-Type', file_type.MIME)
      elif file_name.split('.')[-1] == 'html':
      self.set_header('Content-Type', 'text/html')
      else:
      self.set_header('Content-Type', 'text/txt')
      self.response(file_contents)

      def get_request_address(self):
      return self.address

      def parse_headers(self, headers):
      t = {}
      for header in headers[1:]:
      t[header.split(': ')[0]] = header.split(': ')[1]
      return t

      def reply(self):
      ''' Assembles the response and sends it to the client
      '''
      if self.reply_headers[0][0:4] != "HTTP":
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.reply_content = ['<p>Response Status unspecified</p>']

      self.calculate_content_length()

      message = 'rn'.join(self.reply_headers)
      message += 'rnrn'
      try:
      message += 'rn'.join(self.reply_content)
      message += 'rn'
      except TypeError:
      message = bytes(message, 'utf-8')
      message += b'rn'.join(self.reply_content)
      message += b'rn'

      try:
      if type(message) == str:
      self.client.send(bytes(message, 'utf-8'))
      else:
      self.client.send(message)
      except:
      pass

      def handle(self, client, parsed_data, address):
      ''' Initialises variables and case-switches the request type to
      determine the handler function
      '''
      self.client = client
      self.address = address
      self.reply_headers = []
      self.reply_content = []
      self.headers = True
      self.request_status = parsed_data[1].split('rn')[0]
      request = parsed_data[0]
      headers = self.parse_headers(parsed_data[1].split('rn'))
      contents = parsed_data[2]

      if request == "GET":
      func = self.get
      elif request == "POST":
      func = self.post
      elif request == "HEAD":
      func = self.head
      elif request == "PUT":
      func = self.put
      elif request == "DELETE":
      func = self.delete
      elif request == "CONNECT":
      func = self.connect
      elif request == "OPTIONS":
      func = self.options
      elif request == "TRACE":
      func = self.trace
      elif request == "PATCH":
      func = self.patch
      else:
      func = self.default

      func(headers, contents)
      self.reply()

      def default(self, headers, contents):
      ''' If the request is not known, defaults to this
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Unknown Request Type</p>''')

      def get(self, headers, contents):
      ''' Overwrite to customly handle GET requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a GET Request</p>''')

      def post(self, headers, contents):
      ''' Overwrite to customly handle POST requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a POST Request</p>''')

      def head(self, headers, contents):
      ''' Overwrite to customly handle HEAD requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a HEAD Request</p>''')

      def put(self, headers, contents):
      ''' Overwrite to customly handle PUT requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a PUT Request</p>''')

      def delete(self, headers, contents):
      ''' Overwrite to customly handle DELETE requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a DELETE Request</p>''')

      def connect(self, headers, contents):
      ''' Overwrite to customly handle CONNECT requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a CONNECT Request</p>''')

      def options(self, headers, contents):
      ''' Overwrite to customly handle OPTIONS requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got an OPTIONS Request</p>''')

      def trace(self, headers, contents):
      ''' Overwrite to customly handle TRACE requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a TRACE Request</p>''')

      def patch(self, headers, contents):
      ''' Overwrite to customly handle PATCH requests
      '''
      self.set_status(200, 'OK')
      self.set_header('Content-Type', 'text/html')
      self.response('''<p>Successfully got a PATCH Request</p>''')


      # =======================================================================

      if __name__ == "__main__":
      import sys

      class CustomHandler(Handler):
      def get(self, headers, contents):
      self.set_status(200, 'OK')
      request_address = self.get_request_address()[0]
      file_name = self.extract_file_name()
      print(f'{request_address} -> {headers["Host"]}/{file_name}')
      self.send_file()

      def run():
      if len(sys.argv) == 2:
      port = int(sys.argv[1])
      else:
      port = 80

      try:
      print('Initialising...', end='')
      http_server = ServerSocket(
      ('0.0.0.0', port),
      CustomHandler,
      logging=True
      )
      print('Done')
      http_server.initialise()
      except Exception as e:
      print(f'{e}')

      run()



      Thank you for taking the time.







      python-3.x http server





      share












      share










      share



      share










      asked 4 mins ago









      LForchiniLForchini

      485




      485






















          0






          active

          oldest

          votes












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "196"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: false,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217358%2fbasic-python-http-server%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Code Review Stack Exchange!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          Use MathJax to format equations. MathJax reference.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217358%2fbasic-python-http-server%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          is 'sed' thread safeWhat should someone know about using Python scripts in the shell?Nexenta bash script uses...

          How do i solve the “ No module named 'mlxtend' ” issue on Jupyter?

          Pilgersdorf Inhaltsverzeichnis Geografie | Geschichte | Bevölkerungsentwicklung | Politik | Kultur...