Created
May 13, 2016 14:36
-
-
Save nopper/2551d5c15310558aed4c0d01d647a201 to your computer and use it in GitHub Desktop.
Tmux helper for fabric
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fabric.api import * | |
class Tmux(object): | |
"""Tmux helper for fabric""" | |
def __init__(self, session_name, run_cmd=run): | |
self.session_name = session_name | |
self.run_cmd = run_cmd | |
self.create_session() | |
def create_session(self): | |
with settings(warn_only=True): | |
test = self.run_cmd('tmux has-session -t %s' % self.session_name) | |
if test.failed: | |
self.run_cmd('tmux new-session -d -s %s' % self.session_name) | |
self.run_cmd( | |
'tmux set-option -t %s -g allow-rename off' % self.session_name) | |
def recreate(self): | |
self.kill_session() | |
self.create_session() | |
def kill_session(self): | |
self.run_cmd('tmux kill-session -t %s' % self.session_name) | |
def command(self, command, pane=0): | |
self.run_cmd('tmux send-keys -t %s:%s "%s" ENTER' % ( | |
self.session_name, pane, command)) | |
def new_window(self, name): | |
self.run_cmd('tmux new-window -t %s -n %s' % (self.session_name, name)) | |
def find_window(self, name): | |
with settings(warn_only=True): | |
test = self.run_cmd('tmux list-windows -t %s | grep \'%s\'' % (self.session_name, name)) | |
return not test.failed | |
def rename_window(self, new_name, old_name=None): | |
if old_name is None: | |
self.run_cmd('tmux rename-window %s' % new_name) | |
else: | |
self.run_cmd('tmux rename-window -t %s %s' % (old_name, new_name)) | |
def wait_for(self, signal_name): | |
self.run_cmd('tmux wait-for %s' % signal_name) | |
def run_singleton(self, command, orig_name): | |
run_name = "run/%s" % orig_name | |
done_name = "done/%s" % orig_name | |
# If the program is running we wait to be finished. | |
if self.find_window(run_name): | |
self.wait_for(run_name) | |
# If the program is not running we create a window with done_name | |
if not self.find_window(done_name): | |
self.new_window(done_name) | |
self.rename_window(run_name, done_name) | |
# Check that we can execute the commands in the correct window | |
assert self.find_window(run_name) | |
rename_window_cmd = 'tmux rename-window -t %s %s' % ( | |
run_name, done_name) | |
signal_cmd = 'tmux wait-for -S %s' % run_name | |
expanded_command = '%s ; %s ; %s' % ( | |
command, rename_window_cmd, signal_cmd) | |
self.command(expanded_command, run_name) | |
self.wait_for(run_name) | |
@task | |
def test(): | |
t = Tmux('session', run_cmd=local) | |
t.run_singleton('sleep 10', 'sleeping') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment