Forked from consentfactory/netmiko_threading_queuing.py
Last active
February 14, 2024 11:40
-
-
Save Tes3awy/5538da7e54f4d9dff4925e6832e941ce to your computer and use it in GitHub Desktop.
Script to perform multithreading with Netmiko.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Jimmy Taylor and Osama Abbas | |
# https://www.consentfactory.com/python-threading-queuing-netmiko/ | |
# This method will spin up threads and process IP addresses in a queue | |
import os | |
import signal | |
# threading library | |
import threading | |
# Additional modules imported for getting password, pretty print | |
from getpass import getpass | |
from pprint import pprint | |
# Queuing | |
from queue import Queue | |
# Importing Netmiko modules | |
from netmiko import ConnectHandler | |
from netmiko.exceptions import NetmikoAuthenticationException, NetmikoTimeoutException | |
# These capture errors relating to hitting ctrl+C (I forget the source) | |
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # IOError: Broken pipe | |
signal.signal(signal.SIGINT, signal.SIG_DFL) # KeyboardInterrupt: Ctrl-C | |
# Get the password | |
username = input("Username: ") | |
password = getpass() | |
# Switch IP addresses from text file that has one IP per line | |
ip_addrs_file = open("ips.txt", mode="r") | |
ip_addrs = ip_addrs_file.read().splitlines() | |
# Set up thread count for number of threads to spin up | |
num_threads = 8 | |
# This sets up the queue | |
enclosure_queue = Queue() | |
# Set up thread lock so that only one thread prints at a time | |
print_lock = threading.Lock() | |
# CLI command being sent. This could be anywhere (and even be a passed paramenter) | |
# but I put at the top for code readability | |
command = "show inventory" # or show running-config | |
# Function used in threads to connect to devices, passing in the thread # and queue | |
def device_connector(i: int, q): | |
# This while loop runs indefinitely and grabs IP addresses from the queue and processes them | |
# Loop will stop and restart if "ip = q.get()" is empty | |
while True: | |
# These print statements are largely for the user indicating where the process is at | |
# and aren't required | |
print(f"{i:,}: Waiting for IP...") | |
ip = q.get() | |
print(f"{i:,}: Acquired IP Address: {ip}") | |
# k,v passed to ConnectHandler | |
device = { | |
"device_type": "cisco_ios", | |
"host": ip, | |
"username": username, | |
"password": password, | |
} | |
# Connect to the device, and print out auth or timeout errors | |
try: | |
conn = ConnectHandler(**device) | |
except NetmikoTimeoutException: | |
with print_lock: | |
print(f"\n{i:,}: ERROR: Connection to {ip} timed-out.", end="\n\n") | |
q.task_done() | |
continue | |
except NetmikoAuthenticationException: | |
with print_lock: | |
print( | |
f"\n{i:,}: ERROR: Authenticaftion failed for {ip}. Stopping script.", | |
end="\n\n", | |
) | |
q.task_done() | |
os.kill(os.getpid(), signal.SIGUSR1) | |
# Capture the output, and use TextFSM (in this case) to parse data | |
output = conn.send_command(command, use_textfsm=True) | |
with print_lock: | |
print(f"{i:,}: Printing output...") | |
pprint(output) | |
# Disconnect from device | |
conn.disconnect() | |
# Set the queue task as complete, thereby removing it from the queue indefinitely | |
q.task_done() | |
# Mail function that compiles the thread launcher and manages the queue | |
def main(): | |
# Setting up threads based on number set above | |
for i in range(num_threads): | |
# Create the thread using 'deviceconnector' as the function, passing in | |
# the thread number and queue object as parameters | |
thread = threading.Thread( | |
target=device_connector, | |
args=( | |
i, | |
enclosure_queue, | |
), | |
) | |
# Set the thread as a background daemon/job | |
thread.setDaemon(True) | |
# Start the thread | |
thread.start() | |
# For each ip address in "ip_addrs", add that IP address to the queue | |
for ip_addr in ip_addrs: | |
enclosure_queue.put(ip_addr) | |
# Wait for all tasks in the queue to be marked as completed (task_done) | |
enclosure_queue.join() | |
print("*** Script complete ***") | |
if __name__ == "__main__": | |
# Calling the main function | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment