일단 spring batch 를 구현하게 된 배경을 설명하자면,
회사에서 메신저 프로그램을 활용하여 카카오톡의 단체방 처럼 학기 시작하기 전에 학생들의 수업단톡방을 개설하는
배치 프로그램을 개발해야 했다.
그래서 수업테이블에 저장되어 있는 수업을 조회하여 그 수업에 해당되는 학생들을 단톡방에 초대하는 거
까지 에 대한 코드를 설명할 예정이다.
ClassConversationCreateJobConfiguration 이 Class 가 하나의 Job 이 되는 것이다.
기초적인 spring batch 에 대한 설명이 아닌 작성자인 본인에 대한 코드 기록용이므로 참고 부탁드린다..😥
1. Spring Batch 의 시작 Job 개발
1-1. Job 개발
@Bean
@Qualifier("ClassCourseConversationCreateJob")
public Job ClassCourseConversationCreateJob() {
return jobBuilderFactory.get("ClassCourseConversationCreateJob")
.start(ClassCourseConversationCreateStep())
.listener(new JobListener())
.build();
}
ClassCourseConversationCreateJob 은 ClassCourseConversationCreateStep() 으로 구성되어 있다.
@Bean
@JobScope
public Step ClassCourseConversationCreateStep() {
return stepBuilderFactory.get("ClassCourseConversationCreateStep")
.<Map<String,Object>, Map<String,Object>>chunk(chunkSize)
.reader(newCourseReaderFromLocalDB(null, null, null))
.processor((Function<Map<String,Object>,Map<String,Object>>) (map) -> {
map.put("EVENT", "CLASS_CONVERSATION_CREATE");
JSONObject jsonObj = new JSONObject(map);
try {
eventProcessor.runEventProcessor(jsonObj.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
return map;
})
.writer(new ItemWriter<Map<String,Object>>() {
@Override
public void write(List<? extends Map<String,Object>> items) throws Exception {
for (Map<String,Object> item : items) {
log.info("ClassCourseConversationCreateStep = {}", item);
}
}
})
.listener(new StepListener())
.allowStartIfComplete(true)
.build();
}
🧸 chunk 지향 프로그래밍 (<inputType, outputType>chunk(chunkSize))
: 한번에 하나씩 item을 입력받아서 chunk 단위로 트랜잭션을 다루는 방식. ( ex) <Map<String,Object>, Map<String,Object>>chunk(chunkSize)) 실패할 경우 chunk 단위만큼만 롤백이 된다.
Reader 와 Processor는 1건의 item만 다뤄지고, Writer 에서는 chunk 단위로 처리된다.

- Reader : 데이터 읽기
- Processor : 데이터 가공
- Writer : Chunk 단위로 처리(일괄저장)
Processor 부분은 CLASS_CONVERSATION_CREATE 라는 EventController가 실행되는 방식으로 구현하였다.
1-2. EventProcessorImpl 개발
EventController 어노테이션 코드이다.
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventController {
String value();
int tokenConsume() default 1;
}
EventProcessorImpl 구현 코드이다.
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.SynchronizationStrategy;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class EventProcessorImpl implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
applicationContext = ctx;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
private Map<String,Method> eventProcessors = null;
private Map<String,Object> eventObjects = null;
private final Map<Class, Object> objectCache = new HashMap<>();
public Set<Class> findAllClassesUsingClassLoader(String packageName){
InputStream stream = this.getClass().getClassLoader()
.getResourceAsStream(packageName.replaceAll("[.]", "/"));
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
return reader.lines()
.filter(line -> line.endsWith(".class"))
.map(line -> getClass(line,packageName))
.collect(Collectors.toSet());
}
private Class getClass(String className, String packageName) {
try {
return Class.forName(packageName + "."
+ className.substring(0, className.lastIndexOf(".")));
} catch(ClassNotFoundException e) {
}
return null;
}
public Object getEventObject(String eventName) {
Object o = null;
o = newInstance(eventProcessors.get(eventName).getDeclaringClass());
return o;
}
public Object newInstance(Class c) {
if (true == objectCache.containsKey(c))
return objectCache.get(c);
String className = c.getName();
className = className.substring(className.lastIndexOf('.') + 1);
className = Character.toLowerCase(className.charAt(0)) + className.substring(1);
Object o = null;
if (true == applicationContext.containsBean(className)) {
o = applicationContext.getBean(c);
objectCache.put(c, o);
return o;
}
if (true == applicationContext.containsBean(className)) {
objectCache.put(c, o);
o = applicationContext.getBean(c);
return o;
}
try {
o = c.getDeclaredConstructor().newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
objectCache.put(c, o);
return o;
}
@PostConstruct
private void init() {
// mq.controller 패키지 밑에 있는 Controller만 탐색한다.
String packageName = getClass().getPackage().getName();
eventProcessors = findAllClassesUsingClassLoader(packageName)
.stream()
.flatMap(c -> Stream.of(c.getDeclaredMethods()))
.filter(m -> m.isAnnotationPresent(EventController.class))
.collect(Collectors.toMap(
m -> m.getAnnotation(EventController.class).value(),
m -> m
));
if ( eventProcessors.size() == 0 ) {
Map<String, BotEventController> beansWithAnnotation = applicationContext.getBeansOfType(BotEventController.class);
eventProcessors = beansWithAnnotation.entrySet().stream()
.flatMap(m -> Stream.of(m.getValue().getClass().getDeclaredMethods()))
.filter(m -> m.isAnnotationPresent(EventController.class))
.collect(Collectors.toMap(
m -> m.getAnnotation(EventController.class).value(),
m -> m
));
}
eventObjects = eventProcessors.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> getEventObject(e.getKey())
));
log.info(eventProcessors.toString());
}
private Bucket bucket;
@PostConstruct
public void initRateLimiter() {
bucket = Bucket.builder()
.addLimit(Bandwidth.simple(175, Duration.ofMinutes(1)))
.withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED)
.build();
}
// 설정상 Event가 동시에 처리되지는 않지만 만약을 위해 synchronized method로 처리한다.
public synchronized Boolean runEventProcessor(String payload) throws InterruptedException {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> eventJson = null;
try {
eventJson = mapper.readValue(payload, new TypeReference<Map<String, Object>>() {
});
log.info("[Listened EVENT]" + eventJson.toString());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
String event = (String) eventJson.get("EVENT");
Object invokeObject = eventObjects.get(event);
Method invokeMethod = eventProcessors.get(event);
EventController eventController = invokeMethod.getAnnotation(EventController.class);
//빨리 호출할 필요가 없어서 무조건 100msec 쉰다.
Thread.sleep(100);
// Simple Rate Limit. api call limits 200 per minute.
long nanosToWaitForRefill = bucket.estimateAbilityToConsume(eventController.tokenConsume())
.getNanosToWaitForRefill();
if (nanosToWaitForRefill > 0) {
log.info("Too Many Events: Pending For " + TimeUnit.NANOSECONDS.toMillis(nanosToWaitForRefill) + " msecs");
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(nanosToWaitForRefill) + 50); // 내림 이슈로 +1만 하면 되지만 추가 마진 부여
}
if (!bucket.tryConsume(eventController.tokenConsume()))
return false;
Boolean ret = false;
try {
invokeMethod.setAccessible(true);
ret = (Boolean) invokeMethod.invoke(invokeObject, eventJson);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return ret;
}
}
@EventController 에 대한 코드와 eventProcessor로 @EventController 가 붙은 메소드를 실행 시키는 인터페이스 코드에 대한 것이었다.
eventProcessor.runEventProcessor 해당 코드는 jsonObj 에 담긴 이벤트 name 을 가진 @EventController를 찾아서 해당 메소드를 실행 시키게 될 것이다.
@EventController(value = "CLASS_CONVERSATION_CREATE", tokenConsume=1)
public boolean classConversationCreateController(Map<String, Object> eventJson){
String syy = eventJson.get("SYY").toString();
String smtCd = eventJson.get("SMT_CD").toString();
String subjtCd = eventJson.get("SUBJT_CD").toString();
String unvfrStdrDeptCd = eventJson.get("UNVFR_STDR_DEPT_CD").toString();
String corseDvclsNo = eventJson.get("CORSE_DVCLS_NO").toString();
Boolean ret = clsBotEventService.classConversationCreate(syy, smtCd, subjtCd, unvfrStdrDeptCd, corseDvclsNo);
return ret;
}
이러한 식으로 controller 가 구현되어 있다.
2. Scheduler
import java.text.SimpleDateFormat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import metabot.JobRunner;
@Slf4j
@Component
@RequiredArgsConstructor
public abstract class DynamicChangeScheduler {
private ThreadPoolTaskScheduler scheduler;
@Autowired
JobRunner jobRunner;
// @Value("${application.bot.default-schedule}")
private String defaultCron;
public void startScheduler(String params) {
scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
scheduler.schedule(getRunnable(params), getTrigger());
}
public void stopScheduler() {
scheduler.shutdown();
}
private Runnable getRunnable(String params) {
return new Runnable() {
@Override
public void run() {
long curTime = System.currentTimeMillis();
log.info("######### = {} parameters = {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(curTime), params);
jobRunner.start(params);
}
};
}
// 실행시간 결정
private Trigger getTrigger() {
return new CronTrigger(defaultCron);
}
public void changeCronSet(String defaultCron) {
this.defaultCron = defaultCron;
}
}
ThreadPoolTaskScheduler 를 활용하여 DynamicScheduler 를 구현하였다.
changeCronSet 메소드에서 스케줄을 새롭게 세팅 할 수 있다.
getRunnable 메소드에서 jobRunner.start 를 통해 해당 Job을 실행 시킬 수 있도록 하였다.
3. 최종적인 Job Config 관련 코드
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.sql.DataSource;
import org.json.simple.JSONObject;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.jdbc.core.RowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import metabot.bot.batch.job.listener.config.JobListener;
import metabot.bot.batch.job.listener.config.StepListener;
import metabot.botbroker.event.controller.EventProcessorImpl;
@Slf4j
@RequiredArgsConstructor
@Configuration
/*
* 방 생성 Job
* 실행주기 : 1학기, 여름계절학기, 2학기, 겨울계절학기
* */
public class ClassConversationCreateJobConfiguration extends BatchJobConfiguration{
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EventProcessorImpl eventProcessor;
private static final int chunkSize = 100;
@Autowired
@Qualifier("tiberoDataSource")
private final DataSource tiberoDataSource;
@Bean
@Qualifier("ClassCourseConversationCreateJob")
public Job ClassCourseConversationCreateJob() {
return jobBuilderFactory.get("ClassCourseConversationCreateJob")
.start(ClassCourseConversationCreateStep())
.listener(new JobListener())
.build();
}
@Bean
@JobScope
public Step ClassCourseConversationCreateStep() {
return stepBuilderFactory.get("ClassCourseConversationCreateStep")
.<Map<String,Object>, Map<String,Object>>chunk(chunkSize)
.reader(newCourseReaderFromLocalDB(null, null, null))
.processor((Function<Map<String,Object>,Map<String,Object>>) (map) -> {
map.put("EVENT", "CLASS_CONVERSATION_CREATE");
JSONObject jsonObj = new JSONObject(map);
try {
eventProcessor.runEventProcessor(jsonObj.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
return map;
})
.writer(new ItemWriter<Map<String,Object>>() {
@Override
public void write(List<? extends Map<String,Object>> items) throws Exception {
for (Map<String,Object> item : items) {
log.info("ClassCourseConversationCreateStep = {}", item);
}
}
})
.listener(new StepListener())
.allowStartIfComplete(true)
.build();
}
@Bean
@StepScope
// TERM_CD: 10(1학기), 11(여름학기), 20(2학기), 21(겨울학기)
public JdbcCursorItemReader<Map<String,Object>> newCourseReaderFromLocalDB(
@Value("#{jobParameters['syy']}") String syy
,@Value("#{jobParameters['smtCd']}") String smtCd
,@Value("#{jobParameters['subjtCd']}") String subjtCd) {
RowMapper<Map<String, Object>> mapper = new ColumnMapRowMapper();
// DB 연결되면 DB로 변경
return new JdbcCursorItemReaderBuilder<Map<String,Object>>()
.fetchSize(chunkSize)
.dataSource(tiberoDataSource)
.verifyCursorPosition(false)
.rowMapper(mapper)
.sql(String.format(
" SELECT SUBJT_CD -- 과목CD\n"+
" ,(SELECT SCH.FN_SCUR_SUBJT_NM(SUBJT_CD) FROM DUAL) AS SUBJT_NM\n"+
" ,SYY\n"+
" ,SMT_CD\n"+
" ,UNVFR_STDR_DEPT_CD\n"+
" ,CORSE_DVCLS_NO\n"+
" FROM [테이블명]\n"+
" WHERE 1 = 1\n"+
" AND SYY = '%s'\n"+
" AND SMT_CD = '%s'\n"+
" AND SUBJT_CD LIKE 'U%%'\n"+
" ORDER BY SUBJT_CD, UNVFR_STDR_DEPT_CD, CORSE_DVCLS_NO\n"
,syy,smtCd,subjtCd))
.name("newCourseReaderFromLocalDB")
.build();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
return jobRegistryBeanPostProcessor;
}
}'Java > Spring Batch' 카테고리의 다른 글
| Spring Batch + JPA ( feat. 티베로) (0) | 2023.06.27 |
|---|---|
| Spring Batch + Tibero 연동 (1) | 2023.06.22 |