Skip to content

Instantly share code, notes, and snippets.

@kittinan
Last active October 9, 2024 06:09
Show Gist options
  • Save kittinan/9f10bb3ae7bef351b50b5d49677d3895 to your computer and use it in GitHub Desktop.
Save kittinan/9f10bb3ae7bef351b50b5d49677d3895 to your computer and use it in GitHub Desktop.
# pip install asyncio websockets
import asyncio
import binascii
import random
import string
import struct
import websockets
def generate_client_identifier(prefix="popdeng0.", length=22):
# Generate a random string of hexadecimal characters to complete the client identifier
suffix_length = length - len(prefix)
suffix = "".join(random.choices(string.hexdigits.lower(), k=suffix_length))
return prefix + suffix
def create_mqtt_connect_packet(client_identifier):
# 102200044d5154540402003c0016706f7064656e67302e65383839323964303533646136
# Fixed header
packet_type_and_flags = 0x10 # CONNECT packet type (1) and flags (0000)
# Protocol Name (MQTT)
protocol_name = "MQTT".encode("utf-8") # Convert "MQTT" to bytes
protocol_name_length = struct.pack(
">H", len(protocol_name)
) # Length of protocol name (2 bytes)
# Protocol Level (for MQTT 3.1.1, it's 4)
protocol_level = struct.pack("B", 4)
# Connect flags (Clean Session = 1, all others are 0)
connect_flags = struct.pack("B", 2)
# Keep Alive (2 bytes) - 60 seconds
keep_alive = struct.pack(">H", 60)
# Client Identifier
client_id_bytes = client_identifier.encode("utf-8")
client_id_length = struct.pack(
">H", len(client_id_bytes)
) # Length of client identifier (2 bytes)
# Variable header (Protocol Name, Protocol Level, Connect Flags, Keep Alive)
variable_header = (
protocol_name_length
+ protocol_name
+ protocol_level
+ connect_flags
+ keep_alive
)
# Payload (Client Identifier)
payload = client_id_length + client_id_bytes
# Remaining Length (Variable header + Payload)
remaining_length = len(variable_header) + len(payload)
# Use MQTT's Remaining Length encoding (variable-length format)
remaining_length_encoded = b""
while remaining_length > 0:
encoded_byte = remaining_length % 128
remaining_length //= 128
# If there are more data to encode, set the continuation bit (0x80)
if remaining_length > 0:
encoded_byte |= 0x80
remaining_length_encoded += struct.pack("B", encoded_byte)
# Fixed header + remaining length + variable header + payload
packet = (
struct.pack("B", packet_type_and_flags)
+ remaining_length_encoded
+ variable_header
+ payload
)
return packet
async def send_messages():
uri = "wss://test.mosquitto.org:8081/mqtt"
# Define headers that can be set manually
extra_headers = [
("Pragma", "no-cache"),
("Cache-Control", "no-cache"),
(
"User-Agent",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
),
("Accept-Encoding", "gzip, deflate, br, zstd"),
("Accept-Language", "en-US,en;q=0.9,th-TH;q=0.8,th;q=0.7"),
]
# Set the 'Origin' header
origin = "https://popdeng.click"
# Specify the subprotocol to set 'Sec-WebSocket-Protocol' header
subprotocols = ["mqtt"]
async with websockets.connect(
uri, extra_headers=extra_headers, origin=origin, subprotocols=subprotocols
) as websocket:
# Connection successful
print(f"Connected successfully to {uri}")
client_identity = generate_client_identifier()
# client_identity = "pdeng0.06e3fcc3eedeb"
print("client_identity: {}".format(client_identity))
init_message = create_mqtt_connect_packet(client_identity)
print("init_message: {}".format(init_message))
# init_message = binascii.unhexlify(init_message_hex)
await websocket.send(init_message)
print("Sent initial message to initiate connection.")
# Initialize counter
counter = 0
# Infinite loop to send binary message every 20ms
message_hex = "3025000e706f7064656e672f636c69636b737b22636f756e7472795f636f6465223a225448227d"
message = binascii.unhexlify(message_hex)
while True:
await websocket.send(message)
counter += 1
print(f"Sent message {counter}")
await asyncio.sleep(0.02) # Sleep for 20ms
asyncio.run(send_messages())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment