Skip to content

Instantly share code, notes, and snippets.

@dginev
Last active May 21, 2019 07:31
Show Gist options
  • Save dginev/772929dd19c88856035ac0a6c6ff8a7c to your computer and use it in GitHub Desktop.
Save dginev/772929dd19c88856035ac0a6c6ff8a7c to your computer and use it in GitHub Desktop.
UPDATE with LIMIT (in postgresql) with diesel.rs ?
use diesel::result::Error;
use diesel::{delete, insert_into, update};
use diesel::pg::PgConnection;
use diesel::prelude::*;
use schema::tasks;
use schema::tasks::dsl::{serviceid, status};
//...
let q = update(tasks::table
.filter(serviceid.eq(service.id))
.filter(status.eq(TaskStatus::TODO.raw()))
// .limit(queue_size)
).set(status.eq(mark as i32));
print_sql!(q);
q.get_results(connection)
// Generated SQL is as expected:
// UPDATE "tasks" SET "status" = $1 WHERE "tasks"."serviceid" = $2 AND "tasks"."status" = $3 -- binds: [18906, 1, 0]
// Adding back the .limit raises a rather cryptic error message, screenshot below
@dginev
Copy link
Author

dginev commented Oct 30, 2017

error

@dginev
Copy link
Author

dginev commented Oct 30, 2017

The goal of the query is to mark a fixed number of DB rows as "used", for a simple job queue implementation.

@dginev
Copy link
Author

dginev commented Oct 30, 2017

The original query, which I am trying to adapt, looked like:

UPDATE tasks t SET status=$1 FROM (
  SELECT * FROM tasks WHERE serviceid=$2 and status=$3
  LIMIT $4
  FOR UPDATE
) subt
WHERE t.taskid=subt.taskid
RETURNING t.taskid,t.entry,t.serviceid,t.corpusid,t.status;

I'm only striving to get things functionally identical in diesel, not reproduce the query verbatim.

@dginev
Copy link
Author

dginev commented Oct 30, 2017

After my fresh morning coffee, it looks like i should really be experimenting with diesel's .for_update() here

@dginev
Copy link
Author

dginev commented Nov 13, 2017

[SOLVED] What eventually worked for me was using two separate select and update queries. I am uncertain if this is the most idiomatic Diesel, but this snippet is passing basic tests:

  let mut marked_tasks: Vec<Task> = Vec::new();
  try!(connection.transaction::<(), Error, _>(|| {
    let tasks_for_update = try!(
      tasks::table
        .for_update()
        .filter(serviceid.eq(service.id))
        .filter(status.eq(TaskStatus::TODO.raw()))
        .limit(queue_size as i64)
        .load(connection)
    );
    marked_tasks = tasks_for_update
      .into_iter()
      .map(|task| {
        Task {
          status: i32::from(mark),
          ..task
        }
      })
      .map(|task| task.save_changes(connection))
      .filter_map(|saved| saved.ok())
      .collect();
    Ok(())
  }));
  Ok(marked_tasks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment