Last active
February 24, 2025 23:41
-
-
Save twobob/84df3714f9a19f3c0a4fea434d3e5d90 to your computer and use it in GitHub Desktop.
telnet - call dave
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Feature-complete Telnet client with a curses-based text UI. | |
Adjusted to: | |
- Automatically try "pdp1173.com" on port 23 and log in as "guest". | |
- If that fails, try "davepl.dyndns.org" on port 23 with "guest". | |
- Repeat this connection attempt loop forever until ESC is pressed to quit. | |
- Once connected and logged in, display remote output and allow typing commands. | |
- Typing 'exit' closes the current session and resumes connection attempts. | |
- Pressing ESC at any point quits the program immediately. | |
Enhancements: | |
- Improved progress messages and error insights. | |
- For each step (connection and login prompt), two methods are used: | |
1. Primary method (direct telnetlib connection and reading). | |
2. Fallback method (raw socket connection and stimulation via newline). | |
- This ensures the problem is on the remote end rather than the local side. | |
""" | |
import curses | |
import telnetlib | |
import socket | |
import select | |
import time | |
HOSTS = [ | |
("pdp1173.com", 23), | |
("davepl.dyndns.org", 23), | |
] | |
def attempt_connection(stdscr): | |
""" | |
Attempt to connect to the hosts in HOSTS list in order. | |
Wait for the login prompt, then send 'guest' as the username. | |
Returns: | |
- A connected telnetlib.Telnet object on success. | |
- "ESC" if ESC was pressed. | |
- None if all attempts fail. | |
""" | |
stdscr.clear() | |
stdscr.addstr(0, 0, "Starting connection attempts...\n") | |
stdscr.refresh() | |
for host, port in HOSTS: | |
# Check for ESC before attempting a host connection | |
for _ in range(20): # check over 1 second (20*0.05 sec) | |
stdscr.nodelay(True) | |
if stdscr.getch() == 27: # ESC | |
stdscr.addstr("ESC pressed. Aborting connection attempts.\n") | |
stdscr.refresh() | |
return "ESC" | |
time.sleep(0.05) | |
stdscr.nodelay(False) | |
stdscr.clear() | |
stdscr.addstr(0, 0, f"Attempting to connect to {host}:{port}...\n") | |
stdscr.refresh() | |
connection = None | |
error_messages = [] | |
# --- First Connection Attempt: Using telnetlib.Telnet --- | |
try: | |
stdscr.addstr("Trying direct telnet connection...\n") | |
stdscr.refresh() | |
tn = telnetlib.Telnet(host, port, timeout=5) | |
connection = tn | |
stdscr.addstr("Direct telnet connection successful.\n") | |
stdscr.refresh() | |
except Exception as e: | |
error_messages.append(f"Direct telnet error: {e}") | |
stdscr.addstr(f"Direct connection failed: {e}\n") | |
stdscr.refresh() | |
time.sleep(0.5) | |
# --- Second Connection Attempt: Using socket.create_connection --- | |
if connection is None: | |
try: | |
stdscr.addstr("Trying socket-based connection...\n") | |
stdscr.refresh() | |
s = socket.create_connection((host, port), timeout=5) | |
tn = telnetlib.Telnet() | |
tn.sock = s | |
connection = tn | |
stdscr.addstr("Socket connection successful.\n") | |
stdscr.refresh() | |
except Exception as e: | |
error_messages.append(f"Socket connection error: {e}") | |
stdscr.addstr(f"Socket connection failed: {e}\n") | |
stdscr.refresh() | |
time.sleep(0.5) | |
if connection is None: | |
stdscr.addstr(f"Failed to connect to {host}:{port}.\nErrors: {' | '.join(error_messages)}\n") | |
stdscr.refresh() | |
time.sleep(1) | |
continue | |
# Set non-blocking mode for the connection's socket (if possible) | |
try: | |
connection.sock.setblocking(False) | |
except Exception as e: | |
stdscr.addstr(f"Warning: Could not set non-blocking mode: {e}\n") | |
stdscr.refresh() | |
# --- Wait for Login Prompt --- | |
stdscr.addstr("Waiting for 'login:' prompt...\n") | |
stdscr.refresh() | |
start_time = time.time() | |
buffer = b"" | |
login_received = False | |
prompt_attempts = 0 | |
while time.time() - start_time < 10: | |
rlist, _, _ = select.select([connection.fileno()], [], [], 0.1) | |
for fd in rlist: | |
if fd == connection.fileno(): | |
try: | |
data = connection.read_eager() | |
if data: | |
buffer += data | |
if b"login:" in buffer.lower(): | |
login_received = True | |
break | |
except EOFError as e: | |
stdscr.addstr("Connection closed unexpectedly while waiting for login prompt.\n") | |
stdscr.addstr(f"EOFError: {e}\n") | |
stdscr.refresh() | |
connection.close() | |
login_received = False | |
break | |
except Exception as e: | |
stdscr.addstr(f"Error reading from connection: {e}\n") | |
stdscr.refresh() | |
if login_received: | |
break | |
prompt_attempts += 1 | |
# Every ~2 seconds, stimulate the remote side by sending a newline | |
if prompt_attempts % 20 == 0: | |
try: | |
connection.write(b"\n") | |
stdscr.addstr("No login prompt detected. Sent newline to stimulate response.\n") | |
stdscr.refresh() | |
except Exception as e: | |
stdscr.addstr(f"Error sending newline: {e}\n") | |
stdscr.refresh() | |
# Check for ESC during waiting period | |
stdscr.nodelay(True) | |
if stdscr.getch() == 27: | |
stdscr.addstr("ESC pressed during login prompt wait. Aborting.\n") | |
stdscr.refresh() | |
connection.close() | |
return "ESC" | |
stdscr.nodelay(False) | |
if not login_received: | |
stdscr.addstr("Login prompt not received.\nBuffer content received:\n") | |
try: | |
stdscr.addstr(buffer.decode('utf-8', errors='replace') + "\n") | |
except Exception: | |
stdscr.addstr(str(buffer) + "\n") | |
stdscr.addstr(f"Closing connection to {host}:{port} and trying next host.\n") | |
stdscr.refresh() | |
connection.close() | |
time.sleep(1) | |
continue | |
stdscr.addstr(f"Login prompt received from {host}:{port}. Sending username 'guest'.\n") | |
stdscr.refresh() | |
try: | |
connection.write(b"guest\n") | |
except Exception as e: | |
stdscr.addstr(f"Error sending username: {e}\n") | |
stdscr.refresh() | |
connection.close() | |
continue | |
return connection | |
return None | |
def main(stdscr): | |
curses.cbreak() | |
curses.noecho() | |
stdscr.keypad(True) | |
curses.start_color() | |
while True: | |
tn = attempt_connection(stdscr) | |
if tn == "ESC": | |
break | |
if tn is None: | |
stdscr.addstr("All connection attempts failed. Retrying...\n") | |
stdscr.refresh() | |
time.sleep(1) | |
continue | |
# Connection is established and login prompt was answered. | |
stdscr.clear() | |
stdscr.refresh() | |
curses.curs_set(1) | |
height, width = stdscr.getmaxyx() | |
output_height = height - 2 | |
if output_height < 1: | |
output_height = 1 | |
output_win = curses.newwin(output_height, width, 0, 0) | |
input_win = curses.newwin(1, width, output_height, 0) | |
output_win.scrollok(True) | |
line_buffer = "" | |
def redraw_windows(): | |
nonlocal height, width, output_height | |
height, width = stdscr.getmaxyx() | |
output_height = height - 2 | |
if output_height < 1: | |
output_height = 1 | |
output_win.resize(output_height, width) | |
input_win.resize(1, width) | |
input_win.mvwin(output_height, 0) | |
stdscr.refresh() | |
output_win.refresh() | |
input_win.refresh() | |
telnet_fd = tn.get_socket().fileno() | |
try: | |
tn.sock.setblocking(False) | |
except Exception as e: | |
output_win.addstr(f"Warning: Could not set non-blocking mode: {e}\n") | |
output_win.refresh() | |
session_active = True | |
while session_active: | |
if curses.is_term_resized(height, width): | |
redraw_windows() | |
input_win.clear() | |
input_win.addstr(0, 0, line_buffer) | |
input_win.move(0, len(line_buffer)) | |
input_win.refresh() | |
# --- Read Remote Data (try two methods) --- | |
rlist, _, _ = select.select([telnet_fd], [], [], 0.05) | |
if telnet_fd in rlist: | |
try: | |
data = tn.read_eager() | |
except EOFError: | |
output_win.addstr("\n*** Connection closed by remote host ***\n") | |
output_win.refresh() | |
session_active = False | |
break | |
except Exception as e: | |
# Fallback: try a raw socket recv | |
try: | |
data = tn.sock.recv(1024) | |
except Exception as e2: | |
output_win.addstr(f"\n*** Error reading data: {e}; fallback failed: {e2} ***\n") | |
output_win.refresh() | |
session_active = False | |
break | |
if data: | |
try: | |
text = data.decode('utf-8', errors='replace') | |
except Exception: | |
text = data.decode('latin1', errors='replace') | |
for line_part in text.split('\n'): | |
output_win.addstr(line_part + "\n") | |
output_win.refresh() | |
# --- Process User Input --- | |
curses.halfdelay(1) | |
try: | |
c = input_win.getch() | |
except Exception: | |
c = -1 | |
curses.cbreak() | |
if c != -1: | |
if c == 10: # Enter key | |
cmd = line_buffer.strip() | |
if cmd.lower() == "exit": | |
session_active = False | |
break | |
try: | |
tn.write(line_buffer.encode('utf-8') + b"\n") | |
except Exception as e: | |
output_win.addstr(f"\n*** Error sending command: {e} ***\n") | |
output_win.refresh() | |
line_buffer = "" | |
elif c in (263, curses.KEY_BACKSPACE): | |
if line_buffer: | |
line_buffer = line_buffer[:-1] | |
elif c == 3: # Ctrl-C | |
session_active = False | |
break | |
elif c == 330: # Delete key | |
if line_buffer: | |
line_buffer = line_buffer[:-1] | |
elif c == curses.KEY_RESIZE: | |
redraw_windows() | |
elif c == 27: # ESC key: quit entire program immediately | |
tn.close() | |
return | |
elif 32 <= c < 127: | |
line_buffer += chr(c) | |
tn.close() | |
if __name__ == '__main__': | |
curses.wrapper(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment