Skip to content

Instantly share code, notes, and snippets.

@fosterseth
Last active June 28, 2024 10:28
Show Gist options
  • Save fosterseth/f0966ac6e214099ce28be5b154fd8f5b to your computer and use it in GitHub Desktop.
Save fosterseth/f0966ac6e214099ce28be5b154fd8f5b to your computer and use it in GitHub Desktop.
How a job runs

Controller components diagram

https://lucid.app/lucidchart/647bb2fa-3820-458a-bd2c-3f0f8520add7/edit?page=-8ZpAu3F4v8S#

image

Web Container (uwsgi process)

what is uwsgi

Think of uwsgi as processes that take incoming HTTP requests, run special python code (django views), and then return HTTP responses. Here is a great video that explains wsgi https://youtu.be/UklcIDgHtwQ

from url to Django View

POST request to this endpoint https://localhost:8043/api/v2/job_templates/7/launch/

↘️

re_path(r'^(?P<pk>[0-9]+)/launch/$', JobTemplateLaunch.as_view(), name='job_template_launch')

uwsgi worker eventually runs this view

↘️

class JobTemplateLaunch(RetrieveAPIView):
  def post(self, request, *args, **kwargs):
    new_job = obj.create_unified_job(**serializer.validated_data)

from job template to job object

this is where we go from template to specific job creates an unsaved Job object from the Demo Job Template instance, copying over important launch variables

↘️

unified_job = copy_model_by_class(self, unified_job_class, fields, validated_kwargs)

in copy_model_by_class, Class2 could be Job, ProjectUpdate, InventoryUpdate, etc. For our demo job, it will be Job

↘️

return  Class2(**new_kwargs)

saving job to database and signaling start

save to database! Interestingly, is initially saved with the "New" status

↘️

unified_job.save()

↘️

result = new_job.signal_start(**passwords)

change status to "pending" and schedule a task manager to run asap

↘️

def signal_start(self, **kwargs):
  if  self.dependencies_processed:
     ScheduleTaskManager().schedule()

schedule_task_manager() has some nuance to it. For example, there is special code to make sure we don't schedule a bunch of task managers to run at once. You can read more about it in our docs here https://github.com/ansible/awx/blob/devel/docs/task_manager_system.md#bulk-reschedule

📞 signal_start is good example of inter-process communication. The uwsgi process in the web container is telling the dispatch service running in the task container to start a task! Neat!

Task container (dispatcher process)

Dispatcher responsibilities

The dispatcher processes are designed to run arbitrary python code. These include background tasks like task managers and job reapers, but they also run the jobs themselves! These dispatch processes are the heart of our app and where a ton of important stuff occurs.

inside the task container (i.e. tools_awx_1) run the following to see a list of running dispatcher processes

ps axf

There is a single parent dispatcher process and a few child dispatcher workers (linux forks). The parent process receives incoming work, and then assigns the work to one of the worker processes.

Task manager deserves its own code walk-through, but let's look at some key parts that correspond to running the demo job.

Task manager entry point

load all jobs in database with the "pending" status, which includes our un-ran demo job

↘️

def _schedule(self):
  self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True))

Here we do the actual load from the database

↘️

def get_tasks(self, filter_args):
    qs = (
        UnifiedJob.objects.filter(**filter_args)
        .exclude(launch_type='sync')
        .exclude(polymorphic_ctype_id=wf_approval_ctype_id)
        .order_by('created')
        .prefetch_related('dependent_jobs')
    )

Glossing over some detail, here is a chain of method calls (all in task_manager.py) to handle our job.

self.process_tasks()
 \
   self.process_pending_tasks(pending_tasks)
    \
     self.start_task(task, self.controlplane_ig, execution_instance)
     |
     task.status = 'waiting'

This part signals the parent dispatcher process to begin running the Job.

↘️

task_cls = task._get_task_class() # this is RunJob
task_cls.apply_async(
    [task.pk],
    opts,
    queue=task.get_queue_name(), # controller node that is assigned to this task
    uuid=task.celery_task_id,
    callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
    errbacks=[{'task': handle_work_error.name, 'kwargs': {'task_actual': task_actual}}],
)

📞 One dispatch worker is telling the parent dispatch process to begin work on another task!

RunJob inherits from the PublisherMixin class.

↘️

class PublisherMixin(object):
  def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
    obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
    with pg_bus_conn() as conn:
      conn.notify(queue, json.dumps(obj))

here is a real example of "obj". This is the type of message the parent dispatcher "knows" how to process.

{
'args': [85],
'callbacks': [''{'kwargs': {'task_actual': {'id': 85, 'type': 'job'}},
                'task': 'awx.main.tasks.system.handle_work_success'}],
'errbacks': [''{'kwargs': {'task_actual': {'id': 85, 'type': 'job'}},
               'task': 'awx.main.tasks.system.handle_work_error'}],
'guid': '86784dd5b51e464e9dc811ae55377e03',
'kwargs': {},
'task': 'awx.main.tasks.jobs.RunJob',
'time_pub': 1674803091.1054723,
'uuid': '4899f503-1556-4631-8596-8c10bfddcb23'
}

conn.notify uses the postgres notify system to publish JSON data that contains basic information on running the job.

↘️

def notify(self, channel, payload):
  with self.conn.cursor() as cur:
    cur.execute('SELECT pg_notify(%s, %s);', (channel, payload))

This message will be picked up in the parent dispatcher process

🚧 Detour - need to learn a bit about how the dispatcher works 🚧

The dispatcher runs us a service on bootstrap. That is, the service is always on and starts when the task container starts.

entry point for dispatcher is run_dispatcher.py

↘️

queues = ['tower_broadcast_all', get_local_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4))
consumer.run()

in run() is where we read the message that was sent from the task manager process (in the apply_async)

↘️

with pg_bus_conn(new_connection=True) as conn:
    for e in conn.events(yield_timeouts=True):
        if e is not None:
            self.process_task(json.loads(e.payload))

conn.events()

↘️

for e in conn.events():
  self.process_task(json.loads(e.payload))

in process_task()

body for Demo Job Template

{'args': [22],
 'callbacks': [''{'kwargs': {'task_actual': {'id': 22, 'type': 'job'}},
                'task': 'awx.main.tasks.system.handle_work_success'}],
 'errbacks': [''{'args': ['fe1cf03e-2111-4334-a1f2-440e468c6161'],
               'kwargs': {'subtasks': [{'id': 22, 'type': 'job'}]},
               'task': 'awx.main.tasks.system.handle_work_error'}],
 'guid': '1f06e078f43d4871ac954ba99307f97c',
 'kwargs': {},
 'task': 'awx.main.tasks.jobs.RunJob',
 'uuid': 'fe1cf03e-2111-4334-a1f2-440e468c6161'}

↘️

self.pool.write(queue, body)

in write()

↘️

self.workers[queue_actual].put(body) # an MPQueue object

↘️

class  BaseWorker(object):
  def  read(self, queue):
    return  queue.get(block=True, timeout=1)

read is being called continuously in work_loop()

↘️

body = self.read(queue)
..
self.perform_work(body, *args)

in TaskWorker().perform_work

↘️

result = self.run_callable(body)

in run_callable() ↘️

_call = TaskWorker.resolve_callable(task)

This parses the body['task'] ("awx.main.tasks.jobs.RunJob") into a real python callable object

↘️

_call = _call().run

instantiate the RunJob class and call .run()

Recap

  • Job object (pending) saved in database during the HTTP request / response
  • Task manager loads this object from database, switches status to waiting and submits the job to the dispatcher parent process via PG_NOTIFY
  • Parent dispatcher reads this message, resolves the callable (RunTask class), and runs the callable

Task container (dispatcher process)

Change job status to running

in RunJob.run()

load job from database and change status to running

↘️

self.instance = self.model.objects.get(pk=pk)
..
self.update_model(pk, status='running', start_args='')

Create an empty private data dir

The private data dir is where we collect all essential files needed to run the playbook. This includes project playbooks (hello_world.yml), host information, and extra variables.

with settings.AWX_CLEANUP_PATHS = False you can see the private data dirs for jobs already ran in tools_awx_1 by running

$ ls -d /tmp/awx_*
/tmp/awx_183_sc94vsqs

It begins by calling build_private_data_dir

self is type RunJob and self.instance is type Job

↘️

private_data_dir = self.build_private_data_dir(self.instance)

in build_private_data_dir, create the directory on the filesystem

↘️

path = tempfile.mkdtemp(prefix=JOB_FOLDER_PREFIX % instance.pk, dir=settings.AWX_ISOLATION_BASE_PATH)

e.g. for demo job this makes

/tmp/awx_22_mst0n_rj

Copy project files into the privata data dir

↘️

build_project_dir()
\
 sync_and_copy()
  \
   sync_and_copy_without_lock()

❗ Assume project sync is not needed on this run

