Files
browser/http_client/connection.py

246 lines
9.2 KiB
Python

import socket, logging, ssl, threading, os, ujson, time
from http_client.html_parser import HTML, CSSParser, Element, tree_to_list, get_inline_styles
def resolve_url(scheme, host, port, path, url):
if "://" in url: return url
if not url.startswith("/"):
dir, _ = path.rsplit("/", 1)
while url.startswith("../"):
_, url = url.split("/", 1)
if "/" in dir:
dir, _ = dir.rsplit("/", 1)
url = f"{dir}/{url}"
if url.startswith("//"):
return f"{scheme}:{url}"
else:
return f"{scheme}://{host}:{port}{url}"
class HTTPClient():
def __init__(self):
self.scheme = "http"
self.host = ""
self.path = ""
self.port = 0
self.request_headers = {}
self.response_explanation = None
self.response_headers = {}
self.response_http_version = None
self.response_status = None
self.nodes = []
self.css_rules = []
self.content_response = ""
self.view_source = False
self.redirect_count = 0
self.needs_render = False
def file_request(self, url):
with open(url.split("file://", 1)[1], "r") as file:
self.content_response = file.read()
def get_request(self, url, request_headers, css=False):
if url.startswith("view-source:"):
url = url.split("view-source:")[1]
self.view_source = True
else:
self.view_source = False
self.scheme, url_parts = url.split("://", 1)
if "/" not in url_parts:
self.host = url_parts
self.path = "/"
else:
self.host, self.path = url_parts.split("/", 1)
self.path = f"/{self.path}"
if ":" in self.host:
self.host, port = self.host.split(":", 1)
self.port = int(port)
else:
self.port = 80 if self.scheme == "http" else 443
self.request_headers = request_headers
self.response_explanation = None
self.response_headers = {}
self.response_http_version = None
self.response_status = None
self.content_response = ""
if "Host" not in self.request_headers:
self.request_headers["Host"] = self.host
cache_filename = f"{self.scheme}_{self.host}_{self.port}_{self.path.replace('/', '_')}.json"
if os.path.exists(f"html_cache/{cache_filename}"):
threading.Thread(target=self.parse, daemon=True).start()
return
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
if self.scheme == "https":
ctx = ssl.create_default_context()
try:
self.socket = ctx.wrap_socket(self.socket, server_hostname=self.host)
except ssl.SSLCertVerificationError:
logging.debug(f"Invalid SSL cert for {self.host}:{self.port}{self.path}")
return
request_header_lines = '\r\n'.join([f"{header_name}: {header_value}" for header_name, header_value in self.request_headers.items()])
request = f"GET {self.path} HTTP/1.0\r\n{request_header_lines}\r\n\r\n"
logging.debug(f"Sending Request:\n{request}")
self.socket.send(request.encode())
threading.Thread(target=self.receive_response, daemon=True, args=(css,)).start()
def receive_response(self, css=False):
buffer = b""
headers_parsed = False
content_length = None
while True:
try:
data = self.socket.recv(2048)
if not data:
logging.debug("Connection closed by peer.")
break
buffer += data
if not headers_parsed:
header_end_index = buffer.find(b"\r\n\r\n")
if header_end_index != -1: # not found
header_data = buffer[:header_end_index].decode('latin-1')
body_data = buffer[header_end_index + 4:] # +4 for the \r\n\r\n
self._parse_headers(header_data)
headers_parsed = True
content_length_header = self.response_headers.get("Content-Length")
if content_length_header:
try:
content_length = int(content_length_header)
except ValueError:
logging.debug(f"Invalid Content-Length header: {content_length_header}")
self.content_response = body_data.decode('utf-8', errors='ignore') # Assuming body is UTF-8
if content_length is not None and len(body_data) >= content_length:
break
elif content_length is None:
pass
else:
continue
else:
self.content_response += data.decode('utf-8', errors='ignore')
if content_length is not None and len(self.content_response.encode('utf-8')) >= content_length:
break
except Exception as e:
logging.error(f"Error receiving messages: {e}")
break
self.socket.close()
self.socket = None
if 300 <= int(self.response_status) < 400:
if self.redirect_count >= 4:
return
location_header = self.response_headers["Location"]
if "http" in location_header or "https" in location_header:
self.get_request(location_header, self.request_headers)
else:
self.get_request(f"{self.scheme}://{self.host}{location_header}", self.request_headers)
else:
self.redirect_count = 0
if not css:
self.parse()
def _parse_headers(self, header_data):
lines = header_data.splitlines()
if not lines:
logging.debug("Received empty header data.")
return
response_status_line = lines[0]
try:
self.response_http_version, self.response_status, *explanation_parts = response_status_line.split(" ", 2)
self.response_explanation = " ".join(explanation_parts)
except ValueError:
logging.error(f"Error parsing status line: {response_status_line}")
return
headers = {}
for i in range(1, len(lines)):
line = lines[i]
if not line:
break
try:
header_name, value = line.split(":", 1)
headers[header_name.strip()] = value.strip()
except ValueError:
logging.error(f"Error parsing header line: {line}")
self.response_headers = headers
def parse(self):
self.css_rules = []
html_cache_filename = f"{self.scheme}_{self.host}_{self.port}_{self.path.replace('/', '_')}.json"
original_scheme = self.scheme
original_host = self.host
original_port = self.port
original_path = self.path
original_response = self.content_response
if html_cache_filename in os.listdir("html_cache"):
with open(f"html_cache/{html_cache_filename}", "r") as file:
self.nodes = HTML.from_json(ujson.load(file))
else:
self.nodes = HTML(self.content_response).parse()
with open(f"html_cache/{html_cache_filename}", "w") as file:
json_list = HTML.to_json(self.nodes)
file.write(ujson.dumps(json_list))
css_links = [
node.attributes["href"]
for node in tree_to_list(self.nodes, [])
if isinstance(node, Element)
and node.tag == "link"
and node.attributes.get("rel") == "stylesheet"
and "href" in node.attributes
]
for css_link in css_links:
self.content_response = ""
css_cache_filename = f"{self.scheme}_{self.host}_{self.port}_{self.path.replace('/', '_')}_{css_link.replace('/', '_')}.json" # we need to include the other variables so for example /styles.css wouldnt be cached for all websites
if css_cache_filename in os.listdir("css_cache"):
with open(f"css_cache/{css_cache_filename}", "r") as file:
rules = CSSParser.from_json(ujson.load(file))
else:
self.get_request(resolve_url(self.scheme, self.host, self.port, self.path, css_link), self.request_headers, css=True)
while not self.content_response:
time.sleep(0.025)
rules = CSSParser(self.content_response).parse()
with open(f"css_cache/{css_cache_filename}", "w") as file:
ujson.dump(CSSParser.to_json(rules), file)
self.css_rules.extend(rules)
self.css_rules.extend(get_inline_styles(self.nodes))
self.scheme = original_scheme
self.host = original_host
self.port = original_port
self.path = original_path
self.content_response = original_response
self.needs_render = True