mirror of
https://github.com/csd4ni3l/browser.git
synced 2025-11-05 04:57:57 +01:00
231 lines
8.6 KiB
Python
231 lines
8.6 KiB
Python
import socket, logging, ssl, threading, os, ujson, time
|
|
|
|
from http_client.html_parser import HTML, CSSParser, Element, tree_to_list, get_inline_styles
|
|
|
|
class HTTPClient():
|
|
def __init__(self):
|
|
self.scheme = "http"
|
|
self.host = ""
|
|
self.path = ""
|
|
self.port = 0
|
|
self.request_headers = {}
|
|
self.response_explanation = None
|
|
self.response_headers = {}
|
|
self.response_http_version = None
|
|
self.response_status = None
|
|
self.nodes = []
|
|
self.css_rules = []
|
|
self.content_response = ""
|
|
self.view_source = False
|
|
self.redirect_count = 0
|
|
self.needs_render = False
|
|
|
|
def file_request(self, url):
|
|
with open(url.split("file://", 1)[1], "r") as file:
|
|
self.content_response = file.read()
|
|
|
|
def get_request(self, url, request_headers, css=False):
|
|
if url.startswith("view-source:"):
|
|
url = url.split("view-source:")[1]
|
|
self.view_source = True
|
|
else:
|
|
self.view_source = False
|
|
|
|
self.scheme, url_parts = url.split("://", 1)
|
|
|
|
if "/" not in url_parts:
|
|
self.host = url_parts
|
|
self.path = "/"
|
|
else:
|
|
self.host, self.path = url_parts.split("/", 1)
|
|
self.path = f"/{self.path}"
|
|
|
|
if ":" in self.host:
|
|
self.host, port = self.host.split(":", 1)
|
|
self.port = int(port)
|
|
else:
|
|
self.port = 80 if self.scheme == "http" else 443
|
|
|
|
self.request_headers = request_headers
|
|
self.response_explanation = None
|
|
self.response_headers = {}
|
|
self.response_http_version = None
|
|
self.response_status = None
|
|
self.content_response = ""
|
|
|
|
if "Host" not in self.request_headers:
|
|
self.request_headers["Host"] = self.host
|
|
|
|
cache_filename = f"{self.scheme}_{self.host}_{self.port}_{self.path.replace('/', '_')}.json"
|
|
if os.path.exists(f"http_cache/{cache_filename}"):
|
|
threading.Thread(target=self.parse, daemon=True).start()
|
|
return
|
|
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.connect((self.host, self.port))
|
|
|
|
if self.scheme == "https":
|
|
ctx = ssl.create_default_context()
|
|
try:
|
|
self.socket = ctx.wrap_socket(self.socket, server_hostname=self.host)
|
|
except ssl.SSLCertVerificationError:
|
|
logging.debug(f"Invalid SSL cert for {self.host}:{self.port}{self.path}")
|
|
return
|
|
|
|
request_header_lines = '\r\n'.join([f"{header_name}: {header_value}" for header_name, header_value in self.request_headers.items()])
|
|
request = f"GET {self.path} HTTP/1.0\r\n{request_header_lines}\r\n\r\n"
|
|
|
|
logging.debug(f"Sending Request:\n{request}")
|
|
|
|
self.socket.send(request.encode())
|
|
|
|
threading.Thread(target=self.receive_response, daemon=True, args=(css,)).start()
|
|
|
|
def receive_response(self, css=False):
|
|
buffer = b""
|
|
headers_parsed = False
|
|
content_length = None
|
|
|
|
while True:
|
|
try:
|
|
data = self.socket.recv(2048)
|
|
if not data:
|
|
logging.debug("Connection closed by peer.")
|
|
break
|
|
buffer += data
|
|
|
|
if not headers_parsed:
|
|
header_end_index = buffer.find(b"\r\n\r\n")
|
|
if header_end_index != -1: # not found
|
|
header_data = buffer[:header_end_index].decode('latin-1')
|
|
body_data = buffer[header_end_index + 4:] # +4 for the \r\n\r\n
|
|
|
|
self._parse_headers(header_data)
|
|
headers_parsed = True
|
|
|
|
content_length_header = self.response_headers.get("Content-Length")
|
|
if content_length_header:
|
|
try:
|
|
content_length = int(content_length_header)
|
|
except ValueError:
|
|
logging.debug(f"Invalid Content-Length header: {content_length_header}")
|
|
|
|
self.content_response = body_data.decode('utf-8', errors='ignore') # Assuming body is UTF-8
|
|
|
|
if content_length is not None and len(body_data) >= content_length:
|
|
break
|
|
elif content_length is None:
|
|
pass
|
|
else:
|
|
continue
|
|
else:
|
|
self.content_response += data.decode('utf-8', errors='ignore')
|
|
if content_length is not None and len(self.content_response.encode('utf-8')) >= content_length:
|
|
break
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error receiving messages: {e}")
|
|
break
|
|
|
|
self.socket.close()
|
|
self.socket = None
|
|
|
|
if 300 <= int(self.response_status) < 400:
|
|
if self.redirect_count >= 4:
|
|
return
|
|
|
|
location_header = self.response_headers["Location"]
|
|
if "http" in location_header or "https" in location_header:
|
|
self.get_request(location_header, self.request_headers)
|
|
else:
|
|
self.get_request(f"{self.scheme}://{self.host}{location_header}", self.request_headers)
|
|
else:
|
|
self.redirect_count = 0
|
|
|
|
if not css:
|
|
self.parse()
|
|
|
|
def _parse_headers(self, header_data):
|
|
lines = header_data.splitlines()
|
|
|
|
if not lines:
|
|
logging.debug("Received empty header data.")
|
|
return
|
|
|
|
response_status_line = lines[0]
|
|
try:
|
|
self.response_http_version, self.response_status, *explanation_parts = response_status_line.split(" ", 2)
|
|
self.response_explanation = " ".join(explanation_parts)
|
|
except ValueError:
|
|
logging.error(f"Error parsing status line: {response_status_line}")
|
|
return
|
|
|
|
headers = {}
|
|
for i in range(1, len(lines)):
|
|
line = lines[i]
|
|
if not line:
|
|
break
|
|
try:
|
|
header_name, value = line.split(":", 1)
|
|
headers[header_name.strip()] = value.strip()
|
|
except ValueError:
|
|
logging.error(f"Error parsing header line: {line}")
|
|
self.response_headers = headers
|
|
|
|
def parse(self):
|
|
self.css_rules = []
|
|
|
|
cache_filename = f"{self.scheme}_{self.host}_{self.port}_{self.path.replace('/', '_')}.json"
|
|
|
|
original_scheme = self.scheme
|
|
original_host = self.host
|
|
original_port = self.port
|
|
original_path = self.path
|
|
original_response = self.content_response
|
|
|
|
if cache_filename in os.listdir("http_cache"):
|
|
with open(f"http_cache/{cache_filename}", "r") as file:
|
|
self.nodes = HTML.from_json(ujson.load(file))
|
|
else:
|
|
self.nodes = HTML(self.content_response).parse()
|
|
with open(f"http_cache/{cache_filename}", "w") as file:
|
|
json_list = HTML.to_json(self.nodes)
|
|
file.write(ujson.dumps(json_list))
|
|
|
|
css_links = [
|
|
node.attributes["href"]
|
|
for node in tree_to_list(self.nodes, [])
|
|
if isinstance(node, Element)
|
|
and node.tag == "link"
|
|
and node.attributes.get("rel") == "stylesheet"
|
|
and "href" in node.attributes
|
|
]
|
|
|
|
for css_link in css_links:
|
|
self.content_response = ""
|
|
|
|
if "://" in css_link:
|
|
self.get_request(css_link, self.request_headers, True)
|
|
|
|
if not css_link.startswith("/"):
|
|
dir, _ = self.path.rsplit("/", 1)
|
|
css_link = dir + "/" + css_link
|
|
|
|
if css_link.startswith("//"):
|
|
self.get_request(self.scheme + ":" + css_link, self.request_headers, True)
|
|
else:
|
|
self.get_request(self.scheme + "://" + self.host + ":" + str(self.port) + css_link, self.request_headers, True)
|
|
|
|
while not self.content_response:
|
|
time.sleep(0.025)
|
|
|
|
self.css_rules.extend(CSSParser(self.content_response).parse())
|
|
|
|
self.css_rules.extend(get_inline_styles(self.nodes))
|
|
|
|
self.scheme = original_scheme
|
|
self.host = original_host
|
|
self.port = original_port
|
|
self.path = original_path
|
|
self.content_response = original_response
|
|
self.needs_render = True |