当前位置: 代码迷 >> 综合 >> spring batch 源码分析
  详细解决方案

spring batch 源码分析

热度:72   发布时间:2023-12-10 00:26:21.0

一、启动batch任务时,会调用job.execute(jobExecution): job为FlowJob类型,jobExecution调用jobRepository的createJobExecution方法生成。
FlowJob继承org.springframework.batch.core.job.AbstractJob,调用AbstractJob#execute方法执行job,这个方法负责执行job、处理所有的listeners和repository调用、将实际的处理委托给子类的doExecute方法。

1. 为当前线程注册step context

JobSynchronizationManager.register(execution);

在org.springframework.batch.core.scope.context.JobSynchronizationManager#register方法中,实际调用JobSynchronizationManager的manager属性的register方法来完成。

JobSynchronizationManager的manager属性,默认为

new SynchronizationManagerSupport<JobExecution, JobContext>()

SynchronizationManagerSupport用来存储当前线程的execution,execution与context(new JobContext(execution))的map。

2. 校验jobParameters

jobParametersValidator.validate(execution.getJobParameters());

jobParameters不能为null,如果为null会抛出JobParametersInvalidException

3. 自行listeners的beforeJob方法

listener.beforeJob(execution);

4. 调用doExecute方法执行job

doExecute(execution);

二、org.springframework.batch.core.job.flow.FlowJob#doExecute方法执行job

1. 创建JobFlowExecutor,JobFlowExecutor用在需要执行与JobExecution有关的flow的组件中

JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
      new SimpleStepHandler(getJobRepository()), execution);

SimpleStepHandler负责管理repository,重启业务,在new SimpleStepHandler时,会新建ExecutionContext。

new ExecutionContext()
这个类封装了一个ConcurrentHashMap,能够提供类型安全的read操作。

2.实际负责job的是flow.start(executor),flow是根据配置文件中定义的job生成的SimpleFlow

executor.updateJobExecutionStatus(flow.start(executor).getStatus());

SimpleFlow的属性startState为StepState类型,值为job的第一个step定义,name为job的id.第一个step的id。

start方法中,会定义一个state并且将startState赋值给他,然后取得stateName(job的id.第一个step的id),最后调用resume方法

if (startState == null) {initializeTransitions();
}
State state = startState;
String stateName = state.getName();
return resume(stateName, executor);

三、org.springframework.batch.core.job.flow.support.SimpleFlow#resume方法

resume方法的核心是调用state 的handle方法

while (isFlowContinued(state, status, stepExecution)) {stateName = state.getName();
   try {status = state.handle(executor);
      stepExecution = executor.getStepExecution();
   }state = nextState(stateName, status, stepExecution);
}
FlowExecution result = new FlowExecution(stateName, status);

四、org.springframework.batch.core.job.flow.support.state.StepState#handle方法只有两行语句

executor.abandonStepExecution();
return new FlowExecutionStatus(executor.executeStep(step));

在启动新的step时,要更新上一次execution,确保他执行失败后在这次启动时能够被放弃。

executor.executeStep(step)

org.springframework.batch.core.job.flow.JobFlowExecutor继承FlowExecutor接口,这个接口为FlowJob提供step by step执行的context和执行策略。

org.springframework.batch.core.job.flow.JobFlowExecutor#executeStep方法的核心

StepExecution stepExecution = stepHandler.handleStep(step, execution);

org.springframework.batch.core.job.SimpleStepHandler#handleStep

currentStepExecution = execution.createStepExecution(step.getName());
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);

org.springframework.batch.core.step.AbstractStep#execute

doExecute(stepExecution);

org.springframework.batch.core.step.tasklet.TaskletStep#doExecute

这个方法中会创建一个Semaphore,这个信号量是为了step能够在不使用锁的情况下并发执行

stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {@Override
   public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)throws Exception {StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

      // Before starting a new transaction, check for
      // interruption.
      interruptionPolicy.checkInterrupted(stepExecution);

      RepeatStatus result;
      try {result = new TransactionTemplate(transactionManager, transactionAttribute).execute(new ChunkTransactionCallback(chunkContext, semaphore));
      }catch (UncheckedTransactionException e) {// Allow checked exceptions to be thrown inside callback
         throw (Exception) e.getCause();
      }chunkListener.afterChunk(chunkContext);

      // Check for interruption after transaction as well, so that
      // the interrupted exception is correctly propagated up to
      // caller
      interruptionPolicy.checkInterrupted(stepExecution);

      return result;
   }});
org.springframework.batch.repeat.support.RepeatTemplate#iterate,执行batch callback直到completion policy任务已经完成。会等待整个batch完成再返回。
result = executeInternal(callback);

org.springframework.batch.repeat.support.RepeatTemplate#executeInternal,循环执行interceptor和batch callback的内部方法

 
 
RepeatInternalState state = createInternalState(context);
while (running) {
    if (running) 
        result = getNextResult(context , callback , state) ;        result = result.and(waitForResults(state)) ;
executeAfterInterceptors(context , result) ;执行createInternalState,会创建RepeatInternalStateSupport对象,这个对象的results属性为ResultHolderResultQueue类型
new RepeatInternalStateSupport();

this.results = new ResultHolderResultQueue(throttleLimit);

ResultHolderResultQueue类型的对象会根据job配置的throttleLimit来创建队列和信号量,用来多线程执行step,throttleLimit为期望得到的result的数量

results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
waits = new Semaphore(throttleLimit);

五、org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate#getNextResult

这个方法利用TaskExecutor多线程执行,得到result;内部state是一个未执行完的result holders的队列;这个方法退出时,有返回值的holder不应该在队列中;队列被scoped在调用他的方法中,不需要synchronize access.

在创建ExecutingRunnable时,使用的是同一个context,即将多个线程的结果放入相同的context中。

ExecutingRunnable runnable = null;
ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
do {/*
    * Wrap the callback in a runnable that will add its result to the
    * queue when it is ready.
    */
   runnable = new ExecutingRunnable(callback, context, queue);
   /**  * Tell the runnable that it can expect a result. This could have* been in-lined with the constructor, but it might block, so it's* better to do it here, since we have the option (it's a private* class).  */  runnable.expect();
   taskExecutor.execute(runnable);
   /*
    * Allow termination policy to update its state. This must happen
    * immediately before or after the call to the task executor.
    */
   update(context);
   /*
    * Keep going until we get a result that is finished, or early
    * termination...
    */
} while (queue.isEmpty() && !isComplete(context));
/*
 * N.B. If the queue is empty then take() blocks until a result appears,* and there must be at least one because we just submitted one to the* task executor.
 */
ResultHolder result = queue.take();
return result.getResult();

在ExecutingRunnable的run方法中,会调用TaskletStep#doExecute方法在调用stepOperations.iterate时传入的callback

result = callback.doInIteration(context);
之前传入的callback对象的doInIteration方法中,执行获取结果的逻辑
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));

execute方法最终会调用

result = action.doInTransaction(status);

ChunkTranscationCallback是TaskletStep的内部类,

org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction的核心:

result = tasklet.execute(contribution, chunkContext);

org.springframework.batch.core.step.item.ChunkOrientedTasklet#execute

execute方法中的chunkProvider类型根据配置文件中是否为chunk 配置skip-policy而不同,

如果未配置就是org.springframework.batch.core.step.item.SimpleChunkProvider;

如果配置了就是org.springframework.batch.core.step.item.FaultTolerantChunkProcessor,他是SimpleChunkProvider的子类,因此在执行provide、process、postProcess时,实际还是调用的SimpleChunkProvider中定义的方法

Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {inputs = chunkProvider.provide(contribution);
   if (buffering) {chunkContext.setAttribute(INPUTS_KEY, inputs);
   }
}
chunkProcessor.process(contribution, inputs);
chunkProvider.postProcess(contribution, inputs);

// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {logger.debug("Inputs still busy");
   return RepeatStatus.CONTINUABLE;
}chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();
return RepeatStatus.continueIf(!inputs.isEnd());

1. org.springframework.batch.core.step.item.SimpleChunkProvider#provide

final Chunk<I> inputs = new Chunk<I>();
repeatOperations.iterate(new RepeatCallback() {@Override
   public RepeatStatus doInIteration(final RepeatContext context) throws Exception {I item = null;
      try {item = read(contribution, inputs);
      }catch (SkipOverflowException e) {// read() tells us about an excess of skips by throwing an
         // exception
         return RepeatStatus.FINISHED;
      }if (item == null) {inputs.setEnd();
         return RepeatStatus.FINISHED;
      }inputs.add(item);
      contribution.incrementReadCount();
      return RepeatStatus.CONTINUABLE;
   }});

return inputs;

repeatOperations跟上面TaskletStep#doExecute中的stepOperations一样,都是org.springframework.batch.repeat.support.RepeatTemplate类型,只是此时传入的callback的doInIteration方法实现不同


2. org.springframework.batch.core.step.item.SimpleChunkProcessor#process

// Allow temporary state to be stored in the user data field
initializeUserData(inputs);
// If there is no input we don't have to do anything more
if (isComplete(inputs)) {return;
}
// Make the transformation, calling remove() on the inputs iterator if
// any items are filtered. Might throw exception and cause rollback.
Chunk<O> outputs = transform(contribution, inputs);
// Adjust the filter count based on available data
contribution.incrementFilterCount(getFilterCount(inputs, outputs));
// Adjust the outputs if necessary for housekeeping purposes, and then
// write them out...
write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#transform,会在这个方法中定义的RetryCallback对象的doWithRetry方法中调用配置文件中配置的processor

Chunk<O> outputs = new Chunk<O>();
@SuppressWarnings("unchecked")
final UserData<O> data = (UserData<O>) inputs.getUserData();
final Chunk<O> cache = data.getOutputs();
final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
final AtomicInteger count = new AtomicInteger(0);

// final int scanLimit = processorTransactional && data.scanning() ? 1 :
// 0;

for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {final I item = iterator.next();

   RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {@Override
      public O doWithRetry(RetryContext context) throws Exception {O output = null;
         try {count.incrementAndGet();
            O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
            if (cached != null && !processorTransactional) {output = cached;
            }else {output = doProcess(item);
               if (output == null) {data.incrementFilterCount();
               } else if (!processorTransactional && !data.scanning()) {cache.add(output);
               }}}catch (Exception e) {if (rollbackClassifier.classify(e)) {// Default is to rollback unless the classifier
               // allows us to continue
               throw e;
            }else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {// If we are not re-throwing then we should check if
               // this is skippable
               contribution.incrementProcessSkipCount();
               logger.debug("Skipping after failed process with no rollback", e);
               // If not re-throwing then the listener will not be
               // called in next chunk.
               callProcessSkipListener(item, e);
            }else {// If it's not skippable that's an error in
               // configuration - it doesn't make sense to not roll
               // back if we are also not allowed to skip
               throw new NonSkippableProcessException("Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
                     e);
            }}if (output == null) {// No need to re-process filtered items
            iterator.remove();
         }return output;
      }};

   RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {@Override
      public O recover(RetryContext context) throws Exception {Throwable e = context.getLastThrowable();
         if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {iterator.remove(e);
            contribution.incrementProcessSkipCount();
            logger.debug("Skipping after failed process", e);
            return null;
         }else {if (rollbackClassifier.classify(e)) {// Default is to rollback unless the classifier
               // allows us to continue
               throw new RetryException("Non-skippable exception in recoverer while processing", e);
            }iterator.remove(e);
            return null;
         }}};

   O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(getInputKey(item), rollbackClassifier));
   if (output != null) {outputs.add(output);
   }/*
    * We only want to process the first item if there is a scan for a
    * failed item.
    */
   if (data.scanning()) {while (cacheIterator != null && cacheIterator.hasNext()) {outputs.add(cacheIterator.next());
      }// Only process the first item if scanning
      break;
   }
}return outputs;

org.springframework.batch.core.step.item.BatchRetryTemplate#execute(org.springframework.retry.RetryCallback<T,E>, org.springframework.retry.RecoveryCallback<T>, org.springframework.retry.RetryState)

O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(getInputKey(item), rollbackClassifier));
最终调用org.springframework.retry.support.RetryTemplate#doExecute,核心代码如下:

RetryContext context = this.open(retryPolicy, state);
RetrySynchronizationManager.register(context);
try {boolean running = this.doOpenInterceptors(retryCallback, context);while(this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {try {lastException = null;
            var24 = retryCallback.doWithRetry(context);
            return var24;
        } catch (Throwable var21) {}}var24 = this.handleRetryExhausted(recoveryCallback, context, state);
} catch (Throwable var22) {throw wrapIfNecessary(var22);
} finally {this.close(retryPolicy, context, state, lastException == null);
    this.doCloseInterceptors(retryCallback, context, lastException);
    RetrySynchronizationManager.clear();
}
return var24;

3. org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#write

final UserData<O> data = (UserData<O>) inputs.getUserData();
final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {@Override
   public Object doWithRetry(RetryContext context) throws Exception {contextHolder.set(context);

      if (!data.scanning()) {chunkMonitor.setChunkSize(inputs.size());
         try {doWrite(outputs.getItems());
         }catch (Exception e) {if (rollbackClassifier.classify(e)) {throw e;
            }/*
             * If the exception is marked as no-rollback, we need to
             * override that, otherwise there's no way to write the
             * rest of the chunk or to honour the skip listener
             * contract.
             */
            throw new ForceRollbackForWriteSkipException("Force rollback on skippable exception so that skipped item can be located.", e);
         }contribution.incrementWriteCount(outputs.size());
      }else {scan(contribution, inputs, outputs, chunkMonitor, false);
      }return null;

   }
};

if (!buffering) {RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {@Override
      public Object recover(RetryContext context) throws Exception {Throwable e = context.getLastThrowable();
         if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {throw new RetryException("Invalid retry state during write caused by "
                  + "exception that does not classify for rollback: ", e);
         }Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
         for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {inputIterator.next();
            outputIterator.next();

            checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
            if (!rollbackClassifier.classify(e)) {throw new RetryException("Invalid retry state during recovery caused by exception that does not classify for rollback: ",
                     e);
            }}return null;

      }};

   batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
         BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));

}
else {RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {@Override
      public Object recover(RetryContext context) throws Exception {/*
          * If the last exception was not skippable we don't need to
          * do any scanning. We can just bomb out with a retry
          * exhausted.
          */
         if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {throw new ExhaustedRetryException("Retry exhausted after last attempt in recovery path, but exception is not skippable.",
                  context.getLastThrowable());
         }inputs.setBusy(true);
         data.scanning(true);
         scan(contribution, inputs, outputs, chunkMonitor, true);
         return null;
      }};
try {batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
            rollbackClassifier));
   }catch (Exception e) {RetryContext context = contextHolder.get();
      if (!batchRetryTemplate.canRetry(context)) {/*
          * BATCH-1761: we need advance warning of the scan about to
          * start in the next transaction, so we can change the
          * processing behaviour.
          */
         data.scanning(true);
      }throw e;
   }}callSkipListeners(inputs, outputs);

疑问:

1. 每个step的chunk在执行时,reader、processor、writer是否在同一个线程中执行?

2. 如果reader、processor、writer可以在不同线程中执行,源码中在何处调度?并未发现相关代码.









  相关解决方案