Skip to content

Instantly share code, notes, and snippets.

@belyaev-pa
Created April 29, 2019 16:21
Show Gist options
  • Select an option

  • Save belyaev-pa/03ef3141d5b673d0a571e6d9b950ed74 to your computer and use it in GitHub Desktop.

Select an option

Save belyaev-pa/03ef3141d5b673d0a571e6d9b950ed74 to your computer and use it in GitHub Desktop.
socket_client
import socket
import sys
import json
# Create a UDS socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = './uds_socket.s'
print(sys.stderr, 'connecting to %s' % server_address)
try:
sock.connect(server_address)
except socket.error, msg:
print(sys.stderr, msg)
sys.exit(1)
try:
# Send data
msg = dict(
job_id='1111111111111',
job_type='my_type',
manager_type='local',
arguments='log_txt_file=/home/pavel/test_log.txt'
)
message = json.dumps(msg)
print(sys.stderr, 'sending "%s"' % message)
sock.sendall(message)
amount_received = 0
amount_expected = len(message)
while amount_received < amount_expected:
data = sock.recv(16)
amount_received += len(data)
print(sys.stderr, 'received "%s"' % data)
finally:
print(sys.stderr, 'closing socket')
sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment