mirror of
https://github.com/csd4ni3l/browser.git
synced 2026-01-01 04:03:43 +01:00
Initial version, i forgot to commit for a long time.
This commit is contained in:
166
http_client/connection.py
Normal file
166
http_client/connection.py
Normal file
@@ -0,0 +1,166 @@
|
||||
import socket, logging, ssl, threading, os
|
||||
|
||||
class HTTPClient():
|
||||
def __init__(self):
|
||||
self.scheme = "http"
|
||||
self.host = ""
|
||||
self.path = ""
|
||||
self.port = 0
|
||||
self.request_headers = {}
|
||||
self.response_explanation = None
|
||||
self.response_headers = {}
|
||||
self.response_http_version = None
|
||||
self.response_status = None
|
||||
self.content_response = ""
|
||||
self.view_source = False
|
||||
self.redirect_count = 0
|
||||
self.needs_render = False
|
||||
|
||||
def file_request(self, url):
|
||||
with open(url.split("file://", 1)[1], "r") as file:
|
||||
self.content_response = file.read()
|
||||
|
||||
def get_request(self, url, request_headers):
|
||||
if url.startswith("view-source:"):
|
||||
url = url.split("view-source:")[1]
|
||||
self.view_source = True
|
||||
else:
|
||||
self.view_source = False
|
||||
|
||||
self.scheme, url_parts = url.split("://", 1)
|
||||
|
||||
if "/" not in url_parts:
|
||||
self.host = url_parts
|
||||
self.path = "/"
|
||||
else:
|
||||
self.host, self.path = url_parts.split("/", 1)
|
||||
self.path = f"/{self.path}"
|
||||
|
||||
if ":" in self.host:
|
||||
self.host, port = self.host.split(":", 1)
|
||||
self.port = int(port)
|
||||
else:
|
||||
self.port = 80 if self.scheme == "http" else 443
|
||||
|
||||
self.request_headers = request_headers
|
||||
self.response_explanation = None
|
||||
self.response_headers = {}
|
||||
self.response_http_version = None
|
||||
self.response_status = None
|
||||
self.content_response = ""
|
||||
|
||||
if "Host" not in self.request_headers:
|
||||
self.request_headers["Host"] = self.host
|
||||
|
||||
cache_filename = f"{self.scheme}_{self.host}_{self.port}_{self.path.replace('/', '_')}.json"
|
||||
if os.path.exists(f"http_cache/{cache_filename}"):
|
||||
self.needs_render = True
|
||||
return
|
||||
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.socket.connect((self.host, self.port))
|
||||
|
||||
if self.scheme == "https":
|
||||
ctx = ssl.create_default_context()
|
||||
try:
|
||||
self.socket = ctx.wrap_socket(self.socket, server_hostname=self.host)
|
||||
except ssl.SSLCertVerificationError:
|
||||
logging.debug(f"Invalid SSL cert for {self.host}:{self.port}{self.path}")
|
||||
return
|
||||
|
||||
request_header_lines = '\r\n'.join([f"{header_name}: {header_value}" for header_name, header_value in self.request_headers.items()])
|
||||
request = f"GET {self.path} HTTP/1.0\r\n{request_header_lines}\r\n\r\n"
|
||||
|
||||
logging.debug(f"Sending Request:\n{request}")
|
||||
|
||||
self.socket.send(request.encode())
|
||||
|
||||
threading.Thread(target=self.receive_response, daemon=True).start()
|
||||
|
||||
def receive_response(self):
|
||||
buffer = b""
|
||||
headers_parsed = False
|
||||
content_length = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
data = self.socket.recv(2048)
|
||||
if not data:
|
||||
logging.debug("Connection closed by peer.")
|
||||
break
|
||||
buffer += data
|
||||
|
||||
if not headers_parsed:
|
||||
header_end_index = buffer.find(b"\r\n\r\n")
|
||||
if header_end_index != -1: # not found
|
||||
header_data = buffer[:header_end_index].decode('latin-1')
|
||||
body_data = buffer[header_end_index + 4:] # +4 for the \r\n\r\n
|
||||
|
||||
self._parse_headers(header_data)
|
||||
headers_parsed = True
|
||||
|
||||
content_length_header = self.response_headers.get("Content-Length")
|
||||
if content_length_header:
|
||||
try:
|
||||
content_length = int(content_length_header)
|
||||
except ValueError:
|
||||
logging.debug(f"Invalid Content-Length header: {content_length_header}")
|
||||
|
||||
self.content_response = body_data.decode('utf-8', errors='ignore') # Assuming body is UTF-8
|
||||
|
||||
if content_length is not None and len(body_data) >= content_length:
|
||||
break
|
||||
elif content_length is None:
|
||||
pass
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
self.content_response += data.decode('utf-8', errors='ignore')
|
||||
if content_length is not None and len(self.content_response.encode('utf-8')) >= content_length:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error receiving messages: {e}")
|
||||
break
|
||||
|
||||
if 300 <= int(self.response_status) < 400:
|
||||
if self.redirect_count >= 4:
|
||||
return
|
||||
|
||||
location_header = self.response_headers["Location"]
|
||||
if "http" in location_header or "https" in location_header:
|
||||
self.get_request(location_header, self.request_headers)
|
||||
else:
|
||||
self.get_request(f"{self.scheme}://{self.host}{location_header}", self.request_headers)
|
||||
else:
|
||||
self.redirect_count = 0
|
||||
self.socket.close()
|
||||
|
||||
self.needs_render = True
|
||||
|
||||
def _parse_headers(self, header_data):
|
||||
lines = header_data.splitlines()
|
||||
|
||||
if not lines:
|
||||
logging.debug("Received empty header data.")
|
||||
return
|
||||
|
||||
response_status_line = lines[0]
|
||||
try:
|
||||
self.response_http_version, self.response_status, *explanation_parts = response_status_line.split(" ", 2)
|
||||
self.response_explanation = " ".join(explanation_parts)
|
||||
except ValueError:
|
||||
logging.error(f"Error parsing status line: {response_status_line}")
|
||||
return
|
||||
|
||||
headers = {}
|
||||
for i in range(1, len(lines)):
|
||||
line = lines[i]
|
||||
if not line:
|
||||
break
|
||||
try:
|
||||
header_name, value = line.split(":", 1)
|
||||
headers[header_name.strip()] = value.strip()
|
||||
except ValueError:
|
||||
logging.error(f"Error parsing header line: {line}")
|
||||
self.response_headers = headers
|
||||
153
http_client/html_parser.py
Normal file
153
http_client/html_parser.py
Normal file
@@ -0,0 +1,153 @@
|
||||
SELF_CLOSING_TAGS = [
|
||||
"area", "base", "br", "col", "embed", "hr", "img", "input",
|
||||
"link", "meta", "param", "source", "track", "wbr",
|
||||
]
|
||||
|
||||
HEAD_TAGS = [
|
||||
"base", "basefont", "bgsound", "noscript",
|
||||
"link", "meta", "title", "style", "script",
|
||||
]
|
||||
|
||||
class Element:
|
||||
def __init__(self, tag, attributes, parent):
|
||||
self.tag = tag
|
||||
self.attributes = attributes
|
||||
self.children = []
|
||||
self.parent = parent
|
||||
|
||||
def __repr__(self):
|
||||
attrs = [" " + k + "=\"" + v + "\"" for k, v in self.attributes.items()]
|
||||
attr_str = ""
|
||||
for attr in attrs:
|
||||
attr_str += attr
|
||||
return "<" + self.tag + attr_str + ">"
|
||||
|
||||
class Text:
|
||||
def __init__(self, text, parent):
|
||||
self.text = text
|
||||
self.children = []
|
||||
self.parent = parent
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.text)
|
||||
|
||||
class HTML():
|
||||
def __init__(self, raw_html):
|
||||
self.raw_html = raw_html
|
||||
self.unfinished = []
|
||||
|
||||
self.parse()
|
||||
|
||||
def parse(self):
|
||||
text = ""
|
||||
in_tag = False
|
||||
for c in self.raw_html:
|
||||
if c == "<":
|
||||
in_tag = True
|
||||
if text: self.add_text(text) # start of new tag means before everything was content/text
|
||||
text = ""
|
||||
elif c == ">":
|
||||
in_tag = False
|
||||
self.add_tag(text) # end of a tag means everything in-between were tags
|
||||
text = ""
|
||||
else:
|
||||
text += c
|
||||
|
||||
if not in_tag and text:
|
||||
self.add_text(text)
|
||||
|
||||
return self.finish()
|
||||
|
||||
def add_text(self, text):
|
||||
if text.isspace(): return
|
||||
self.implicit_tags(None)
|
||||
parent = self.unfinished[-1]
|
||||
node = Text(text, parent)
|
||||
parent.children.append(node)
|
||||
|
||||
def get_attributes(self, text):
|
||||
parts = text.split()
|
||||
tag = parts[0].casefold()
|
||||
attributes = {}
|
||||
|
||||
for attrpair in parts[1:]:
|
||||
if "=" in attrpair:
|
||||
key, value = attrpair.split("=", 1)
|
||||
if len(value) > 2 and value[0] in ["'", "\""]:
|
||||
value = value[1:-1]
|
||||
attributes[key.casefold()] = value
|
||||
else:
|
||||
attributes[attrpair.casefold()] = ""
|
||||
|
||||
return tag, attributes
|
||||
|
||||
|
||||
def add_tag(self, tag):
|
||||
tag, attributes = self.get_attributes(tag)
|
||||
|
||||
if tag.startswith("!"): return
|
||||
|
||||
self.implicit_tags(tag)
|
||||
|
||||
if tag.startswith("/"):
|
||||
if len(self.unfinished) == 1: return
|
||||
node = self.unfinished.pop()
|
||||
parent = self.unfinished[-1]
|
||||
parent.children.append(node)
|
||||
elif tag in SELF_CLOSING_TAGS:
|
||||
parent = self.unfinished[-1]
|
||||
node = Element(tag, attributes, parent)
|
||||
parent.children.append(node)
|
||||
else:
|
||||
parent = self.unfinished[-1] if self.unfinished else None
|
||||
node = Element(tag, attributes, parent)
|
||||
self.unfinished.append(node)
|
||||
|
||||
def implicit_tags(self, tag):
|
||||
while True:
|
||||
open_tags = [node.tag for node in self.unfinished]
|
||||
if open_tags == [] and tag != "html":
|
||||
self.add_tag("html")
|
||||
elif open_tags == ["html"] and tag not in ["head", "body", "/html"]:
|
||||
if tag in HEAD_TAGS:
|
||||
self.add_tag("head")
|
||||
else:
|
||||
self.add_tag("body")
|
||||
elif open_tags == ["html", "head"] and tag not in ["/head"] + HEAD_TAGS:
|
||||
self.add_tag("/head")
|
||||
else:
|
||||
break
|
||||
|
||||
def finish(self):
|
||||
if not self.unfinished:
|
||||
self.implicit_tags(None)
|
||||
|
||||
while len(self.unfinished) > 1:
|
||||
node = self.unfinished.pop()
|
||||
parent = self.unfinished[-1]
|
||||
parent.children.append(node)
|
||||
return self.unfinished.pop()
|
||||
|
||||
@staticmethod
|
||||
def print_tree(node, indent=0):
|
||||
print(" " * indent, node)
|
||||
for child in node.children:
|
||||
HTML.print_tree(child, indent + 2)
|
||||
|
||||
@staticmethod
|
||||
def to_json(tree: Element | Text):
|
||||
if isinstance(tree, Text):
|
||||
return ["text", tree.text, [HTML.to_json(child) for child in tree.children]]
|
||||
elif isinstance(tree, Element):
|
||||
return ["element", tree.tag, tree.attributes, [HTML.to_json(child) for child in tree.children]]
|
||||
|
||||
@staticmethod
|
||||
def from_json(json_list, parent=None):
|
||||
if json_list[0] == "text":
|
||||
text = Text(json_list[1], parent)
|
||||
text.children = [HTML.from_json(child, text) for child in json_list[2]]
|
||||
return text
|
||||
elif json_list[0] == "element":
|
||||
element = Element(json_list[1], json_list[2], parent)
|
||||
element.children = [HTML.from_json(child, element) for child in json_list[3]]
|
||||
return element
|
||||
300
http_client/renderer.py
Normal file
300
http_client/renderer.py
Normal file
@@ -0,0 +1,300 @@
|
||||
import arcade, arcade.gui, pyglet, os, ujson
|
||||
|
||||
from utils.constants import token_pattern, emoji_pattern
|
||||
|
||||
from http_client.connection import HTTPClient
|
||||
from http_client.html_parser import HTML, Text, Element
|
||||
|
||||
BLOCK_ELEMENTS = [
|
||||
"html", "body", "article", "section", "nav", "aside",
|
||||
"h1", "h2", "h3", "h4", "h5", "h6", "hgroup", "header",
|
||||
"footer", "address", "p", "hr", "pre", "blockquote",
|
||||
"ol", "ul", "menu", "li", "dl", "dt", "dd", "figure",
|
||||
"figcaption", "main", "div", "table", "form", "fieldset",
|
||||
"legend", "details", "summary"
|
||||
]
|
||||
|
||||
HSTEP = 13
|
||||
VSTEP = 18
|
||||
|
||||
class BlockLayout:
|
||||
def __init__(self, node, parent, previous):
|
||||
self.node = node
|
||||
self.parent = parent
|
||||
self.previous = previous
|
||||
|
||||
self.children = []
|
||||
self.display_list = []
|
||||
self.line = []
|
||||
|
||||
self.font_cache = {}
|
||||
|
||||
self.x, self.y, self.width, self.height = None, None, None, None
|
||||
|
||||
def paint(self):
|
||||
return self.display_list
|
||||
|
||||
def layout_mode(self):
|
||||
if isinstance(self.node, Text):
|
||||
return "inline"
|
||||
elif any([isinstance(child, Element) and \
|
||||
child.tag in BLOCK_ELEMENTS
|
||||
for child in self.node.children]):
|
||||
return "block"
|
||||
elif self.node.children:
|
||||
return "inline"
|
||||
else:
|
||||
return "block"
|
||||
|
||||
def layout(self):
|
||||
self.x = self.parent.x
|
||||
self.width = self.parent.width
|
||||
|
||||
if self.previous:
|
||||
self.y = self.previous.y + self.previous.height
|
||||
else:
|
||||
self.y = self.parent.y
|
||||
|
||||
mode = self.layout_mode()
|
||||
if mode == "block":
|
||||
previous = None
|
||||
for child in self.node.children:
|
||||
next = BlockLayout(child, self, previous)
|
||||
self.children.append(next)
|
||||
previous = next
|
||||
else:
|
||||
self.cursor_x = 0
|
||||
self.cursor_y = 0
|
||||
self.weight = "normal"
|
||||
self.style = "roman"
|
||||
self.size = 16
|
||||
|
||||
self.line = []
|
||||
self.recurse(self.node)
|
||||
self.flush()
|
||||
|
||||
for child in self.children:
|
||||
child.layout()
|
||||
|
||||
if mode == "block":
|
||||
self.height = sum([
|
||||
child.height for child in self.children])
|
||||
else:
|
||||
self.height = self.cursor_y
|
||||
|
||||
def ensure_font(self, size, weight, style, emoji):
|
||||
if not (size, weight, style, emoji) in self.font_cache:
|
||||
self.font_cache[(size, weight, style, emoji)] = pyglet.font.load("Roboto", size, weight, style == "italic") if not emoji else pyglet.font.load("OpenMoji Color", size, weight, style == "italic")
|
||||
|
||||
return self.font_cache[(size, weight, style, emoji)]
|
||||
|
||||
def word(self, word: str, emoji=False):
|
||||
font = self.ensure_font(self.size, self.weight, self.style, emoji)
|
||||
|
||||
w = font.get_text_size(word + (" " if not emoji else " "))[0]
|
||||
|
||||
if self.cursor_x + w > self.width:
|
||||
self.flush()
|
||||
|
||||
self.line.append((self.cursor_x, word, font))
|
||||
self.cursor_x += w + font.get_text_size(" ")[0]
|
||||
|
||||
def flush(self):
|
||||
if not self.line:
|
||||
return
|
||||
|
||||
fonts_on_line = [font for x, word, font in self.line]
|
||||
max_ascent = max(font.ascent for font in fonts_on_line)
|
||||
max_descent = min(font.descent for font in fonts_on_line)
|
||||
|
||||
baseline = self.cursor_y + 1.25 * max_ascent
|
||||
|
||||
for rel_x, word, font in self.line:
|
||||
x = self.x + rel_x
|
||||
y = self.y + baseline - font.ascent
|
||||
self.display_list.append((x, y, word, font))
|
||||
|
||||
self.cursor_x = 0
|
||||
self.line = []
|
||||
self.cursor_y = baseline + 1.25 * max_descent
|
||||
|
||||
def recurse(self, tree):
|
||||
if isinstance(tree, Text):
|
||||
if "{" in tree.text or "}" in tree.text:
|
||||
return
|
||||
|
||||
word_list = [match.group(0) for match in token_pattern.finditer(tree.text)]
|
||||
|
||||
for word in word_list:
|
||||
if emoji_pattern.fullmatch(word):
|
||||
self.word(word, emoji=True)
|
||||
else:
|
||||
self.word(word)
|
||||
else:
|
||||
self.open_tag(tree.tag)
|
||||
for child in tree.children:
|
||||
self.recurse(child)
|
||||
self.close_tag(tree.tag)
|
||||
|
||||
def open_tag(self, tag):
|
||||
if tag == "i":
|
||||
self.style = "italic"
|
||||
elif tag == "b":
|
||||
self.weight = "bold"
|
||||
elif tag == "small":
|
||||
self.size -= 2
|
||||
elif tag == "big":
|
||||
self.size += 4
|
||||
elif tag == "br":
|
||||
self.flush()
|
||||
|
||||
def close_tag(self, tag):
|
||||
if tag == "i":
|
||||
self.style = "roman"
|
||||
elif tag == "b":
|
||||
self.weight = "normal"
|
||||
elif tag == "small":
|
||||
self.size += 2
|
||||
elif tag == "big":
|
||||
self.size -= 4
|
||||
elif tag == "p":
|
||||
self.flush()
|
||||
self.cursor_y += VSTEP
|
||||
|
||||
class DocumentLayout:
|
||||
def __init__(self, node):
|
||||
self.node = node
|
||||
self.parent = None
|
||||
self.children = []
|
||||
|
||||
def layout(self):
|
||||
child = BlockLayout(self.node, self, None)
|
||||
self.children.append(child)
|
||||
|
||||
self.width = arcade.get_window().width - 2 * HSTEP
|
||||
self.x = HSTEP
|
||||
self.y = VSTEP
|
||||
child.layout()
|
||||
self.height = child.height
|
||||
self.display_list = child.display_list
|
||||
|
||||
def paint(self):
|
||||
return []
|
||||
|
||||
def paint_tree(layout_object, display_list):
|
||||
display_list.extend(layout_object.paint())
|
||||
|
||||
for child in layout_object.children:
|
||||
paint_tree(child, display_list)
|
||||
|
||||
class Renderer():
|
||||
def __init__(self, http_client: HTTPClient, window: arcade.Window):
|
||||
self.content = ''
|
||||
self.request_scheme = 'http'
|
||||
|
||||
self.http_client = http_client
|
||||
|
||||
self.scroll_y = 0
|
||||
self.scroll_y_speed = 50
|
||||
self.allow_scroll = False
|
||||
self.smallest_y = 0
|
||||
|
||||
self.text_labels: list[pyglet.text.Label] = []
|
||||
self.text_to_create = []
|
||||
|
||||
self.window = window
|
||||
self.window.on_mouse_scroll = self.on_mouse_scroll
|
||||
self.window.on_resize = self.on_resize
|
||||
|
||||
self.batch = pyglet.graphics.Batch()
|
||||
|
||||
def on_resize(self, width, height):
|
||||
for widget in self.text_labels:
|
||||
invisible = (widget.y + widget.content_height) > self.window.height * 0.95
|
||||
# Doing visible flag set manually since it takes a lot of time
|
||||
if widget.visible:
|
||||
if invisible:
|
||||
widget.visible = False
|
||||
elif not widget.visible:
|
||||
if not invisible:
|
||||
widget.visible = True
|
||||
|
||||
self.http_client.needs_render = True
|
||||
|
||||
def on_mouse_scroll(self, x, y, scroll_x, scroll_y):
|
||||
if not self.allow_scroll:
|
||||
return
|
||||
|
||||
old_y = self.scroll_y
|
||||
self.scroll_y = max(0, min(self.scroll_y - (scroll_y * self.scroll_y_speed), -self.smallest_y))
|
||||
|
||||
for widget in self.text_labels:
|
||||
widget.y += self.scroll_y - old_y
|
||||
|
||||
invisible = (widget.y + widget.content_height) > self.window.height * 0.95
|
||||
|
||||
# Doing visible flag set manually since it takes a lot of time
|
||||
if widget.visible:
|
||||
if invisible:
|
||||
widget.visible = False
|
||||
elif not widget.visible:
|
||||
if not invisible:
|
||||
widget.visible = True
|
||||
|
||||
def add_text(self, x, y, text, font, multiline=False):
|
||||
self.text_labels.append(
|
||||
pyglet.text.Label(
|
||||
text=text,
|
||||
font_name=font.name,
|
||||
italic=font.italic,
|
||||
weight=font.weight,
|
||||
font_size=font.size,
|
||||
multiline=multiline,
|
||||
color=arcade.color.BLACK,
|
||||
x=x,
|
||||
y=(self.window.height * 0.95) - y,
|
||||
batch=self.batch
|
||||
)
|
||||
)
|
||||
|
||||
if y < self.smallest_y:
|
||||
self.smallest_y = y
|
||||
|
||||
def update(self):
|
||||
if not self.http_client.needs_render:
|
||||
return
|
||||
|
||||
self.http_client.needs_render = False
|
||||
self.allow_scroll = True
|
||||
|
||||
for child in self.text_labels:
|
||||
child.delete()
|
||||
del child
|
||||
|
||||
self.text_labels.clear()
|
||||
self.smallest_y = 0
|
||||
|
||||
if self.http_client.view_source or self.http_client.scheme == "file":
|
||||
self.add_text(x=HSTEP, y=0, text=self.http_client.content_response, font=pyglet.font.load("Roboto", 16), multiline=True)
|
||||
elif self.http_client.scheme == "http" or self.http_client.scheme == "https":
|
||||
if not os.path.exists("http_cache"):
|
||||
os.makedirs("http_cache")
|
||||
|
||||
cache_filename = f"{self.http_client.scheme}_{self.http_client.host}_{self.http_client.port}_{self.http_client.path.replace('/', '_')}.json"
|
||||
|
||||
if cache_filename in os.listdir("http_cache"):
|
||||
with open(f"http_cache/{cache_filename}", "r") as file:
|
||||
self.nodes = HTML.from_json(ujson.load(file))
|
||||
else:
|
||||
self.nodes = HTML(self.http_client.content_response).parse()
|
||||
with open(f"http_cache/{cache_filename}", "w") as file:
|
||||
json_list = HTML.to_json(self.nodes)
|
||||
file.write(ujson.dumps(json_list))
|
||||
|
||||
self.document = DocumentLayout(self.nodes)
|
||||
self.document.layout()
|
||||
self.display_list = []
|
||||
paint_tree(self.document, self.display_list)
|
||||
|
||||
for x, y, text, font in self.display_list:
|
||||
self.add_text(x, y, text, font)
|
||||
Reference in New Issue
Block a user