Created
August 14, 2017 00:17
-
-
Save nilebox/fbf90b871113506f1b865c54d417d1c0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package queue; | |
import com.google.common.collect.ImmutableMap; | |
import org.joda.time.Duration; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.jdbc.core.RowCallbackHandler; | |
import org.springframework.jdbc.core.RowMapper; | |
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; | |
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcDaoSupport; | |
import javax.annotation.Nonnull; | |
import javax.annotation.Nullable; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Optional; | |
public class PostgresQueueDao extends NamedParameterJdbcDaoSupport implements QueueDao { | |
private static final Logger log = LoggerFactory.getLogger(PostgresQueueDao.class); | |
public PostgresQueueDao(JdbcTemplate jdbcTemplate) { | |
setJdbcTemplate(jdbcTemplate); | |
} | |
private static final String INSERT_TASK_SQL; | |
private static final String BATCH_INSERT_TASKS_SQL; | |
static { | |
String insertTaskSQL = "INSERT INTO tasks(queue_name, task, process_time, actor)" + | |
" VALUES (:queue_name, :task, now() + :process_time_delay_sec * INTERVAL '1 SECOND', :actor)"; | |
INSERT_TASK_SQL = insertTaskSQL + " RETURNING id"; | |
BATCH_INSERT_TASKS_SQL = insertTaskSQL; | |
} | |
@Override | |
public TaskId enqueue(@Nonnull Queue queue, Optional<String> task, Optional<String> actor, Duration firstExecutionDelay) { | |
return enqueue(queue.getQueueName(), task, actor, firstExecutionDelay); | |
} | |
@Override | |
public int batchEnqueue(@Nonnull Queue queue, @Nonnull List<String> tasks, Optional<String> actor, @Nonnull Duration firstExecutionDelay) { | |
log.info("batchEnqueue(): queue={}, tasks count={}, actor={}, firstExecutionDelay={}", queue.getQueueName(), tasks.size(), actor, firstExecutionDelay); | |
final MapSqlParameterSource[] params = tasks.stream() | |
.map(task -> convertTaskToSqlParams(queue.getQueueName(), task, actor, firstExecutionDelay)) | |
.toArray(size -> new MapSqlParameterSource[size]); | |
int result = getNamedParameterJdbcTemplate().batchUpdate(BATCH_INSERT_TASKS_SQL, params).length; | |
log.info("batchEnqueue(): queue={}, enqueued count={} of {}", queue.getQueueName(), result, tasks.size()); | |
return result; | |
} | |
private MapSqlParameterSource convertTaskToSqlParams(@Nonnull String queueName, @Nullable String task, Optional<String> actor, Duration firstExecutionDelay) { | |
return new MapSqlParameterSource("queue_name", queueName) | |
.addValue("task", task) | |
.addValue("process_time_delay_sec", firstExecutionDelay.toStandardSeconds().getSeconds()) | |
.addValue("actor", actor.orElse(null)); | |
} | |
public TaskId enqueue(@Nonnull String queueName, String task, Duration firstExecutionDelay) { | |
return enqueue(queueName, Optional.of(task), Optional.empty(), firstExecutionDelay); | |
} | |
public TaskId enqueue(@Nonnull String queueName, Optional<String> task, Optional<String> actor, Duration firstExecutionDelay) { | |
log.info("enqueue(): queue=" + queueName + ", task=***, actor=" + actor | |
+ ", firstExecutionDelay=" + firstExecutionDelay); | |
TaskId taskId = new TaskId(getNamedParameterJdbcTemplate().queryForObject(INSERT_TASK_SQL, | |
convertTaskToSqlParams(queueName, task.orElse(null), actor, firstExecutionDelay), Long.class)); | |
log.info("enqueue(): queue=" + queueName + ", id=" + taskId.asLong() + ", task=***, actor=" + actor | |
+ ", firstExecutionDelay=" + firstExecutionDelay); | |
return taskId; | |
} | |
private static final String NEXT_TASK_SQL = "SELECT out_id as id, out_task as task, out_attempt as attempt, " + | |
"out_actor as actor, out_create_time as create_time " + | |
"FROM nextTask(:queue_name, :is_delay_linear, :linear_delay_in_seconds) WHERE out_id IS NOT NULL;"; | |
public Optional<TaskRecord> nextTask(@Nonnull Queue queue, @Nonnull Optional<Duration> linearDelay) { | |
//log.info("nextTask(): queue=" + queue.getQueueName() + ", linearDelay=" + linearDelay); | |
return getNamedParameterJdbcTemplate().query(NEXT_TASK_SQL, | |
new MapSqlParameterSource("queue_name", queue.getQueueName()) | |
.addValue("is_delay_linear", linearDelay.isPresent()) | |
.addValue("linear_delay_in_seconds", linearDelay.orElse(Duration.ZERO).toStandardSeconds().getSeconds()), | |
new RowMapper<TaskRecord>() { | |
@Override | |
public TaskRecord mapRow(ResultSet rs, int rowNum) throws SQLException { | |
return new TaskRecord(rs.getLong("id"), rs.getString("task"), rs.getLong("attempt"), | |
rs.getString("actor"), rs.getTimestamp("create_time")); | |
} | |
}).stream().findFirst(); | |
} | |
private static final String DELETE_ALL_PENDING_TASKS_SQL = "DELETE FROM tasks WHERE process_time > now() AND attempt > 1"; | |
public void deleteAllPendingTasks() { | |
log.info("deleteAllPendingTasks"); | |
getJdbcTemplate().update(DELETE_ALL_PENDING_TASKS_SQL); | |
} | |
private static final String DELETE_TASK_SQL = "DELETE FROM tasks WHERE queue_name = :queue_name AND id = :id"; | |
public void deleteTask(@Nonnull Queue queue, @Nonnull TaskId taskId) { | |
log.info("deleteTask(): queue=" + queue.getQueueName() + ", taskId=" + taskId); | |
getNamedParameterJdbcTemplate().update(DELETE_TASK_SQL, | |
ImmutableMap.of("queue_name", queue.getQueueName(), "id", taskId.asLong())); | |
} | |
private static final String DELETE_TASKS_BY_ACTOR_SQL = "DELETE FROM tasks WHERE queue_name = :queue_name AND actor = :actor"; | |
public void deleteTasksByActor(@Nonnull Queue queue, @Nonnull String actor) { | |
log.info("deleteTasksByActor(): queue=" + queue.getQueueName() + ", actor=" + actor); | |
getNamedParameterJdbcTemplate().update(DELETE_TASKS_BY_ACTOR_SQL, | |
ImmutableMap.of("queue_name", queue.getQueueName(), "actor", actor)); | |
} | |
private static final String UPDATE_TASK_SQL = "UPDATE tasks\n" + | |
"SET\n" + | |
" process_time = now() + :next_execution_delay_in_seconds * INTERVAL '1 SECOND',\n" + | |
" task = :task,\n" + | |
" attempt = 0\n" + | |
"WHERE id = :id"; | |
public void reenqueue(@Nonnull TaskRecord task, @Nonnull Duration nextExecutionDelay) { | |
log.info("reenqueue(): task=***, nextExecutionDelay=" + nextExecutionDelay); | |
getNamedParameterJdbcTemplate().update(UPDATE_TASK_SQL, ImmutableMap.of("id", task.getId().asLong(), | |
"task", task.getTask(), "next_execution_delay_in_seconds", nextExecutionDelay.toStandardSeconds().getSeconds())); | |
} | |
private static final String IS_TASK_EXISTS_SQL = "SELECT count(*) FROM tasks" + | |
" WHERE queue_name = :queue_name AND actor = :actor"; | |
public boolean isTaskExist(@Nonnull Queue queue, @Nonnull String actor) { | |
log.info("hasTaskForGivenActor(): queue=" + queue.getQueueName() + ", actor=" + actor); | |
Integer taskCount = getNamedParameterJdbcTemplate().queryForObject(IS_TASK_EXISTS_SQL, ImmutableMap.of("queue_name", queue.getQueueName(), | |
"actor", actor), Integer.class); | |
return taskCount != null && taskCount > 0; | |
} | |
@Override | |
public void resetAllPendingTasks() { | |
getJdbcTemplate().execute("UPDATE tasks SET process_time=now(), attempt=0 WHERE attempt > 0"); | |
} | |
public Map<String, Long> getPendingTasksCount() { | |
final Map<String, Long> pendingTasks = new LinkedHashMap<>(); | |
getJdbcTemplate().query("SELECT queue_name, COUNT(1) FROM tasks WHERE (attempt > 1) OR (attempt = 1 AND process_time < now()) " + | |
"GROUP BY queue_name ORDER BY queue_name", new RowCallbackHandler() { | |
@Override | |
public void processRow(ResultSet rs) throws SQLException { | |
pendingTasks.put(rs.getString(1), rs.getLong(2)); | |
} | |
}); | |
return pendingTasks; | |
} | |
public Map<String, Long> getAllTasksCount() { | |
final Map<String, Long> tasks = new LinkedHashMap<>(); | |
getJdbcTemplate().query("SELECT queue_name, COUNT(1) FROM tasks GROUP BY queue_name ORDER BY queue_name", new RowCallbackHandler() { | |
@Override | |
public void processRow(ResultSet rs) throws SQLException { | |
tasks.put(rs.getString(1), rs.getLong(2)); | |
} | |
}); | |
return tasks; | |
} | |
public Map<String, List<QueueSummary>> getPendingTasks() { | |
final Map<String, List<QueueSummary>> result = new HashMap<>(); | |
getJdbcTemplate().query("SELECT t.id, t.queue_name, substring(t.task for 2000) as task, t.process_time, t.attempt, t.create_time, t.actor " + | |
"FROM tasks t JOIN (SELECT id, row_number() OVER (PARTITION BY queue_name ORDER BY id desc) rn \n" + | |
"FROM tasks WHERE (attempt > 1) OR (attempt = 1 AND process_time < now())) task_ids " + | |
"on task_ids.id=t.id WHERE task_ids.rn<=10", new RowCallbackHandler() { | |
@Override | |
public void processRow(ResultSet rs) throws SQLException { | |
String name = rs.getString("queue_name"); | |
if (!result.containsKey(name)) { | |
result.put(name, new ArrayList<>(10)); | |
} | |
result.get(name).add(new QueueSummary(rs.getString("id"), name, rs.getString("task"), | |
QueueUtils.formatMoscowDateTime(rs.getTimestamp("process_time")), rs.getString("attempt"), | |
QueueUtils.formatMoscowDateTime(rs.getTimestamp("create_time")), rs.getString("actor"))); | |
} | |
}); | |
return result; | |
} | |
/** | |
* Delete tasks from the queue. | |
* | |
* @param queueName Name of the queue. | |
*/ | |
@Override | |
public void clearQueue(String queueName) { | |
String sql = "delete from tasks where queue_name = ?"; | |
getJdbcTemplate().update(sql, queueName); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment