Created
October 23, 2014 08:37
-
-
Save vadmium/675e87a148cba14b9e78 to your computer and use it in GitHub Desktop.
Py Socks UDP example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
from sys import stdin, stderr | |
from socket import SOCK_DGRAM | |
from urllib.parse import SplitResult | |
import socket | |
import sys | |
import selectors | |
import net | |
class main: | |
def __init__(self, target, *, proxy=None, bind=""): | |
target = net.parse_addr(target, 0) | |
bind = net.parse_addr(bind, 0) | |
if proxy is None: | |
socketclass = socket.socket | |
else: | |
proxy = net.parse_addr(proxy) | |
from socks import socksocket as socketclass, PROXY_TYPE_SOCKS5 | |
with socketclass(type=SOCK_DGRAM) as self.sock: | |
if proxy: | |
self.sock.set_proxy(PROXY_TYPE_SOCKS5, *proxy) | |
self.sock.bind(bind) | |
local = net.format_addr(self.sock.getsockname()) | |
if proxy: | |
relay = net.format_addr(self.sock.get_proxy_peername()) | |
msg = "Locally bound to {}, relay {}".format(local, relay) | |
print(msg, file=stderr) | |
else: | |
print("Bound to {}".format(local), file=stderr) | |
self.sock.connect(target) | |
self.input = stdin.detach() | |
self.output = sys.stdout | |
sys.stdout = None | |
self.output = self.output.detach() | |
sel = selectors.SelectSelector() | |
sel.register(self.input, selectors.EVENT_READ, self.send) | |
sel.register(self.sock, selectors.EVENT_READ, self.recv) | |
try: | |
while True: | |
for (key, _) in sel.select(): | |
key.data() | |
except StopIteration: | |
pass | |
def send(self): | |
packet = self.input.read() | |
sent = self.sock.send(packet) | |
print("Bytes sent: {}".format(sent), file=stderr) | |
if not packet: | |
raise StopIteration() | |
def recv(self): | |
(packet, address) = self.sock.recvfrom(0x10000) | |
address = net.format_addr(address) | |
msg = "Bytes received: {}, from {}:".format(len(packet), address) | |
print(msg, file=stderr) | |
self.output.write(packet) | |
self.output.flush() | |
if __name__ == "__main__": | |
import clifunc | |
clifunc.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment