Skip to content

Instantly share code, notes, and snippets.

@ShawnHymel
Last active September 21, 2024 16:05
Show Gist options
  • Save ShawnHymel/16f1228c92ad0eb9d5fbebbfe296ee6a to your computer and use it in GitHub Desktop.
Save ShawnHymel/16f1228c92ad0eb9d5fbebbfe296ee6a to your computer and use it in GitHub Desktop.
Ollama Tool (Function Calling) Example on Raspberry Pi 5
"""
Ollama Light Assistant
Test the ability for LLMs to call tools (i.e. functions) from within the Ollama
environment on a Raspberry Pi with speech-to-text. Tested with a Raspberry Pi 5
(8 GB) and this USB microphone: https://www.adafruit.com/product/3367.
Connect an LED to GPIO 17 (physical pin 11 on the board) in an active high
configuration:
GPIO 17 -> 220 Ohm resistor -> LED -> GND
Install dependencies:
$ curl -fsSL https://ollama.com/install.sh | sh
$ ollama serve
$ sudo apt install -y libportaudio2
Create a Python virtual environment and install necessary packages. We need
system site packages enabled so that gpiozero actually works.
$ python -m venv venv-ollama --system-site-packages
$ source venv-ollama/bin/activate
$ python -m pip install ollama vosk sounddevice
Download the LLM and Vosk STT model:
$ ollama pull allenporter/xlam:1b
$ python -c "from vosk import Model; Model(lang='en-us')"
Get the USB device number of your attached microphone:
$ python -c "import sounddevice; print(sounddevice.query_devices())"
For example, my device number is '0' for the "USB PnP Sound Device." Change the
AUDIO_INPUT_INDEX value to this number in the code below.
Run the script with:
$ python ollama-light-assistant.py
Give a voice command like "turn on the light" to have the LLM call the
led_write() function.
For the xlam:1b model, you must be SUPER specific with your commands. For
example, it will accept "turn on the light" but not "turn the light on." You
can try the llama3.1:8b model for more flexibility, but it's VERY slow on the
Pi 5, taking up to 3 minutes to respond.
Note: LLMs are SUPER slow on the Raspberry Pi right now, and tool calling is
janky. Once we get better/smaller LLMs, more RAM, and/or AI accelerators, this
should (in theory) be much faster and more robust.
References:
Ollama tools: https://ollama.com/blog/tool-support
Tool-supported models: https://ollama.com/search?c=tools
Author: Shawn Hymel
Date: September 17, 2024
License: BSD-0 (https://opensource.org/license/0bsd)
"""
from collections import deque
import json
import queue
from gpiozero import LED
import ollama
import sounddevice as sd
from vosk import Model, KaldiRecognizer
# Settings
LED_PIN = 17
AUDIO_INPUT_INDEX = 0
MODEL = "allenporter/xlam:1b" # You could also try "llama3.1:8b"
OLLAMA_HOST = "http://localhost:11434"
MAX_MESSAGES = 5
PREAMBLE = "You are a helpful assistant that can turn the light on and off."
# Define tools for the model to use (i.e. functions)
TOOLS = [
{
'type': 'function',
'function': {
'name': "led_write",
'description': "Turn the light off or on",
'parameters': {
'type': 'object',
'properties': {
'value': {
'type': 'number',
'description': "The value to write to the light pin " \
"to turn it off and on. 0 for off, 1 for on.",
},
},
'required': ['value'],
},
}
}
]
# -----------------------------------------------------------------------------
# Classes
class FixedSizeQueue:
"""
Fixed size array with FIFO and optional preamble.
"""
def __init__(self, max_size, preamble=None):
self.queue = deque(maxlen=max_size)
self.preamble = {
'role': 'system',
'content': preamble
}
def push(self, item):
self.queue.append(item)
def get(self):
if self.preamble['content'] is None:
return list(self.queue)
else:
return [self.preamble] + list(self.queue)
# -----------------------------------------------------------------------------
# Functions
def led_write(led, value):
"""
Turn the LED on or off.
"""
if int(value) > 0:
led.on()
print("The LED is now on")
else:
led.off()
print("The LED is now off")
def send(chat, msg_history, client, model, tools, led):
"""
Send a message to the LLM server and print the response.
"""
# Add user message to the conversation history
msg_history.push({
'role': 'user',
'content': chat
})
# Send message to LLM server
response = client.chat(
model=model,
messages=msg_history.get(),
tools=tools,
stream=False
)
# Print the full response
print(f"Response: {response['message']}")
# Add the model's response to the conversation history
msg_history.push({
'role': 'assistant',
'content': response['message']['content']
})
# Check if the model used any of the provided tools
if response['message'].get('tool_calls') is None:
print("Tools not used.")
return
# Call the function(s) the model used
else:
print("Tools used. Calling:")
for tool in response['message']['tool_calls']:
print(tool)
if tool['function']['name'] == "led_write":
led_write(led, tool['function']['arguments']['value'])
def record_callback(indata, frames, time, status, q):
"""
Callback for recording audio from the microphone.
"""
if status:
print(status, file=sys.stderr)
q.put(bytes(indata))
# -----------------------------------------------------------------------------
# Main
if __name__ == "__main__":
# Get the sample rate of the input device
device_info = sd.query_devices(sd.default.device[0], 'input')
sample_rate = int(device_info['default_samplerate'])
# Build the STT model
stt_model = Model(lang='en-us')
stt_recognizer = KaldiRecognizer(stt_model, sample_rate)
stt_recognizer.SetWords(False)
# Configure chat history and connect to the LLM server
msg_history = FixedSizeQueue(MAX_MESSAGES, PREAMBLE)
chat_client = ollama.Client(host=OLLAMA_HOST)
# Initialize the audio recording queue
q = queue.Queue()
# Initialize the LED and turn it off
led = LED(LED_PIN)
led.off()
while True:
# Listen for user input
print("Listening...")
result_text = ""
with sd.RawInputStream(
dtype='int16',
channels=1,
callback=lambda in_data, frames, time, status: record_callback(
in_data,
frames,
time,
status,
q
)
):
# Collect audio data until we have a full phrase
while True:
data = q.get()
if stt_recognizer.AcceptWaveform(data):
# Perform speech-to-text (STT) on the audio data
result = json.loads(stt_recognizer.Result())
result_text = result.get("text", "")
break
# Send the user's message to the LLM server
if not result_text:
print("No speech detected")
else:
print(f"Speech detected: {result_text}")
send(result_text, msg_history, chat_client, MODEL, TOOLS, led)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment