Skip to content

Instantly share code, notes, and snippets.

@andrewxhill
Last active March 17, 2025 23:32
Show Gist options
  • Save andrewxhill/332eae147ca220d8fe012a2cd5742f65 to your computer and use it in GitHub Desktop.
Save andrewxhill/332eae147ca220d8fe012a2cd5742f65 to your computer and use it in GitHub Desktop.
import { Agent } from '@mastra/core';
import { MCPConfiguration } from '@mastra/mcp';
import { anthropic } from '@ai-sdk/anthropic';
import { config } from 'dotenv';
import { ChainOfThoughtLog, TradingPrediction, TokenPriceData, AgentStrategy, storeChainOfThought, storePrediction, storeTokenPriceData, storeAgentStrategy, initializeBucket } from './recall-client.js';
// Load environment variables
config();
// Validate required environment variables
const requiredEnvVars = [
'ANTHROPIC_API_KEY',
'DATA_SKILLS_MCP_PATH',
'RECALL_MCP_PATH',
'COINGECKO_API_KEY',
'RECALL_PRIVATE_KEY',
'RECALL_NETWORK',
'TWITTER_LISTS',
'RECALL_BUCKET',
'SUBSTACK_LISTS'
] as const;
for (const envVar of requiredEnvVars) {
if (!process.env[envVar]) {
console.error(`Missing required environment variable: ${envVar}`);
throw new Error(`Missing required environment variable: ${envVar}`);
}
}
// Create MCP configuration
const mcp = new MCPConfiguration({
servers: {
recall: {
command: 'node',
args: [process.env.RECALL_MCP_PATH!],
env: {
RECALL_PRIVATE_KEY: process.env.RECALL_PRIVATE_KEY!,
RECALL_NETWORK: process.env.RECALL_NETWORK!
}
},
omnifeeds: {
command: 'node',
args: [process.env.DATA_SKILLS_MCP_PATH!],
env: {
TWITTER_USERNAME: process.env.TWITTER_USERNAME!,
TWITTER_PASSWORD: process.env.TWITTER_PASSWORD!,
TWITTER_EMAIL: process.env.TWITTER_EMAIL!,
PORT: process.env.TWITTER_MCP_PORT!,
COINGECKO_API_KEY: process.env.COINGECKO_API_KEY!,
SUBSTACK_LISTS: process.env.SUBSTACK_LISTS!
}
}
}
});
// Create the trading alpha detection agent
const tradingAlphaDetector = new Agent({
name: 'Trading Alpha Detector',
instructions: `You are an agent that detects trading alpha by analyzing social media and market data.
Your primary functions are:
- Monitor social media for cryptocurrency mentions from the following Twitter lists: ${process.env.TWITTER_LISTS}
- Monitor Substack articles from the following sources: ${process.env.SUBSTACK_LISTS}
- Track market data and volume changes for tokens mentioned in these sources
- Identify potential trading opportunities based on social signals and market data
- Store analysis and predictions in Recall
- Sleep for a few hours between monitoring sessions to avoid rate limits
- Store and update your strategy when it changes
When analyzing social media:
1. Focus exclusively on mentions from the specified Twitter lists and Substack sources
2. Pay special attention to tokens that receive multiple mentions or engagement
3. Consider the credibility and track record of the accounts mentioning tokens
4. Look for patterns in timing and frequency of mentions
5. After completing your analysis, use the sleep tool to wait a few hours before the next check
When tracking token prices and mentions:
1. For each token mentioned in social media:
- Query current price data from CoinGecko
- Track mentions count from Twitter and Substack
- Calculate heat value based on mentions and price movement
- Store the data in the following format:
{
"timestamp": "2024-03-12T12:30:00Z",
"symbol": "TOKEN",
"price_usd": "10.50",
"volume_24h": "1000000",
"market_cap": "10000000",
"price_change_24h": "5.2",
"mentions": {
"twitter": 5,
"substack": 2
},
"heat_value": 0.75
}
When your strategy changes:
1. Document your new strategy in markdown format
2. Include:
- Description of your strategy
- Trading rules you follow
- Parameters for decision making
- Data sources you monitor
3. Store the strategy in both markdown and JSON formats
Use the provided tools to:
- Query the recall-data-omnifeeds for Twitter list content using the twitter-get-list-tweets tool
- For each list URL, extract the list ID from the URL (the last part after /lists/)
- Pass the listId parameter to the tool
- Only query each list once per session
- Query the recall-data-omnifeeds for Substack content using the substack-get-recent-posts tool
- For each Substack URL, extract the substackId from the URL
- Pass the substackId parameter to the tool
- Look up token data via recall-data-omnifeeds
- Store findings and analysis via recall-mcp
- Use the sleep tool to wait between monitoring sessions:
- Call the sleep tool with a duration in seconds (e.g., 7200 for 2 hours)
- Log the sleep duration and reason
- Format: "Sleep: [duration] seconds - [reason]"
IMPORTANT: You must format your responses exactly as shown below to ensure proper logging:
For each step of your analysis, output in this exact format:
Step: [step name]
Reasoning: [your reasoning process]
Context: {"tokens": [...], "sources": [...], "marketData": {...}}
Conclusion: [your conclusion for this step]
When you identify a trading opportunity, output in this exact format:
Trading Prediction: {"timestamp": "2024-03-12T12:30:00Z", "chain_id": "BASE", "symbol": "GAME", "cost": "10", "action": "buy", "notes": "Strong social signals and increasing volume", "token_estimate": "50"}
When your strategy changes, output in this exact format:
Strategy Update: {"timestamp": "2024-03-12T12:30:00Z", "version": "1.0.1", "description": "Updated strategy description", "rules": ["rule1", "rule2"], "parameters": {"min_mentions": 5, "min_volume_24h": "1000000", "min_market_cap": "10000000", "min_price_change": "5.0", "min_heat_value": 0.75}, "sources": {"twitter_lists": ["list1", "list2"], "substack_urls": ["url1", "url2"]}}
IMPORTANT FORMATTING RULES:
1. All outputs must be on a single line
2. Each output must start with the exact prefix (including the space)
3. All JSON must be valid and include all required fields
4. Timestamps must be in ISO 8601 format
5. Numbers should be strings when representing currency amounts
6. Arrays and objects must be properly formatted
Example valid outputs:
Step: Analyzing mentions
Reasoning: Checking recent token mentions from Twitter lists
Context: {"tokens": ["BTC", "ETH"], "sources": ["twitter-list-1"], "marketData": {"BTC": {"price": "50000"}}}
Conclusion: Found significant mention of BTC
Trading Prediction: {"timestamp": "2024-03-12T12:30:00Z", "chain_id": "BASE", "symbol": "GAME", "cost": "10", "action": "buy", "notes": "Strong social signals and increasing volume", "token_estimate": "50"}
Strategy Update: {"timestamp": "2024-03-12T12:30:00Z", "version": "1.0.1", "description": "Updated strategy with heat value", "rules": ["Monitor social signals", "Track price movements"], "parameters": {"min_mentions": 5, "min_volume_24h": "1000000", "min_market_cap": "10000000", "min_price_change": "5.0", "min_heat_value": 0.75}, "sources": {"twitter_lists": ["https://twitter.com/i/lists/123"], "substack_urls": ["https://crypto.substack.com"]}}
Always maintain this exact format for all your outputs. Each output must be on its own line with no other text between them.`,
model: anthropic('claude-3-sonnet-20240229')
});
// Main function to start the agent
async function main() {
let isRunning = true;
let consecutiveErrors = 0;
const MAX_CONSECUTIVE_ERRORS = 3;
const ERROR_BACKOFF_TIME = 5 * 60 * 1000; // 5 minutes
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\nReceived SIGINT. Shutting down gracefully...');
isRunning = false;
});
process.on('SIGTERM', () => {
console.log('\nReceived SIGTERM. Shutting down gracefully...');
isRunning = false;
});
try {
console.log('Starting Trading Alpha Detection Agent...');
// Initialize Recall bucket
console.log('Initializing Recall bucket...');
await initializeBucket();
console.log('Recall bucket initialized successfully');
// Get toolsets from MCP configuration
console.log('Getting MCP toolsets...');
const toolsets = await mcp.getToolsets();
console.log('MCP tools initialized:', Object.keys(toolsets));
while (isRunning) {
try {
// Send monitoring prompt
const prompt = `You are a trading alpha detection agent. Monitor the following sources for potential trading opportunities:
Twitter Lists:
${process.env.TWITTER_LISTS.split(',').join('\n')}
Substack Articles:
${process.env.SUBSTACK_LISTS.split(',').join('\n')}
For each monitoring cycle:
1. Check for new content from the sources
2. Analyze any token mentions and their context
3. Track price and volume changes for mentioned tokens
4. Generate trading predictions when strong signals are found
Store your findings in the following formats:
Token Price Data:
{
"timestamp": "2024-03-15T10:30:00Z",
"symbol": "TOKEN",
"price_usd": 1.23,
"volume_24h": 1000000,
"market_cap": 10000000,
"price_change_24h": 5.2,
"mentions": {
"twitter": 10,
"substack": 2
}
}
Trading Prediction:
{
"timestamp": "2024-03-15T10:30:00Z",
"chain_id": "BASE",
"symbol": "TOKEN",
"cost": "10",
"action": "buy",
"notes": "Strong social signals with increasing volume",
"token_estimate": "50"
}
Strategy Update:
{
"timestamp": "2024-03-15T10:30:00Z",
"version": "1.0",
"description": "Current trading strategy based on social signals and market data",
"rules": [
"Monitor high-engagement crypto influencers",
"Track token mentions across platforms",
"Analyze price action correlation with social signals"
],
"parameters": {
"min_mentions": 5,
"min_volume_usd": 100000,
"min_market_cap": 1000000,
"min_price_change": 2.0
},
"sources": {
"twitter_lists": ${JSON.stringify(process.env.TWITTER_LISTS.split(','))},
"substack_urls": ${JSON.stringify(process.env.SUBSTACK_LISTS.split(','))}
}
}
For each analysis step, output:
Step: [current step being performed]
Reasoning: [explanation of analysis and decision making]
Context: [relevant data points and observations]
Conclusion: [outcome or decision]
When you find trading opportunities, output them as "Trading Prediction: {prediction_json}"
When you gather token data, output it as "Token Price: {price_data_json}"
When you update your strategy, output it as "Strategy Update: {strategy_json}"
Remember to:
1. Store all analysis and predictions in Recall
2. Sleep for a few hours between monitoring sessions to avoid rate limits
3. Store and update your strategy when it changes
4. Format all outputs exactly as shown above for proper logging
Begin monitoring now.`;
console.log('\nStarting monitoring cycle...');
console.log('Sending prompt:', prompt);
let currentStep = '';
let reasoning = '';
let context: any = {};
let conclusion = '';
let buffer = '';
const response = await tradingAlphaDetector.stream(prompt, { toolsets });
// Handle streaming response
for await (const part of response.fullStream) {
if (!isRunning) {
console.log('Shutdown requested. Stopping monitoring cycle...');
break;
}
try {
switch (part.type) {
case 'error':
console.error('Error in agent response:', part.error);
break;
case 'text-delta':
process.stdout.write(part.textDelta);
// Accumulate text in buffer
buffer += part.textDelta;
// Only process complete lines
if (buffer.includes('\n')) {
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
// Extract step, reasoning, and conclusion from the text
if (line.includes('Step:')) {
currentStep = line.split('Step:')[1].trim();
console.log('Found step:', currentStep);
}
if (line.includes('Reasoning:')) {
reasoning = line.split('Reasoning:')[1].trim();
console.log('Found reasoning');
}
if (line.includes('Context:')) {
try {
const contextStr = line.split('Context:')[1].trim();
context = JSON.parse(contextStr);
console.log('Parsed context successfully');
} catch (e) {
console.warn('Failed to parse context:', e);
console.warn('Context string was:', line.split('Context:')[1].trim());
}
}
if (line.includes('Conclusion:')) {
conclusion = line.split('Conclusion:')[1].trim();
console.log('Found conclusion');
// Store chain of thought log when we have a complete step
if (currentStep && reasoning && conclusion) {
try {
const log: ChainOfThoughtLog = {
timestamp: new Date().toISOString(),
step: currentStep,
reasoning,
context,
conclusion
};
console.log('Storing chain of thought log...');
await storeChainOfThought(log);
console.log('Chain of thought log stored successfully');
// Reset for next step
currentStep = '';
reasoning = '';
context = {};
conclusion = '';
} catch (e) {
console.error('Failed to store chain of thought log:', e);
}
}
}
// Check for trading predictions
if (line.includes('Trading Prediction:')) {
try {
const predictionStr = line.split('Trading Prediction:')[1].trim();
console.log('Parsing prediction:', predictionStr);
const prediction: TradingPrediction = JSON.parse(predictionStr);
console.log('Storing prediction...');
await storePrediction(prediction);
console.log('Prediction stored successfully');
} catch (e) {
console.error('Failed to parse or store prediction:', e);
console.error('Prediction string was:', line.split('Trading Prediction:')[1].trim());
}
}
// Check for token price data
if (line.includes('Token Price:')) {
try {
const priceDataStr = line.split('Token Price:')[1].trim();
console.log('Parsing token price data:', priceDataStr);
const priceData: TokenPriceData = JSON.parse(priceDataStr);
console.log('Storing token price data...');
await storeTokenPriceData(priceData);
console.log('Token price data stored successfully');
} catch (e) {
console.error('Failed to parse or store token price data:', e);
console.error('Token price data string was:', line.split('Token Price:')[1].trim());
}
}
// Check for strategy updates
if (line.includes('Strategy Update:')) {
try {
const strategyStr = line.split('Strategy Update:')[1].trim();
console.log('Parsing strategy update:', strategyStr);
const strategy: AgentStrategy = JSON.parse(strategyStr);
console.log('Storing strategy update...');
await storeAgentStrategy(strategy);
console.log('Strategy update stored successfully');
} catch (e) {
console.error('Failed to parse or store strategy update:', e);
console.error('Strategy update string was:', line.split('Strategy Update:')[1].trim());
}
}
}
}
break;
case 'tool-call':
console.log('\nCalling tool:', part.toolName, 'with args:', JSON.stringify(part.args, null, 2));
break;
case 'tool-result':
console.log('\nTool result:', JSON.stringify(part.result, null, 2));
break;
}
} catch (error) {
console.error('Error processing agent response part:', error);
// Continue processing other parts
}
}
console.log('\nMonitoring cycle completed successfully');
consecutiveErrors = 0; // Reset error counter on success
} catch (error) {
console.error('Error in monitoring cycle:', error);
consecutiveErrors++;
if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
console.error(`Reached maximum consecutive errors (${MAX_CONSECUTIVE_ERRORS}). Backing off for ${ERROR_BACKOFF_TIME / 1000} seconds...`);
await new Promise(resolve => setTimeout(resolve, ERROR_BACKOFF_TIME));
}
}
}
console.log('Agent shutdown complete');
} catch (error) {
console.error('Error starting agent:', error);
process.exit(1);
}
}
// Run the agent
main().catch(error => {
console.error('Unhandled error in main:', error);
process.exit(1);
});
import { createWalletClient, http } from "viem";
import { privateKeyToAccount } from "viem/accounts";
import { testnet } from "@recallnet/chains";
import { RecallClient } from "@recallnet/sdk/client";
import { ListResultBucket } from "@recallnet/sdk/bucket";
import dotenv from 'dotenv';
dotenv.config();
// Ensure required environment variables are present
if (!process.env.RECALL_PRIVATE_KEY || !process.env.RECALL_BUCKET) {
throw new Error('Missing required RECALL_PRIVATE_KEY or RECALL_BUCKET environment variables');
}
// Configuration
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000; // 1 second
const READS_ENABLED = false; // Set to true when network reads are back up
// Ensure private key is in correct format
const privateKey = process.env.RECALL_PRIVATE_KEY.startsWith('0x')
? process.env.RECALL_PRIVATE_KEY as `0x${string}`
: `0x${process.env.RECALL_PRIVATE_KEY}` as `0x${string}`;
// Create wallet client from private key
const walletClient = createWalletClient({
account: privateKeyToAccount(privateKey),
chain: testnet,
transport: http(),
});
// Initialize Recall client with wallet
export const recall = new RecallClient({ walletClient });
// Get managers for different operations
const bucketManager = recall.bucketManager();
// Storage interfaces
export interface ChainOfThoughtLog {
timestamp: string;
step: string;
reasoning: string;
context: {
tokens?: string[];
sources?: string[];
marketData?: any;
};
conclusion: string;
}
export interface TradingPrediction {
timestamp: string;
chain_id: string;
symbol: string;
cost: string;
action: 'buy' | 'sell';
notes: string;
token_estimate: string;
}
export interface TokenPriceData {
timestamp: string;
symbol: string;
price_usd: string;
volume_24h: string;
market_cap: string;
price_change_24h: string;
mentions: {
twitter: number;
substack: number;
};
heat_value?: number;
}
export interface TokenPriceHistory {
symbol: string;
data: TokenPriceData[];
}
export interface AgentStrategy {
timestamp: string;
version: string;
description: string;
rules: string[];
parameters: {
min_mentions: number;
min_volume_24h: string;
min_market_cap: string;
min_price_change: string;
min_heat_value: number;
};
sources: {
twitter_lists: string[];
substack_urls: string[];
};
}
// Initialize bucket
let bucket: `0x${string}`;
// Helper function for retrying operations
async function retryOperation<T>(operation: () => Promise<T>, retries = MAX_RETRIES): Promise<T> {
let lastError: Error | undefined;
for (let i = 0; i < retries; i++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
if (i < retries - 1) {
console.warn(`Operation failed, attempt ${i + 1}/${retries}. Retrying in ${RETRY_DELAY}ms...`);
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY));
}
}
}
throw lastError;
}
export async function initializeBucket(): Promise<void> {
try {
await retryOperation(async () => {
// Create bucket if it doesn't exist
const { result: existingBuckets } = await bucketManager.list();
// Find or create bucket
const existingBucket = existingBuckets.find((b: ListResultBucket) => b.metadata?.['name'] === process.env.RECALL_BUCKET);
if (existingBucket) {
bucket = existingBucket.addr;
} else {
const { result: { bucket: newBucket } } = await bucketManager.create({
metadata: { name: process.env.RECALL_BUCKET! }
});
bucket = newBucket;
}
console.log('Bucket initialized:', bucket);
});
} catch (error) {
console.error('Failed to initialize bucket after retries:', error);
throw error;
}
}
// Storage functions
export async function storeChainOfThought(log: ChainOfThoughtLog): Promise<void> {
try {
await retryOperation(async () => {
const timestamp = new Date(log.timestamp).getTime().toString();
const key = `agents/trading-alpha/cot/${timestamp}.json`;
await bucketManager.add(bucket, key, Buffer.from(JSON.stringify(log)));
console.log('Successfully stored chain of thought log');
});
} catch (error) {
console.error('Failed to store chain of thought log after retries:', error);
throw error;
}
}
export async function storePrediction(prediction: TradingPrediction): Promise<void> {
try {
await retryOperation(async () => {
const timestamp = new Date(prediction.timestamp).getTime().toString();
const key = `agents/trading-alpha/outcomes/${timestamp}.json`;
await bucketManager.add(bucket, key, Buffer.from(JSON.stringify(prediction)));
console.log('Successfully stored prediction');
});
} catch (error) {
console.error('Failed to store prediction after retries:', error);
throw error;
}
}
export async function storeAgentMetadata(): Promise<void> {
const metadata = {
id: 'trading-alpha',
created_at: new Date().toISOString(),
type: 'trading-alpha-detection',
version: '1.0.0',
config: {
twitter_list: process.env.TWITTER_LISTS,
bucket
}
};
try {
await retryOperation(async () => {
const key = 'agents/trading-alpha/agent.json';
await bucketManager.add(bucket, key, Buffer.from(JSON.stringify(metadata)));
console.log('Successfully stored agent metadata');
});
} catch (error) {
console.error('Failed to store agent metadata after retries:', error);
throw error;
}
}
export async function storeAgentStrategy(strategy: AgentStrategy): Promise<void> {
try {
// Store JSON format
await retryOperation(async () => {
await bucketManager.add(
bucket,
'strategy.json',
Buffer.from(JSON.stringify(strategy, null, 2))
);
});
// Create markdown version
const markdownContent = `# Trading Alpha Detection Strategy
Version: ${strategy.version}
Last Updated: ${strategy.timestamp}
## Description
${strategy.description}
## Trading Rules
${strategy.rules.map(rule => `- ${rule}`).join('\n')}
## Parameters
- Minimum Mentions: ${strategy.parameters.min_mentions}
- Minimum 24h Volume: ${strategy.parameters.min_volume_24h}
- Minimum Market Cap: ${strategy.parameters.min_market_cap}
- Minimum Price Change: ${strategy.parameters.min_price_change}
- Minimum Heat Value: ${strategy.parameters.min_heat_value}
## Data Sources
### Twitter Lists
${strategy.sources.twitter_lists.map(list => `- ${list}`).join('\n')}
### Substack URLs
${strategy.sources.substack_urls.map(url => `- ${url}`).join('\n')}
`;
// Store markdown format
await retryOperation(async () => {
await bucketManager.add(
bucket,
'strategy.md',
Buffer.from(markdownContent)
);
});
console.log('Strategy stored successfully in both JSON and markdown formats');
} catch (error) {
console.error('Error storing strategy:', error);
throw error;
}
}
export async function getAgentStrategy(): Promise<AgentStrategy | null> {
if (!READS_ENABLED) {
console.warn('Network reads are currently disabled');
return null;
}
try {
const result = await retryOperation(async () => {
return await bucketManager.get(bucket, 'strategy.json');
});
if (!result) {
return null;
}
return JSON.parse(result.toString()) as AgentStrategy;
} catch (error) {
console.error('Error retrieving strategy:', error);
return null;
}
}
export async function getChainOfThoughtLogs(): Promise<ChainOfThoughtLog[]> {
if (!READS_ENABLED) {
console.warn('NOTICE: Recall network read operations are currently disabled. Data retrieval is not available.');
return [];
}
try {
return await retryOperation(async () => {
const { result } = await bucketManager.query(bucket);
const logs: ChainOfThoughtLog[] = [];
if (result.objects && result.objects.length > 0) {
for (const obj of result.objects) {
if (!obj.key.includes('agents/trading-alpha/cot/')) continue;
const { result: data } = await bucketManager.get(bucket, obj.key);
if (data) {
const decoded = new TextDecoder().decode(data);
logs.push(JSON.parse(decoded));
}
}
}
return logs;
});
} catch (error) {
console.error('Failed to retrieve chain of thought logs after retries:', error);
throw error;
}
}
export async function getPredictions(): Promise<TradingPrediction[]> {
if (!READS_ENABLED) {
console.warn('NOTICE: Recall network read operations are currently disabled. Data retrieval is not available.');
return [];
}
try {
return await retryOperation(async () => {
const { result } = await bucketManager.query(bucket);
const predictions: TradingPrediction[] = [];
if (result.objects && result.objects.length > 0) {
for (const obj of result.objects) {
if (!obj.key.includes('agents/trading-alpha/outcomes/')) continue;
const { result: data } = await bucketManager.get(bucket, obj.key);
if (data) {
const decoded = new TextDecoder().decode(data);
predictions.push(JSON.parse(decoded));
}
}
}
return predictions;
});
} catch (error) {
console.error('Failed to retrieve predictions after retries:', error);
throw error;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment