Created
October 31, 2016 21:54
-
-
Save hetii/613e23daa7a402de7ccd7479a29e9660 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
`# This is simple bridge between PTY and TCP endpoint | |
# You can run it by executing following step in 3 terminals: | |
# 1. nc -l 8080 | |
# 2. python simpleclient.py | |
# 3. minicom -p /dev/pts/3 | |
from __future__ import print_function | |
from twisted.internet import reactor, protocol, inotify, fdesc | |
from twisted.python.util import println | |
from twisted.web.client import getPage | |
from twisted.python import filepath | |
import os, pty, array, fcntl | |
TCGETS2 = 0x802C542A | |
class PtyEndpoint: | |
old_buf = array.array('i', [0] * 64) | |
def __init__(self): | |
self.pty_master, self.pty_slave = pty.openpty() | |
self.pty_master_name = os.ttyname(self.pty_master) | |
self.pty_slave_name = os.ttyname(self.pty_slave) | |
print("Master name: %s" % self.pty_master_name) | |
print("Slave : %s" % self.pty_slave_name) | |
def fileno(self): | |
""" We want to select on FD 0 """ | |
return self.pty_master | |
def changeBaudrate(self): | |
buf = array.array('i', [0] * 64) | |
fcntl.ioctl(self.pty_slave, TCGETS2, buf) | |
if buf != self.old_buf: | |
self.old_buf = buf | |
rate = buf[9] | |
print ( "Slave name is: %s, baud-rate: %s" % (self.pty_slave_name, rate)) | |
print(self.transport.realAddress) | |
page="http://%s/console/baud?rate=%s/" % (self.transport.realAddress[0], rate) | |
getPage(page).addCallbacks(callback=lambda value:(println(value)), | |
errback=lambda error:(println("an error occurred", error), reactor.stop())) | |
def doRead(self): | |
"""Read data from PTY and write it to TCP endpoint.""" | |
data = os.read(self.pty_master, 1024) | |
print("doRead called with data: %s" % data) | |
self.changeBaudrate(); | |
self.transport.write(data) | |
def doWrite(self, data): | |
os.write(self.pty_master, data) | |
print("doWrite called, ", data) | |
def logPrefix(self): | |
return 'PtyStdIO' | |
def connectionMade(self): | |
print("PtyStdIO connectionMade") | |
def connectionLost(self, reason): | |
print("PtyStdIO connectionLost", reason) | |
class TcpEndpoint(protocol.Protocol): | |
def __init__(self, pty_instance): | |
self.pty_instance = pty_instance | |
def connectionMade(self): | |
self.pty_instance.transport = self.transport | |
print("TcpEndpoint connectionMade") | |
def dataReceived(self, data): | |
self.pty_instance.doWrite(data) | |
print("FROM TCP: %r" % data) | |
def connectionLost(self, reason): | |
print("TcpEndpoint connection lost") | |
class TCPFactory(protocol.ClientFactory): | |
protocol = TcpEndpoint | |
def buildProtocol(self, addr=None): | |
pty_instance = PtyEndpoint() | |
pty_protocol = self.protocol(pty_instance) | |
reactor.addReader(pty_instance) | |
return pty_protocol | |
def clientConnectionLost(self, conn, reason): | |
print("TCPFactory clientConnectionLost") | |
pass | |
def main(): | |
f = TCPFactory() | |
reactor.connectTCP("192.168.4.1", 23, f) | |
reactor.run() | |
# this only runs if the module was *not* imported | |
if __name__ == '__main__': | |
main() | |
` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment