Skip to content

Instantly share code, notes, and snippets.

@twobob
Last active February 24, 2025 23:41
Show Gist options
  • Save twobob/84df3714f9a19f3c0a4fea434d3e5d90 to your computer and use it in GitHub Desktop.
Save twobob/84df3714f9a19f3c0a4fea434d3e5d90 to your computer and use it in GitHub Desktop.
telnet - call dave
#!/usr/bin/env python3
"""
Feature-complete Telnet client with a curses-based text UI.
Adjusted to:
- Automatically try "pdp1173.com" on port 23 and log in as "guest".
- If that fails, try "davepl.dyndns.org" on port 23 with "guest".
- Repeat this connection attempt loop forever until ESC is pressed to quit.
- Once connected and logged in, display remote output and allow typing commands.
- Typing 'exit' closes the current session and resumes connection attempts.
- Pressing ESC at any point quits the program immediately.
Enhancements:
- Improved progress messages and error insights.
- For each step (connection and login prompt), two methods are used:
1. Primary method (direct telnetlib connection and reading).
2. Fallback method (raw socket connection and stimulation via newline).
- This ensures the problem is on the remote end rather than the local side.
"""
import curses
import telnetlib
import socket
import select
import time
HOSTS = [
("pdp1173.com", 23),
("davepl.dyndns.org", 23),
]
def attempt_connection(stdscr):
"""
Attempt to connect to the hosts in HOSTS list in order.
Wait for the login prompt, then send 'guest' as the username.
Returns:
- A connected telnetlib.Telnet object on success.
- "ESC" if ESC was pressed.
- None if all attempts fail.
"""
stdscr.clear()
stdscr.addstr(0, 0, "Starting connection attempts...\n")
stdscr.refresh()
for host, port in HOSTS:
# Check for ESC before attempting a host connection
for _ in range(20): # check over 1 second (20*0.05 sec)
stdscr.nodelay(True)
if stdscr.getch() == 27: # ESC
stdscr.addstr("ESC pressed. Aborting connection attempts.\n")
stdscr.refresh()
return "ESC"
time.sleep(0.05)
stdscr.nodelay(False)
stdscr.clear()
stdscr.addstr(0, 0, f"Attempting to connect to {host}:{port}...\n")
stdscr.refresh()
connection = None
error_messages = []
# --- First Connection Attempt: Using telnetlib.Telnet ---
try:
stdscr.addstr("Trying direct telnet connection...\n")
stdscr.refresh()
tn = telnetlib.Telnet(host, port, timeout=5)
connection = tn
stdscr.addstr("Direct telnet connection successful.\n")
stdscr.refresh()
except Exception as e:
error_messages.append(f"Direct telnet error: {e}")
stdscr.addstr(f"Direct connection failed: {e}\n")
stdscr.refresh()
time.sleep(0.5)
# --- Second Connection Attempt: Using socket.create_connection ---
if connection is None:
try:
stdscr.addstr("Trying socket-based connection...\n")
stdscr.refresh()
s = socket.create_connection((host, port), timeout=5)
tn = telnetlib.Telnet()
tn.sock = s
connection = tn
stdscr.addstr("Socket connection successful.\n")
stdscr.refresh()
except Exception as e:
error_messages.append(f"Socket connection error: {e}")
stdscr.addstr(f"Socket connection failed: {e}\n")
stdscr.refresh()
time.sleep(0.5)
if connection is None:
stdscr.addstr(f"Failed to connect to {host}:{port}.\nErrors: {' | '.join(error_messages)}\n")
stdscr.refresh()
time.sleep(1)
continue
# Set non-blocking mode for the connection's socket (if possible)
try:
connection.sock.setblocking(False)
except Exception as e:
stdscr.addstr(f"Warning: Could not set non-blocking mode: {e}\n")
stdscr.refresh()
# --- Wait for Login Prompt ---
stdscr.addstr("Waiting for 'login:' prompt...\n")
stdscr.refresh()
start_time = time.time()
buffer = b""
login_received = False
prompt_attempts = 0
while time.time() - start_time < 10:
rlist, _, _ = select.select([connection.fileno()], [], [], 0.1)
for fd in rlist:
if fd == connection.fileno():
try:
data = connection.read_eager()
if data:
buffer += data
if b"login:" in buffer.lower():
login_received = True
break
except EOFError as e:
stdscr.addstr("Connection closed unexpectedly while waiting for login prompt.\n")
stdscr.addstr(f"EOFError: {e}\n")
stdscr.refresh()
connection.close()
login_received = False
break
except Exception as e:
stdscr.addstr(f"Error reading from connection: {e}\n")
stdscr.refresh()
if login_received:
break
prompt_attempts += 1
# Every ~2 seconds, stimulate the remote side by sending a newline
if prompt_attempts % 20 == 0:
try:
connection.write(b"\n")
stdscr.addstr("No login prompt detected. Sent newline to stimulate response.\n")
stdscr.refresh()
except Exception as e:
stdscr.addstr(f"Error sending newline: {e}\n")
stdscr.refresh()
# Check for ESC during waiting period
stdscr.nodelay(True)
if stdscr.getch() == 27:
stdscr.addstr("ESC pressed during login prompt wait. Aborting.\n")
stdscr.refresh()
connection.close()
return "ESC"
stdscr.nodelay(False)
if not login_received:
stdscr.addstr("Login prompt not received.\nBuffer content received:\n")
try:
stdscr.addstr(buffer.decode('utf-8', errors='replace') + "\n")
except Exception:
stdscr.addstr(str(buffer) + "\n")
stdscr.addstr(f"Closing connection to {host}:{port} and trying next host.\n")
stdscr.refresh()
connection.close()
time.sleep(1)
continue
stdscr.addstr(f"Login prompt received from {host}:{port}. Sending username 'guest'.\n")
stdscr.refresh()
try:
connection.write(b"guest\n")
except Exception as e:
stdscr.addstr(f"Error sending username: {e}\n")
stdscr.refresh()
connection.close()
continue
return connection
return None
def main(stdscr):
curses.cbreak()
curses.noecho()
stdscr.keypad(True)
curses.start_color()
while True:
tn = attempt_connection(stdscr)
if tn == "ESC":
break
if tn is None:
stdscr.addstr("All connection attempts failed. Retrying...\n")
stdscr.refresh()
time.sleep(1)
continue
# Connection is established and login prompt was answered.
stdscr.clear()
stdscr.refresh()
curses.curs_set(1)
height, width = stdscr.getmaxyx()
output_height = height - 2
if output_height < 1:
output_height = 1
output_win = curses.newwin(output_height, width, 0, 0)
input_win = curses.newwin(1, width, output_height, 0)
output_win.scrollok(True)
line_buffer = ""
def redraw_windows():
nonlocal height, width, output_height
height, width = stdscr.getmaxyx()
output_height = height - 2
if output_height < 1:
output_height = 1
output_win.resize(output_height, width)
input_win.resize(1, width)
input_win.mvwin(output_height, 0)
stdscr.refresh()
output_win.refresh()
input_win.refresh()
telnet_fd = tn.get_socket().fileno()
try:
tn.sock.setblocking(False)
except Exception as e:
output_win.addstr(f"Warning: Could not set non-blocking mode: {e}\n")
output_win.refresh()
session_active = True
while session_active:
if curses.is_term_resized(height, width):
redraw_windows()
input_win.clear()
input_win.addstr(0, 0, line_buffer)
input_win.move(0, len(line_buffer))
input_win.refresh()
# --- Read Remote Data (try two methods) ---
rlist, _, _ = select.select([telnet_fd], [], [], 0.05)
if telnet_fd in rlist:
try:
data = tn.read_eager()
except EOFError:
output_win.addstr("\n*** Connection closed by remote host ***\n")
output_win.refresh()
session_active = False
break
except Exception as e:
# Fallback: try a raw socket recv
try:
data = tn.sock.recv(1024)
except Exception as e2:
output_win.addstr(f"\n*** Error reading data: {e}; fallback failed: {e2} ***\n")
output_win.refresh()
session_active = False
break
if data:
try:
text = data.decode('utf-8', errors='replace')
except Exception:
text = data.decode('latin1', errors='replace')
for line_part in text.split('\n'):
output_win.addstr(line_part + "\n")
output_win.refresh()
# --- Process User Input ---
curses.halfdelay(1)
try:
c = input_win.getch()
except Exception:
c = -1
curses.cbreak()
if c != -1:
if c == 10: # Enter key
cmd = line_buffer.strip()
if cmd.lower() == "exit":
session_active = False
break
try:
tn.write(line_buffer.encode('utf-8') + b"\n")
except Exception as e:
output_win.addstr(f"\n*** Error sending command: {e} ***\n")
output_win.refresh()
line_buffer = ""
elif c in (263, curses.KEY_BACKSPACE):
if line_buffer:
line_buffer = line_buffer[:-1]
elif c == 3: # Ctrl-C
session_active = False
break
elif c == 330: # Delete key
if line_buffer:
line_buffer = line_buffer[:-1]
elif c == curses.KEY_RESIZE:
redraw_windows()
elif c == 27: # ESC key: quit entire program immediately
tn.close()
return
elif 32 <= c < 127:
line_buffer += chr(c)
tn.close()
if __name__ == '__main__':
curses.wrapper(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment