Skip to content

Instantly share code, notes, and snippets.

@draganHR
Created November 9, 2014 09:55
Show Gist options
  • Save draganHR/7807f108b11785fe443d to your computer and use it in GitHub Desktop.
Save draganHR/7807f108b11785fe443d to your computer and use it in GitHub Desktop.
Run test elasticsearch server
from elasticsearch import Elasticsearch, TransportError
from nose.plugins import Plugin
import os
import shutil
import subprocess
import tempfile
import socket
from time import sleep
class ElasticsearchServer(Plugin):
"""
Nose plugin that starts elasticsearch server.
"""
enabled = True
name = 'elasticsearchserver'
home = '/usr/share/elasticsearch'
cmd = os.path.join(home, 'bin/elasticsearch')
_process = None
_tempdir = None
_host = '127.0.0.1'
_port = 9201
_timeout = 5
def configure(self, options, conf):
super(ElasticsearchServer, self).configure(options, conf)
def begin(self):
# Check if port is free
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((self._host, self._port))
if result == 0:
raise Exception("Port %s is open" % self._port)
# Create temp dirs
self._tempdir = tempfile.mkdtemp(prefix='nose-elasticsearchserver-')
for dirname in ('data', 'work', 'logs'):
os.makedirs(os.path.join(self._tempdir, dirname))
# Start process
self._process = subprocess.Popen([
self.cmd,
'-Des.cluster.name=test-cluster',
'-Des.node.name=test-node',
'-Des.node.master=true',
'-Des.node.data=true',
'-Des.network.host=' + self._host,
'-Des.http.port=' + str(self._port),
'-Des.path.data=' + os.path.join(self._tempdir, 'data'),
'-Des.path.work=' + os.path.join(self._tempdir, 'data'),
'-Des.path.logs=' + os.path.join(self._tempdir, 'data'),
'-Des.discovery.zen.ping.multicast.enabled=false'
])
# Wait for elasticsearch to become available
connection_errors = 0
client = Elasticsearch(['%s:%s' % (self._host, self._port)], timeout=self._timeout)
while connection_errors < 30:
try:
client.cluster.health(wait_for_status='green', request_timeout=20)
except TransportError:
connection_errors += 1
sleep(1)
else:
break
self.setup_indices()
def setup_indices(self, client):
pass
def finalize(self, result):
self._process.terminate()
self._process.communicate()
shutil.rmtree(self._tempdir)
#!/bin/sh
rm -rf /tmp/elasticsearch-test
mkdir -p /tmp/elasticsearch-test/data
mkdir -p /tmp/elasticsearch-test/work
mkdir -p /tmp/elasticsearch-test/logs
/usr/share/elasticsearch/bin/elasticsearch \
-Des.cluster.name=test-cluster \
-Des.node.name=test-node \
-Des.node.data=true \
-Des.node.master=true \
-Des.network.host=127.0.0.1 \
-Des.http.port=9201 \
-Des.path.data=/tmp/elasticsearch-test/data \
-Des.path.work=/tmp/elasticsearch-test/work \
-Des.path.logs=/tmp/elasticsearch-test/logs \
-Des.discovery.zen.ping.multicast.enabled=false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment