Skip to content

Instantly share code, notes, and snippets.

@nicolargo
Last active August 29, 2015 14:27
Show Gist options
  • Save nicolargo/91a1df2471d8f4b608ce to your computer and use it in GitHub Desktop.
Save nicolargo/91a1df2471d8f4b608ce to your computer and use it in GitHub Desktop.
Docker stats getter in a thread
import threading
import docker
import time
class ThreadStats(threading.Thread):
def __init__(self, client, container):
super(ThreadStats, self).__init__()
self._stop = threading.Event()
self.container = container
self.__stats_stream = client.stats(self.container['Id'], decode=True)
self.stats = None
def run(self):
for i in self.__stats_stream:
# print('Container {} => {}'.format(self.container['Id'], i['network']))
self.stats = i
if self.stopped():
break
@property
def stats(self):
return self._stats
@stats.setter
def stats(self, value):
self._stats = value
def stop(self, timeout=None):
self._stop.set()
super(ThreadStats, self).join(timeout)
def stopped(self):
return self._stop.isSet()
def main():
client = docker.Client(base_url='unix://var/run/docker.sock')
print('Docker version: {}'.format(client.version()))
thread_list = []
for c in client.containers():
t = ThreadStats(client, c,)
thread_list.append(t)
t.start()
time.sleep(3)
for t in thread_list:
print(t.stats['network'])
time.sleep(3)
for t in thread_list:
print(t.stats['network'])
# Stop threads
for t in thread_list:
t.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment