Created
January 2, 2024 17:53
-
-
Save progandy/87a6f4c7cc6349092731107017a3768c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
capo-browser | |
Copyright (C) 2024 ProgAndy | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
""" | |
# Portal Browser for Captive Portal Detection | |
This script launches a minimal web browser inside a user namespace, configuring specific nameservers | |
to ensure accurate captive portal detection. It automatically detects the presence of a captive portal | |
using the provided check URL and opens a minimal browser window accordingly. | |
## Dependencies: | |
- Python 3.8+ | |
- Either GTK or Qt. Possible combinations are: | |
- PyGObject, GTK3, WebKit2Gtk 4.0 | |
- PyGObject, GTK3, WebKit2Gtk 4.1 | |
- PyGObject, GTK4, WebKitGtk 6.0 | |
- PySide2, Qt5, QtWebEngine for Gt5 | |
- PySide6, Qt6, QtWebEngine for Qt6 | |
- POSIX sh with | |
- unshare | |
- mount | |
- Linux kernel with support for unprivileged user namespaces and mount namespace | |
- libc able to parse resolv.conf (directly or configurable with nsswitch.conf) | |
""" | |
import gi | |
import sys | |
import os | |
import tempfile | |
import argparse | |
from urllib.parse import urlparse | |
import shlex | |
import ipaddress | |
import http.client | |
import socket | |
from enum import Enum | |
class PortalDetectionProvider(Enum): | |
FIREFOX = ("http://detectportal.firefox.com/success.txt", "success") | |
EDGE = ("http://www.msftconnecttest.com/connecttest.txt", "Microsoft Connect Test") | |
CHROME = ("http://www.gstatic.com/generate_204", None) | |
APPLE = ("http://captive.apple.com/hotspot-detect.html", "Success") | |
ANDROID = ("http://connectivitycheck.android.com/generate_204", None) | |
KUKETZ = ("http://captiveportal.kuketz.de", None) | |
GRAPHENEOS = ("http://connectivitycheck.grapheneos.network/generate_204", None) | |
GNOME = ("http://nmcheck.gnome.org/check_network_status.txt", "NetworkManager is online") | |
ARCHLINUX = ("http://ping.archlinux.org/", "This domain is used for connectivity checking (captive portal detection).") | |
STEAM = ("http://test.steampowered.com/", "success") | |
STEAM204 = ("http://test.steampowered.com/204", None) | |
class CaptivePortalChecker: | |
def __init__(self, provider_or_url, text=None): | |
if provider_or_url.lower() in [entity.name.lower() for entity in PortalDetectionProvider]: | |
url, default_text = self._get_portal_detection_info(provider_or_url) | |
print(f"Using provider {provider_or_url}") | |
else: | |
url, default_text = provider_or_url, text | |
print(f"Detecion url: {url}") | |
print(f"Detection method: status {'204' if default_text is None else '400 with text [' + default_text + ']'}") | |
self.url = url | |
self.text = default_text | |
def _get_portal_detection_info(self, provider_name): | |
entity = PortalDetectionProvider[provider_name.upper()] | |
return entity.value | |
def check_captive_portal(self): | |
parsed_url = urlparse(self.url) | |
def perform_check(protocol): | |
connection_cls = http.client.HTTPSConnection if protocol == 'https' else http.client.HTTPConnection | |
try: | |
path = parsed_url.path | |
if parsed_url.query: | |
path += '?' + parsed_url.query | |
connection = connection_cls(parsed_url.netloc, timeout=5) | |
connection.request("GET", path) | |
response = connection.getresponse() | |
if self.text is None and response.status == 204: | |
print(f"No captive portal detected via {protocol}") | |
return False | |
if self.text is not None and response.status == 200 and self.text.strip() == response.read().decode("utf-8").strip(): | |
print(f"No captive portal detected via {protocol}") | |
return False | |
location_header = response.getheader('Location') | |
if location_header: | |
print(f"Redirect detected via {protocol}. Target URL: {location_header}") | |
return location_header | |
print(f"Captive portal detected via {protocol}") | |
return True | |
except Exception as e: | |
print(f"Error: {e} via {protocol}") | |
return None | |
# Perform HTTPS check | |
result_https = perform_check("https") | |
if result_https is False: | |
return False | |
# Perform HTTP check only if HTTPS check finds a portal | |
result_http = perform_check("http") | |
return result_http | |
@staticmethod | |
def add_argument_group(parser): | |
group = parser.add_argument_group('Captive Portal Detection Options', description='Perform connectivity checks for predefined entities or custom URLs.') | |
group.add_argument('--check', choices=[entity.name.lower() for entity in PortalDetectionProvider], | |
help='Provider for connectivity check (choose one)', default=PortalDetectionProvider.GNOME.name.lower()) | |
group.add_argument('--check-url', help='Custom URL for connectivity check, must be http', metavar='CUSTOM_URL') | |
group.add_argument('--check-text', help='Expected text for text check (optional)', dest='check_text', metavar='EXPECTED_TEXT') | |
return group | |
class GObjectIntrospectionHelper: | |
""" | |
Helper class for GObject Introspection related operations. | |
""" | |
@staticmethod | |
def version_available(namespace, version): | |
""" | |
Check if a specific version of a GObject Introspection module is available. | |
Parameters: | |
- namespace (str): The namespace of the GObject Introspection module. | |
- version (str): The version of the GObject Introspection module. | |
Returns: | |
- bool: True if the version is available, False otherwise. | |
""" | |
repository = gi.Repository.get_default() | |
if repository: | |
for vers in repository.enumerate_versions(namespace): | |
if vers == version: | |
return True | |
return False | |
class GTKBrowserApp: | |
""" | |
GTK3 Browser Application. | |
""" | |
def __init__(self, url, gtk_version, webkit_version): | |
gi.require_version("Gtk", gtk_version) | |
if gtk_version == "3.0": | |
gi.require_version("WebKit2", webkit_version) | |
from gi.repository import Gtk, WebKit2 | |
else: | |
gi.require_version("WebKit", "6.0") | |
from gi.repository import Gtk, WebKit | |
class BrowserAppGtk(Gtk.Application): | |
"""GTK Browser Application""" | |
def __init__(self): | |
"""Initialize the GTK browser application.""" | |
super().__init__(application_id="org.example.portalbrowser", flags=0) | |
self.connect("activate", self.on_activate) | |
self.url = url | |
def on_activate(self, app): | |
"""Handler for the 'activate' signal.""" | |
self.win = Gtk.ApplicationWindow(application=app) | |
self.win.set_title("Captive Portal Browser") | |
self.view = self.create_webview() | |
if gtk_version == "3.0": | |
self.win.add(self.view) | |
self.win.show_all() | |
else: | |
self.win.set_child(self.view) | |
self.win.present() | |
def create_webview(self): | |
"""Create and configure the WebKit2.WebView.""" | |
wk = WebKit2 if gtk_version == "3.0" else WebKit | |
view = wk.WebView() | |
context = view.get_context() | |
context.set_cache_model(wk.CacheModel.DOCUMENT_VIEWER) | |
view.load_uri(self.url) | |
return view | |
self.app = BrowserAppGtk() | |
def run(self): | |
"""Run the GTK3 browser application.""" | |
self.app.run() | |
class QtBrowserApp: | |
""" | |
Qt Browser Application. | |
""" | |
def __init__(self, url): | |
try: | |
from PySide6.QtWidgets import QApplication, QMainWindow | |
from PySide6.QtWebEngineWidgets import QWebEngineView | |
except ImportError: | |
from PySide2.QtWidgets import QApplication, QMainWindow | |
from PySide2.QtWebEngineWidgets import QWebEngineView | |
self.app = QApplication(sys.argv) | |
self.win = QMainWindow() | |
self.win.setWindowTitle("Captive Portal Browser") | |
self.view = QWebEngineView() | |
self.view.setUrl(url) | |
self.win.setCentralWidget(self.view) | |
self.win.show() | |
def run(self): | |
"""Run the Qt browser application.""" | |
sys.exit(self.app.exec()) | |
class Utils: | |
""" | |
Class for running the script inside a user namespace. | |
""" | |
@staticmethod | |
def run_inside_userns(args): | |
"""Run the script inside a user namespace.""" | |
print("Inside user namespace") | |
checker = CaptivePortalChecker(args.check_url or args.check, text=args.check_text) | |
if checker.check_captive_portal(): | |
print("Captive portal detected.") | |
#url = "http://www.gstatic.com/generate_204" | |
url = args.check_url | |
print(url) | |
Utils.create_minimal_browser(url) | |
else: | |
print("No captive portal detected.") | |
sys.exit() | |
@staticmethod | |
def ip_string(address): | |
""" | |
Check if the given string is a valid IP address. | |
Parameters: | |
- address (str): The IP address to validate. | |
Returns: | |
- bool: True if the address is a valid IP, False otherwise. | |
""" | |
try: | |
ipaddress.ip_address(address) | |
return address | |
except ValueError: | |
raise argparse.ArgumentTypeError(f"{address} is not a valid ip") | |
@staticmethod | |
def check_captive_portal(check_url): | |
""" | |
Check for a captive portal using the provided check URL. | |
Parameters: | |
- check_url (str): The URL used for portal detection. | |
Returns: | |
- bool: True if a captive portal is detected, False otherwise. | |
""" | |
parsed_url = urlparse(check_url) | |
try: | |
conn = http.client.HTTPConnection(parsed_url.netloc, timeout=5) | |
query_string = parsed_url.query | |
path = parsed_url.path + '?' + query_string if query_string else parsed_url.path | |
conn.request("GET", path) | |
response = conn.getresponse() | |
return response.status != 204 | |
except (socket.timeout, socket.error): | |
return False | |
finally: | |
conn.close() | |
def create_minimal_browser(url): | |
""" | |
Create a minimal browser window based on the available libraries. | |
""" | |
if GObjectIntrospectionHelper.version_available("Gtk", "4.0") and GObjectIntrospectionHelper.version_available("WebKit", "6.0"): | |
gtk4_browser = GTKBrowserApp(url, "4.0", "6.0") | |
gtk4_browser.run() | |
elif GObjectIntrospectionHelper.version_available("Gtk", "3.0") and ( | |
GObjectIntrospectionHelper.version_available("WebKit2", "4.1") or GObjectIntrospectionHelper.version_available("WebKit2", "4.0")): | |
webkit_version = "4.1" if GObjectIntrospectionHelper.version_available("WebKit2", "4.1") else "4.0" | |
gtk3_browser = GTKBrowserApp(url, "3.0", webkit_version) | |
gtk3_browser.run() | |
else: | |
qt_browser = QtBrowserApp(url) | |
qt_browser.run() | |
class DefaultHelpFormatter(argparse.HelpFormatter): | |
def _get_help_string(self, action): | |
if action.default is not argparse.SUPPRESS and action.default is not None: | |
default_str = f" (default: {action.default})" | |
else: | |
default_str = "" | |
return super()._get_help_string(action) + default_str | |
def parse_command_line_args(): | |
""" | |
Parse and validate command-line arguments. | |
Returns: | |
- str: The primary nameserver IP. | |
- bool: Whether to run inside a user namespace. | |
- str: The URL used for portal detection. | |
""" | |
parser = argparse.ArgumentParser(formatter_class=DefaultHelpFormatter, description="Portal Browser with Captive Portal Detection") | |
CaptivePortalChecker.add_argument_group(parser) | |
parser.add_argument("nameservers", type=Utils.ip_string, nargs="+", help="Nameservers to be used for captive portal detection") | |
parser.add_argument("--ns", action="store_true", help=argparse.SUPPRESS) # Hiding the --ns argument | |
return parser.parse_args() | |
def setup_userns(primary_nameserver, secondary_nameserver, check_url): | |
""" | |
Sets up the user namespace and runs the browser in it. | |
Parameters: | |
- primary_nameserver (str): The primary nameserver IP. | |
- secondary_nameserver (str): The secondary nameserver IP (if provided, otherwise None). | |
- check_url (str): The URL used for portal detection. | |
""" | |
with tempfile.TemporaryDirectory() as mnt_namespace: | |
print("Directory for namespace resolver overrides:", mnt_namespace) | |
resolv_conf_path = os.path.join(mnt_namespace, "resolv.conf") | |
with open(resolv_conf_path, "w") as resolv_conf: | |
resolv_conf.write(f"nameserver {primary_nameserver}\n") | |
if secondary_nameserver: | |
resolv_conf.write(f"nameserver {secondary_nameserver}\n") | |
nsswitch_conf_path = os.path.join(mnt_namespace, "nsswitch.conf") | |
with open(nsswitch_conf_path, "w") as nsswitch_conf: | |
nsswitch_conf.write("passwd: files\n") | |
nsswitch_conf.write("group: files\n") | |
nsswitch_conf.write("hosts: files dns\n") | |
# Include check_url in the command line arguments passed to the new process | |
os.execlp("unshare", "unshare", "-rm", "sh", "-c", | |
f"mount --bind {shlex.quote(resolv_conf_path)} /etc/resolv.conf" | |
f" && mount --bind {shlex.quote(nsswitch_conf_path)} /etc/nsswitch.conf" | |
f" && unshare -u --map-group={os.getgid()} --map-user={os.getuid()}" + | |
f" {shlex.quote(sys.executable)} {' '.join(map(shlex.quote, sys.argv))} --ns" | |
f" ; rm -r {shlex.quote(mnt_namespace)}") | |
def main(): | |
""" | |
Main entry point for the script. | |
""" | |
args = parse_command_line_args() | |
if args.ns: | |
Utils.run_inside_userns(args) | |
elif args.nameservers: | |
setup_userns(args.nameservers[0], args.nameservers[1] if len(args.nameservers) > 1 else None, args.check_url) | |
else: | |
print("Unexpected error in command-line argument parsing.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment