Skip to content

Instantly share code, notes, and snippets.

@githubutilities
Last active August 17, 2021 05:47
Show Gist options
  • Select an option

  • Save githubutilities/53a446f3857613553079 to your computer and use it in GitHub Desktop.

Select an option

Save githubutilities/53a446f3857613553079 to your computer and use it in GitHub Desktop.
Socket Programming in Python

Socket Programming in Python

  • SimpleSocketServer.py--socket server in single thread
  • ThreadedSocketServer.py--socket server in multiple thread
  • PollingSocketServer.py--socket server using select function

Using telnet socket client to debug your program

Run telnet localhost 8080 and type anything you want. To exit, type Escape character and then type q to quit telnet.

Reference

__author__ = 'will'
__version__ = "0.1"
__all__ = ["start_socket_server"]
import select
import socket
def start_socket_server():
CONNECTION_LIST = []
BUFFER_SIZE = 1024
PORT = 8080
HOST = '127.0.0.1'
MAX_QUEUE_SIZE = 10
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
server_socket.bind((HOST, PORT))
server_socket.listen(MAX_QUEUE_SIZE)
CONNECTION_LIST.append(server_socket)
print 'Chat Server listening on port ' + str(PORT)
while True:
read_sockets, write_sockets, error_sockets = select.select(CONNECTION_LIST, [], [])
for sock in read_sockets:
# new connection
if sock == server_socket:
sockfd, addr = server_socket.accept()
CONNECTION_LIST.append(sockfd)
print "Client %s:%s connected" % addr
# message from client
else:
# try to process the incoming data
try:
data = sock.recv(BUFFER_SIZE)
if data:
reply = str(len(data)) + ' bytes received\n' + 'Data contents: ' + str(data) + '\n'
sock.send(reply)
except:
print "Client %s:%s is offline" % addr
CONNECTION_LIST.remove(sock)
# for conn in CONNECTION_LIST:
# conn.send("Client " + str(addr[0]) + ':' + str(addr[1]) + " is offline")
sock.close()
continue
# if current script is the main script run start_socket_server()
if __name__ == "__main__":
start_socket_server()
__author__ = 'will'
__version__ = "0.1"
__all__ = ["start_socket_server"]
import sys
import socket
def start_socket_server():
PORT = 8080
HOST = ''
MAX_QUEUE_NUM = 10
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print 'Socket created'
try:
s.bind((HOST, PORT))
except socket.error as msg:
print 'Bind failed. Error Code: ' + str(msg[0]) + ' Message: ' + msg[1]
sys.exit()
print 'Socket bind completed'
s.listen(MAX_QUEUE_NUM)
print 'Socket listening on ' + str(PORT)
while True:
conn, addr = s.accept()
print 'Socket connected with ' + addr[0] + ':' + str(addr[1])
# close opened socket
s.close()
# if current script is the main script run start_socket_server()
if __name__ == "__main__":
start_socket_server()
__author__ = 'will'
__version__ = "0.1"
__all__ = ["start_socket_server"]
import sys
import socket
import thread
def client_thread(conn, addr):
conn.send('You have successfully connected to ThreadedSocketServer\n')
while True:
BUFFER_SIZE = 1024
# receive
data = conn.recv(BUFFER_SIZE)
if not data:
break
reply = str(len(data)) + ' bytes received\n' + 'Data contents: ' + data
conn.sendall(reply)
conn.close()
print 'connection to ' + addr[0] + ':' + str(addr[1]) + ' closed'
def start_socket_server():
PORT = 8080
HOST = ''
MAX_QUEUE_NUM = 10
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print 'Socket created'
try:
s.bind((HOST, PORT))
except socket.error as msg:
print 'Bind failed. Error Code: ' + str(msg[0]) + ' Message: ' + msg[1]
sys.exit()
print 'Socket bind completed'
s.listen(MAX_QUEUE_NUM)
print 'Socket listening on ' + str(PORT)
while True:
conn, addr = s.accept()
print 'Socket connected with ' + addr[0] + ':' + str(addr[1])
thread.start_new_thread(client_thread, (conn, addr))
# close opened socket
s.close()
# if current script is the main script run start_socket_server()
if __name__ == "__main__":
start_socket_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment