Last active
May 21, 2019 07:31
-
-
Save dginev/772929dd19c88856035ac0a6c6ff8a7c to your computer and use it in GitHub Desktop.
UPDATE with LIMIT (in postgresql) with diesel.rs ?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use diesel::result::Error; | |
use diesel::{delete, insert_into, update}; | |
use diesel::pg::PgConnection; | |
use diesel::prelude::*; | |
use schema::tasks; | |
use schema::tasks::dsl::{serviceid, status}; | |
//... | |
let q = update(tasks::table | |
.filter(serviceid.eq(service.id)) | |
.filter(status.eq(TaskStatus::TODO.raw())) | |
// .limit(queue_size) | |
).set(status.eq(mark as i32)); | |
print_sql!(q); | |
q.get_results(connection) | |
// Generated SQL is as expected: | |
// UPDATE "tasks" SET "status" = $1 WHERE "tasks"."serviceid" = $2 AND "tasks"."status" = $3 -- binds: [18906, 1, 0] | |
// Adding back the .limit raises a rather cryptic error message, screenshot below |
Author
dginev
commented
Oct 30, 2017
The goal of the query is to mark a fixed number of DB rows as "used", for a simple job queue implementation.
The original query, which I am trying to adapt, looked like:
UPDATE tasks t SET status=$1 FROM (
SELECT * FROM tasks WHERE serviceid=$2 and status=$3
LIMIT $4
FOR UPDATE
) subt
WHERE t.taskid=subt.taskid
RETURNING t.taskid,t.entry,t.serviceid,t.corpusid,t.status;
I'm only striving to get things functionally identical in diesel, not reproduce the query verbatim.
After my fresh morning coffee, it looks like i should really be experimenting with diesel's .for_update()
here
[SOLVED] What eventually worked for me was using two separate select and update queries. I am uncertain if this is the most idiomatic Diesel, but this snippet is passing basic tests:
let mut marked_tasks: Vec<Task> = Vec::new();
try!(connection.transaction::<(), Error, _>(|| {
let tasks_for_update = try!(
tasks::table
.for_update()
.filter(serviceid.eq(service.id))
.filter(status.eq(TaskStatus::TODO.raw()))
.limit(queue_size as i64)
.load(connection)
);
marked_tasks = tasks_for_update
.into_iter()
.map(|task| {
Task {
status: i32::from(mark),
..task
}
})
.map(|task| task.save_changes(connection))
.filter_map(|saved| saved.ok())
.collect();
Ok(())
}));
Ok(marked_tasks)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment