Created
December 22, 2015 02:07
-
-
Save daemin-hwang/f0ed970a2765aded8597 to your computer and use it in GitHub Desktop.
spring batch flow job example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kr.co.plaync.lineage2.app.batch.rank.job; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import javax.persistence.EntityManagerFactory; | |
import javax.sql.DataSource; | |
import kr.co.plaync.lineage2.app.batch.powerbook.listener.CustomJobExecutionListener; | |
import kr.co.plaync.lineage2.app.batch.rank.job.redis.RedisItemProcessor; | |
import kr.co.plaync.lineage2.app.batch.rank.job.redis.RedisItemReader; | |
import kr.co.plaync.lineage2.domain.ingame.RankUser; | |
import kr.co.plaync.lineage2.opensource.batch.BatchInfo; | |
import kr.co.plaync.lineage2.opensource.jdbc.config.DbconnInfo; | |
import kr.co.plaync.lineage2.opensource.jdbc.config.DbconnInfo.LiveReplication; | |
import kr.co.plaync.lineage2.opensource.redis.repository.RedisRepository; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.batch.core.Job; | |
import org.springframework.batch.core.JobExecution; | |
import org.springframework.batch.core.Step; | |
import org.springframework.batch.core.StepContribution; | |
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; | |
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; | |
import org.springframework.batch.core.job.builder.FlowBuilder; | |
import org.springframework.batch.core.job.builder.JobBuilder; | |
import org.springframework.batch.core.job.flow.Flow; | |
import org.springframework.batch.core.scope.context.ChunkContext; | |
import org.springframework.batch.core.scope.context.JobSynchronizationManager; | |
import org.springframework.batch.core.step.tasklet.Tasklet; | |
import org.springframework.batch.item.ItemReader; | |
import org.springframework.batch.item.ItemWriter; | |
import org.springframework.batch.item.database.JpaItemWriter; | |
import org.springframework.batch.repeat.RepeatStatus; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Scope; | |
import org.springframework.core.task.SimpleAsyncTaskExecutor; | |
import org.springframework.core.task.SyncTaskExecutor; | |
import org.springframework.core.task.TaskExecutor; | |
import org.springframework.data.redis.core.ZSetOperations.TypedTuple; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.jdbc.core.RowMapper; | |
import org.springframework.util.Assert; | |
import com.ncsoft.ncframework.jdbc.routing.DataSourceContextHolder; | |
@Slf4j | |
@Configuration | |
public class RedisJobConfigurationTest { | |
@Deprecated | |
private static final String DELIMITER = ":"; | |
private static final String STEP_NAME_DELIMITER = ":"; | |
private static final Integer BATCH_SIZE = 420; | |
private static final Integer REDIS_KEY_EXPIRE_TIME = 60 * 60 * 6; | |
public enum SERVERINFO { | |
LINEAGE2_REPLICATION101("CLASSIC","WORLD_101", "101"), | |
LINEAGE2_REPLICATION102("CLASSIC","WORLD_102", "102"), | |
LINEAGE2_REPLICATION104("CLASSIC","WORLD_104", "104"), | |
LINEAGE2_REPLICATION107("CLASSIC","WORLD_107", "107"); | |
/* LINEAGE2_REPLICATION7("LIVE","7"), | |
LINEAGE2_REPLICATION42("LIVE","42"), | |
LINEAGE2_REPLICATION44("LIVE","44"), | |
LINEAGE2_REPLICATION45("LIVE","45"), | |
LINEAGE2_REPLICATION46("LIVE","46"), | |
LINEAGE2_REPLICATION48("LIVE","48"), | |
LINEAGE2_REPLICATION49("LIVE","49"), | |
LINEAGE2_REPLICATION50("LIVE","50"), | |
LINEAGE2_REPLICATION51("LIVE","51");*/ | |
private String serverType; | |
private String redisKey; | |
private String serverId; | |
SERVERINFO(String serverType, String redisKey, String serverId) { | |
this.serverType = serverType; | |
this.redisKey = redisKey; | |
this.serverId = serverId; | |
} | |
public String getServerType() { | |
return serverType; | |
} | |
public String getRedisKey() { | |
return redisKey; | |
} | |
public String getServerId() { | |
return serverId; | |
} | |
} | |
@Autowired | |
private JobBuilderFactory jobBuilders; | |
@Autowired | |
private CustomJobExecutionListener customJobExecutionListener; | |
@Autowired | |
private StepBuilderFactory stepBuilders; | |
@Autowired | |
@Qualifier(DbconnInfo.LINEAGE2_GAME_INFO) | |
private DataSource dataSource; | |
@Autowired | |
@Qualifier(LiveReplication.LINEAGE2_REPLICATION101) | |
private DataSource dataSource101; | |
@Autowired | |
@Qualifier(LiveReplication.LINEAGE2_REPLICATION102) | |
private DataSource dataSource102; | |
@Autowired | |
@Qualifier(LiveReplication.LINEAGE2_REPLICATION104) | |
private DataSource dataSource104; | |
@Autowired | |
@Qualifier(LiveReplication.LINEAGE2_REPLICATION107) | |
private DataSource dataSource107; | |
/*@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION7) | |
private DataSource dataSource7; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION42) | |
private DataSource dataSource42; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION44) | |
private DataSource dataSource44; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION45) | |
private DataSource dataSource45; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION46) | |
private DataSource dataSource46; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION48) | |
private DataSource dataSource48; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION49) | |
private DataSource dataSource49; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION50) | |
private DataSource dataSource50; | |
@Autowired | |
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION51) | |
private DataSource dataSource51;*/ | |
@Autowired | |
private JdbcTemplate jdbcTemplate; | |
@Autowired | |
private RedisRepository redisRepository; | |
@Autowired | |
@Qualifier(BatchInfo.DEFAULT_ENTITY_MANAGER_FACTORY) | |
private EntityManagerFactory emf; | |
@Bean(name = "ingameRankJobTest") | |
public Job job() { | |
JobBuilder jobBuilder = jobBuilders.get("ingameRankJobTest").listener(customJobExecutionListener); | |
FlowBuilder<Flow>splitflowBuilder1 = new FlowBuilder<Flow>("splitflowBuilder1").start(beforeStep()); | |
FlowBuilder<Flow>splitflowBuilder2 = new FlowBuilder<Flow>("splitflowBuilder2").start(beforeStep()); | |
FlowBuilder<Flow>splitflowBuilder3 = new FlowBuilder<Flow>("splitflowBuilder3").start(beforeStep()); | |
for(Step step : step1List()) { | |
Flow subFlow = new FlowBuilder<Flow>(step.getName()).from(step).build(); | |
String connName = step.getName().split(STEP_NAME_DELIMITER)[1]; | |
splitflowBuilder1.split(taskExecutor(connName)).add(subFlow); | |
} | |
for(Step step : step2List()) { | |
Flow subFlow = new FlowBuilder<Flow>(step.getName()).from(step).build(); | |
String connName = step.getName().split(STEP_NAME_DELIMITER)[1]; | |
splitflowBuilder2.split(syncTaskExecutor(connName)).add(subFlow); | |
} | |
for(Step step : step3List()) { | |
Flow subFlow = new FlowBuilder<Flow>(step.getName()).from(step).build(); | |
String connName = step.getName().split(STEP_NAME_DELIMITER)[1]; | |
splitflowBuilder3.split(taskExecutor(connName)).add(subFlow); | |
} | |
return jobBuilder.start(splitflowBuilder1.build()) | |
.next(splitflowBuilder2.build()) | |
.next(splitflowBuilder3.build()) | |
.end().build(); | |
} | |
class DynamicDataSourceSyncTaskExecutor extends SyncTaskExecutor { | |
private static final long serialVersionUID = 1L; | |
private String dataSourceName; | |
public DynamicDataSourceSyncTaskExecutor(String dataSourceName) { | |
this.dataSourceName = dataSourceName; | |
} | |
@Override | |
public void execute(final Runnable task) { | |
final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution(); | |
super.execute(new Runnable() { | |
@Override | |
public void run() { | |
DataSourceContextHolder.setDataSourceName(dataSourceName); | |
JobSynchronizationManager.register(jobExecution); | |
try { | |
task.run(); | |
} finally { | |
DataSourceContextHolder.clearDataSourceName(); | |
JobSynchronizationManager.release(); | |
} | |
} | |
}); | |
} | |
} | |
private TaskExecutor syncTaskExecutor(final String dataSourceName) { | |
return new DynamicDataSourceSyncTaskExecutor(dataSourceName); | |
} | |
class DynamicDataSourceSimpleAsyncTaskExecutor extends SimpleAsyncTaskExecutor { | |
private static final long serialVersionUID = 1L; | |
private String dataSourceName; | |
public DynamicDataSourceSimpleAsyncTaskExecutor(String dataSourceName) { | |
this.dataSourceName = dataSourceName; | |
} | |
@Override | |
protected void doExecute(final Runnable task) { | |
final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution(); | |
super.doExecute(new Runnable() { | |
@Override | |
public void run() { | |
DataSourceContextHolder.setDataSourceName(dataSourceName); | |
JobSynchronizationManager.register(jobExecution); | |
try { | |
task.run(); | |
} finally { | |
DataSourceContextHolder.clearDataSourceName(); | |
JobSynchronizationManager.release(); | |
} | |
} | |
}); | |
} | |
} | |
private TaskExecutor taskExecutor(final String dataSourceName) { | |
return new DynamicDataSourceSimpleAsyncTaskExecutor(dataSourceName); | |
} | |
private String lookupDBConnName(final SERVERINFO serverInfo) { | |
String serverId = serverInfo.getServerId(); | |
String[] dataSourceNames = DbconnInfo.getDataSourceNames(LiveReplication.class); | |
for(String dataSourceName : dataSourceNames) { | |
if (dataSourceName.contains(serverId)) { | |
return dataSourceName; | |
} | |
} | |
return null; | |
} | |
@Scope("prototype") | |
@Bean | |
public Step beforeStep() { | |
Tasklet tasklet = new Tasklet() { | |
@Override | |
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
// TODO Auto-generated method stub | |
log.info("[start ingameRankJob before step.]"); | |
return RepeatStatus.FINISHED; | |
} | |
}; | |
return stepBuilders.get("beforeStep") | |
.tasklet(tasklet) | |
.build(); | |
} | |
public List<Step> step1List() { | |
List<Step>stepList = new ArrayList<Step>(); | |
for (SERVERINFO serverInfo : SERVERINFO.values()) { | |
String connName = lookupDBConnName(serverInfo); | |
Step step = stepBuilders.get("saveIngameRankToRedisStep" + STEP_NAME_DELIMITER + connName) | |
.tasklet(tasklet(serverInfo)) | |
.build(); | |
stepList.add(step); | |
} | |
return stepList; | |
} | |
public List<Step> step2List() { | |
List<Step>stepList = new ArrayList<Step>(); | |
for (SERVERINFO serverInfo : SERVERINFO.values()) { | |
String connName = DbconnInfo.LINEAGE2_GAME_INFO; | |
Step step = stepBuilders.get("deleteIngameRankToDbStep" + STEP_NAME_DELIMITER + connName) | |
.tasklet(tasklet2(serverInfo)) | |
.build(); | |
stepList.add(step); | |
} | |
return stepList; | |
} | |
public List<Step> step3List() { | |
List<Step>stepList = new ArrayList<Step>(); | |
for (SERVERINFO serverInfo : SERVERINFO.values()) { | |
String worldId = serverInfo.getRedisKey(); | |
String connName = DbconnInfo.LINEAGE2_GAME_INFO; | |
Step step = stepBuilders.get("saveIngameRankToDbStep" + STEP_NAME_DELIMITER + connName).<TypedTuple<String>, RankUser> chunk(BATCH_SIZE) | |
.reader(reader(worldId)) | |
.processor(new RedisItemProcessor()) | |
.writer(writer()) | |
.build(); | |
stepList.add(step); | |
} | |
return stepList; | |
} | |
/** | |
* 복제DB에서 사용자 경험치 데이타를 지정된 사이즈만큼 행 반환한다 . | |
* Fetch 사이즈만큼 행 반환시마다 Redis에 Insert를 수행한다. | |
* 전체 Insert 작업이 완료된후 Redis에 해당 월드 키를 6시간마다 삭제처리한다. | |
* @return | |
*/ | |
@Scope("prototype") | |
@Bean | |
protected Tasklet tasklet(final SERVERINFO serverInfo) { | |
return new Tasklet() { | |
@Override | |
public RepeatStatus execute(StepContribution contribution, ChunkContext context) { | |
final String redisKey = serverInfo.getRedisKey(); | |
//해당 리플리케이션의 user_data 카운팅 수 추출 | |
long serverUserCount = selectServerUserCount(); | |
long size = serverUserCount / BATCH_SIZE + 1; | |
List<RankUser> rankUserList = null; | |
for (long i = 0L; i < size; i++) { | |
long min = (i * BATCH_SIZE) + 1; // 1, 1001, 2001 | |
long max = (i + 1) * BATCH_SIZE; // 1000, 2000, 3000 | |
rankUserList = selectRankUserList(min, max); | |
for (RankUser user : rankUserList) { | |
String value = user.generateRankUserRedisValue(); | |
redisRepository.addSortedSet(redisKey, value, user.getExp()); | |
} | |
} | |
//레디스에 삽입된 카운트 체킹 | |
Long userCount = redisRepository.getSortedSize(serverInfo.getRedisKey()); | |
redisRepository.setExpireKey(redisKey, REDIS_KEY_EXPIRE_TIME); | |
contribution.incrementWriteCount(userCount.intValue()); | |
log.info("{} insertIngameRankToRedisTask FINISHED", redisKey); | |
return RepeatStatus.FINISHED; | |
} | |
}; | |
} | |
private long selectServerUserCount() { | |
StringBuilder countSql = new StringBuilder(); | |
countSql.append(" SELECT MAX(char_id)"); | |
countSql.append(" FROM user_data"); | |
countSql.append(" WHERE builder = 0" ); | |
countSql.append(" AND account_id > 0" ); | |
countSql.append(" AND temp_delete_date is null" ); | |
long serverUserCount = jdbcTemplate.queryForObject(countSql.toString(), Long.class); | |
return serverUserCount; | |
} | |
private List<RankUser> selectRankUserList(long min, long max) { | |
StringBuilder sql = new StringBuilder(); | |
sql.append(" SELECT char_id,world,char_name,pledge_id,lev,class,exp "); | |
sql.append(" FROM user_data WITH (READUNCOMMITTED)" ); | |
sql.append(" WHERE builder = 0" ); | |
sql.append(" AND account_id > 0" ); | |
sql.append(" AND temp_delete_date is null" ); | |
sql.append(" AND char_id >= ? and char_id <= ?" ); | |
List<RankUser>rankUserList = jdbcTemplate.query(sql.toString(), new UserDataMapper(), new Object[] {min, max}); | |
return rankUserList; | |
} | |
/** | |
* 신규 배치 데이타 등록전 기존 날짜 및 월드에 등록된 데이타 삭제 처리 | |
* 중복 실행시 데이타 이중 등록 방지용도로 Old 데이타 삭제처리함 | |
* 데이타 삭제시 Lock 현상 제거 | |
* @return | |
*/ | |
@Scope("prototype") | |
@Bean | |
protected Tasklet tasklet2(final SERVERINFO serverInfo) { | |
return new Tasklet() { | |
@Override | |
public RepeatStatus execute(StepContribution contribution, ChunkContext context) { | |
StringBuilder deleteSql = new StringBuilder(); | |
deleteSql.append(" WHILE (1>0) "); | |
deleteSql.append(" BEGIN"); | |
deleteSql.append(" DELETE TOP (").append(BATCH_SIZE).append(")"); | |
deleteSql.append(" FROM GrowRanking "); | |
deleteSql.append(" WHERE insertDate = CONVERT(CHAR(10), getdate(), 112) "); | |
deleteSql.append(" AND serverId = ").append(serverInfo.serverId); | |
deleteSql.append(" IF @@ROWCOUNT = 0 BREAK"); | |
deleteSql.append(" END"); | |
jdbcTemplate.execute(deleteSql.toString()); | |
log.info("{} deleteIngameRankToDbTask FINISHED", serverInfo.getRedisKey()); | |
return RepeatStatus.FINISHED; | |
} | |
}; | |
} | |
///////////////////////////////// | |
// STEP 3 영역 | |
///////////////////////////////// | |
@Scope("prototype") | |
@Bean | |
public ItemReader<TypedTuple<String>> reader(String worldId) { | |
RedisItemReader reader = new RedisItemReader(worldId); | |
reader.setPageSize(BATCH_SIZE); | |
return reader; | |
} | |
@Scope("prototype") | |
@Bean | |
public ItemWriter<RankUser> writer() { | |
JpaItemWriter<RankUser>writer = new JpaItemWriter<>(); | |
writer.setEntityManagerFactory(emf); | |
return writer; | |
} | |
public class UserDataMapper implements RowMapper<RankUser> { | |
@Override | |
public RankUser mapRow(ResultSet rs, int i) throws SQLException { | |
RankUser user = new RankUser(); | |
user.setCharacterId(rs.getInt("char_id")); | |
user.setCharacterName(rs.getString("char_name")); | |
user.setClassId(rs.getInt("class")); | |
user.setCharacterLevel(rs.getInt("lev")); | |
user.setServerId(rs.getInt("world")); | |
user.setPledgeId(rs.getInt("pledge_id")); | |
user.setExp(rs.getLong("exp")); | |
return user; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment