Last active
April 6, 2022 15:34
-
-
Save Ran-Xing/b5363ac76c553505e7bce98c1c7cf162 to your computer and use it in GitHub Desktop.
python api Detect interface usage times
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: UTF-8 -*- | |
import json | |
import os | |
import socket | |
def server_init(): | |
HOST, PORT = '0.0.0.0', 9999 | |
listen_socket = socket.socket(socket.AF_INET, | |
socket.SOCK_STREAM) | |
listen_socket.setsockopt(socket.SOL_SOCKET, | |
socket.SO_REUSEADDR, 1) | |
listen_socket.bind((HOST, PORT)) | |
listen_socket.listen(1) | |
print('Serving HTTP on port %s ...' % PORT) | |
return listen_socket | |
def server(extended): | |
client_connection, client_address = extended.accept() | |
try: | |
URL_PATH = client_connection.recv(1024).decode("utf-8").split("\r\n")[0].split(' ')[1] | |
except UnicodeDecodeError: | |
URL_PATH = "Error" | |
if URL_PATH == '/wechaty': | |
client_connection.send(b"""HTTP/1.1 200 OK\n\n{"status": "ok"}""") | |
client_connection.close() | |
readNum(client_address, "Success") | |
else: | |
client_connection.close() | |
readNum(client_address, "Errors") | |
def readNum(ipAddr, status): | |
def IPNum(fJson): | |
if ipAddr[0] in fJson[status]: | |
fJson[status][ipAddr[0]] += 1 | |
else: | |
# 如果没有找到相应的IP地址,则为第一次,所以添加 1 | |
fJson[status][ipAddr[0]] = 1 | |
if os.path.exists('log.json'): | |
with open('log.json', 'r') as readfile: | |
fJson = json.load(readfile) | |
readfile.close() | |
fJson["Nums"] += 1 | |
IPNum(fJson) | |
else: | |
fJson = {"Nums": 1, "Success": {ipAddr[0]: 0}, "Errors": {ipAddr[0]: 0}} | |
fJson[status][ipAddr[0]] = 1 | |
print('IP: %s:%s 请求状态: %s 请求次数: %s Api 接收到的总次数:%s' % ( | |
ipAddr[0], ipAddr[1], status, fJson[status][ipAddr[0]], fJson["Nums"])) | |
with open('log.json', "w") as outfile: | |
json.dump(fJson, outfile) | |
outfile.close() | |
if __name__ == '__main__': | |
listen_socket = server_init() | |
while True: | |
server(listen_socket) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment