Skip to content

Instantly share code, notes, and snippets.

@louis030195
Created March 4, 2025 21:09
Show Gist options
  • Save louis030195/4bf6bdbad98bffaca8e3f39b54fc4eb3 to your computer and use it in GitHub Desktop.
Save louis030195/4bf6bdbad98bffaca8e3f39b54fc4eb3 to your computer and use it in GitHub Desktop.
use dirs::home_dir;
use regex::Regex;
use sentry;
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::process::Command;
use tokio::sync::watch;
use anyhow::Result;
use std::fs;
use std::path::Path;
use tokio::io::{AsyncBufReadExt, BufReader};
use tracing::{debug, error, info, warn};
use url::Url;
use which::which;
// Add these imports at the top of the file
use serde_json::json;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use crate::pick_unused_port;
use once_cell::sync::Lazy;
// Add near other imports
use http_cache_reqwest::{CACacheManager, Cache, CacheMode, HttpCache, HttpCacheOptions};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use reqwest_middleware::reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use reqwest_middleware::reqwest::Client;
use reqwest_middleware::ClientBuilder;
use std::collections::HashSet;
use std::str::FromStr;
// Add at top of file with other imports
#[cfg(windows)]
use std::os::windows::process::CommandExt;
#[derive(Clone, Debug, Copy)]
pub enum PipeState {
Port(u16),
Pid(i32),
}
pub struct CronHandle {
shutdown: watch::Sender<bool>,
}
#[derive(Debug, Clone)]
pub enum BuildStatus {
NotStarted,
InProgress,
Success,
Failed(String),
}
impl CronHandle {
pub fn stop(&self) {
let _ = self.shutdown.send(true);
}
}
static CRON_HANDLES: Lazy<tokio::sync::Mutex<HashMap<String, Vec<CronHandle>>>> =
Lazy::new(|| tokio::sync::Mutex::new(HashMap::new()));
// Add this function to generate a secure cron secret
fn generate_cron_secret() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect()
}
// Update this function near the top of the file
fn sanitize_pipe_name(name: &str) -> String {
// First check if this is a GitHub URL and extract the repo name
if let Ok(url) = Url::parse(name) {
if url.host_str() == Some("github.com") {
let path_segments: Vec<&str> = url.path_segments().unwrap().collect();
if path_segments.len() >= 2 {
// Use the repository name (second segment) instead of branch name
let repo_name = path_segments[1];
debug!("Using repository name for pipe: {}", repo_name);
return repo_name.to_string();
}
}
}
// Fall back to original sanitization logic for non-GitHub URLs
let re = Regex::new(r"[^a-zA-Z0-9_-]").unwrap();
let sanitized = re.replace_all(name, "-").to_string();
// Remove "-ref-main/" suffix if it exists
sanitized
.strip_suffix("-ref-main/")
.or_else(|| sanitized.strip_suffix("-ref-main"))
.unwrap_or(&sanitized)
.to_string()
}
async fn create_watchdog_script(parent_pid: u32, child_pid: u32) -> Result<PathBuf> {
let script_content = if cfg!(windows) {
format!(
r#"
$parentPid = {parent_pid}
$childPid = {child_pid}
function Get-ChildProcesses($ProcessId) {{
Get-WmiObject Win32_Process | Where-Object {{ $_.ParentProcessId -eq $ProcessId }} | ForEach-Object {{
$_.ProcessId
Get-ChildProcesses $_.ProcessId
}}
}}
while ($true) {{
try {{
$parent = Get-Process -Id $parentPid -ErrorAction Stop
$child = Get-Process -Id $childPid -ErrorAction Stop
Start-Sleep -Seconds 1
}} catch {{
Write-Host "Parent process ($parentPid) not found, terminating child processes"
# Get all child processes recursively
$children = Get-ChildProcesses $childPid
# Add the main process to the list
$allProcesses = @($childPid) + $children
foreach ($processId in $allProcesses) {{
try {{
Stop-Process -Id $processId -Force -ErrorAction SilentlyContinue
Write-Host "Stopped process: $processId"
}} catch {{
Write-Host "Process $processId already terminated"
}}
}}
Stop-Process -Id $PID -Force
exit
}}
}}
"#
)
} else {
format!(
r#"#!/bin/bash
set -x # Enable debug mode
parent_pid={parent_pid}
child_pid={child_pid}
echo "[$(date)] Watchdog started - monitoring parent PID: $parent_pid, child PID: $child_pid"
find_all_children() {{
local parent=$1
local children=$(pgrep -P $parent)
echo $children
for child in $children; do
find_all_children $child
done
}}
cleanup() {{
echo "[$(date)] Starting cleanup..."
# Get all child processes recursively
all_children=$(find_all_children $child_pid)
echo "[$(date)] Found child processes: $all_children"
# Try to kill by process group first
child_pgid=$(ps -o pgid= -p $child_pid 2>/dev/null | tr -d ' ')
if [ ! -z "$child_pgid" ]; then
echo "[$(date)] Killing process group $child_pgid"
pkill -TERM -g $child_pgid 2>/dev/null || true
sleep 1
pkill -KILL -g $child_pgid 2>/dev/null || true
fi
# Kill all children individually if they still exist
if [ ! -z "$all_children" ]; then
echo "[$(date)] Killing all child processes: $all_children"
kill -TERM $all_children 2>/dev/null || true
sleep 1
kill -KILL $all_children 2>/dev/null || true
fi
# Kill the main process if it still exists
echo "[$(date)] Killing main process $child_pid"
kill -TERM $child_pid 2>/dev/null || true
sleep 1
kill -KILL $child_pid 2>/dev/null || true
# Final verification
sleep 1
remaining=$(ps -o pid= -g $child_pgid 2>/dev/null || true)
if [ ! -z "$remaining" ]; then
echo "[$(date)] WARNING: Some processes might still be running: $remaining"
pkill -KILL -g $child_pgid 2>/dev/null || true
fi
exit 0
}}
trap cleanup SIGTERM SIGINT
while true; do
if ! ps -p $parent_pid > /dev/null 2>&1; then
echo "[$(date)] Parent process ($parent_pid) not found, terminating child processes"
cleanup
exit
fi
sleep 1
done
"#
)
};
let temp_dir = std::env::temp_dir();
let script_name = if cfg!(windows) {
format!("watchdog_{parent_pid}_{child_pid}.ps1")
} else {
format!("watchdog_{parent_pid}_{child_pid}.sh")
};
let script_path = temp_dir.join(script_name);
tokio::fs::write(&script_path, script_content).await?;
// Set executable permissions on Unix systems only
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = tokio::fs::metadata(&script_path).await?.permissions();
perms.set_mode(0o755);
tokio::fs::set_permissions(&script_path, perms).await?;
}
Ok(script_path)
}
async fn spawn_watchdog(parent_pid: u32, child_pid: u32) -> Result<tokio::process::Child> {
let script_path = create_watchdog_script(parent_pid, child_pid).await?;
info!(
"Spawning watchdog process for parent_pid={}, child_pid={}",
parent_pid, child_pid
);
info!("Watchdog script path: {:?}", script_path);
// Create a log file for the watchdog
let log_path = std::env::temp_dir().join(format!("watchdog_{}_{}.log", parent_pid, child_pid));
#[cfg(windows)]
let child = {
Command::new("powershell")
.arg("-ExecutionPolicy")
.arg("Bypass")
.arg("-NoProfile")
.arg("-NonInteractive")
.arg("-WindowStyle")
.arg("Hidden")
.arg("-File")
.arg(&script_path)
.creation_flags(0x08000000)
.stdout(std::fs::File::create(&log_path)?)
.stderr(std::fs::File::create(&log_path)?)
.spawn()?
};
#[cfg(not(windows))]
let child = {
Command::new("bash")
.arg(&script_path)
.stdout(std::fs::File::create(&log_path)?)
.stderr(std::fs::File::create(&log_path)?)
.spawn()?
};
if let Some(id) = child.id() {
info!("Watchdog process spawned with PID: {}", id);
// Modify verification for Windows
#[cfg(windows)]
{
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let output = Command::new("powershell")
.arg("-NoProfile")
.arg("-NonInteractive")
.arg("-Command")
.arg(format!(
"Get-Process -Id {} -ErrorAction SilentlyContinue",
id
))
.output()
.await;
match output {
Ok(output) => {
if output.status.success() {
info!("Watchdog process verified running with PID: {}", id);
} else {
error!("Watchdog process not found after spawn! PID: {}", id);
}
}
Err(e) => error!("Failed to verify watchdog process: {}", e),
}
});
}
#[cfg(not(windows))]
{
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let output = Command::new("ps")
.args(["-p", &id.to_string()])
.output()
.await;
match output {
Ok(output) => {
if output.status.success() {
info!("Watchdog process verified running with PID: {}", id);
} else {
error!("Watchdog process not found after spawn! PID: {}", id);
}
}
Err(e) => error!("Failed to verify watchdog process: {}", e),
}
});
}
}
// Clean up script after a delay
let script_path_clone = script_path.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
if let Err(e) = tokio::fs::remove_file(&script_path_clone).await {
error!("Failed to remove watchdog script: {}", e);
}
});
Ok(child)
}
pub async fn run_pipe(
pipe: &str,
screenpipe_dir: PathBuf,
) -> Result<(tokio::process::Child, PipeState)> {
let bun_path = find_bun_path().ok_or_else(|| {
let err = anyhow::anyhow!("bun not found");
sentry::capture_error(&err.source().unwrap());
err
})?;
let pipe_dir = screenpipe_dir.join("pipes").join(pipe);
let pipe_json_path = pipe_dir.join("pipe.json");
let package_json_path = pipe_dir.join("package.json");
debug!(
"checking if pipe is a next.js project at: {:?}",
package_json_path
);
// First check if it's a Next.js project by looking at package.json
let is_nextjs = if package_json_path.exists() {
debug!("found package.json, checking for next.js dependency");
let package_json = tokio::fs::read_to_string(&package_json_path).await?;
let package_data: Value = serde_json::from_str(&package_json)?;
let has_next = package_data["dependencies"].get("next").is_some();
debug!("is next.js project: {}", has_next);
has_next
} else {
false
};
// Check if pipe is still enabled
if pipe_json_path.exists() {
debug!("checking if pipe is enabled from: {:?}", pipe_json_path);
let pipe_json = tokio::fs::read_to_string(&pipe_json_path).await?;
let pipe_config: Value = serde_json::from_str(&pipe_json)?;
if !pipe_config
.get("enabled")
.and_then(Value::as_bool)
.unwrap_or(false)
{
debug!("pipe {} is disabled, stopping", pipe);
anyhow::bail!("pipe is disabled");
}
debug!("pipe {} is enabled, continuing", pipe);
}
// Prepare environment variables
debug!("preparing environment variables for pipe: {}", pipe);
let mut env_vars = std::env::vars().collect::<Vec<(String, String)>>();
env_vars.push((
"SCREENPIPE_DIR".to_string(),
screenpipe_dir.to_str().unwrap().to_string(),
));
env_vars.push(("PIPE_ID".to_string(), pipe.to_string()));
env_vars.push((
"PIPE_DIR".to_string(),
pipe_dir.to_str().unwrap().to_string(),
));
if is_nextjs {
debug!(
"setting up next.js specific configuration for pipe: {}",
pipe
);
let mut assigned_port = None;
// Handle Next.js specific setup including crons
if pipe_json_path.exists() {
debug!("reading pipe.json for next.js configuration");
let pipe_json = tokio::fs::read_to_string(&pipe_json_path).await?;
let pipe_config: Value = serde_json::from_str(&pipe_json)?;
// Try to use user-configured port first
if let Some(user_port) = pipe_config.get("port").and_then(|p| p.as_u64()) {
debug!("found user-configured port: {}", user_port);
// Verify port is available
if is_port_available(user_port as u16) {
assigned_port = Some(user_port as u16);
debug!("user-configured port {} is available", user_port);
} else {
debug!(
"user-configured port {} is in use, will assign random port",
user_port
);
}
}
// Fallback to random port if needed
let port = assigned_port.unwrap_or_else(|| pick_unused_port().expect("No ports free"));
info!("[{}] using port {} for next.js pipe", pipe, port);
// Update pipe.json with the actual port being used
let mut updated_config = pipe_config.clone();
updated_config["port"] = json!(port);
let updated_pipe_json = serde_json::to_string_pretty(&updated_config)?;
let mut file = File::create(&pipe_json_path).await?;
file.write_all(updated_pipe_json.as_bytes()).await?;
info!(
"[{}] updated pipe.json with port configuration: {}",
pipe, updated_pipe_json
);
env_vars.push(("PORT".to_string(), port.to_string()));
// Handle cron jobs if they exist
if let Some(crons) = pipe_config.get("crons").and_then(Value::as_array) {
info!(
"[{}] found {} cron jobs in configuration",
pipe,
crons.len()
);
let base_url = format!("http://localhost:{}", port);
debug!("[{}] using base url: {} for cron jobs", pipe, base_url);
let cron_secret = generate_cron_secret();
debug!("[{}] generated cron secret: {}", pipe, cron_secret);
env_vars.push(("CRON_SECRET".to_string(), cron_secret.clone()));
let mut handles = Vec::new();
for cron in crons {
let path = cron["path"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("missing path"))?
.to_string();
let schedule = cron["schedule"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("missing schedule"))?
.to_string();
let (tx, rx) = watch::channel(false);
let handle = CronHandle { shutdown: tx };
handles.push(handle);
let base_url = base_url.clone();
let pipe_clone = pipe.to_string();
let secret_clone = cron_secret.clone();
let screenpipe_dir = screenpipe_dir.clone();
tokio::spawn(async move {
run_cron_schedule(
&pipe_clone,
&base_url,
&path,
&secret_clone,
&schedule,
&screenpipe_dir,
rx,
)
.await;
});
}
// Store handles for later cleanup
CRON_HANDLES.lock().await.insert(pipe.to_string(), handles);
}
// Install dependencies using bun
info!("[{}] installing dependencies for next.js pipe", pipe);
let install_output = Command::new(&bun_path)
.arg("install")
.current_dir(&pipe_dir)
.env("NPM_CONFIG_REGISTRY", "https://registry.npmjs.org")
.env("BUN_CONFIG_REGISTRY", "https://registry.npmjs.org")
.output()
.await?;
if !install_output.status.success() {
let err_msg = String::from_utf8_lossy(&install_output.stderr);
error!("[{}] failed to install dependencies: {}", pipe, err_msg);
let err = anyhow::anyhow!(
"failed to install dependencies for next.js pipe: {}",
err_msg
);
sentry::capture_error(&err.source().unwrap());
anyhow::bail!(err);
}
info!(
"[{}] successfully installed dependencies for next.js pipe",
pipe
);
} else {
let port = pick_unused_port().expect("No ports free");
info!(
"[{}] no pipe.json found, using random port {} for next.js pipe",
pipe, port
);
env_vars.push(("PORT".to_string(), port.to_string()));
}
// Try to build the Next.js project
let build_status = try_build_nextjs(&pipe_dir, &bun_path).await?;
let build_success = matches!(build_status, BuildStatus::Success);
if pipe_json_path.exists() {
let pipe_json = tokio::fs::read_to_string(&pipe_json_path).await?;
let mut pipe_config: Value = serde_json::from_str(&pipe_json)?;
pipe_config["buildStatus"] = match &build_status {
BuildStatus::Success => json!("success"),
BuildStatus::Failed(error) => json!({
"status": "failed",
"error": error
}),
BuildStatus::InProgress => json!("in_progress"),
BuildStatus::NotStarted => json!("not_started"),
};
let updated_pipe_json = serde_json::to_string_pretty(&pipe_config)?;
tokio::fs::write(&pipe_json_path, updated_pipe_json).await?;
}
let port = env_vars
.iter()
.find(|(k, _)| k == "PORT")
.map(|(_, v)| v)
.unwrap()
.parse::<u16>()
.expect("Invalid port number");
// Run the Next.js project
info!(
"starting next.js project in {} mode",
if build_success {
"production"
} else {
"development"
}
);
let mut command = Command::new(&bun_path);
command.arg("run").arg("--bun");
if build_success {
command.arg("start");
} else {
info!("[{}] falling back to dev mode due to build failure", pipe);
command.arg("dev");
}
command
.arg("--port")
.arg(port.to_string())
.current_dir(&pipe_dir)
.envs(env_vars)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let mut child = command.spawn()?;
debug!("[{}] streaming logs for next.js pipe", pipe);
stream_logs(pipe, &mut child).await?;
let child_pid = child.id().expect("Failed to get child PID") as u32;
let parent_pid = std::process::id();
info!("Spawned bun process with PID: {}", child_pid);
// Spawn watchdog process
match spawn_watchdog(parent_pid, child_pid).await {
Ok(watchdog) => {
debug!(
"Watchdog process spawned successfully with PID: {:?}",
watchdog.id()
);
}
Err(e) => {
warn!("Failed to spawn watchdog process: {}", e);
}
}
return Ok((child, PipeState::Port(port)));
}
// If it's not a Next.js project, run as regular pipe
let main_module = find_pipe_file(&pipe_dir)?;
info!("[{}] executing pipe: {:?}", pipe, main_module);
env_vars.push((
"PIPE_FILE".to_string(),
main_module.to_str().unwrap().to_string(),
));
let mut child = Command::new(&bun_path)
.arg("run")
.arg("--bun")
.arg(&main_module)
.envs(env_vars)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
// Stream logs
stream_logs(pipe, &mut child).await?;
let child_id = child.id().unwrap();
Ok((child, PipeState::Pid(child_id as i32))) // Return 0 or handle port differently for non-Next.js projects
}
async fn stream_logs(pipe: &str, child: &mut tokio::process::Child) -> Result<()> {
let stdout = child.stdout.take().expect("failed to get stdout");
let stderr = child.stderr.take().expect("failed to get stderr");
let pipe_clone = pipe.to_string();
// Spawn tasks to handle stdout and stderr
let _stdout_handle = tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
info!("[{}] {}", pipe_clone, line);
}
});
let pipe_clone = pipe.to_string();
let _stderr_handle = tokio::spawn(async move {
// Create static HashMaps for efficient lookups
static INFO_PATTERNS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
let mut set = HashSet::new();
// Development related
set.insert("Download");
set.insert("Task dev");
set.insert("$ next dev");
set.insert("ready started server");
set.insert("Local:");
set.insert("Webpack is configured");
set.insert("See instructions");
set.insert("https://nextjs.org");
set.insert("⚠ See instructions");
set.insert("$ next start");
set.insert("[bun install]");
set.insert("Saved lockfile");
set.insert("Resolved, downloaded");
set.insert("Installing");
set.insert("Successfully installed");
set.insert("packages installed");
set.insert("Fetching");
set.insert("Resolving");
// Frontend console patterns
set.insert("[LOG]");
set.insert("console.log");
set.insert("] ");
set.insert("›");
set.insert("<");
set.insert("Warning:");
set.insert("render@");
set.insert("webpack");
set.insert("HMR");
set.insert("[HMR]");
set
});
static ERROR_PATTERNS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert("TypeError:");
set.insert("ReferenceError:");
set.insert("SyntaxError:");
set.insert("Error:");
set.insert("Uncaught");
set.insert("Failed to compile");
set.insert("ENOENT");
set.insert("FATAL");
set
});
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let line_lower = line.to_lowercase(); // Convert once for case-insensitive matching
// Quick checks first
if line.trim().is_empty() || line.contains("console.") {
info!("[{}] {}", pipe_clone, line);
continue;
}
// Check for error patterns first (higher priority)
if ERROR_PATTERNS
.iter()
.any(|&pattern| line_lower.contains(&pattern.to_lowercase()))
{
error!("[{}] {}", pipe_clone, line);
sentry::capture_message(
&format!("[{}] {}", pipe_clone, line),
sentry::Level::Error,
);
continue;
}
// Then check for info patterns
if INFO_PATTERNS
.iter()
.any(|&pattern| line_lower.contains(&pattern.to_lowercase()))
{
info!("[{}] {}", pipe_clone, line);
continue;
}
// Default to warning for unknown patterns
warn!("[{}] {}", pipe_clone, line);
}
});
info!("pipe execution completed successfully [{}]", pipe);
Ok(())
}
// Add this helper function for retrying installations
async fn retry_install(bun_path: &Path, dest_dir: &Path, max_retries: u32) -> Result<()> {
let mut attempt = 0;
let mut last_error = None;
while attempt < max_retries {
let mut install_child = Command::new(bun_path)
.arg("i")
.current_dir(dest_dir)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
// Stream logs for npm install
if let Ok(()) = stream_logs("bun install", &mut install_child).await {
let status = install_child.wait().await?;
if status.success() {
return Ok(());
}
}
attempt += 1;
let delay = std::time::Duration::from_secs(2u64.pow(attempt)); // exponential backoff
error!(
"install attempt {} failed, retrying in {} seconds",
attempt,
delay.as_secs()
);
tokio::time::sleep(delay).await;
last_error = Some(anyhow::anyhow!(
"installation failed after {} attempts",
attempt
));
}
Err(last_error.unwrap_or_else(|| anyhow::anyhow!("installation failed")))
}
pub async fn download_pipe(source: &str, screenpipe_dir: PathBuf) -> anyhow::Result<PathBuf> {
info!("Processing pipe from source: {}", source);
let is_local = Url::parse(source).is_err();
let mut pipe_name =
sanitize_pipe_name(Path::new(source).file_name().unwrap().to_str().unwrap());
// For GitHub URLs, extract the repository name directly
if !is_local {
if let Ok(url) = Url::parse(source) {
if url.host_str() == Some("github.com") {
let path_segments: Vec<&str> = url.path_segments().unwrap().collect();
if path_segments.len() >= 2 {
pipe_name = path_segments[1].to_string();
debug!("Using repository name for pipe: {}", pipe_name);
}
}
}
}
// Add _local suffix for local pipes
if is_local {
pipe_name.push_str("_local");
}
let dest_dir = screenpipe_dir.join("pipes").join(&pipe_name);
debug!("Destination directory: {:?}", dest_dir);
// Save existing pipe.json content before downloading
let pipe_json_path = dest_dir.join("pipe.json");
let existing_config = if pipe_json_path.exists() {
debug!("Existing pipe.json found");
let content = tokio::fs::read_to_string(&pipe_json_path).await?;
Some(serde_json::from_str::<Value>(&content)?)
} else {
debug!("No existing pipe.json found");
None
};
// Create temp directory for download
let temp_dir = dest_dir.with_extension("_temp");
tokio::fs::create_dir_all(&temp_dir).await?;
// Download to temp directory first
let download_result = if let Ok(parsed_url) = Url::parse(source) {
debug!("Source is a URL: {}", parsed_url);
if parsed_url.host_str() == Some("github.com") {
download_github_folder(&parsed_url, &temp_dir).await
} else if cfg!(windows) && parsed_url.scheme().len() == 1 {
// This is likely a Windows path with drive letter being interpreted as URL scheme
debug!("Detected Windows path with drive letter, treating as local path");
let source_path = Path::new(source);
if !source_path.exists() || !source_path.is_dir() {
anyhow::bail!("Invalid local source path");
}
copy_dir_all(source_path, &temp_dir).await
} else {
anyhow::bail!("Unsupported URL format");
}
} else {
debug!("Source is a local path");
let source_path = Path::new(source);
if !source_path.exists() || !source_path.is_dir() {
anyhow::bail!("Invalid local source path");
}
copy_dir_all(source_path, &temp_dir).await
};
// remove temp dir if download failed
if let Err(e) = download_result {
tokio::fs::remove_dir_all(&temp_dir).await?;
error!("Failed to download pipe: {}", e);
}
// If download successful, move temp dir to final location
if dest_dir.exists() {
tokio::fs::remove_dir_all(&dest_dir).await?;
}
tokio::fs::rename(&temp_dir, &dest_dir).await?;
// Restore or merge pipe.json if needed
if let Some(ref existing_config) = existing_config {
let new_config_path = dest_dir.join("pipe.json");
if new_config_path.exists() {
let content = tokio::fs::read_to_string(&new_config_path).await?;
let new_json: Value = serde_json::from_str(&content)?;
// Create merged config
let mut merged_config = new_json.clone(); // Start with new schema
// If both configs have fields array, preserve user values
if let (Some(existing_obj), Some(new_obj)) =
(existing_config.as_object(), merged_config.as_object_mut())
{
// Copy over non-fields properties from existing config
for (key, value) in existing_obj {
if key != "fields" {
new_obj.insert(key.clone(), value.clone());
}
}
// For fields array, preserve user values while keeping new schema
if let (Some(existing_fields), Some(new_fields)) = (
existing_config["fields"].as_array(),
new_obj.get_mut("fields").and_then(|f| f.as_array_mut()),
) {
// For each field in the new schema
for new_field in new_fields {
if let Some(name) = new_field.get("name").and_then(Value::as_str) {
// If this field existed in the old config, preserve its value
if let Some(existing_field) = existing_fields
.iter()
.find(|f| f.get("name").and_then(Value::as_str) == Some(name))
{
if let Some(user_value) = existing_field.get("value") {
if let Some(new_field_obj) = new_field.as_object_mut() {
new_field_obj
.insert("value".to_string(), user_value.clone());
}
}
}
}
}
}
}
let config_str = serde_json::to_string_pretty(&merged_config)?;
tokio::fs::write(&new_config_path, config_str).await?;
} else {
// If no new config exists, keep the existing one
let config_str = serde_json::to_string_pretty(&existing_config)?;
tokio::fs::write(&new_config_path, config_str).await?;
}
}
// After downloading/copying the pipe, check if it's a Next.js project
let package_json_path = dest_dir.join("package.json");
if package_json_path.exists() {
let package_json = tokio::fs::read_to_string(&package_json_path).await?;
let package_data: Value = serde_json::from_str(&package_json)?;
let bun_path = find_bun_path().ok_or_else(|| anyhow::anyhow!("bun not found"))?;
// Make bun install mandatory for all package.json pipes with retries
retry_install(&bun_path, &dest_dir, 3).await?;
if package_data["dependencies"].get("next").is_some() {
info!("Detected Next.js project, setting up for production");
// Update pipe.json to indicate it's a Next.js project
let mut pipe_config = if let Some(existing_json) = &existing_config {
existing_json.clone()
} else if pipe_json_path.exists() {
let pipe_json = tokio::fs::read_to_string(&pipe_json_path).await?;
serde_json::from_str(&pipe_json)?
} else {
json!({})
};
pipe_config["is_nextjs"] = json!(true);
pipe_config["buildStatus"] = json!("not_started");
let updated_pipe_json = serde_json::to_string_pretty(&pipe_config)?;
let mut file = File::create(&pipe_json_path).await?;
file.write_all(updated_pipe_json.as_bytes()).await?;
}
}
info!("pipe copied successfully to: {:?}", dest_dir);
Ok(dest_dir)
}
async fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let src = src.as_ref();
let dst = dst.as_ref();
debug!("copy_dir_all: src={:?}, dst={:?}", src, dst);
tokio::fs::create_dir_all(&dst).await?;
debug!("Created destination directory: {:?}", dst);
let mut entries = tokio::fs::read_dir(src).await?;
debug!("Reading source directory: {:?}", src);
while let Some(entry) = entries.next_entry().await? {
let ty = entry.file_type().await?;
let src_path = entry.path();
let dst_path = dst.join(entry.file_name());
debug!("Processing entry: {:?}", src_path);
if should_ignore(&entry.file_name()) {
debug!("Skipping ignored file/directory: {:?}", entry.file_name());
continue;
}
if ty.is_dir() {
debug!("Entry is a directory, recursing: {:?}", src_path);
copy_dir_all_boxed(src_path, dst_path).await?;
} else {
debug!("Copying file: {:?} to {:?}", src_path, dst_path);
tokio::fs::copy(&src_path, &dst_path).await?;
}
}
debug!("Finished copying directory: {:?}", src);
Ok(())
}
fn copy_dir_all_boxed(
src: impl AsRef<Path> + Send + 'static,
dst: impl AsRef<Path> + Send + 'static,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
Box::pin(copy_dir_all(src, dst))
}
fn should_ignore(file_name: &std::ffi::OsStr) -> bool {
let ignore_list = [
"node_modules",
".git",
".next",
"dist",
"build",
".DS_Store",
"Thumbs.db",
".env",
".env.local",
".env.development.local",
".env.test.local",
".env.production.local",
];
ignore_list.iter().any(|ignored| file_name == *ignored)
|| file_name.to_str().is_some_and(|s| s.starts_with('.'))
}
fn download_github_folder(
url: &Url,
dest_dir: &Path,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
let url = url.clone();
let dest_dir = dest_dir.to_path_buf();
Box::pin(async move {
// Create a cached client
let client = ClientBuilder::new(Client::new())
.with(Cache(HttpCache {
mode: CacheMode::Default,
manager: CACacheManager {
path: home_dir()
.unwrap()
.join(".screenpipe")
.join(".http-cacache"),
},
options: HttpCacheOptions::default(),
}))
.build();
let api_url = get_raw_github_url(url.as_str())?;
debug!("using github api url: {}", api_url);
let response = client
.get(&api_url)
.header("Accept", "application/vnd.github.v3+json")
.header("User-Agent", "screenpipe")
.send()
.await?;
debug!(
"GitHub API cache hit: {:?}",
response.headers().get("x-cache")
);
let contents: Value = response.text().await?.parse()?;
let tree = contents["tree"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("invalid response from github api"))?;
// Extract repo info from URL
let path_segments: Vec<&str> = url.path_segments().unwrap().collect();
// Handle both URL formats: with and without /tree/branch
let (owner, repo, branch) = if path_segments.contains(&"tree") {
// Format: /owner/repo/tree/branch
let tree_pos = path_segments.iter().position(|&s| s == "tree").unwrap();
(
path_segments[0],
path_segments[1],
path_segments[tree_pos + 1],
)
} else {
// Format: /owner/repo
(path_segments[0], path_segments[1], "main") // Default to main branch
};
// Extract the base path for subfolder downloads
let base_path = if path_segments.contains(&"tree") {
url.path_segments()
.and_then(|segments| {
let segments: Vec<_> = segments.collect();
if segments.len() >= 5 && segments[2] == "tree" {
Some(segments[4..].join("/"))
} else {
None
}
})
.unwrap_or_default()
} else {
// No subfolder for URLs without /tree/branch format
String::new()
};
debug!("base path for download: {}", base_path);
// Process all files in parallel
let mut tasks = Vec::new();
for item in tree {
let path = item["path"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("missing path in github response"))?;
// Skip if file is not in the target directory
if !base_path.is_empty() && !path.starts_with(&base_path) {
continue;
}
let file_name = Path::new(path)
.file_name()
.ok_or_else(|| anyhow::anyhow!("invalid path"))?;
// Skip hidden files and ignored directories
if should_ignore(file_name) {
debug!("skipping ignored file/directory: {}", path);
continue;
}
let item_type = item["type"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("missing type in github response"))?;
if item_type == "blob" {
// Calculate relative path from base_path
let relative_path = if !base_path.is_empty() && path.starts_with(&base_path) {
if let Some(stripped) = path.strip_prefix(&base_path) {
stripped.trim_start_matches('/')
} else {
path
}
} else {
path
};
let file_dest = dest_dir.join(relative_path);
let client = client.clone();
// Use raw.githubusercontent.com URL
let raw_url = format!(
"https://raw.githubusercontent.com/{}/{}/{}/{}",
owner, repo, branch, path
);
// Create task for parallel download
tasks.push(tokio::spawn(async move {
if let Some(parent) = file_dest.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file_content = client.get(&raw_url).send().await?.bytes().await?;
tokio::fs::write(&file_dest, &file_content).await?;
debug!("downloaded file: {:?}", file_dest);
Ok::<_, anyhow::Error>(())
}));
}
}
// Wait for all downloads to complete
for task in tasks {
task.await??;
}
Ok(())
})
}
fn get_raw_github_url(url: &str) -> anyhow::Result<String> {
debug!("Attempting to get raw GitHub URL for: {}", url);
let parsed_url = Url::parse(url)?;
if parsed_url.host_str() == Some("github.com") {
let path_segments: Vec<&str> = parsed_url.path_segments().unwrap().collect();
// Handle URLs without /tree/branch format (assume main branch)
if path_segments.len() >= 2 && !path_segments.contains(&"tree") {
let owner = path_segments[0];
let repo = path_segments[1];
let branch = "main"; // Default to main branch
let raw_url = format!(
"https://api.github.com/repos/{}/{}/git/trees/{}?recursive=1",
owner, repo, branch
);
debug!(
"Converted to GitHub API URL with default branch: {}",
raw_url
);
return Ok(raw_url);
}
// Original logic for URLs with /tree/branch format
if path_segments.len() >= 4 && path_segments.contains(&"tree") {
// Find the position of "tree" in the path
let tree_pos = path_segments.iter().position(|&s| s == "tree").unwrap();
let owner = path_segments[0];
let repo = path_segments[1];
// The branch is just the segment after "tree"
let branch = path_segments[tree_pos + 1];
let raw_url = format!(
"https://api.github.com/repos/{}/{}/git/trees/{}?recursive=1",
owner, repo, branch
);
debug!("Converted to GitHub API URL: {}", raw_url);
return Ok(raw_url);
}
}
anyhow::bail!("Invalid GitHub URL format")
}
fn find_pipe_file(pipe_dir: &Path) -> anyhow::Result<PathBuf> {
for entry in fs::read_dir(pipe_dir)? {
let entry = entry?;
let file_name = entry.file_name();
let file_name_str = file_name.to_str().unwrap();
if (file_name_str == "pipe.js" || file_name_str == "pipe.ts") && !is_hidden_file(&file_name)
{
return Ok(entry.path());
}
}
anyhow::bail!("No pipe.js/pipe.ts found in the pipe/dist directory")
}
fn is_hidden_file(file_name: &std::ffi::OsStr) -> bool {
file_name
.to_str()
.map(|s| s.starts_with('.') || s == "Thumbs.db")
.unwrap_or(false)
}
#[cfg(not(windows))]
const BUN_EXECUTABLE_NAME: &str = "bun";
#[cfg(windows)]
const BUN_EXECUTABLE_NAME: &str = "bun.exe";
static BUN_PATH: Lazy<Option<PathBuf>> = Lazy::new(find_bun_path_internal);
pub fn find_bun_path() -> Option<PathBuf> {
BUN_PATH.as_ref().map(|p| p.clone())
}
fn find_bun_path_internal() -> Option<PathBuf> {
debug!("starting search for bun executable");
// Check in executable directory (eg tauri etc.)
if let Ok(exe_path) = std::env::current_exe() {
if let Some(exe_folder) = exe_path.parent() {
debug!("executable folder: {:?}", exe_folder);
let bun_in_exe_folder = exe_folder.join(BUN_EXECUTABLE_NAME);
if bun_in_exe_folder.exists() {
debug!("found bun in executable folder: {:?}", bun_in_exe_folder);
return Some(bun_in_exe_folder);
}
debug!("bun not found in executable folder");
// Platform-specific checks
#[cfg(target_os = "macos")]
{
let resources_folder = exe_folder.join("../Resources");
debug!("resources folder: {:?}", resources_folder);
let bun_in_resources = resources_folder.join(BUN_EXECUTABLE_NAME);
if bun_in_resources.exists() {
debug!("found bun in resources folder: {:?}", bun_in_resources);
return Some(bun_in_resources);
}
debug!("bun not found in resources folder");
}
#[cfg(target_os = "linux")]
{
let lib_folder = exe_folder.join("lib");
debug!("lib folder: {:?}", lib_folder);
let bun_in_lib = lib_folder.join(BUN_EXECUTABLE_NAME);
if bun_in_lib.exists() {
debug!("found bun in lib folder: {:?}", bun_in_lib);
return Some(bun_in_lib);
}
debug!("bun not found in lib folder");
}
}
}
// Check if bun is in PATH
if let Ok(path) = which(BUN_EXECUTABLE_NAME) {
debug!("found bun in PATH: {:?}", path);
return Some(path);
}
debug!("bun not found in PATH");
// Check in current working directory
if let Ok(cwd) = std::env::current_dir() {
debug!("current working directory: {:?}", cwd);
let bun_in_cwd = cwd.join(BUN_EXECUTABLE_NAME);
if bun_in_cwd.is_file() && bun_in_cwd.exists() {
debug!("found bun in current working directory: {:?}", bun_in_cwd);
return Some(bun_in_cwd);
}
debug!("bun not found in current working directory");
}
error!("bun not found");
let err = anyhow::anyhow!("Bun executable not found. Pipe functionality may be limited.");
sentry::capture_error(&err.source().unwrap());
None
}
// Add this function to handle cron state persistence
pub async fn get_last_cron_execution(pipe_dir: &Path, path: &str) -> Result<Option<SystemTime>> {
let state_file = pipe_dir.join(".cron_state.json");
if !state_file.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(state_file).await?;
let state: Value = serde_json::from_str(&content)?;
if let Some(last_run) = state.get(path).and_then(|v| v.as_u64()) {
Ok(Some(UNIX_EPOCH + std::time::Duration::from_secs(last_run)))
} else {
Ok(None)
}
}
// Add this function to save cron execution time
pub async fn save_cron_execution(pipe_dir: &Path, path: &str) -> Result<()> {
let state_file = pipe_dir.join(".cron_state.json");
let mut state: Value = if state_file.exists() {
let content = tokio::fs::read_to_string(&state_file).await?;
serde_json::from_str(&content)?
} else {
json!({})
};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if let Some(obj) = state.as_object_mut() {
obj.insert(path.to_string(), json!(now));
}
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(state_file)
.await?;
file.write_all(serde_json::to_string_pretty(&state)?.as_bytes())
.await?;
Ok(())
}
// Update the run_cron_schedule function
async fn run_cron_schedule(
pipe: &str,
base_url: &str,
path: &str,
secret: &str,
schedule: &str,
screenpipe_dir: &Path,
mut shutdown: watch::Receiver<bool>,
) {
let schedule = match cron::Schedule::from_str(schedule) {
Ok(s) => s,
Err(e) => {
let err_msg = format!("invalid cron schedule: {}", e);
error!("{}", err_msg);
sentry::capture_error(&anyhow::anyhow!(err_msg).source().unwrap());
return;
}
};
let client = Client::new();
let mut headers = HeaderMap::new();
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {}", secret)).unwrap(),
);
// Get pipe directory for state persistence
let pipe_dir = screenpipe_dir.join("pipes").join(pipe);
loop {
// Get last execution time at the start of each loop
let last_run = match get_last_cron_execution(&pipe_dir, path).await {
Ok(time) => time,
Err(e) => {
error!("failed to get last cron execution: {}", e);
None
}
};
let now = chrono::Local::now();
let next = if let Some(last) = last_run {
// Get next occurrence after the last execution
let last_chrono = chrono::DateTime::<chrono::Local>::from(last);
schedule.after(&last_chrono).next()
} else {
schedule.after(&now).next()
};
let next = match next {
Some(next) => next,
None => {
error!("no next execution time found for cron schedule");
break;
}
};
if next <= now {
info!("next execution time is before or equal to the current time, recalculating...");
continue;
}
let duration = match (next - now).to_std() {
Ok(duration) => duration,
Err(e) => {
error!("invalid duration: {}", e);
continue; // falling back to minute is messing with cron schedule
}
};
info!(
"next cron execution for pipe {} at path {} in {} seconds",
pipe,
path,
duration.as_secs()
);
// Wait for either the next execution time or shutdown signal
tokio::select! {
_ = tokio::time::sleep(duration) => {
info!("executing cron job for pipe {} at path {}", pipe, path);
match client
.get(format!("{}{}", base_url, path))
.headers(headers.clone())
.send()
.await
{
Ok(res) => {
if !res.status().is_success() {
let err_msg = format!("cron job failed with status: {}", res.status());
error!("{}", err_msg);
if let Ok(text) = res.text().await {
error!("error response: {}", text);
sentry::capture_message(
&format!("{}: {}", err_msg, text),
sentry::Level::Error
);
} else {
sentry::capture_message(&err_msg, sentry::Level::Error);
}
}
}
Err(e) => {
let err_msg = format!("failed to execute cron job: {}", e);
error!("{}", err_msg);
sentry::capture_error(&e);
}
}
}
Ok(()) = shutdown.changed() => {
if *shutdown.borrow() {
info!("shutting down cron job for pipe at path: {}", path);
break;
}
}
}
}
}
pub async fn cleanup_pipe_crons(pipe: &str) -> Result<()> {
if let Some(handles) = CRON_HANDLES.lock().await.remove(pipe) {
info!("cleaning up {} cron jobs for pipe {}", handles.len(), pipe);
for handle in handles {
handle.stop();
}
info!("stopped all cron jobs for pipe: {}", pipe);
}
Ok(())
}
async fn try_build_nextjs(pipe_dir: &Path, bun_path: &Path) -> Result<BuildStatus> {
info!(
"checking if i need to build the next.js project in: {:?}",
pipe_dir
);
// Check if build already exists and is valid
let build_dir = pipe_dir.join(".next");
if build_dir.exists() {
let build_manifest = build_dir.join("build-manifest.json");
if build_manifest.exists() {
info!("found existing next.js build, skipping rebuild");
return Ok(BuildStatus::Success);
}
// Invalid/incomplete build directory - remove it
debug!("removing invalid next.js build directory");
tokio::fs::remove_dir_all(&build_dir).await?;
}
// Update build status to InProgress in pipe.json
let pipe_json_path = pipe_dir.join("pipe.json");
if pipe_json_path.exists() {
let pipe_json = tokio::fs::read_to_string(&pipe_json_path).await?;
let mut pipe_config: Value = serde_json::from_str(&pipe_json)?;
pipe_config["buildStatus"] = json!("in_progress");
let updated_pipe_json = serde_json::to_string_pretty(&pipe_config)?;
tokio::fs::write(&pipe_json_path, updated_pipe_json).await?;
}
info!("running next.js build");
let build_output = Command::new(bun_path)
.arg("run")
.arg("--bun")
.arg("build")
.current_dir(pipe_dir)
.output()
.await?;
if build_output.status.success() {
info!("next.js build completed successfully");
Ok(BuildStatus::Success)
} else {
let error_message = String::from_utf8_lossy(&build_output.stderr).to_string();
error!("next.js build failed: {}", error_message);
Ok(BuildStatus::Failed(error_message))
}
}
// Add this helper function to check if a port is available
fn is_port_available(port: u16) -> bool {
use std::net::TcpListener;
TcpListener::bind(("127.0.0.1", port)).is_ok()
}
pub async fn download_pipe_private(
pipe_name: &str,
source: &str,
screenpipe_dir: PathBuf,
) -> anyhow::Result<PathBuf> {
info!("processing private pipe from zip: {}", source);
let dest_dir = screenpipe_dir.join("pipes").join(&pipe_name);
debug!("destination directory: {:?}", dest_dir);
// Create temp directory for download
let temp_dir = dest_dir.with_extension("_temp");
tokio::fs::create_dir_all(&temp_dir).await?;
// Create initial pipe.json in temp directory with download status
let temp_pipe_json = temp_dir.join("pipe.json");
let initial_config = json!({
"buildStatus": {
"status": "in_progress",
"step": "downloading",
}
});
tokio::fs::write(
&temp_pipe_json,
serde_json::to_string_pretty(&initial_config)?,
)
.await?;
// Helper function to update build status
async fn update_build_status(
path: &Path,
status: &str,
step: &str,
error: Option<&str>,
) -> anyhow::Result<()> {
if path.exists() {
let pipe_json = tokio::fs::read_to_string(path).await?;
let mut pipe_config: Value = serde_json::from_str(&pipe_json)?;
pipe_config["buildStatus"] = match error {
Some(err) => json!({
"status": status,
"step": step,
"error": err
}),
None => json!({
"status": status,
"step": step
}),
};
let updated_pipe_json = serde_json::to_string_pretty(&pipe_config)?;
tokio::fs::write(path, updated_pipe_json).await?;
}
Ok(())
}
// Cleanup function
async fn cleanup_temp(temp_dir: &Path, temp_zip: &Path) -> anyhow::Result<()> {
if temp_zip.exists() {
if let Err(e) = tokio::fs::remove_file(temp_zip).await {
warn!("Failed to remove temporary zip file: {}", e);
}
}
if temp_dir.exists() {
if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await {
warn!("Failed to remove temporary directory: {}", e);
}
}
Ok(())
}
// Download zip file
debug!("downloading zip file from: {}", source);
let client = Client::new();
let response = match client.get(source).send().await {
Ok(res) => res,
Err(e) => {
let err_msg = format!("Failed to download zip: {}", e);
error!("{}", err_msg);
cleanup_temp(&temp_dir, &temp_dir.join("temp.zip")).await?;
return Err(anyhow::anyhow!(err_msg));
}
};
let zip_content = match response.bytes().await {
Ok(content) => content,
Err(e) => {
let err_msg = format!("Failed to read zip content: {}", e);
error!("{}", err_msg);
cleanup_temp(&temp_dir, &temp_dir.join("temp.zip")).await?;
return Err(anyhow::anyhow!(err_msg));
}
};
// Create temporary zip file
let temp_zip = temp_dir.join("temp.zip");
if let Err(e) = tokio::fs::write(&temp_zip, &zip_content).await {
let err_msg = format!("Failed to write temporary zip file: {}", e);
error!("{}", err_msg);
cleanup_temp(&temp_dir, &temp_zip).await?;
return Err(anyhow::anyhow!(err_msg));
}
// Update status before extraction
let temp_pipe_json = temp_dir.join("pipe.json");
update_build_status(&temp_pipe_json, "in_progress", "extracting", None).await?;
// Unzip the file
debug!("unzipping file to temp directory");
let temp_zip_path = temp_zip.clone();
let temp_dir_path = temp_dir.clone();
let extraction_result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let zip_file = std::fs::File::open(&temp_zip_path)?;
let mut archive = zip::ZipArchive::new(zip_file)?;
let total_files = archive.len();
for i in 0..total_files {
let mut file = archive.by_index(i)?;
let name = file.name().to_string();
// Check for zip slip vulnerability
if name.contains("..") {
return Err(anyhow::anyhow!(
"Invalid zip file: potential path traversal attack"
));
}
let outpath = temp_dir_path.join(&name);
if name.ends_with('/') {
std::fs::create_dir_all(&outpath)?;
} else {
if let Some(p) = outpath.parent() {
if !p.exists() {
std::fs::create_dir_all(p)?;
}
}
let mut outfile = std::fs::File::create(&outpath)?;
std::io::copy(&mut file, &mut outfile)?;
}
}
Ok(())
})
.await;
// Handle extraction errors
if let Err(e) = extraction_result {
let err_msg = format!("Extraction task failed: {}", e);
error!("{}", err_msg);
update_build_status(&temp_pipe_json, "error", "extracting", Some(&err_msg)).await?;
cleanup_temp(&temp_dir, &temp_zip).await?;
return Err(anyhow::anyhow!(err_msg));
}
if let Err(e) = extraction_result.unwrap() {
let err_msg = format!("Failed to extract zip: {}", e);
error!("{}", err_msg);
update_build_status(&temp_pipe_json, "error", "extracting", Some(&err_msg)).await?;
cleanup_temp(&temp_dir, &temp_zip).await?;
return Err(anyhow::anyhow!(err_msg));
}
// Remove the temporary zip file
if let Err(e) = tokio::fs::remove_file(&temp_zip).await {
warn!("Failed to remove temporary zip file: {}", e);
}
// Move temp dir to final location
if dest_dir.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&dest_dir).await {
let err_msg = format!("Failed to remove existing destination directory: {}", e);
error!("{}", err_msg);
cleanup_temp(&temp_dir, &temp_zip).await?;
return Err(anyhow::anyhow!(err_msg));
}
}
if let Err(e) = tokio::fs::rename(&temp_dir, &dest_dir).await {
let err_msg = format!("Failed to move files to destination: {}", e);
error!("{}", err_msg);
cleanup_temp(&temp_dir, &temp_zip).await?;
return Err(anyhow::anyhow!(err_msg));
}
// Update status for installation
let final_pipe_json = dest_dir.join("pipe.json");
update_build_status(&final_pipe_json, "in_progress", "installing", None).await?;
// Check if it's a Next.js project and handle installation
let package_json_path = dest_dir.join("package.json");
let is_nextjs = if package_json_path.exists() {
let package_json = tokio::fs::read_to_string(&package_json_path).await?;
let package_data: Value = serde_json::from_str(&package_json)?;
package_data["dependencies"].get("next").is_some()
} else {
false
};
// Find bun path
let bun_path = find_bun_path().ok_or_else(|| anyhow::anyhow!("bun not found"))?;
// Run bun install with error handling
info!("installing dependencies");
if let Err(e) = retry_install(&bun_path, &dest_dir, 3).await {
let err_msg = format!("Failed to install dependencies: {}", e);
error!("{}", err_msg);
update_build_status(&final_pipe_json, "error", "installing", Some(&err_msg)).await?;
return Err(anyhow::anyhow!(err_msg));
}
if is_nextjs {
info!("detected next.js project, updating configuration");
if final_pipe_json.exists() {
let pipe_json = tokio::fs::read_to_string(&final_pipe_json).await?;
let mut pipe_config: Value = serde_json::from_str(&pipe_json)?;
pipe_config["is_nextjs"] = json!(true);
pipe_config["buildStatus"] = json!({
"status": "success",
"step": "completed"
});
let updated_pipe_json = serde_json::to_string_pretty(&pipe_config)?;
tokio::fs::write(&final_pipe_json, updated_pipe_json).await?;
}
} else {
// Update final success status for non-Next.js projects
update_build_status(&final_pipe_json, "success", "completed", None).await?;
}
info!("pipe downloaded and set up successfully at: {:?}", dest_dir);
Ok(dest_dir)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment