Last active
June 19, 2026 15:49
-
-
Save Decapitated/03dac4a735f4873cd7edc9fce02f99f1 to your computer and use it in GitHub Desktop.
HTTP Utilities
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @abstract | |
| class_name HTTPUtilities | |
| static func connect_host(tree: SceneTree, http_client: HTTPClient, url: String) -> bool: | |
| var err: int = http_client.connect_to_host(url) | |
| if err != OK: | |
| return false | |
| # Wait until resolved and connected. | |
| while http_client.get_status() == HTTPClient.STATUS_CONNECTING or http_client.get_status() == HTTPClient.STATUS_RESOLVING: | |
| http_client.poll() | |
| await tree.process_frame | |
| return http_client.get_status() == HTTPClient.STATUS_CONNECTED | |
| static func get_params_string(params: Dictionary[String, String]) -> String: | |
| var params_string := "" | |
| if !params.is_empty(): | |
| var param_strings := params.keys().map(func(k): | |
| return "%s=%s" % [k.uri_encode(), params[k].uri_encode()] | |
| ) | |
| params_string = "?%s" % "&".join(param_strings) | |
| return params_string | |
| static func GET(tree: SceneTree, http_client: HTTPClient, url: String, headers: PackedStringArray = [], params: Dictionary[String, String] = {}, cache: bool = true) -> Result: | |
| var params_string := get_params_string(params) | |
| var full_url := "%s%s" % [url, params_string] | |
| # Return cached result. | |
| if cache: | |
| var cache_data = HTTPCache.get_request_data_GET(full_url) | |
| if cache_data != null: | |
| return Result.OK(cache_data) | |
| var url_obj := Url.new(url) | |
| var connected := await connect_host(tree, http_client, url_obj.get_connection()) | |
| if !connected: | |
| return Result.ERROR("Failed to connect: %s" % url) | |
| var path := url_obj.get_path() | |
| var err: int = http_client.request(HTTPClient.METHOD_GET, "%s%s" % [path, params_string], headers) | |
| if err != OK: | |
| return Result.ERROR("(%s) Failed to request: %s" % [err, full_url]) | |
| while http_client.get_status() == HTTPClient.STATUS_REQUESTING: | |
| # Keep polling for as long as the request is being processed. | |
| http_client.poll() | |
| await tree.process_frame | |
| # Check if the request was successful. | |
| if http_client.get_status() != HTTPClient.STATUS_BODY and http_client.get_status() != HTTPClient.STATUS_CONNECTED: | |
| return Result.ERROR("(%s) Request failed: %s" % [err, full_url]) | |
| var code := http_client.get_response_code() | |
| if !http_client.has_response(): | |
| if code != 200: | |
| return Result.ERROR("(%s) Request http error: %s" % [code, full_url]) | |
| else: | |
| return Result.OK(null) | |
| var rb = PackedByteArray() # Array that will hold the data. | |
| while http_client.get_status() == HTTPClient.STATUS_BODY: | |
| # While there is body left to be read | |
| http_client.poll() | |
| # Get a chunk. | |
| var chunk = http_client.read_response_body_chunk() | |
| if chunk.size() == 0: | |
| await tree.process_frame | |
| else: | |
| rb = rb + chunk # Append to read buffer. | |
| if code != 200: | |
| return Result.ERROR("(%s) Request http error: %s\nResponse: %s" % [code, full_url, rb.get_string_from_utf8()]) | |
| # Cache the request. | |
| if cache: | |
| HTTPCache.set_request_data_GET(full_url, rb.duplicate()) | |
| # Done! | |
| return Result.OK(rb) | |
| static func POST(tree: SceneTree, http_client: HTTPClient, url: String, headers: PackedStringArray = [], body: Dictionary[String, Variant] = {}, cache: bool = true) -> Result: | |
| var body_string := JSON.stringify(body) | |
| # Return cached result. | |
| if cache: | |
| var cache_data = HTTPCache.get_request_data_POST(url, body_string) | |
| if cache_data != null: | |
| return Result.OK(cache_data) | |
| var url_obj := Url.new(url) | |
| var connected := await connect_host(tree, http_client, url_obj.get_connection()) | |
| if !connected: | |
| return Result.ERROR("Failed to connect: %s" % url) | |
| var path := url_obj.get_path() | |
| var err: int = http_client.request(HTTPClient.METHOD_POST, "%s" % path, headers, body_string) | |
| if err != OK: | |
| return Result.ERROR("(%s) Failed to request with body: %s \"%s\"" % [err, url, body_string]) | |
| while http_client.get_status() == HTTPClient.STATUS_REQUESTING: | |
| # Keep polling for as long as the request is being processed. | |
| http_client.poll() | |
| await tree.process_frame | |
| # Check if the request was successful. | |
| if http_client.get_status() != HTTPClient.STATUS_BODY and http_client.get_status() != HTTPClient.STATUS_CONNECTED: | |
| return Result.ERROR("(%s) Request failed: %s" % [err, url]) | |
| var code := http_client.get_response_code() | |
| if !http_client.has_response(): | |
| if code != 200: | |
| return Result.ERROR("(%s) Request http error with body: %s" % [code, url, body_string]) | |
| else: | |
| return Result.OK(null) | |
| var rb = PackedByteArray() # Array that will hold the data. | |
| while http_client.get_status() == HTTPClient.STATUS_BODY: | |
| # While there is body left to be read | |
| http_client.poll() | |
| # Get a chunk. | |
| var chunk = http_client.read_response_body_chunk() | |
| if chunk.size() == 0: | |
| await tree.process_frame | |
| else: | |
| rb = rb + chunk # Append to read buffer. | |
| if code != 200: | |
| return Result.ERROR("(%s) Request http error: %s\nResponse: %s" % [code, url, rb.get_string_from_utf8()]) | |
| # Cache the request. | |
| if cache: | |
| HTTPCache.set_request_data_POST(url, body_string, rb.duplicate()) | |
| # Done! | |
| return Result.OK(rb) | |
| static func get_image(tree: SceneTree, url: String) -> Image: | |
| var http_client := HTTPClient.new() | |
| var result := await GET(tree, http_client, url) | |
| if result.is_error(): | |
| print(result.get_error()) | |
| return null | |
| var buffer := result.get_value() as PackedByteArray | |
| var extension := url.get_extension().to_lower() | |
| if extension.is_empty(): | |
| extension = get_image_format(buffer) | |
| var image := Image.new() | |
| var err: int | |
| if extension == "bmp": | |
| err = image.load_bmp_from_buffer(buffer) | |
| elif extension == "dds": | |
| err = image.load_dds_from_buffer(buffer) | |
| elif extension == "jpg" or extension == "jpeg": | |
| err = image.load_jpg_from_buffer(buffer) | |
| elif extension == "ktx": | |
| err = image.load_ktx_from_buffer(buffer) | |
| elif extension == "png": | |
| err = image.load_png_from_buffer(buffer) | |
| elif extension == "svg": | |
| err = image.load_svg_from_buffer(buffer) | |
| elif extension == "tga": | |
| err = image.load_tga_from_buffer(buffer) | |
| elif extension == "webp": | |
| err = image.load_webp_from_buffer(buffer) | |
| else: | |
| print("Unsupported image format: %s" % extension) | |
| err = ERR_FILE_UNRECOGNIZED | |
| if err != OK: | |
| print("Failed to load image from %s" % url) | |
| return null | |
| return image | |
| static func get_image_format(buffer: PackedByteArray) -> String: | |
| # bmp Check | |
| if buffer[0] == 0x42 && \ | |
| buffer[1] == 0x4D: | |
| return "bmp" | |
| # dds Check | |
| if buffer[0] == 0x20 && \ | |
| buffer[1] == 0x53 && \ | |
| buffer[2] == 0x44 && \ | |
| buffer[3] == 0x44: | |
| return "dds" | |
| # jpg Check | |
| if buffer[0] == 0xFF && \ | |
| buffer[1] == 0xD8 && \ | |
| buffer[2] == 0xFF: | |
| return "jpg" | |
| # ktx Check | |
| if buffer[0] == 0xAB && \ | |
| buffer[1] == 0x4B && \ | |
| buffer[2] == 0x54 && \ | |
| buffer[3] == 0x58 && \ | |
| buffer[4] == 0x20 && \ | |
| buffer[5] == 0x32 && \ | |
| buffer[6] == 0x30 && \ | |
| buffer[7] == 0xBB && \ | |
| buffer[8] == 0x0D && \ | |
| buffer[9] == 0x0A && \ | |
| buffer[10] == 0x1A && \ | |
| buffer[11] == 0x0A: | |
| return "ktx" | |
| # png Check | |
| if buffer[0] == 0x89 && \ | |
| buffer[1] == 0x50 && \ | |
| buffer[2] == 0x4E && \ | |
| buffer[3] == 0x47 && \ | |
| buffer[4] == 0x0D && \ | |
| buffer[5] == 0x0A && \ | |
| buffer[6] == 0x1A && \ | |
| buffer[7] == 0x0A: | |
| return "png" | |
| # webp Check | |
| if buffer[0] == 0x52 && \ | |
| buffer[1] == 0x49 && \ | |
| buffer[2] == 0x46 && \ | |
| buffer[3] == 0x46 && \ | |
| buffer[8] == 0x57 && \ | |
| buffer[9] == 0x45 && \ | |
| buffer[10] == 0x42 && \ | |
| buffer[11] == 0x50: | |
| return "webp" | |
| return "" | |
| class Url: | |
| var _protocol: String | |
| var _host: String | |
| var _path: String | |
| func _init(url: String) -> void: | |
| var protocol_split := url.split("://", false, 1) | |
| assert(protocol_split.size() == 2, "URL failed at protocol split: %s" % url) | |
| _protocol = protocol_split[0] | |
| var host_path_split := protocol_split[1].split("/", false, 1) | |
| assert(host_path_split.size() >= 1, "URL failed at host split: %s" % url) | |
| _host = host_path_split[0] | |
| if host_path_split.size() == 2: | |
| _path = host_path_split[1] | |
| func _to_string() -> String: | |
| return "%s://%s/%s" % [_protocol, _host, _path] | |
| func get_protocol() -> String: | |
| return _protocol | |
| func get_host() -> String: | |
| return _host | |
| func get_path() -> String: | |
| return "/%s" % _path | |
| func get_connection() -> String: | |
| return "%s://%s" % [_protocol, _host] | |
| class Ref: | |
| var _value: Variant | |
| func _init(value: Variant = null) -> void: | |
| _value = value | |
| func get_value() -> Variant: | |
| return _value | |
| func set_value(value: Variant) -> void: | |
| _value = value | |
| class Result: | |
| var _value: Variant | |
| var _error: Variant | |
| func _init(value: Variant, error: String = "") -> void: | |
| _value = value | |
| _error = null if error.is_empty() else (error as Variant) | |
| static func OK(value: Variant) -> Result: | |
| return Result.new(value) | |
| static func ERROR(error: String) -> Result: | |
| return Result.new(null, error) | |
| func is_ok() -> bool: | |
| return _error == null | |
| func is_error() -> bool: | |
| return _error != null | |
| func get_value() -> Variant: | |
| return _value | |
| func get_error() -> String: | |
| return _error | |
| @abstract | |
| class HTTPCache: | |
| static var _get_request_data: Dictionary[String, PackedByteArray] = {} | |
| static var _post_request_data: Dictionary[String, Dictionary] = {} | |
| static func get_request_data_GET(url: String) -> Variant: | |
| return _get_request_data.get(url) | |
| static func set_request_data_GET(url: String, data: PackedByteArray) -> void: | |
| _get_request_data[url] = data | |
| static func get_request_data_POST(url: String, body: String) -> Variant: | |
| var requests = _post_request_data.get(url) | |
| if requests == null: | |
| return null | |
| return (requests as Dictionary).get(body) | |
| static func set_request_data_POST(url: String, body: String, data: PackedByteArray) -> void: | |
| if !_post_request_data.has(url): | |
| _post_request_data[url] = {} | |
| _post_request_data[url][body] = data |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const BASE_URL := "https://api.curseforge.com" | |
| const HEADERS: PackedStringArray = [ | |
| "Accept: application/json", | |
| "x-api-key: %s" % API_KEY | |
| ] | |
| var _tree: SceneTree | |
| func _GET(path: String, params: Dictionary[String, String] = {}) -> HTTPUtilities.Result: | |
| var http_client := HTTPClient.new() | |
| var url := "%s%s" % [BASE_URL, path] | |
| return await HTTPUtilities.GET( | |
| _tree, http_client, | |
| url, HEADERS, params | |
| ) | |
| func _POST(path: String, body: Dictionary[String, Variant]) -> HTTPUtilities.Result: | |
| var http_client := HTTPClient.new() | |
| var url := "%s%s" % [BASE_URL, path] | |
| return await HTTPUtilities.POST( | |
| _tree, http_client, | |
| url, HEADERS, body | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment