重试组件

什么重试机制?

重试是为了提高成功的可能性

反过来理解,任何可能失败且允许重试操作的场景,就适合使用重试机制。但有了重试机制就一定能成功吗?显然不是。如果不成功就一直重试,这种处理方式会使得业务线程一直被重试占用,这样会导致服务的负载线程暴增直至服务宕机,因此需要限制重试次数。失败情况下,我们需要做后续的操作,如果是数据库操作的重试,需要回滚事物;如果是服务调用的重试,需要邮件报警通知运维开发人员,恢复服务。

对于服务接口调用,可能是因为网络波动导致超时失败,这时候所有重试次数是在很短时间内发起的话,就很容易全部超时失败,因此超时机制还需要引入重试动作之间时间间隔以及第一次失败后延迟多长时间再开始重试等机制。

重试机制要素

- 限制重试次数
- 每次重试的时间间隔
- 最终失败结果的报警或事物回滚
- 在特定失败异常事件情况下选择重试

背景介绍

在实际的项目应用场景中,经常会需要遇到远程服务接口的调用,时不时会出现一些接口调用超时,或者函数执行失败需要重试的情况,例如下边的这种场景:

某些不太稳定的接口,需要依赖于第三方的远程调用,例如数据加载,数据上传相关的类型。

基于try/catch的重试方案

这种方式来做重试处理的话,会比较简单粗暴。

 public void test(){
    try{
        //执行远程调用方法
        doRef();
    }catch(Exception e){
        //重新执行远程调用方法
        doRef();
    }
}

当出现了异常的时候,立即执行远程调用,此时可能忽略了几个问题:

  1. 如果重试出现了问题,是否还能继续重试
  2. 第一次远程调用出现了异常,此时可能第三方服务此时负载已达到瓶颈,或许需要间隔一段时间再发送远程调用的成功率会高些。
  3. 多次重试都失败之后如何通知调用方自己。

使用Spring的Retry组件

  1. 引入依赖
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>
  1. 在启动类上加入一个@EnableRetry注解
@SpringBootApplication
@EnableRetry
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}
  1. 在需要被执行的函数头部加入这一@Retryable注解
@Retryable(value = Exception.class,maxAttempts = 3  , backoff = @Backoff(delay = 2000,multiplier = 1.5))
public int retryServiceOne(int code) throws Exception {
	// TODO Auto-generated method stub 
	System.out.println("retryServiceOne被调用,时间:"+LocalTime.now());
	System.out.println("执行当前业务逻辑的线程名:"+Thread.currentThread().getName());
	if (code==0){
	    throw new Exception("业务执行失败情况!");
	}
	System.out.println("retryServiceOne执行成功!");

	return 200;
}

配置元数据情况:

- 重试次数为3
- 第一次重试延迟2s
- 每次重试时间间隔是前一次1.5倍
- Exception类异常情况下重试

自定义重试组件

  1. 定义一个重试注解
package com.likecat.retry;

import java.lang.annotation.*;

/**
 * 重试组件注解
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:41
 */
@Documented
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Retry {
    //最大重试次数
    int maxAttempts() default 3;

    //每次重试的间隔时间
    int delay() default 3000;

    //关注异常(仅当抛出了相应异常的条件下才会重试)
    Class<? extends Throwable>[] value() default {};

    //重试策略(默认是快速重试)
    Class<? extends RetryStrategy> strategy() default FastRetryStrategy.class;

    //重试监听器
    Class<? extends RetryListener> listener() default AbstractRetryListener.class;
}
  1. 重试切面,拦截带有 @Retry 注解的方法,然后将需要执行的部分放入到一个RetryTask类型的对象当中,内部的doTask函数会触发真正的方法调用。
package com.likecat.retry;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.reflect.Method;

/**
 * 重试接口的拦截和处理
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:50
 */
@Aspect
@Component
public class RetryAop {

    @Resource
    private ApplicationContext applicationContext;

    @Pointcut("@annotation(com.likecat.retry.Retry)")
    public void pointCut() {
    }

    @Around(value = "pointCut()")
    public Object doBiz(ProceedingJoinPoint point) {
        MethodSignature methodSignature = (MethodSignature) point.getSignature();
        Method method = methodSignature.getMethod();
        Retry retry = method.getDeclaredAnnotation(Retry.class);

        //获取重试策略
        RetryStrategy retryStrategy = applicationContext.getBean(retry.strategy());
        RetryTask retryTask = new RetryTaskImpl(point);
        retryStrategy.initArgs(retry, retryTask);
        try {
            Object result = point.proceed();
            return result;
        } catch (Throwable throwable) {
            //重试
            System.out.println(method.getName()+"执行失败,开始重试");
            retryStrategy.retryTask();
        }
        return null;
    }

    /**
     * 重试业务
     */
    private class RetryTaskImpl implements RetryTask {
        private ProceedingJoinPoint proceedingJoinPoint;
        private Object result;
        private volatile Boolean asyncRetryState = null;
        public RetryTaskImpl(ProceedingJoinPoint proceedingJoinPoint) {
            this.proceedingJoinPoint = proceedingJoinPoint;
        }
        public ProceedingJoinPoint getProceedingJoinPoint() {
            return proceedingJoinPoint;
        }
        public void setProceedingJoinPoint(ProceedingJoinPoint proceedingJoinPoint) {
            this.proceedingJoinPoint = proceedingJoinPoint;
        }
        public Object getResult() {
            return result;
        }
        public void setResult(Object result) {
            this.result = result;
        }
        public Boolean getAsyncRetryState() {
            return asyncRetryState;
        }
        public void setAsyncRetryState(Boolean asyncRetryState) {
            this.asyncRetryState = asyncRetryState;
        }

        @Override
        public Object getRetryResult() {
            return result;
        }
        @Override
        public Boolean getRetryStatus() {
            return asyncRetryState;
        }
        @Override
        public void setRetrySuccess() {
            this.setAsyncRetryState(true);
        }
        @Override
        public void doTask() throws Throwable {
            this.result = proceedingJoinPoint.proceed();
        }
    }
}

  1. RetryTask接口(重试任务)
package com.likecat.retry;

/**
 * 任务执行
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:46
 */
public interface RetryTask {

    //获取重试结果
    Object getRetryResult();

    //获取重试状态
    Boolean getRetryStatus();

    void setRetrySuccess();

    //重试逻辑
    void doTask() throws Throwable;
}
  1. 重试策略接口
package com.likecat.retry;

/**
 * 重试策略
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:41
 */
public interface RetryStrategy {

    /**
     * 初始化一些参数配置
     *
     * @param retry
     * @param retryTask
     */
    void initArgs(Retry retry,RetryTask retryTask);

    /**
     * 重试策略
     */
    void retryTask();

}

  1. 默认的重试策略为快速重试策略
package com.likecat.retry;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.concurrent.ExecutorService;

/**
 * 默认的重试策略为快速重试策略
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:42
 */
public class FastRetryStrategy implements RetryStrategy, ApplicationContextAware {

    private Retry retry;
    private RetryTask retryTask;
    private ApplicationContext applicationContext;
    private ExecutorService retryThreadPool;

    public FastRetryStrategy() {
    }

    public ExecutorService getRetryThreadPool() {
        return retryThreadPool;
    }

    public void setRetryThreadPool(ExecutorService retryThreadPool) {
        this.retryThreadPool = retryThreadPool;
    }

    @Override
    public void initArgs(Retry retry, RetryTask retryTask) {
        this.retry = retry;
        this.retryTask = retryTask;
    }

    @Override
    public void retryTask() {
        if (!FastRetryStrategy.class.equals(retry.strategy())) {
            System.err.println("error retry strategy");
            return;
        }
        //安全类型bean查找
        String[] beanNames = applicationContext.getBeanNamesForType(retry.listener());
        RetryListener retryListener = null;
        if (beanNames != null && beanNames.length > 0) {
            retryListener = applicationContext.getBean(retry.listener());
        }
        Class<? extends Throwable>[] exceptionClasses = retry.value();
        RetryListener finalRetryListener = retryListener;
        //如果没有支持异步功能,那么在进行重试的时候就会一直占用着服务器的业务线程,导致服务器线程负载暴增
        retryThreadPool.submit(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= retry.maxAttempts(); i++) {
                    int finalI = i;
                    try {
                        retryTask.doTask();
                        retryTask.setRetrySuccess();
                        return;
                    } catch (Throwable e) {
                        for (Class<? extends Throwable> clazz : exceptionClasses) {
                            if (e.getClass().equals(clazz) || e.getClass().isInstance(clazz)) {
                                if (finalRetryListener != null) {
                                    finalRetryListener.notifyObserver();
                                }
                                System.err.println("[FastRetryStrategy] retry again,attempt's time is " + finalI + ",tims is " + System.currentTimeMillis());
                                try {
                                    Thread.sleep(retry.delay());
                                } catch (InterruptedException ex) {
                                    ex.printStackTrace();
                                }
                                continue;
                            }
                        }
                    }
                }
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        ExecutorService executorService = (ExecutorService) applicationContext.getBean("retryThreadPool");
        this.setRetryThreadPool(executorService);
    }

}
  1. 通过回调对应的监听器组件做一些记录:例如日志记录,操作记录写入等等操作。
package com.likecat.retry;

/**
 * 通知观察者
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:46
 */
public interface RetryListener {

    /**
     * 通知观察者
     */
    void notifyObserver();
}

  1. 默认抽象类
package com.likecat.retry;

/**
 * 默认抽象类
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:47
 */
public abstract class AbstractRetryListener implements RetryListener {
    @Override
    public abstract void notifyObserver();
}
  1. 自定义监听器
package com.likecat.retry;

/**
 * 默认监听器
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 11:47
 */
public class DefaultRetryListener implements RetryListener{

    @Override
    public void notifyObserver() {
        System.out.println("this is a DefaultRetryListener");
    }
}
  1. bean的初始化配置
package com.likecat.retry;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 14:45
 */
@Configuration
public class RetryConfig {
    @Bean
    public FastRetryStrategy fastRetryStrategy(){
        return new FastRetryStrategy();
    }
    @Bean
    public RetryListener defaultRetryListener(){
        return new DefaultRetryListener();
    }
    @Bean
    public ExecutorService retryThreadPool(){
        ExecutorService executorService = new ThreadPoolExecutor(2,4,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        return executorService;
    }
}
  1. 测试
package com.likecat.retry;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author likecat
 * @version 1.0
 * @date 2022/3/11 14:42
 */
@RestController
public class TestController {

    public static int count = 0;

    @Retry(maxAttempts = 5, delay = 100, value = {ArithmeticException.class}, strategy = FastRetryStrategy.class, listener = DefaultRetryListener.class)
    @GetMapping(value = "/do-test")
    public String doTest(int code) {
        count++;
        System.out.println("code is :" + code + " result is :" + count % 3 + " count is :" + count);
        if (code == 1) {
            System.out.println("--this is a test");
        } else {
            if (count % 5 != 0) {
                System.out.println(4 / 0);
            }
        }
        return "success";
    }
}

一条小咸鱼