Skip to content

Instantly share code, notes, and snippets.

@pellaeon
Created September 23, 2022 12:21
Show Gist options
  • Save pellaeon/3925b0fd2d8939e12b38325d16c0003b to your computer and use it in GitHub Desktop.
Save pellaeon/3925b0fd2d8939e12b38325d16c0003b to your computer and use it in GitHub Desktop.
Frida child-gating and spawn-gating example
"""
This POC is based on example from https://frida.re/news/#child-gating
and is aimed to instrument child processes along with the main one.
"""
from __future__ import print_function
import frida
from frida_tools.application import Reactor
import threading
class Application(object):
def __init__(self):
self._stop_requested = threading.Event()
self._reactor = Reactor(run_until_return=lambda _:
self._stop_requested.wait())
self._device = frida.get_usb_device(timeout=5)
self._sessions = set()
self._device.on("child-added", lambda child:
self._reactor.schedule(
lambda: self._on_delivered(child)))
def run(self):
self._reactor.schedule(lambda: self._start())
self._reactor.run()
def _start(self):
app_package_name = "com.example.app"
pid = self._device.spawn(app_package_name)
print("✔ spawn({})".format(app_package_name))
self._instrument(pid)
def _stop_if_idle(self):
if len(self._sessions) == 0:
self._stop_requested.set()
def _instrument(self, pid):
print("✔ attach(pid={})".format(pid))
session = self._device.attach(pid)
session.on("detached", lambda reason:
self._reactor.schedule(lambda:
self._on_detached(pid, session, reason)))
print("✔ enable_child_gating()")
session.enable_child_gating()
print("✔ create_script()")
script = session.create_script("""
Interceptor.attach(Module.findExportByName(null, 'open'), {
onEnter: function (args) {
send({
type: 'open',
path: Memory.readUtf8String(args[0])
});
}
});
""")
script.on("message", lambda message, data:
self._reactor.schedule(
lambda: self._on_message(pid, message)))
print("✔ load()")
script.load()
print("✔ resume(pid={})".format(pid))
self._device.resume(pid)
self._sessions.add(session)
def _on_delivered(self, child):
print("⚡ child-added: {}".format(child))
self._instrument(child.pid)
def _on_detached(self, pid, session, reason):
print("⚡ detached: pid={}, reason='{}'"
.format(pid, reason))
self._sessions.remove(session)
self._reactor.schedule(self._stop_if_idle, delay=0.5)
def _on_message(self, pid, message):
print("⚡ message: pid={}, payload={}"
.format(pid, message["payload"]))
app = Application()
app.run()
"""
This POC is based on example from https://frida.re/news/#child-gating
and is modified to use spawn-gating to catch any newly spawned process on the device
Background:
1. Android apps can declare in AndroidManifest.xml to put some components
(such as a `Service`) into a separate process using the `android:process`
directive. See: https://developer.android.com/guide/topics/manifest/service-element
2. In such cases, the new process will be spawned by the system, instead of
by the app's main process. I.e. the new process will have `zygote` as
its parent process.
3. Thus, child-gating is not able to catch these new processes
The logic to catch those processes are simply to catch all newly spawned
process on the system, then filter by its identifier, which should start
with the app's package name.
Ref:
- https://github.com/frida/frida/issues/602
- https://gist.github.com/oleavr/ae7bcbbb9179852a4731
- https://developer.android.com/guide/components/processes-and-threads
- https://developer.android.com/guide/components/activities/process-lifecycle
"""
from __future__ import print_function
import frida
from frida_tools.application import Reactor
import threading
app_package_name = "com.example.app"
class Application(object):
def __init__(self):
self._stop_requested = threading.Event()
self._reactor = Reactor(run_until_return=lambda _:
self._stop_requested.wait())
self._device = frida.get_usb_device(timeout=5)
print("✔ enable_spawn_gating()")
self._device.enable_spawn_gating()
self._sessions = set()
self._device.on("spawn-added", lambda child:
self._reactor.schedule(
lambda: self._on_delivered(child)))
def run(self):
self._reactor.schedule(lambda: self._start())
self._reactor.run()
def _start(self):
pid = self._device.spawn(app_package_name)
print("✔ spawn({})".format(app_package_name))
self._instrument(pid)
def _stop_if_idle(self):
if len(self._sessions) == 0:
self._stop_requested.set()
def _instrument(self, pid):
print("✔ attach(pid={})".format(pid))
session = self._device.attach(pid)
session.on("detached", lambda reason:
self._reactor.schedule(lambda:
self._on_detached(pid, session, reason)))
print("✔ create_script()")
script = session.create_script("""
Interceptor.attach(Module.findExportByName(null, 'open'), {
onEnter: function (args) {
send({
type: 'open',
path: Memory.readUtf8String(args[0])
});
}
});
""")
script.on("message", lambda message, data:
self._reactor.schedule(
lambda: self._on_message(pid, message)))
print("✔ load()")
script.load()
print("✔ resume(pid={})".format(pid))
self._device.resume(pid)
self._sessions.add(session)
def _on_delivered(self, child):
print("⚡ child-added: {}".format(child))
if child.identifier.startswith(app_package_name):
self._instrument(child.pid)
def _on_detached(self, pid, session, reason):
print("⚡ detached: pid={}, reason='{}'"
.format(pid, reason))
self._sessions.remove(session)
self._reactor.schedule(self._stop_if_idle, delay=0.5)
def _on_message(self, pid, message):
print("⚡ message: pid={}, payload={}"
.format(pid, message["payload"]))
app = Application()
app.run()
@koonkoon
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment