← Back to articles

Rebuilding my portfolio...

2022-12-29 · 9 min read

...with a custom HTTP server written from scratch in Python!

Obviously, I designed the new site first - written in pure HTML and CSS, none of that extra frontend framework nonsense. (Then again, this blog is written in Next.js, so I can't really complain about using a framework. Also, that reminds me - I have to update this blog sometime soon to Next.js 13! I haven't really taken a deeper look at it yet but it's definitely on my TODO.)

Here's how it ended up looking:

Portfolio sitePortfolio site

You'll notice that it's pretty similar to this site - it makes use of borders and CSS transforms in a way I find fun. Plus, a couple of extra features:

  • Posts, baby! Yes, the posts are being stored in a folder on the backend and then being served on the front end, using a custom templating engine (of sorts, haha).
  • Ability to read environment variables. Currently, this is being used to cURL the Spotify API (with an OAuth token) to get what I'm currently listening to.

Of course, the design isn't the coolest part, considering that I've probably iterated on it multiple times (here's a version one, and a prospective version). No, what's cool about it is that it's written from scratch in Python, using only a couple of the default, built-in libraries:

  • json, to read environment variables from .env
  • http, for a list of all the possible HTTP status codes
  • mimetype, to determine the MIME type of a file
  • os, to load environment variables directly from the system
  • urllib to make API requests to the Spotify API
  • socket, which actually powers the HTTP server by listening to client requests.
  • threading, to listen to multiple client requests at a time.

In fact, you know what, let me just show you the code:

from http import HTTPStatus
from os import environ, listdir, path
from urllib import request, parse
from urllib.request import Request, urlopen
import json, mimetypes, socket, threading

# Load environment variables
env = {}
    with open(".env") as file:
        for line in file.readlines():
            key = ""
            for char in line:
                if char == "=":
                key += char
            env[key] = line.lstrip(f"{key}=").strip()
    # .env doesn't exist, so just read from system environment variables
    env = dict(environ)

# Make sure all necessary environment variables exist
if not env.get("PORT"):
    raise Exception("Please include the PORT environment variable inside .env")
if not env.get("SPOTIFY_ENCODED_TOKEN"):
    raise Exception(
        "Please include the SPOTIFY_ENCODED_TOKEN environment variable inside .env"
if not env.get("SPOTIFY_REFRESH_TOKEN"):
    raise Exception(
        "Please include the SPOTIFY_REFRESH_TOKEN environment variable inside .env"

# Functions for templating data
def listening_to():
    """Get what I'm listening to on Spotify."""
    data = parse.urlencode(
        {"grant_type": "refresh_token", "refresh_token": env["SPOTIFY_REFRESH_TOKEN"]}

    req = Request("https://accounts.spotify.com/api/token", method="POST", data=data)
    req.add_header("Content-Type", "application/x-www-form-urlencoded")
    req.add_header("Authorization", f"Basic {env['SPOTIFY_ENCODED_TOKEN']}")
    res = json.loads(request.urlopen(req).read())

    # Once we get the Spotify refresh token, use it to get what I'm currently listening to
    req = Request(
        "https://api.spotify.com/v1/me/player/currently-playing", method="GET"
    req.add_header("Accept", "application/json")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {res.get('access_token')}")
    res = request.urlopen(req).read()

    song_str = "nothing"
    if len(res):
        data = json.loads(res)
        song_str = f"{data.get('item').get('name')} by {', '.join([artist.get('name') for artist in data.get('item').get('artists')])}"
    return {
        "status_code": 200,
        "content_type": "application/json",
        "body": json.dumps({"value": song_str}),

def get_posts():
    """Get the posts I've made, stored in /posts."""
    posts = []
    for file in listdir("./posts"):
        with open(f"./posts/{file}") as f:
            posts.append({"date": file.strip(".md"), "content": f.read()})
    return json.dumps(
        sorted(posts, key=lambda x: x["date"].split("-")[::-1], reverse=True)

data = {"index.html": {"posts": get_posts()}}

class TCPServer:
    host = ""
    port = int(env["PORT"])
    max_connections = 5  # Max connections in queue

    def start(cls):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((cls.host, cls.port))

        print("Listening on port", cls.port)

        while True:
            # Accept any new connection
            conn, addr = s.accept()
            threading.Thread(target=cls.handle, args=(conn, addr)).start()

    def handle(cls, conn, addr):
        data = conn.recv(1024)
        response = cls.handle_request(data)

    def handle_request(self, data):
        """Handle incoming data."""

class HTTPServer(TCPServer):
    private_files = ["..", "main.py"]  # Add some sense of security
    request_methods = ["GET"]
    api_routes = {"spotify": listening_to}
    status_codes = {}
    for enum in HTTPStatus:
        status_codes[enum.value] = str(enum).lstrip("HTTPStatus.").replace("_", " ")

    def handle_request(cls, data):
        request = HTTPRequest(data, cls.request_methods, cls.private_files)

        if request.valid:
            handler = getattr(cls, f"handle_{request.method}")
            response = handler(request)
            return response
            response_line = cls.response_line(status_code=200)
            response_headers = cls.response_headers({"Content-Type": "text/html"})
            response_body = b"Invalid request method"
            return b"".join([response_line, response_headers, b"\r\n", response_body])

    def response_line(cls, status_code):
        reason = cls.status_codes.get(status_code)
        line = f"HTTP/1.1 {status_code} {reason}\r\n"
        return line.encode()

    def response_headers(cls, headers):
        res = []
        for header in headers.keys():
            res.append(f"{header}: {headers[header]}\r\n".encode())
        return b"".join(res)

    def render_html(cls, filename):
        response_line = cls.response_line(status_code=200)
        response_headers = cls.response_headers({"Content-Type": "text/html"})
        with open(filename, "rb") as file:
            response_body = file.read()
            for key in data.get(filename, {}).keys():
                # Loop through each key, replacing with appropriate value in file
                # In a more complex app, this would probably be a function of its own
                response_body = response_body.replace(
                    f"${{{key}}}".encode(), data[filename][key].encode()
        res = b"".join([response_line, response_headers, b"\r\n", response_body])
        return res

    def handle_GET(cls, request):
        filename = request.uri.strip("/")
        if not len(filename):
            filename = "index.html"
        if path.exists(filename):
            content_type = mimetypes.guess_type(filename)[0] or "text/html"
            if content_type == "text/html":
                return cls.render_html(filename)

            response_line = cls.response_line(status_code=200)
            response_headers = cls.response_headers({"Content-Type": content_type})
            with open(filename, "rb") as file:
                response_body = file.read()
        elif filename in list(cls.api_routes.keys()):
            response = cls.api_routes[filename]()
            response_line = cls.response_line(status_code=response["status_code"])
            response_headers = cls.response_headers(
                {"Content-Type": response["content_type"]}
            response_body = response["body"].encode()
            response_line = cls.response_line(status_code=404)
            response_headers = cls.response_headers({"Content-Type": "text/plain"})
            response_body = b"404 Not Found"

        return b"".join([response_line, response_headers, b"\r\n", response_body])

class HTTPRequest:
    request_methods = ["GET", "POST", "PUT", "DELETE"]  # By default, CRUD

    def __init__(self, data, request_methods=request_methods, private_files=[]):
        # The first line of an HTTP request has four parts:
        # Request method
        # URI
        # HTTP version
        # Line break
        self.method = None
        self.uri = None
        self.http_version = "1.1"
        self.valid = True  # By default, valid request
        self.request_methods = request_methods
        self.private_files = private_files


    def private_route(self):
        uri = self.uri.strip("/")
        for private_file in self.private_files:
            if uri.startswith(private_file):
                return True
        return False

    def parse(self, data):
        lines = data.split(b"\r\n")
        request_line = lines[0]

        words = request_line.split(b" ")

        self.method = words[0].decode()
        if len(words) > 1:
            self.uri = words[1].decode()
        if len(words) > 2:
            self.http_version = words[2]

        if self.method not in self.request_methods or self.private_route():
            # Not valid request anymore
            self.valid = False

if __name__ == "__main__":
    server = HTTPServer()

Only 230 lines of code! I think it's pretty cool. There are a couple of sections. First up, I load my environment variables. There are only three of them:

  • PORT, just the port receiving requests (Railway and most providers already provide this)

Afterward, I have two functions, listening_to() and get_posts(), which are basically functions that get the data I'll be either plugging into the / route or using in the API routes.

Then, I have three classes. The first one happens to be TCPServer, a basic implementation of a barebones TCP server. It listens for requests coming through the predefined port, holding up to five connections in the queue. Technically, there probably won't be more than five connections at a time thanks to the use of threads, where each client connection waits sixty seconds before timing out. The handle class method calls handle_request, which has no code inside because we actually use HTTPServer, which inherits from TCPServer.

In HTTPServer, I just define a couple of static variables that are important later on:

  • private_files, which prevents users from exiting the folder the application is running in/downloading the main server code
  • request_methods, which is just a list of accepted request methods. I haven't technically turned this into a full-fledged HTTP server (though that would be cool - maybe I could add a Contact Me form or something that would directly email me), so only GET methods are supported.
  • api_routes, which basically contains other API routes (currently only /spotify) that might be useful.
  • status_codes, a dictionary of all the possible status codes based on the ones provided by the http module.

handle_request is the base method. For context, each HTTP response has to have the following components:

HTTP/1.1 200 OK          # <- The HTTP version, the status code, and the status description
Content-Type: text/html  # ... The headers
                         # ... A blank line
<!DOCTYPE html>          # ... The content itself

So HTTPServer divides that up into a couple of steps. First, we create an object based on the HTTPRequest class, which basically contains values useful for describing an HTTP request. The class (which comes after the HTTPServer class) gives us a couple of pieces of useful info:

  • The method
  • The URI
  • The HTTP version
  • Whether or not the request is valid (i.e., it's not valid if the method is not in the provided list of request methods, or if the route is private)

Then, we use getattr to get the name of the method to call (handle_<request method>), which is super useful.

Since we're only dealing with GET requests, this means that handle_GET is going to end up doing most of the work. It gets the name of the file being requested. If the route happens to be /, this file is going to be index.html. If the file exists, handle_GET will:

  1. Try to determine the MIME type of the file being requested.
  2. If the MIME type is text/html, then we can use our basic templating class method, render_html, to plug in the templating data
  3. Otherwise, just return the file, using the class methods response_line and response_headers to generate the appropriate description and headers for the response.

If the file doesn't exist or the route isn't an API route, we do the same thing, but instead of returning a 200 OK, we return 404 Not Found.

However, if it is an API route, we simply run the function linked to that API route inside api_routes. Since I only have /spotify, the only function that will technically run is going to be listening_to(), which returns a dictionary like this:

    "status_code": 200,
    "content_type": "application/json",
    "content": json.dumps({
        "value": "Style by Taylor Swift"

So we basically form the HTTP response based on this information. That's how API routes work, in essence!

And that's basically it! Running python3 main.py or a similar command will whip the basic, barebones HTTP server.

I have no idea why exactly I decided to write this, but I learned a bunch about TCP/IP and HTTP, so. To be honest, I'm not sure if the multithreading actually works (LOL), but oh well. One thing I've noticed is that my programming skills have really improved overall. I haven't used Python for a hot minute and was able to actually use it, with @classmethod and all that jazz.

Signing off,