Last active
August 6, 2017 21:36
-
-
Save itdaniher/bd1c66db41259eac438b95045ad99598 to your computer and use it in GitHub Desktop.
log all the ports!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aioudp | |
import asyncio | |
async def in_or_nothing(reader, duration = 1): | |
if reader == None: | |
return None | |
try: | |
return await asyncio.wait_for(reader.read(128), duration) | |
except asyncio.TimeoutError: | |
return None | |
async def tcp_logger(reader, writer): | |
peer = writer.get_extra_info('peername')[0:2] | |
port = writer.get_extra_info('sockname')[1] | |
resp = await in_or_nothing(reader) | |
writer.close() | |
print('tcp',(port, (resp, peer))) | |
class Logger(object): | |
def __init__(self, loop = None): | |
if loop == None: | |
loop = asyncio.get_event_loop() | |
self.loop = loop | |
self.listeners_udp = [None] | |
self.listeners_tcp = [] | |
async def monitor_udp(self, max_port=1024): | |
for port in range(1,max_port): | |
try: | |
listener = await aioudp.open_local_endpoint(port=port, loop=self.loop) | |
except OSError as e: | |
print(e) | |
self.listeners_udp.append(listener) | |
while True: | |
results = await asyncio.gather(*[in_or_nothing(l) for l in self.listeners_udp]) | |
actual_results = [x for x in enumerate(results) if x[1] != None] | |
for result in actual_results: | |
if result != None: | |
print('udp', result) | |
async def monitor_tcp(self, max_port=1024): | |
for port in range(1,max_port): | |
try: | |
listener = await asyncio.start_server(tcp_logger, port=port, reuse_port=True) | |
self.listeners_tcp.append(listener) | |
except OSError as e: | |
print(e) | |
if __name__ == "__main__": | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
asyncio.ensure_future(Logger(loop).monitor_udp()) | |
asyncio.ensure_future(Logger(loop).monitor_tcp()) | |
loop.run_forever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Provide high-level UDP endpoints for asyncio""" | |
__all__ = ['open_local_endpoint', 'open_remote_endpoint'] | |
import asyncio | |
import warnings | |
class DatagramEndpointProtocol(asyncio.DatagramProtocol): | |
def __init__(self, endpoint): | |
self._endpoint = endpoint | |
# Protocol methods | |
def connection_made(self, transport): | |
self._endpoint._transport = transport | |
def connection_lost(self, exc): | |
if exc is not None: | |
msg = 'Endpoint lost the connection: {!r}' | |
warnings.warn(msg.format(exc)) | |
self._endpoint.close() | |
# Datagram protocol methods | |
def datagram_received(self, data, addr): | |
self._endpoint.feed_datagram(data, addr) | |
def error_received(self, exc): | |
msg = 'Endpoint received an error: {!r}' | |
warnings.warn(msg.format(exc)) | |
class Endpoint: | |
def __init__(self, queue_size=None): | |
if queue_size is None: | |
queue_size = 0 | |
self._queue = asyncio.Queue(queue_size) | |
self._closed = False | |
self._transport = None | |
# Protocol callbacks | |
def feed_datagram(self, data, addr): | |
try: | |
self._queue.put_nowait((data, addr)) | |
except asyncio.QueueFull: | |
warnings.warn('Endpoint queue is full') | |
def close(self): | |
self._closed = True | |
self.feed_datagram(None, None) | |
if self._transport: | |
self._transport.close() | |
# User methods | |
def write(self, data, addr): | |
if self._closed: | |
raise IOError("Enpoint is closed") | |
self._transport.sendto(data, addr) | |
async def read(self, n = 0): | |
if self._closed: | |
raise IOError("Enpoint is closed") | |
data, addr = await self._queue.get() | |
if data is None: | |
raise IOError("Enpoint is closed") | |
return data, addr | |
def abort(self): | |
if self._closed: | |
raise IOError("Enpoint is closed") | |
self._transport.abort() | |
# Properties | |
@property | |
def address(self): | |
return self._transport._sock.getsockname() | |
@property | |
def closed(self): | |
return self._closed | |
class LocalEndpoint(Endpoint): | |
pass | |
class RemoteEndpoint(Endpoint): | |
def write(self, data): | |
super().write(data, None) | |
async def read(self): | |
data, addr = await super().read() | |
return data | |
async def open_datagram_endpoint(host='0.0.0.0', port=0, *, | |
endpoint_factory=Endpoint, | |
remote=False, loop=None, | |
**kwargs): | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
kwargs['remote_addr' if remote else 'local_addr'] = host, port | |
endpoint = endpoint_factory() | |
factory = lambda: DatagramEndpointProtocol(endpoint) | |
await loop.create_datagram_endpoint(factory, **kwargs) | |
return endpoint | |
async def open_local_endpoint(host='0.0.0.0', port=0, *, | |
queue_size=None, loop=None, **kwargs): | |
endpoint_factory = lambda: LocalEndpoint(queue_size) | |
return await open_datagram_endpoint(host, port, remote=False, | |
endpoint_factory=endpoint_factory, | |
loop=loop, **kwargs) | |
async def open_remote_endpoint(host='0.0.0.0', port=0, *, | |
queue_size=None, loop=None, **kwargs): | |
endpoint_factory = lambda: RemoteEndpoint(queue_size) | |
return await open_datagram_endpoint(host, port, remote=True, | |
endpoint_factory=endpoint_factory, | |
loop=loop, **kwargs) | |
if __name__ == '__main__': | |
async def main(): | |
local = await open_local_endpoint() | |
remote = await open_remote_endpoint(*local.address) | |
remote.write(b'Hey Hey, My My') | |
return await local.read() | |
loop = asyncio.get_event_loop() | |
data, addr = loop.run_until_complete(main()) | |
message = "Got {data!r} from {addr[0]} port {addr[1]}" | |
print(message.format(data=data.decode(), addr=addr)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment