Last active
October 3, 2022 14:51
-
-
Save vsajip/2c1b0c9f6b3e264e41b549d1cc805f8e to your computer and use it in GitHub Desktop.
Test files for CPython gh-84532
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import logging | |
import logging.handlers | |
import time | |
d0 = datetime.date.today() | |
dt0 = datetime.datetime(d0.year, d0.month, d0.day).timestamp() | |
def message(fmt, *args, **kwargs): | |
t = time.time() - dt0 | |
fmt = '%012.6f %s' % (t, fmt) | |
print(fmt, *args, **kwargs) | |
class LogHandler(logging.handlers.SocketHandler): | |
def __init__(self, *args, bufsize=0, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.bufsize = 0 | |
self.buffer = [] | |
def makePickle(self, record): | |
return record.getMessage().encode() + b'\n' | |
def send(self, s): | |
""" | |
Send a pickled string to the socket. | |
This function allows for partial sends which can happen when the | |
network is busy. | |
""" | |
if self.sock is None: | |
message('Client trying to create socket') | |
self.createSocket() | |
if self.sock: | |
message('Client created socket') | |
#self.sock can be None either because we haven't reached the retry | |
#time yet, or because we have reached the retry time and retried, | |
#but are still unable to connect. | |
if not self.sock: | |
print('Client failed to send %s (no socket)' % s) | |
else: | |
try: | |
message('Client sending %s' % s) | |
self.sock.sendall(s) | |
message('Client sent %s' % s) | |
except Exception as e: #pragma: no cover | |
message('Client failed to send %s (%s)' % (s, e)) | |
self.sock.close() | |
self.sock = None # so we can call createSocket next time | |
logger = logging.getLogger() | |
handler = LogHandler('127.0.0.1', 8010) | |
handler.setLevel(logging.DEBUG) | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(handler) | |
logger.closeOnError = True | |
logger.debug('Message #1') | |
time.sleep(5) | |
logger.debug('Message #2') | |
time.sleep(0.2) | |
logger.debug('Message #3') | |
time.sleep(0.2) | |
logger.debug('Message #4') | |
time.sleep(0.2) | |
logger.debug('Message #5') | |
time.sleep(0.2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import socket | |
import time | |
d0 = datetime.date.today() | |
dt0 = datetime.datetime(d0.year, d0.month, d0.day).timestamp() | |
address = ('127.0.0.1', 8010) | |
sock = None | |
def message(fmt, *args, **kwargs): | |
t = time.time() - dt0 | |
fmt = '%012.6f %s' % (t, fmt) | |
print(fmt, *args, **kwargs) | |
def make_socket(timeout=1): | |
result = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
result.settimeout(timeout) | |
try: | |
result.connect(address) | |
except Exception: | |
result.close() | |
message('Failed to connect to server') | |
raise | |
return result | |
def create_socket(): | |
global sock | |
try: | |
sock = make_socket() | |
except Exception as e: | |
message('Client failed to create socket: %s' % e) | |
def send(s): | |
global sock | |
if sock is None: | |
message('Client trying to create socket') | |
create_socket() | |
if sock: | |
message('Client created socket') | |
if not sock: | |
print('Client failed to send %s (no socket)' % s) | |
else: | |
try: | |
message('Client sending %s' % s) | |
sock.sendall(s) | |
message('Client sent %s' % s) | |
except Exception as e: | |
message('Client failed to send %s (%s)' % (s, e)) | |
sock.close() | |
sock = None # so we can call createSocket next time | |
def log(s): | |
send(s.encode() + b'\n') | |
create_socket() | |
log('Message #1') | |
time.sleep(5) | |
log('Message #2') | |
time.sleep(0.2) | |
log('Message #3') | |
time.sleep(0.2) | |
log('Message #4') | |
time.sleep(0.2) | |
log('Message #5') | |
time.sleep(0.2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import socket | |
import time | |
d0 = datetime.date.today() | |
dt0 = datetime.datetime(d0.year, d0.month, d0.day).timestamp() | |
def message(fmt, *args, **kwargs): | |
t = time.time() - dt0 | |
fmt = '%012.6f %s' % (t, fmt) | |
print(fmt, *args, **kwargs) | |
class Timer: | |
def __init__(self, timeout): | |
self.timeout = timeout | |
self.start = datetime.datetime.utcnow() | |
def is_timeout(self): | |
return (self.start + datetime.timedelta(seconds=self.timeout) < | |
datetime.datetime.utcnow()) | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.bind(('127.0.0.1', 8010)) | |
s.listen() | |
while True: | |
conn, addr = s.accept() | |
message('Server got connection from: ', addr) | |
timeout = Timer(timeout=2) | |
while not timeout.is_timeout(): | |
data = conn.recv(1024) | |
if not data: | |
continue | |
message('Server got message: ', data) | |
try: | |
conn.shutdown(socket.SHUT_RD) | |
message('Server shut down reading on socket') | |
except OSError: | |
pass | |
conn.close() | |
message('Server closed socket') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment