Skip to content

Instantly share code, notes, and snippets.

@ihincks
Last active January 9, 2018 20:32
Show Gist options
  • Save ihincks/b017b72d1d950db753b60b92e5200fbd to your computer and use it in GitHub Desktop.
Save ihincks/b017b72d1d950db753b60b92e5200fbd to your computer and use it in GitHub Desktop.
Quick code to talk to COM or TCPIP hardware devices
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import socket\n",
"from time"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"class TCPClient(object):\n",
" \"\"\"\n",
" Wraps socket.socket to make it a bit more suitable for receiving and\n",
" transmitting terminated strings.\n",
" \"\"\"\n",
" \n",
" TERMINATOR = '\\r\\n'\n",
" INPUT_TIMEOUT = 10\n",
"\n",
" def __init__(self, server_ip, server_port):\n",
" self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n",
" self._server_ip = server_ip\n",
" self._server_port = server_port\n",
" # we implement our own buffer just to make bytes_available\n",
" # a thing, since socket.socket doesn't have it.\n",
" self._buffer = ''\n",
"\n",
" def connect(self):\n",
" self._socket.connect((self._server_ip, self._server_port))\n",
" self._socket.setblocking(0)\n",
"\n",
" def close(self):\n",
" self._socket.close()\n",
"\n",
" def _fill_buffer(self):\n",
" is_complete = False\n",
" while not is_complete:\n",
" try:\n",
" self._buffer += self._socket.recv(8192)\n",
" time.sleep(0.01)\n",
" except:\n",
" is_complete = True\n",
"\n",
"\n",
" @property\n",
" def bytes_available(self):\n",
" self._fill_buffer()\n",
" return len(self._buffer)\n",
"\n",
" def flush_input(self):\n",
" self._fill_buffer()\n",
" self._buffer = ''\n",
"\n",
" def write_string(self, string):\n",
" self._socket.send(string + TCPClient.TERMINATOR)\n",
"\n",
" def read_string(self):\n",
" self._fill_buffer()\n",
" message, sep, end = self._buffer.partition(TCPClient.TERMINATOR)\n",
" self._buffer = end\n",
" return message + sep\n",
"\n",
" def read_string_until_terminator(self, timeout=None):\n",
" timeout = self.INPUT_TIMEOUT if timeout is None else timeout\n",
" message = self.read_string()\n",
" t = time.time()\n",
" while len(message) == 0 or ord(message[-1]) != ord(TCPClient.TERMINATOR):\n",
" message += self.read_string()\n",
" time.sleep(0.01)\n",
" if timeout > 0 and time.time() - t > timeout:\n",
" raise IOError('TCP input stream timeout waiting for terminator. So far we have: \\n\"{}\"'.format(message))\n",
" return message"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"client = TCPClient('192.168.1.120',12321)\n",
"client.connect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"client.write_string('IDN')\n",
"print client.read_string_until_terminator()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"client.close()"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda env:birb]",
"language": "python",
"name": "conda-env-birb-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment