https://lucid.app/lucidchart/647bb2fa-3820-458a-bd2c-3f0f8520add7/edit?page=-8ZpAu3F4v8S#
Think of uwsgi as processes that take incoming HTTP requests, run special python code (django views), and then return HTTP responses. Here is a great video that explains wsgi https://youtu.be/UklcIDgHtwQ
POST request to this endpoint
https://localhost:8043/api/v2/job_templates/7/launch/
re_path(r'^(?P<pk>[0-9]+)/launch/$', JobTemplateLaunch.as_view(), name='job_template_launch')
uwsgi worker eventually runs this view
class JobTemplateLaunch(RetrieveAPIView):
def post(self, request, *args, **kwargs):
new_job = obj.create_unified_job(**serializer.validated_data)
this is where we go from template to specific job creates an unsaved Job object from the Demo Job Template instance, copying over important launch variables
unified_job = copy_model_by_class(self, unified_job_class, fields, validated_kwargs)
in copy_model_by_class, Class2 could be Job, ProjectUpdate, InventoryUpdate, etc. For our demo job, it will be Job
return Class2(**new_kwargs)
save to database! Interestingly, is initially saved with the "New" status
unified_job.save()
result = new_job.signal_start(**passwords)
change status to "pending" and schedule a task manager to run asap
def signal_start(self, **kwargs):
if self.dependencies_processed:
ScheduleTaskManager().schedule()
schedule_task_manager() has some nuance to it. For example, there is special code to make sure we don't schedule a bunch of task managers to run at once. You can read more about it in our docs here https://github.com/ansible/awx/blob/devel/docs/task_manager_system.md#bulk-reschedule
📞 signal_start is good example of inter-process communication. The uwsgi process in the web container is telling the dispatch service running in the task container to start a task! Neat!
The dispatcher processes are designed to run arbitrary python code. These include background tasks like task managers and job reapers, but they also run the jobs themselves! These dispatch processes are the heart of our app and where a ton of important stuff occurs.
inside the task container (i.e. tools_awx_1) run the following to see a list of running dispatcher processes
ps axf
There is a single parent dispatcher process and a few child dispatcher workers (linux forks). The parent process receives incoming work, and then assigns the work to one of the worker processes.
Task manager deserves its own code walk-through, but let's look at some key parts that correspond to running the demo job.
load all jobs in database with the "pending" status, which includes our un-ran demo job
def _schedule(self):
self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True))
Here we do the actual load from the database
def get_tasks(self, filter_args):
qs = (
UnifiedJob.objects.filter(**filter_args)
.exclude(launch_type='sync')
.exclude(polymorphic_ctype_id=wf_approval_ctype_id)
.order_by('created')
.prefetch_related('dependent_jobs')
)
Glossing over some detail, here is a chain of method calls (all in task_manager.py) to handle our job.
self.process_tasks()
\
self.process_pending_tasks(pending_tasks)
\
self.start_task(task, self.controlplane_ig, execution_instance)
|
task.status = 'waiting'
This part signals the parent dispatcher process to begin running the Job.
task_cls = task._get_task_class() # this is RunJob
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(), # controller node that is assigned to this task
uuid=task.celery_task_id,
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
errbacks=[{'task': handle_work_error.name, 'kwargs': {'task_actual': task_actual}}],
)
📞 One dispatch worker is telling the parent dispatch process to begin work on another task!
RunJob inherits from the PublisherMixin class.
class PublisherMixin(object):
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
with pg_bus_conn() as conn:
conn.notify(queue, json.dumps(obj))
here is a real example of "obj". This is the type of message the parent dispatcher "knows" how to process.
{
'args': [85],
'callbacks': [''{'kwargs': {'task_actual': {'id': 85, 'type': 'job'}},
'task': 'awx.main.tasks.system.handle_work_success'}],
'errbacks': [''{'kwargs': {'task_actual': {'id': 85, 'type': 'job'}},
'task': 'awx.main.tasks.system.handle_work_error'}],
'guid': '86784dd5b51e464e9dc811ae55377e03',
'kwargs': {},
'task': 'awx.main.tasks.jobs.RunJob',
'time_pub': 1674803091.1054723,
'uuid': '4899f503-1556-4631-8596-8c10bfddcb23'
}
conn.notify uses the postgres notify system to publish JSON data that contains basic information on running the job.
def notify(self, channel, payload):
with self.conn.cursor() as cur:
cur.execute('SELECT pg_notify(%s, %s);', (channel, payload))
This message will be picked up in the parent dispatcher process
The dispatcher runs us a service on bootstrap. That is, the service is always on and starts when the task container starts.
entry point for dispatcher is run_dispatcher.py
queues = ['tower_broadcast_all', get_local_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4))
consumer.run()
in run() is where we read the message that was sent from the task manager process (in the apply_async)
with pg_bus_conn(new_connection=True) as conn:
for e in conn.events(yield_timeouts=True):
if e is not None:
self.process_task(json.loads(e.payload))
conn.events()
for e in conn.events():
self.process_task(json.loads(e.payload))
in process_task()
body for Demo Job Template
{'args': [22],
'callbacks': [''{'kwargs': {'task_actual': {'id': 22, 'type': 'job'}},
'task': 'awx.main.tasks.system.handle_work_success'}],
'errbacks': [''{'args': ['fe1cf03e-2111-4334-a1f2-440e468c6161'],
'kwargs': {'subtasks': [{'id': 22, 'type': 'job'}]},
'task': 'awx.main.tasks.system.handle_work_error'}],
'guid': '1f06e078f43d4871ac954ba99307f97c',
'kwargs': {},
'task': 'awx.main.tasks.jobs.RunJob',
'uuid': 'fe1cf03e-2111-4334-a1f2-440e468c6161'}
self.pool.write(queue, body)
in write()
self.workers[queue_actual].put(body) # an MPQueue object
class BaseWorker(object):
def read(self, queue):
return queue.get(block=True, timeout=1)
read is being called continuously in work_loop()
body = self.read(queue)
..
self.perform_work(body, *args)
in TaskWorker().perform_work
result = self.run_callable(body)
in run_callable()
_call = TaskWorker.resolve_callable(task)
This parses the body['task'] ("awx.main.tasks.jobs.RunJob"
) into a real python callable object
_call = _call().run
instantiate the RunJob class and call .run()
- Job object (pending) saved in database during the HTTP request / response
- Task manager loads this object from database, switches status to waiting and submits the job to the dispatcher parent process via PG_NOTIFY
- Parent dispatcher reads this message, resolves the callable (RunTask class), and runs the callable
in RunJob.run()
load job from database and change status to running
self.instance = self.model.objects.get(pk=pk)
..
self.update_model(pk, status='running', start_args='')
The private data dir is where we collect all essential files needed to run the playbook. This includes project playbooks (hello_world.yml), host information, and extra variables.
with settings.AWX_CLEANUP_PATHS = False
you can see the private data dirs for jobs already ran in tools_awx_1
by running
$ ls -d /tmp/awx_*
/tmp/awx_183_sc94vsqs
It begins by calling build_private_data_dir
self is type RunJob and self.instance is type Job
private_data_dir = self.build_private_data_dir(self.instance)
in build_private_data_dir, create the directory on the filesystem
path = tempfile.mkdtemp(prefix=JOB_FOLDER_PREFIX % instance.pk, dir=settings.AWX_ISOLATION_BASE_PATH)
e.g. for demo job this makes
/tmp/awx_22_mst0n_rj
build_project_dir()
\
sync_and_copy()
\
sync_and_copy_without_lock()
❗ Assume project sync is not needed on this run
We need to copy files from a locally checked out project directory to the private data dir
in sync_and_copy_without_lock()
RunProjectUpdate.make_local_copy(project, private_data_dir)
in make_local_copy()
project_path = project.get_project_path(check_if_exists=False)
destination_folder = os.path.join(job_private_data_dir, 'project')
shutil.copytree(project_path, destination_folder, ignore=shutil.ignore_patterns('.git'), symlinks=True)
e.g. for demo job project_path is
/var/lib/awx/projects/_6__demo_project
this directory contains our hello_world.yml
ansible playbook
shutil.copytree() will do a recursive copy of the project files into the private data dir
The next handful of lines build up the private data dir, filling it in with various extra_vars, arguments, settings, the inventory host file, etc.
here is the private data dir after it is built up, and right before running the job
$ tree /tmp/awx_22_mst0n_rj
├── cp
├── env
│ ├── cmdline
│ ├── extravars
│ └── settings
├── inventory
│ └── hosts
└── project
├── hello_world.yml
└── README.md
env/cmdline - extra arguments to pass to ansible-playbook command
-u admin
env/extravars - these are variables that can be referenced from inside the playbook itself
!unsafe 'awx_job_id': 20
!unsafe 'awx_job_launch_type': !unsafe 'relaunch'
!unsafe 'tower_job_id': 20
!unsafe 'tower_job_launch_type': !unsafe 'relaunch'
!unsafe 'awx_user_id': 2
!unsafe 'awx_user_name': !unsafe 'sbf'
!unsafe 'awx_user_email': !unsafe '[email protected]'
!unsafe 'awx_user_first_name': !unsafe ''
!unsafe 'awx_user_last_name': !unsafe ''
!unsafe 'tower_user_id': 2
!unsafe 'tower_user_name': !unsafe 'sbf'
!unsafe 'tower_user_email': !unsafe '[email protected]'
!unsafe 'tower_user_first_name': !unsafe ''
!unsafe 'tower_user_last_name': !unsafe ''
!unsafe 'awx_inventory_id': 3
!unsafe 'awx_inventory_name': !unsafe 'sbf'
!unsafe 'tower_inventory_id': 3
!unsafe 'tower_inventory_name': !unsafe 'sbf'
!unsafe 'awx_project_revision': !unsafe '347e44fea036c94d5f60e544de006453ee5c71ad'
!unsafe 'awx_project_scm_branch': !unsafe ''
!unsafe 'tower_project_revision': !unsafe '347e44fea036c94d5f60e544de006453ee5c71ad'
!unsafe 'tower_project_scm_branch': !unsafe ''
!unsafe 'awx_job_template_id': 7
!unsafe 'awx_job_template_name': !unsafe 'Demo Job Template'
!unsafe 'tower_job_template_id': 7
!unsafe 'tower_job_template_name': !unsafe 'Demo Job Template'
env/settings - these settings control runner, more described here https://ansible-runner.readthedocs.io/en/stable/intro/?highlight=job_timeout#env-settings-settings-for-runner-itself
{"job_timeout": 0, "suppress_ansible_output": true, "suppress_output_file": true}
inventory/hosts - hosts file. We load the hosts from the inventory and compose a python script (that just prints JSON) containing host information
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
print('{
"all": {
"hosts": [
"localhost"
],
"children": [
"test"
]
},
"_meta": {
"hostvars": {
"localhost": {
"ansible_connection": "local",
"ansible_python_interpreter": "{{ ansible_playbook_python }}",
"remote_tower_enabled": "true",
"remote_host_enabled": "true",
"remote_host_id": 1
}
}
}
}')
project/hello_world.yml
- name: Hello World Sample
hosts: all
tasks:
- name: Hello Message
debug:
msg: "Hello World!"
In addition to the private data dir, there are in-memory runner_params that are defined
{'container_image': 'quay.io/ansible/awx-ee:latest',
'container_options': ['''--user=root',
'--network',
'slirp4netns:enable_ipv6=true'],
'envvars': {'ANSIBLE_COLLECTIONS_PATHS': '/runner/requirements_collections:~/.ansible/collections:/usr/share/ansible/collections',
'ANSIBLE_FORCE_COLOR': 'True',
'ANSIBLE_HOST_KEY_CHECKING': 'False',
'ANSIBLE_INVENTORY_UNPARSED_FAILED': 'True',
'ANSIBLE_PARAMIKO_RECORD_HOST_KEYS': 'False',
'ANSIBLE_RETRY_FILES_ENABLED': 'False',
'ANSIBLE_ROLES_PATH': '/runner/requirements_roles:~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles',
'ANSIBLE_SSH_CONTROL_PATH_DIR': '/runner/cp',
'AWX_HOST': 'https://towerhost',
'AWX_PRIVATE_DATA_DIR': '/tmp/awx_198_6l7jb_yk',
'INVENTORY_ID': '1',
'JOB_ID': '198',
'MAX_EVENT_RES': '700000',
'PROJECT_REVISION': '347e44fea036c94d5f60e544de006453ee5c71ad'},
'fact_cache_type': '',
'ident': 198,
'inventory': '/tmp/awx_198_6l7jb_yk/inventory/hosts',
'passwords': {'BECOME password.*:\\s*?$': '',
'Bad passphrase, try again for .*:\\s*?$': '',
'DOAS password.*:\\s*?$': '',
'DZDO password.*:\\s*?$': '',
'ENABLE password.*:\\s*?$': '',
'Enter passphrase for .*:\\s*?$': '',
'KSU password.*:\\s*?$': '',
'MACHINECTL password.*:\\s*?$': '',
'PBRUN password.*:\\s*?$': '',
'PFEXEC password.*:\\s*?$': '',
'PMRUN password.*:\\s*?$': '',
'Password:\\s*?$': '',
'RUNAS password.*:\\s*?$': '',
'SESU password.*:\\s*?$': '',
'SSH password:\\s*?$': '',
'SU password.*:\\s*?$': '',
'SUDO password.*:\\s*?$': '',
'Vault password:\\s*?$': '',
'doas password.*:\\s*?$': '',
'dzdo password.*:\\s*?$': '',
'enable password.*:\\s*?$': '',
'ksu password.*:\\s*?$': '',
'machinectl password.*:\\s*?$': '',
'pbrun password.*:\\s*?$': '',
'pfexec password.*:\\s*?$': '',
'pmrun password.*:\\s*?$': '',
'runas password.*:\\s*?$': '',
'sesu password.*:\\s*?$': '',
'su password.*:\\s*?$': '',
'sudo password.*:\\s*?$': ''},
'playbook': 'hello_world.yml',
'private_data_dir': '/tmp/awx_198_6l7jb_yk',
'process_isolation': True,
'process_isolation_executable': 'podman',
'suppress_env_files': True}
Ansible runner can easily be evoked by doing something like this
ansible-runner run /path/to/private_data_dir -p hello_world.yml
However, in AWX we need a way to "ship" the private data dir over to a remote location (a k8s pod, or an execution node), run the ansible playbook, and stream the ansible results back to the AWX control plane.
To facilitate this need, ansible runner has three special commands: transmit, worker, and process They are designed to work in a unix pipe chain Read about it here https://ansible-runner.readthedocs.io/en/stable/remote_jobs/?highlight=worker#remote-job-execution
ansible-runner transmit | ansible-runner worker | ansible-runner process
This means the output (stdout) of transmit
gets piped into the input (stdin) of worker
worker
stdout gets piped into the stdin of process
You can conceptually replace the pipe symbols above ("|") with receptor. Receptor's job is to take stdin from AWX, run the work elsewhere, and then stream the results back to AWX.
So, AWX control plane is going to take care of the transmit and process. Receptor is going to take care of worker.
transmit() | receptorctl work submit
receptorctl work results | processor()
An AWXReceptorJob object is instantiated and .run() is called params are the runner_params above
receptor_job = AWXReceptorJob(self, params)
res = receptor_job.run()
in run()
receptor_ctl = get_receptor_ctl()
..
res = self._run_internal(receptor_ctl)
receptorctl is a utility library that can be installed via pip and allows commands to be issued to a local receptor daemon via unix sockets
grab a pair of sockets in _run_internal()
sockin, sockout = socket.socketpair()
..
transmitter_future = executor.submit(self.transmit, sockin)
Note: A unix socket pair is a typical way to allow a parent and child process to communicate. The parent process creates the pair of sockets, forks to create a child process, and passes one socket to the child. The pair are linked up so that they can be written to and read to from the two processes as a mean of communication. In our code, transmit is being run in a separate thread as work_submit.
transmit will compress (tar) up the private_data_dir
and write it the the sockin unix socket.
in transmit()
ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)
submit the work to receptor, with the payload set to whatever transmit outputs.
result = receptor_ctl.submit_work(payload=sockout.makefile('rb'), **work_submit_kw)
at the same time, receptor will read from sockout and save the tar contents to a file on disk
receptor itself maintains a directory to represent the work
in tools_awx_1
$ cd /tmp/receptor/awx_1/aohbnpsM
aohbnpsM above is the receptor work unit ID, which can be found on the job API
files in this directory
status
stdin
stdout
stdin contains all of the necessary input that ansible-runner worker expects
Once receptor gets the full stdin, it will call ansible-runner worker
and pass the file contents to it, and it is here the job actually runs, that is, ansible-playbook
commands will eventually be called.
Remember that ansible-runner worker is receiving the project files in a compressed file. It will unpack this tar to directory specified by AWX_PRIVATE_DATA_DIR
in the runner params.
Note, ansible-playbook
isn't ran directly. Instead ansible-runner worker will tell podman to start a container from the job's execution environment image. You can get the exact podman command that is ran from the job_args field in the API
e.g.
podman run --rm --tty --interactive
--workdir /runner/project
-v /tmp/awx_191_96fykhja/:/runner/:Z
--env-file /tmp/awx_191_96fykhja/artifacts/191/env.list
--quiet
--name ansible_runner_191
--user=root
--network slirp4netns:enable_ipv6=true
quay.io/ansible/awx-ee:latest
ansible-playbook
-u admin -i /runner/inventory/hosts -e @/runner/env/extravars hello_world.yml
Note the ansible-playbook command that is ran! Finally!
While the playbook is running, receptor is live-streaming the results back to AWX.
This is handled by calling get_work_results()
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True)
this will open a socket to start reading data back from receptor.
Just like how transmit ran in a separate thread, process will also run in a separate thread.
processor_future = executor.submit(self.processor, resultfile)
processor will handle each line of stdout from ansible runner
in processor(). We set up special callbacks to handle certain types of lines that are emitted from ansible runner
return ansible_runner.interface.run(
streamer='process',
quiet=True,
_input=resultfile,
event_handler=self.task.runner_callback.event_handler,
finished_callback=self.task.runner_callback.finished_callback,
status_handler=self.task.runner_callback.status_handler,
**self.runner_params,
)
For example,
{"status":
handled by status_handler
{"uuid":
handled by event_handler
{"eof": true}
handled by finished_callback
Ultimately the job is done whenever processor finishes, that is, whenever it gets the EOF signal.
_run_internal() returns whatever process thread returns
return res
get the status from res
status = res.status
here is where the "successful" status is saved. Recall self.instance is the Job object.
self.instance = self.update_model(pk, status=status, select_for_update=True, **self.runner_callback.get_delayed_update_fields())
Processor is handling each event that is coming back from ansible-runner via the event_handler() callback. This is the code path that eventually saves events into the database.
The event handling is fairly complex and involves the callback receiver daemon, so I'll save that for a future topic to cover in more detail.
In RunJob you'll also find a few other hooks
- pre_run_hook
- post_run_hook
- final_run_hook
these are methods that are called during various places of the jobs life that do extra things, such as pre-flight checks and handling fact cache. Not a ton of things are happening there, but good to know they are an important part of the job run.
- Websockets (i.e. how you are able to view stdout live in the UI)
- Callback receiver (how events make it into the database)
- Touched on a bit here, but could use more detail:
- Task manager
- Dispatcher
- Ansible runner
- Receptor
- Netceptor (the networking portion of Receptor)
- Workceptor (Job handling portion of Receptor)
- AWX Operator (i.e. how the app is deployed to a Kubernetes cluster)
- Bootstrapping AWX (i.e. how AWX app starts up, including services, etc)
- Project Updates, Inventory Updates, and Project Syncs
- Subsystem metrics