We need to copy files from a locally checked out project directory to the private data dir

in sync_and_copy_without_lock()

↘️

RunProjectUpdate.make_local_copy(project, private_data_dir)

in make_local_copy()

↘️

project_path = project.get_project_path(check_if_exists=False)
destination_folder = os.path.join(job_private_data_dir, 'project')
shutil.copytree(project_path, destination_folder, ignore=shutil.ignore_patterns('.git'), symlinks=True)

e.g. for demo job project_path is

/var/lib/awx/projects/_6__demo_project

this directory contains our hello_world.yml ansible playbook shutil.copytree() will do a recursive copy of the project files into the private data dir

Copy extra stuff into private data dir

The next handful of lines build up the private data dir, filling it in with various extra_vars, arguments, settings, the inventory host file, etc.

↘️

here is the private data dir after it is built up, and right before running the job

$ tree /tmp/awx_22_mst0n_rj
├── cp
├── env
│ ├── cmdline
│ ├── extravars
│ └── settings
├── inventory
│ └── hosts
└── project
  ├── hello_world.yml
  └── README.md

env/cmdline - extra arguments to pass to ansible-playbook command

-u admin

env/extravars - these are variables that can be referenced from inside the playbook itself

!unsafe 'awx_job_id': 20
!unsafe 'awx_job_launch_type': !unsafe 'relaunch'
!unsafe 'tower_job_id': 20
!unsafe 'tower_job_launch_type': !unsafe 'relaunch'
!unsafe 'awx_user_id': 2
!unsafe 'awx_user_name': !unsafe 'sbf'
!unsafe 'awx_user_email': !unsafe '[email protected]'
!unsafe 'awx_user_first_name': !unsafe ''
!unsafe 'awx_user_last_name': !unsafe ''
!unsafe 'tower_user_id': 2
!unsafe 'tower_user_name': !unsafe 'sbf'
!unsafe 'tower_user_email': !unsafe '[email protected]'
!unsafe 'tower_user_first_name': !unsafe ''
!unsafe 'tower_user_last_name': !unsafe ''
!unsafe 'awx_inventory_id': 3
!unsafe 'awx_inventory_name': !unsafe 'sbf'
!unsafe 'tower_inventory_id': 3
!unsafe 'tower_inventory_name': !unsafe 'sbf'
!unsafe 'awx_project_revision': !unsafe '347e44fea036c94d5f60e544de006453ee5c71ad'
!unsafe 'awx_project_scm_branch': !unsafe ''
!unsafe 'tower_project_revision': !unsafe '347e44fea036c94d5f60e544de006453ee5c71ad'
!unsafe 'tower_project_scm_branch': !unsafe ''
!unsafe 'awx_job_template_id': 7
!unsafe 'awx_job_template_name': !unsafe 'Demo Job Template'
!unsafe 'tower_job_template_id': 7
!unsafe 'tower_job_template_name': !unsafe 'Demo Job Template'

env/settings - these settings control runner, more described here https://ansible-runner.readthedocs.io/en/stable/intro/?highlight=job_timeout#env-settings-settings-for-runner-itself

{"job_timeout": 0, "suppress_ansible_output": true, "suppress_output_file": true}

inventory/hosts - hosts file. We load the hosts from the inventory and compose a python script (that just prints JSON) containing host information

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
print('{
  "all": {
    "hosts": [
      "localhost"
    ],
    "children": [
      "test"
    ]
  },
  "_meta": {
    "hostvars": {
      "localhost": {
        "ansible_connection": "local",
        "ansible_python_interpreter": "{{ ansible_playbook_python }}",
        "remote_tower_enabled": "true",
        "remote_host_enabled": "true",
        "remote_host_id": 1
      }
    }
  }
}')

project/hello_world.yml

- name: Hello World Sample
  hosts: all
  tasks:
    - name: Hello Message
      debug:
        msg: "Hello World!"

In addition to the private data dir, there are in-memory runner_params that are defined

{'container_image': 'quay.io/ansible/awx-ee:latest',
 'container_options': ['''--user=root',
                       '--network',
                       'slirp4netns:enable_ipv6=true'],
 'envvars': {'ANSIBLE_COLLECTIONS_PATHS': '/runner/requirements_collections:~/.ansible/collections:/usr/share/ansible/collections',
             'ANSIBLE_FORCE_COLOR': 'True',
             'ANSIBLE_HOST_KEY_CHECKING': 'False',
             'ANSIBLE_INVENTORY_UNPARSED_FAILED': 'True',
             'ANSIBLE_PARAMIKO_RECORD_HOST_KEYS': 'False',
             'ANSIBLE_RETRY_FILES_ENABLED': 'False',
             'ANSIBLE_ROLES_PATH': '/runner/requirements_roles:~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles',
             'ANSIBLE_SSH_CONTROL_PATH_DIR': '/runner/cp',
             'AWX_HOST': 'https://towerhost',
             'AWX_PRIVATE_DATA_DIR': '/tmp/awx_198_6l7jb_yk',
             'INVENTORY_ID': '1',
             'JOB_ID': '198',
             'MAX_EVENT_RES': '700000',
             'PROJECT_REVISION': '347e44fea036c94d5f60e544de006453ee5c71ad'},
 'fact_cache_type': '',
 'ident': 198,
 'inventory': '/tmp/awx_198_6l7jb_yk/inventory/hosts',
 'passwords': {'BECOME password.*:\\s*?$': '',
               'Bad passphrase, try again for .*:\\s*?$': '',
               'DOAS password.*:\\s*?$': '',
               'DZDO password.*:\\s*?$': '',
               'ENABLE password.*:\\s*?$': '',
               'Enter passphrase for .*:\\s*?$': '',
               'KSU password.*:\\s*?$': '',
               'MACHINECTL password.*:\\s*?$': '',
               'PBRUN password.*:\\s*?$': '',
               'PFEXEC password.*:\\s*?$': '',
               'PMRUN password.*:\\s*?$': '',
               'Password:\\s*?$': '',
               'RUNAS password.*:\\s*?$': '',
               'SESU password.*:\\s*?$': '',
               'SSH password:\\s*?$': '',
               'SU password.*:\\s*?$': '',
               'SUDO password.*:\\s*?$': '',
               'Vault password:\\s*?$': '',
               'doas password.*:\\s*?$': '',
               'dzdo password.*:\\s*?$': '',
               'enable password.*:\\s*?$': '',
               'ksu password.*:\\s*?$': '',
               'machinectl password.*:\\s*?$': '',
               'pbrun password.*:\\s*?$': '',
               'pfexec password.*:\\s*?$': '',
               'pmrun password.*:\\s*?$': '',
               'runas password.*:\\s*?$': '',
               'sesu password.*:\\s*?$': '',
               'su password.*:\\s*?$': '',
               'sudo password.*:\\s*?$': ''},
 'playbook': 'hello_world.yml',
 'private_data_dir': '/tmp/awx_198_6l7jb_yk',
 'process_isolation': True,
 'process_isolation_executable': 'podman',
 'suppress_env_files': True}

🚧 Detour - need to learn a bit about how ansible-runner works 🚧

Ansible runner can easily be evoked by doing something like this ansible-runner run /path/to/private_data_dir -p hello_world.yml

However, in AWX we need a way to "ship" the private data dir over to a remote location (a k8s pod, or an execution node), run the ansible playbook, and stream the ansible results back to the AWX control plane.

Ansible runner chain

To facilitate this need, ansible runner has three special commands: transmit, worker, and process They are designed to work in a unix pipe chain Read about it here https://ansible-runner.readthedocs.io/en/stable/remote_jobs/?highlight=worker#remote-job-execution

ansible-runner transmit | ansible-runner worker | ansible-runner process

This means the output (stdout) of transmit gets piped into the input (stdin) of worker worker stdout gets piped into the stdin of process

AWX + Receptor achieves the above chain

You can conceptually replace the pipe symbols above ("|") with receptor. Receptor's job is to take stdin from AWX, run the work elsewhere, and then stream the results back to AWX.

So, AWX control plane is going to take care of the transmit and process. Receptor is going to take care of worker.

transmit() | receptorctl work submit
receptorctl work results | processor()

Initiate AWXReceptorJob object

An AWXReceptorJob object is instantiated and .run() is called params are the runner_params above

↘️

receptor_job = AWXReceptorJob(self, params)
res = receptor_job.run()

in run()

↘️

receptor_ctl = get_receptor_ctl()
..
res = self._run_internal(receptor_ctl)

receptorctl is a utility library that can be installed via pip and allows commands to be issued to a local receptor daemon via unix sockets

Transmit and work submit

grab a pair of sockets in _run_internal()

↘️

sockin, sockout = socket.socketpair()
..
transmitter_future = executor.submit(self.transmit, sockin)

Note: A unix socket pair is a typical way to allow a parent and child process to communicate. The parent process creates the pair of sockets, forks to create a child process, and passes one socket to the child. The pair are linked up so that they can be written to and read to from the two processes as a mean of communication. In our code, transmit is being run in a separate thread as work_submit.

transmit will compress (tar) up the private_data_dir and write it the the sockin unix socket. in transmit()

↘️

ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)

submit the work to receptor, with the payload set to whatever transmit outputs.

↘️

result = receptor_ctl.submit_work(payload=sockout.makefile('rb'), **work_submit_kw)

at the same time, receptor will read from sockout and save the tar contents to a file on disk

Receptor work unit on disk

receptor itself maintains a directory to represent the work in tools_awx_1

$ cd /tmp/receptor/awx_1/aohbnpsM

aohbnpsM above is the receptor work unit ID, which can be found on the job API

files in this directory

status
stdin
stdout

stdin contains all of the necessary input that ansible-runner worker expects

Ansible runner worker

Once receptor gets the full stdin, it will call ansible-runner worker and pass the file contents to it, and it is here the job actually runs, that is, ansible-playbook commands will eventually be called.

Remember that ansible-runner worker is receiving the project files in a compressed file. It will unpack this tar to directory specified by AWX_PRIVATE_DATA_DIR in the runner params.

Note, ansible-playbook isn't ran directly. Instead ansible-runner worker will tell podman to start a container from the job's execution environment image. You can get the exact podman command that is ran from the job_args field in the API e.g.

podman run --rm --tty --interactive 
  --workdir /runner/project 
  -v /tmp/awx_191_96fykhja/:/runner/:Z 
  --env-file /tmp/awx_191_96fykhja/artifacts/191/env.list 
  --quiet 
  --name ansible_runner_191 
  --user=root 
  --network slirp4netns:enable_ipv6=true 
  quay.io/ansible/awx-ee:latest 
  ansible-playbook 
  -u admin -i /runner/inventory/hosts -e @/runner/env/extravars hello_world.yml

Note the ansible-playbook command that is ran! Finally!

Getting ansible runner results back to AWX control plane

While the playbook is running, receptor is live-streaming the results back to AWX.

This is handled by calling get_work_results()

↘️

resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True)

this will open a socket to start reading data back from receptor.

Just like how transmit ran in a separate thread, process will also run in a separate thread.

↘️

processor_future = executor.submit(self.processor, resultfile)

processor will handle each line of stdout from ansible runner

in processor(). We set up special callbacks to handle certain types of lines that are emitted from ansible runner

↘️

return ansible_runner.interface.run(
    streamer='process',
    quiet=True,
    _input=resultfile,
    event_handler=self.task.runner_callback.event_handler,
    finished_callback=self.task.runner_callback.finished_callback,
    status_handler=self.task.runner_callback.status_handler,
    **self.runner_params,
)

For example, {"status": handled by status_handler

{"uuid": handled by event_handler

{"eof": true} handled by finished_callback

Ultimately the job is done whenever processor finishes, that is, whenever it gets the EOF signal.

_run_internal() returns whatever process thread returns

↘️

return  res

get the status from res

↘️

status = res.status

here is where the "successful" status is saved. Recall self.instance is the Job object.

↘️

self.instance = self.update_model(pk, status=status, select_for_update=True, **self.runner_callback.get_delayed_update_fields())

❔ What about the events? ❔

Processor is handling each event that is coming back from ansible-runner via the event_handler() callback. This is the code path that eventually saves events into the database.

The event handling is fairly complex and involves the callback receiver daemon, so I'll save that for a future topic to cover in more detail.

Other parts of the job lifecyle

In RunJob you'll also find a few other hooks

  • pre_run_hook
  • post_run_hook
  • final_run_hook

these are methods that are called during various places of the jobs life that do extra things, such as pre-flight checks and handling fact cache. Not a ton of things are happening there, but good to know they are an important part of the job run.

Future topics to cover

  • Websockets (i.e. how you are able to view stdout live in the UI)
  • Callback receiver (how events make it into the database)
  • Touched on a bit here, but could use more detail:
    • Task manager
    • Dispatcher
    • Ansible runner
  • Receptor
    • Netceptor (the networking portion of Receptor)
    • Workceptor (Job handling portion of Receptor)
  • AWX Operator (i.e. how the app is deployed to a Kubernetes cluster)
  • Bootstrapping AWX (i.e. how AWX app starts up, including services, etc)
  • Project Updates, Inventory Updates, and Project Syncs
  • Subsystem metrics
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment