Last active
November 13, 2017 16:40
-
-
Save qrkourier/f7f3cc93e0cf0191c6682a85a1ae11ab to your computer and use it in GitHub Desktop.
put metrics in Elasticsearch from the Nicehash API and the Nvidia driver (e.g., GPU temperature)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3.2' | |
services: | |
elasticsearch: | |
image: "docker.elastic.co/elasticsearch/elasticsearch:5.5.2" | |
environment: | |
- http.host=0.0.0.0 | |
- transport.host=127.0.0.1 | |
ports: | |
- "9200:9200" | |
kibana: | |
image: "docker.elastic.co/kibana/kibana:5.5.2" | |
environment: | |
SERVER_NAME: localhost | |
ELASTICSEARCH_URL: http://elasticsearch:9200 | |
ports: | |
- "5601:5601" | |
links: | |
- elasticsearch |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys, json, requests | |
from ratelimit import rate_limited | |
from elasticsearch import Elasticsearch,helpers | |
from datetime import datetime | |
from itertools import chain | |
def main(): | |
nicehash = dict() | |
# store the time in UTC so that Kibana can always display the correct | |
# offset based on the browser's TZ | |
timestamp = datetime.utcnow() | |
# decorate the api calling method with a rate limit of 1 per 3s | |
@rate_limited(3) | |
def call_api(url): | |
try: | |
response = requests.get(url) | |
except requests.ConnectionError as e: | |
raise(e) | |
return response | |
workerAddress = "3FRTZmp2uP1QXApbCzys6auMsNfQ5co6sw" | |
# define the URLs corresponding to each Elasticsearch data type | |
nicehash['urls'] = { | |
'simplemultialgo': "https://api.nicehash.com/api?method=simplemultialgo.info", | |
'stats': "https://api.nicehash.com/api?method=stats.provider&addr="+workerAddress | |
} | |
nicehash['regions'] = ["eu,","us","hk","jp"] | |
# schema | |
indexMappings = { | |
"settings" : { | |
"number_of_shards" : 1 | |
}, | |
"mappings" : { | |
"simplemultialgo" : { | |
"properties" : { | |
"algo" : { "type" : "long" }, | |
"name" : { "type" : "keyword" }, | |
"paying" : { "type" : "float" } | |
} | |
}, | |
"stats" : { | |
"properties" : { | |
"name" : { "type" : "keyword" }, | |
"accepted_speed" : { "type" : "float" }, | |
"rejected_speed" : { "type" : "float" }, | |
"balance" : { "type" : "float" } | |
} | |
} | |
} | |
} | |
# connector | |
es = Elasticsearch( | |
['localhost'], | |
http_auth=('elastic', 'changeme'), | |
port=9200, | |
use_ssl=False | |
) | |
# create the index if necessary | |
index = es.indices.create( | |
index="nicehash", | |
body=indexMappings, | |
ignore=400 | |
) | |
# container for the datasets that will be sent to Elasticsearch via the | |
# bulk helper | |
nicehash['batches'] = dict() | |
# container for the JSON-formatted responses | |
nicehash['responses'] = dict() | |
# container for the iteration generator objects for each Elasticsearch type | |
nicehash['generators'] = dict() | |
for key in nicehash['urls'].keys(): | |
response = call_api(nicehash['urls'][key]) | |
nicehash['responses'][key] = json.loads(response.text) | |
# initialize a list to house the dicts that will be sent to | |
# Elasticsearch as a batch via the bulk helper | |
nicehash['batches'][key] = list() | |
# extract the interesting bits from the response into the batch and | |
# insert a timestamp in each doc in the batch | |
nicehash['batches'][key] = nicehash['responses'][key]['result'][key] | |
for doc in nicehash['batches'][key]: | |
doc['timestamp'] = timestamp | |
# initialize a dict to house mapping of algorithms' names to identifying integer | |
algorithms = dict() | |
# iterate over the returned list of dicts representing the various algorithms | |
# supported by the Nicehash platform to populate the algorithms lookup dict | |
# and make any necessary adjustments to the data to finalize the Elasticsearch batch | |
for algorithm in nicehash['batches']['simplemultialgo']: | |
algorithms[algorithm['algo']] = algorithm['name'] | |
#if algorithm['name'] == "equihash": | |
# algorithm['paying'] = float(algorithm['paying']) / 1000 | |
# iterate over the list of dicts representing statistics per algorithm for | |
# the specified worker | |
for algorithm in nicehash['batches']['stats']: | |
# resolve the algorithm's integer id to the friendly name | |
algorithm['name'] = algorithms[algorithm['algo']] | |
# define a generator as a template for the documents that will be fed to | |
# the ES bulk helper | |
for key in nicehash['urls'].keys(): | |
nicehash['generators'][key] = ({ | |
'_index': "nicehash", | |
'_type': key, | |
'_source': source } | |
for source in chain(nicehash['batches'][key]) | |
) | |
# for doc in nicehash['generators'][key]: | |
# print(doc) | |
# pass the instance of Elasticsearch and a generator of all temps | |
try: | |
result = helpers.bulk(es,nicehash['generators'][key]) | |
except: | |
raise | |
print(result) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#import sys, json, requests | |
from elasticsearch import Elasticsearch,helpers | |
from datetime import datetime | |
from itertools import chain | |
import psutil | |
from pynvml import * | |
def main(): | |
# initialize a list of dicts like { label: "core 1", celsius: 78 } | |
temps = list() | |
# store the time in UTC so that Kibana can always display the correct | |
# offset based on the browser's TZ | |
timestamp = datetime.utcnow() | |
# iterate over sensors_temperatures and push a dict of the label and current | |
# temperature in celsius on a list "cputemps" | |
if not hasattr(psutil, "sensors_temperatures"): | |
sys.exit("platform not supported") | |
sensors = psutil.sensors_temperatures() | |
if not sensors: | |
sys.exit("can't read any temperature") | |
for name, entries in sensors.items(): | |
for entry in entries: | |
myLabel = entry.label or name | |
if myLabel == "pch_skylake": | |
myLabel = "PCIe" | |
elif myLabel == "Package id 0": | |
continue | |
elif myLabel == "acpitz": | |
myLabel = "CPU socket" | |
elif myLabel == "iwlwifi": | |
myLabel = "WiFi adapter" | |
temps.append({ | |
'timestamp': timestamp, | |
'label': myLabel, | |
'celsius': entry.current | |
}) | |
# iterate over GPU devices and push a dict of device name and current | |
# temperature in celsius on a list "gputemps" | |
nvmlInit() | |
gputemps = list() | |
deviceCount = nvmlDeviceGetCount() | |
for device in range(deviceCount): | |
handle = nvmlDeviceGetHandleByIndex(device) | |
gpuname = nvmlDeviceGetName(handle) | |
celsius = str(nvmlDeviceGetTemperature(handle,0)) | |
#print "Device", device, ":", gpuname, celsius+"C" | |
temps.append({ | |
'timestamp': timestamp, | |
'label': gpuname, | |
'celsius': celsius | |
}) | |
# schema | |
indexMappings = { | |
'settings' : { | |
'number_of_shards' : 1 | |
}, | |
'mappings' : { | |
'celsius' : { | |
'properties' : { | |
'timestamp' : { 'type' : "date" }, | |
'label' : { 'type' : "keyword" }, | |
'celsius' : { 'type' : "integer" } | |
}, | |
} | |
} | |
} | |
# connector | |
es = Elasticsearch( | |
['localhost'], | |
http_auth=('elastic', 'changeme'), | |
port=9200, | |
use_ssl=False | |
) | |
# create the index if necessary | |
index = es.indices.create( | |
index="temps", | |
body=indexMappings, | |
ignore=400 | |
) | |
# define a generator as a template for the documents that will be fed to | |
# the ES bulk helper | |
tempsGen = ({ | |
'_index': "temps", | |
'_type': "celsius", | |
'_source': temp } | |
for temp in temps | |
) | |
# for doc in tempsGen: | |
# pass the instance of Elasticsearch and a generator of all temps | |
try: | |
# result = es.index(doc) | |
result = helpers.bulk(es,tempsGen) | |
except: | |
raise | |
print(result) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment