Skip to content

Instantly share code, notes, and snippets.

async fn main() -> anyhow::Result<()> {
// init the sync channel
let (sender, reciever) = std::sync::mpsc::channel::<EmbeddingEntry>();
// start the write task loop and get a handle to it
let db_writer_task = init_writer_task(reciever,db_uri.as_str());
// list the files in the directory to be embedded
let files_dir = fs::read_dir(cli_args.input_directory)?;
files_dir
.into_iter()
fn process_text_file(sender: Sender<EmbeddingEntry>, filename: &str) -> anyhow::Result<()> {
let bert_model = models::bert::get_model_reference(EMB_MODEL_ID, EMB_MODEL_REV)?;
let text_chunks: Vec<&str> = read_file_in_chunks(filename, 256)?;
let file_vector = embed_multiple_sentences(&text_chunks, false, &bert_model)?;
sender.send(EmbeddingEntry {
filename: filename.to_string(),
embedding: file_vector[0].clone(),
})?;
Ok(())
}
const EMB_MODEL_ID: &str = "sentence-transformers/all-MiniLM-L6-v2";
const EMB_MODEL_REV: &str = "refs/pr/21";
thread_local! {
static BERT_MODEL: Rc<BertModelWrapper> = {
info!("Loading a model on thread: {:?}", std::thread::current().id());
let model = BertModelWrapper::new(candle::Device::Cpu, EMB_MODEL_ID, EMB_MODEL_REV);
match model {
Ok(model) => Rc::new(model),
Err(e) => {
Arc::new(Schema::new(vec![
Field::new("filename", DataType::Utf8, false),
Field::new(
"vector",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 384),
true,
),
]))
let lance_table = connection.open_table("table_name").execute().await?;
let batches = vec![Ok(RecordBatch::try_new(
schema.clone(),
vec![Arc::new(key_array), Arc::new(vectors_array)],
)?)
.map_err(|e| ArrowError::from_external_error(e.into()))];
let batch_iterator = RecordBatchIterator::new(batches, schema);
let boxed_batches = Box::new(batch_iterator);
lance_table.add(boxed_batches).execute().await?;
async fn init_db_writer_task(
reciever: Receiver<EmbeddingEntry>,
db_uri: &str,
table_name: &str,
buffer_size: usize,
) -> anyhow::Result<JoinHandle<()>> {
let db = storage::VecDB::connect(db_uri, table_name).await?;
let task_handle = tokio::spawn(async move {
let mut embeddings_buffer = Vec::new();
@a-agmon
a-agmon / df_lambda.rs
Last active March 17, 2025 21:37
Rusty AWS Lambda with Data Fusion and Iceberg
use std::{sync::Arc, time::Instant};
use datafusion::{
arrow::{array::RecordBatch, json::ArrayWriter},
prelude::SessionContext,
};
use iceberg::{io::FileIO, table::StaticTable, TableIdent};
use iceberg_datafusion::IcebergTableProvider;
use lambda_runtime::{
run, service_fn,
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.