-
-
Save thalljiscience/53de670a8422bb3d92f1eea8f05ad4d5 to your computer and use it in GitHub Desktop.
""" Get sequencing run information for a MinION position and run_id""" | |
import argparse | |
import logging | |
import sys | |
import os | |
from minknow_api.manager import Manager | |
from minknow_api.tools import protocols | |
def parse_args(): | |
"""Build and execute a command line argument for querying run info | |
Returns: | |
Parsed arguments to be used when querying run info. | |
""" | |
parser = argparse.ArgumentParser( | |
description=""" | |
Query run info for a running MinKNOW instance. | |
""" | |
) | |
parser.add_argument( | |
"--host", | |
default="localhost", | |
help="IP address of the machine running MinKNOW (defaults to localhost)", | |
) | |
parser.add_argument( | |
"--port", | |
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)", | |
) | |
parser.add_argument( | |
"--no-tls", help="Disable tls connection", default=False, action="store_true" | |
) | |
parser.add_argument("--verbose", action="store_true", help="Enable debug logging") | |
parser.add_argument( | |
"--position", | |
help="position on the machine (or MinION serial number) to run the protocol at", | |
) | |
parser.add_argument( | |
"--run_id", | |
help="The run_id for to query", | |
) | |
args = parser.parse_args() | |
return args | |
def is_position_selected(position, args): | |
# First check for name match: | |
if args.position == position.name: | |
return True | |
return False | |
def main(): | |
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt" | |
# Parse arguments: | |
args = parse_args() | |
# Specify --verbose on the command line to get extra details about | |
if args.verbose: | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
# Construct a manager using the host + port provided: | |
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls) | |
# Find positions: | |
positions = manager.flow_cell_positions() | |
filtered_positions = list( | |
filter(lambda pos: is_position_selected(pos, args), positions) | |
) | |
# At least one position needs to be selected: | |
if not filtered_positions: | |
print( | |
"No positions selected - specify `--position` or `--flow-cell-id`" | |
) | |
sys.exit(1) | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Check if a flowcell is available for sequencing | |
flow_cell_info = position_connection.device.get_flow_cell_info() | |
if not flow_cell_info.has_flow_cell: | |
print("No flow cell present in position %s" % pos) | |
sys.exit(1) | |
# Query run info | |
print("Getting run info for %s positions" % len(filtered_positions)) | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Now query the run_id: | |
print("Getting info for %s" % args.run_id) | |
result = position_connection.protocol.get_run_info( | |
run_id=args.run_id | |
) | |
print("Run state: %s" % result.state) | |
if not result.end_time.seconds: | |
print("Protocol is running") | |
print(result); # Print the entire return stream | |
# Individual fields can be reported as below | |
""" | |
print("Run state: %s" % result.state) | |
print("Protocol ID: %s" % result.protocol_id) | |
print("Start Time: %s" % result.start_time) | |
print("End Time: %s" % result.end_time) | |
print("Run IDs: %s" % result.acquisition_run_ids) | |
print("User Info: %s" % result.user_info) | |
print("Device: %s" % result.device) | |
print("Flow cell: %s" % result.flow_cell) | |
print("Meta Info: %s" % result.meta_info) | |
""" | |
if __name__ == "__main__": | |
main() |
"""Get run acquisition information for a MinION position and acquisition_run_id""" | |
import argparse | |
import logging | |
import sys | |
import os | |
import grpc | |
from minknow_api.manager import Manager | |
import minknow_api.statistics_pb2 | |
def parse_args(): | |
"""Build and execute a command line argument for querying run info | |
Returns: | |
Parsed arguments to be used when querying run info. | |
""" | |
parser = argparse.ArgumentParser( | |
description=""" | |
Query run info for a running MinKNOW instance. | |
""" | |
) | |
parser.add_argument( | |
"--host", | |
default="localhost", | |
help="IP address of the machine running MinKNOW (defaults to localhost)", | |
) | |
parser.add_argument( | |
"--port", | |
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)", | |
) | |
parser.add_argument( | |
"--no-tls", help="Disable tls connection", default=False, action="store_true" | |
) | |
parser.add_argument("--verbose", action="store_true", help="Enable debug logging") | |
parser.add_argument( | |
"--position", | |
help="position on the machine (or MinION serial number) to run the protocol at", | |
) | |
parser.add_argument( | |
"--run_id", | |
help="The run_id for to query", | |
) | |
args = parser.parse_args() | |
return args | |
def main(): | |
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt" | |
args = parse_args() | |
# Specify --verbose on the command line to get extra details about | |
if args.verbose: | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
# Construct a manager using the host + port provided: | |
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls) | |
# Find positions: | |
positions = manager.flow_cell_positions() | |
filtered_positions = list(filter(lambda pos: pos.name == args.position, positions)) | |
# At least one position needs to be selected: | |
if not filtered_positions: | |
print( | |
"No positions selected - specify `--position`" | |
) | |
sys.exit(1) | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Check if a flowcell is available for sequencing | |
flow_cell_info = position_connection.device.get_flow_cell_info() | |
if not flow_cell_info.has_flow_cell: | |
print("No flow cell present in position %s" % pos) | |
sys.exit(1) | |
# Query run info | |
print("Getting run info for %s positions" % len(filtered_positions)) | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Now query the run_id: | |
print("Getting info for %s" % args.run_id) | |
result = position_connection.acquisition.get_acquisition_info( | |
run_id=args.run_id | |
) | |
print(result) | |
if __name__ == "__main__": | |
main() |
"""Check and see if there are run_ids on a host and MinION position""" | |
import argparse | |
import logging | |
import sys | |
import os | |
from minknow_api.manager import Manager | |
from minknow_api.tools import protocols | |
def parse_args(): | |
"""Build and execute a command line argument for listing run_ids | |
Returns: | |
Parsed arguments to be used when stopping a protocol. | |
""" | |
parser = argparse.ArgumentParser( | |
description=""" | |
List active protocol runs. | |
""" | |
) | |
parser.add_argument( | |
"--host", | |
default="localhost", | |
help="IP address of the machine running MinKNOW (defaults to localhost)", | |
) | |
parser.add_argument( | |
"--port", | |
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)", | |
) | |
parser.add_argument( | |
"--no-tls", help="Disable tls connection", default=False, action="store_true" | |
) | |
parser.add_argument("--verbose", action="store_true", help="Enable debug logging") | |
parser.add_argument( | |
"--position", | |
help="position on the machine (or MinION serial number)", | |
) | |
args = parser.parse_args() | |
return args | |
def is_position_selected(position, args): | |
# First check for name match: | |
if args.position == position.name: | |
return True | |
return False | |
def main(): | |
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt" | |
# Parse arguments to be passed to running protocols: | |
args = parse_args() | |
# Specify --verbose on the command line to get extra details about | |
if args.verbose: | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
# Construct a manager using the host + port provided: | |
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls) | |
# Find which positions we are going to list protocols from: | |
positions = manager.flow_cell_positions() | |
filtered_positions = list( | |
filter(lambda pos: is_position_selected(pos, args), positions) | |
) | |
# At least one position needs to be selected: | |
if not filtered_positions: | |
print( | |
"No positions selected - specify `--position` or `--flow-cell-id`" | |
) | |
sys.exit(1) | |
protocol_identifiers = {} | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Check if a flowcell is available for sequencing | |
flow_cell_info = position_connection.device.get_flow_cell_info() | |
if not flow_cell_info.has_flow_cell: | |
print("No flow cell present in position %s" % pos) | |
sys.exit(1) | |
# List run ids from the requested postitions: | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
result = position_connection.protocol.list_protocol_runs() | |
print(result) | |
if __name__ == "__main__": | |
main() |
import argparse | |
import logging | |
import sys | |
import os | |
from minknow_api.manager import Manager | |
from minknow_api.tools import protocols | |
#From acquisition.proto DataAction Enum: | |
STOP_DEFAULT = 0 | |
STOP_KEEP_ALL_DATA = 1 | |
STOP_FINISH_PROCESSING = 2 | |
def parse_args(): | |
"""Build and execute a command line argument for stopping a protocol | |
Returns: | |
Parsed arguments to be used when stopping a protocol. | |
""" | |
parser = argparse.ArgumentParser( | |
description=""" | |
Stop a sequencing protocol in a running MinKNOW instance. | |
""" | |
) | |
parser.add_argument( | |
"--host", | |
default="localhost", | |
help="IP address of the machine running MinKNOW (defaults to localhost)", | |
) | |
parser.add_argument( | |
"--port", | |
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)", | |
) | |
parser.add_argument( | |
"--no-tls", help="Disable tls connection", default=False, action="store_true" | |
) | |
parser.add_argument("--verbose", action="store_true", help="Enable debug logging") | |
parser.add_argument( | |
"--position", | |
help="position on the machine (or MinION serial number) to run the protocol at", | |
) | |
parser.add_argument( | |
"--stop_action", | |
help="Specify stop action (STOP_DEFAULT = FINISH_PROCESSING = finish processing, STOP_KEEP_ALL_DATA = stop basecalling an send unprocessed files to skipped, STOP_FINISH_PROCESSING = keep basecalling until all of the reads have been basecalled" | |
) | |
args = parser.parse_args() | |
return args | |
def is_position_selected(position, args): | |
# First check for name match: | |
if args.position == position.name: | |
return True | |
return False | |
def main(): | |
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt" | |
# Parse arguments to be passed to running protocols: | |
args = parse_args() | |
# Specify --verbose on the command line to get extra details about | |
if args.verbose: | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
# Construct a manager using the host + port provided: | |
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls) | |
# Find which positions we are going to stop protocol on: | |
positions = manager.flow_cell_positions() | |
filtered_positions = list( | |
filter(lambda pos: is_position_selected(pos, args), positions) | |
) | |
# At least one position needs to be selected: | |
if not filtered_positions: | |
print( | |
"No positions selected - specify `--position` or `--flow-cell-id`" | |
) | |
sys.exit(1) | |
protocol_identifiers = {} | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Check if a flowcell is available for sequencing | |
flow_cell_info = position_connection.device.get_flow_cell_info() | |
if not flow_cell_info.has_flow_cell: | |
print("No flow cell present in position %s" % pos) | |
sys.exit(1) | |
stop_action=STOP_DEFAULT | |
# Determine stop_action (aleady set to STOP_DEFAULT by default, so just determine if it needs to be overridden | |
if args.stop_action == "STOP_KEEP_ALL_DATA": | |
stop_action=STOP_KEEP_ALL_DATA | |
else: | |
if args.stop_action == "STOP_FINISH_PROCESSING": | |
stop_action=STOP_FINISH_PROCESSING | |
# Stop protocol on the requested postitions: | |
print("Stopping protocol on %s positions" % len(filtered_positions)) | |
for pos in filtered_positions: | |
# Connect to the sequencing position: | |
position_connection = pos.connect() | |
# Find the protocol identifier for the required protocol: | |
print("Stopping run on position %s" % (pos.name)) | |
print("Stopping protocol on position %s" % args.position) | |
result = position_connection.protocol.stop_protocol( | |
data_action_on_stop=stop_action | |
) | |
if __name__ == "__main__": | |
main() |
Hi! Thanks a lot! This is great!!! I am currently out of the office but will notify my colleague to test. I will report back. Best, J.
@juhench: In case these also help, I've had to create little Python scripts to allow some real-time run monitoring, so I added them here. I'm new to this gist stuff, and github in general, so I could only figure out how to add them as code files at the top. When I try to embed code in a comment, it always gets arbitrarily segmented in a way that makes no sense to me. I also finally got around to working with that AGX Xavier. I went ahead and flashed the eMMC with Jetpack 4.5.1 (I think that was the number) and it was pretty confusing. I had to actually manually write the boot information to direct booting to the eMMC because after flashing the OS, the AGX only booted into root console - not sure if that is supposed to happen, and SDKManager certainly said nothing about that. Anyway, the scripts added above are for getting a list of run_ids for a MinION, getting comprehensive run info for a run_id, and retrieving active sequencing run live acquisition information. The stream data appears to come back in a json-like format, but not json per se, but looks pretty easy to extract relevant info or turn the whole output into a serializable object structure.
The above is a followup to an earlier script to use the Oxford Nanopore minknow_api on an NVIDIA Jetson Xavier device to stop a running sequencing protocol programmatically as part of the creation of a library for automating nanopore sequencing control and monitoring on dedicated NVIDIA Jetson devices from a specialized analysis program on a remote PC.
After learning a bit about gRPC messaging, protobuf compilation and my unfortunately horrendous Python skills, I traced the messaging for stopping a protocol though the auto-compiled code in /usr/local/lib/python3.6/dist-packages/minknow_api/protocol_service.py and /usr/local/lib/python3.6/dist-packages/minknow_api/protocol_pb2.py. All that is really needed to stop a running sequencing protocol is to know the minion device position it is running on. The easiest way I find to create a simple stop_protocol script is to not bother editing the /usr/local/lib/python3.6/dist-packages/minknow_api/tools/protocols.py file to add a "stop_protocol" method after the "start_protocol" method (thus no need to risk corrupting any core api files either), and instead just get a connection object for a minion position and call stop_protocol directly from a small script. It also turns out to be handy to get a connection object and pass it a run_id to get real-time information about a running sequencing run. Above is a simple Python script to stop a protocol without altering any of the installed minknow_api files that bypasses the indirect call to ...tools/protocols.py used with the ONT start_protocol python example. I kept the options to specify host (defaults to localhost), port, secure/insecure and verbose logging. I've had some trouble getting gRPC calls to work from C# on a PC trying to communicate with then python minknow_api on a Jetson. I can get all the protos to compile correctly in C# and compile code with all of the correct syntax, and debug objects in memory with the correct structures apparently created, but I always get TransientFailure on an open gRPC Channel whenever I try to do something as simple as listing flow cell positions, so I am giving up on that temporarily. It seems that it would be safer to have a series of python scripts on the Jetson that can be called through SSH anyway. That way, if the minknow_api is changed, PC-side code will not need to be changed. Only scripts on a any Jetson with updated API code would need modifying.