Skip to content

Instantly share code, notes, and snippets.

@judotens
Last active March 7, 2022 00:54
Show Gist options
  • Save judotens/946a90e3136488c6bd8885c8878cd08f to your computer and use it in GitHub Desktop.
Save judotens/946a90e3136488c6bd8885c8878cd08f to your computer and use it in GitHub Desktop.
Celery Run Bash Project
from __future__ import absolute_import
import os
from subprocess import Popen, PIPE
import datetime
import time
from celery import Celery
from celery import states
from os.path import dirname, join
try:
from dotenv import load_dotenv, find_dotenv
here = dirname(__file__)
dotenv_path = join(here, '.env')
load_dotenv(dotenv_path)
except: pass
CELERY_APP = os.environ.get('CELERY_APP', 'bash')
CELERY_CONCURRENCY = os.environ.get('CELERY_CONCURRENCY', 3)
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
BASH_TIMEOUT = os.environ.get('BASH_TIMEOUT', 1200)
app = Celery(CELERY_APP, broker=CELERY_BROKER_URL, backend=CELERY_BROKER_URL)
app.conf.CELERY_CONCURRENCY = 3
app.conf.CELERY_ACKS_LATE = True
app.conf.CELERYD_PREFETCH_MULTIPLIER = 1
def kill(proc_pid):
process = psutil.Process(proc_pid)
for proc in process.get_children(recursive=True):
proc.kill()
process.kill()
@app.task
def run(cmd, maksi=None):
if maksi and int(maksi) > 0: BASH_TIMEOUT = maksi
popen = Popen(["exec " + cmd], stdout=PIPE, shell=True);
pid = popen.pid
sttime = time.time()
waittime = int(BASH_TIMEOUT)
while True:
popen.poll();
time.sleep(1)
rcode = popen.returncode
now = time.time()
if rcode != None:
print("Process finished!")
run.update_state(state=states.SUCCESS)
return popen.stdout.read()
if ( now > (sttime + waittime) ) :
popen.kill()
run.update_state(state=states.FAILURE)
print("Process killed!")
break
if __name__ == "__main__":
app.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment