Created
June 19, 2015 06:44
-
-
Save wanghongfei/99f049d01f39b102c26c to your computer and use it in GitHub Desktop.
tbschedule DEMO
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.anzhi.schedule.task; | |
import com.anzhi.schedule.model.PassportModel; | |
import com.taobao.pamirs.schedule.IScheduleTaskDealSingle; | |
import com.taobao.pamirs.schedule.TaskItemDefine; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.jdbc.core.RowMapper; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.Comparator; | |
import java.util.List; | |
/** | |
* Created by wanghongfei on 15-6-2. | |
*/ | |
public class TestTaskBean implements IScheduleTaskDealSingle<PassportModel> { | |
private static final int ROW_AMOUNT = 21150926; | |
private JdbcTemplate jdbcTemplate; | |
//private final String SQL_SELECT = "SELECT * FROM u_passport_manage WHERE pid NOT IN (SELECT pid FROM u_passport_manage_copy ) AND pid >= ? AND pid <= ? LIMIT ?"; | |
private final String SQL_SELECT = "SELECT * FROM u_passport_manage FORCE INDEX(PRI) WHERE pid NOT IN ( SELECT pid FROM u_passport_manage_copy ) AND pid >= ? AND pid <= ? LIMIT ?"; | |
private final String SQL_SELECT_SINGLE_DATA = "SELECT * FROM u_passport_manage WHERE pid = ?"; | |
private final String SQL_INSERT = "INSERT INTO u_passport_manage_copy (" + | |
"pid, uid, login_name, name_change_times, password, salt, email, email_valid_status, telephone, telephone_valid_status, encryption, regtime, regip, app_key, service_id, status, create_time, update_time, memo" + | |
") VALUES (" + | |
"?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; | |
/** | |
* 选择任务. 从DB中读取数据, 将取出的数据返回 | |
* @param taskParameter | |
* @param ownSign | |
* @param taskItemNum | |
* @param taskItemList | |
* @param eachFetchDataNum | |
* @return | |
* @throws Exception | |
*/ | |
public List<PassportModel> selectTasks(String taskParameter, String ownSign, | |
int taskItemNum, List<TaskItemDefine> taskItemList, | |
int eachFetchDataNum) throws Exception { | |
try { | |
long minPid = 0; | |
long maxPid = 0; | |
// 每个线程只会得到一个任务项 | |
if (taskItemList.size() > 1) { | |
throw new IllegalStateException("分片错误"); | |
} | |
if (taskItemList.size() == 0) { | |
return null; | |
} | |
System.out.println("每次取:" + eachFetchDataNum); | |
Long taskId = Long.parseLong(taskItemList.get(0).getTaskItemId()); | |
System.out.println("任务项:" + taskId); | |
long blockAmount = ROW_AMOUNT / 10; | |
minPid = (taskId - 1) * blockAmount; | |
if (10 == taskId.longValue()) { | |
maxPid = Long.MAX_VALUE; | |
} else { | |
maxPid = minPid + blockAmount - 1; | |
} | |
// 从DB中读取minPid - maxPid的数据 | |
System.out.println("执行select... ..."); | |
List<PassportModel> dataList = selectData(minPid, maxPid, eachFetchDataNum); | |
System.out.println("选择了 " + dataList.size() + " 条数据"); | |
return dataList; | |
} catch (Exception e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
/** | |
* 向目标表中插入数据 | |
* @param model | |
* @param ownSign | |
* @return | |
* @throws Exception | |
*/ | |
public boolean execute(PassportModel model, String ownSign) | |
throws Exception { | |
try { | |
insertData(model); | |
return true; | |
} catch (Exception e) { | |
e.printStackTrace(); | |
return false; | |
} | |
} | |
public Comparator<PassportModel> getComparator() { | |
return null; | |
} | |
private void processData(PassportModel model) { | |
} | |
public JdbcTemplate getJdbcTemplate() { | |
return jdbcTemplate; | |
} | |
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { | |
this.jdbcTemplate = jdbcTemplate; | |
} | |
private void insertData(PassportModel model) { | |
System.out.println("插入数据, pid = " + model.getPid()); | |
jdbcTemplate.update(SQL_INSERT, model.toParamArray()); | |
} | |
private List<PassportModel> selectData(long minPid, long maxPid, int limit) { | |
return jdbcTemplate.query(SQL_SELECT, new Object[]{minPid, maxPid, limit}, new RowMapper<PassportModel>() { | |
public PassportModel mapRow(ResultSet rs, int i) throws SQLException { | |
PassportModel model = new PassportModel(); | |
model.setPid(rs.getLong("pid")); | |
model.setUid(rs.getString("uid")); | |
model.setLoginName(rs.getString("login_name")); | |
model.setNameChangeTimes(rs.getInt("name_change_times")); | |
model.setPassowrd(rs.getString("password")); | |
model.setSalt(rs.getString("salt")); | |
model.setEmail(rs.getString("email")); | |
model.setEmailValidStatus(rs.getInt("email_valid_status")); | |
model.setTelephone(rs.getString("telephone")); | |
model.setTelephoneValidStatus(rs.getInt("telephone_valid_status")); | |
model.setEncryption(rs.getInt("encryption")); | |
model.setRegTime(rs.getInt("regtime")); | |
model.setRegip(rs.getString("regip")); | |
model.setAppKey(rs.getString("app_key")); | |
model.setServiceId(rs.getString("service_id")); | |
model.setStatus(rs.getInt("status")); | |
model.setCreateTime(rs.getInt("create_time")); | |
model.setUpdateTime(rs.getInt("update_time")); | |
model.setMemo(rs.getString("memo")); | |
return model; | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment