Skip to content

Instantly share code, notes, and snippets.

@wanghongfei
Created June 19, 2015 06:44
Show Gist options
  • Save wanghongfei/99f049d01f39b102c26c to your computer and use it in GitHub Desktop.
Save wanghongfei/99f049d01f39b102c26c to your computer and use it in GitHub Desktop.
tbschedule DEMO
package com.anzhi.schedule.task;
import com.anzhi.schedule.model.PassportModel;
import com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import com.taobao.pamirs.schedule.TaskItemDefine;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.List;
/**
* Created by wanghongfei on 15-6-2.
*/
public class TestTaskBean implements IScheduleTaskDealSingle<PassportModel> {
private static final int ROW_AMOUNT = 21150926;
private JdbcTemplate jdbcTemplate;
//private final String SQL_SELECT = "SELECT * FROM u_passport_manage WHERE pid NOT IN (SELECT pid FROM u_passport_manage_copy ) AND pid >= ? AND pid <= ? LIMIT ?";
private final String SQL_SELECT = "SELECT * FROM u_passport_manage FORCE INDEX(PRI) WHERE pid NOT IN ( SELECT pid FROM u_passport_manage_copy ) AND pid >= ? AND pid <= ? LIMIT ?";
private final String SQL_SELECT_SINGLE_DATA = "SELECT * FROM u_passport_manage WHERE pid = ?";
private final String SQL_INSERT = "INSERT INTO u_passport_manage_copy (" +
"pid, uid, login_name, name_change_times, password, salt, email, email_valid_status, telephone, telephone_valid_status, encryption, regtime, regip, app_key, service_id, status, create_time, update_time, memo" +
") VALUES (" +
"?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
/**
* 选择任务. 从DB中读取数据, 将取出的数据返回
* @param taskParameter
* @param ownSign
* @param taskItemNum
* @param taskItemList
* @param eachFetchDataNum
* @return
* @throws Exception
*/
public List<PassportModel> selectTasks(String taskParameter, String ownSign,
int taskItemNum, List<TaskItemDefine> taskItemList,
int eachFetchDataNum) throws Exception {
try {
long minPid = 0;
long maxPid = 0;
// 每个线程只会得到一个任务项
if (taskItemList.size() > 1) {
throw new IllegalStateException("分片错误");
}
if (taskItemList.size() == 0) {
return null;
}
System.out.println("每次取:" + eachFetchDataNum);
Long taskId = Long.parseLong(taskItemList.get(0).getTaskItemId());
System.out.println("任务项:" + taskId);
long blockAmount = ROW_AMOUNT / 10;
minPid = (taskId - 1) * blockAmount;
if (10 == taskId.longValue()) {
maxPid = Long.MAX_VALUE;
} else {
maxPid = minPid + blockAmount - 1;
}
// 从DB中读取minPid - maxPid的数据
System.out.println("执行select... ...");
List<PassportModel> dataList = selectData(minPid, maxPid, eachFetchDataNum);
System.out.println("选择了 " + dataList.size() + " 条数据");
return dataList;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 向目标表中插入数据
* @param model
* @param ownSign
* @return
* @throws Exception
*/
public boolean execute(PassportModel model, String ownSign)
throws Exception {
try {
insertData(model);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public Comparator<PassportModel> getComparator() {
return null;
}
private void processData(PassportModel model) {
}
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
private void insertData(PassportModel model) {
System.out.println("插入数据, pid = " + model.getPid());
jdbcTemplate.update(SQL_INSERT, model.toParamArray());
}
private List<PassportModel> selectData(long minPid, long maxPid, int limit) {
return jdbcTemplate.query(SQL_SELECT, new Object[]{minPid, maxPid, limit}, new RowMapper<PassportModel>() {
public PassportModel mapRow(ResultSet rs, int i) throws SQLException {
PassportModel model = new PassportModel();
model.setPid(rs.getLong("pid"));
model.setUid(rs.getString("uid"));
model.setLoginName(rs.getString("login_name"));
model.setNameChangeTimes(rs.getInt("name_change_times"));
model.setPassowrd(rs.getString("password"));
model.setSalt(rs.getString("salt"));
model.setEmail(rs.getString("email"));
model.setEmailValidStatus(rs.getInt("email_valid_status"));
model.setTelephone(rs.getString("telephone"));
model.setTelephoneValidStatus(rs.getInt("telephone_valid_status"));
model.setEncryption(rs.getInt("encryption"));
model.setRegTime(rs.getInt("regtime"));
model.setRegip(rs.getString("regip"));
model.setAppKey(rs.getString("app_key"));
model.setServiceId(rs.getString("service_id"));
model.setStatus(rs.getInt("status"));
model.setCreateTime(rs.getInt("create_time"));
model.setUpdateTime(rs.getInt("update_time"));
model.setMemo(rs.getString("memo"));
return model;
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment