Spring Boot项目如何快速从零开始打造一个属于自己的RPC框架

一、前言

在平时spring Boot项目开发过程中,我们进行远程服务调用大都采用@RestController + @RequestMapping相关注解发布接口,使用OpenFeign组件进行微服务之间调用。这套技术架构已经足够完善了,当然没有什么问题,但是作为一个开发者,老是用一套框架天天写代码,不免有点无聊,那么今天我们就从零开始,不使用@RestController + @RequestMapping相关注解发布接口,不使用OpenFeign组件进行远程调用,靠自己,快速从零开始实现一个属于自己的rpc框架,炫一下吧!

二、开发过程

1. 新建Spring Boot项目,导入相关依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-boot-starter</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>***.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
        </dependency>
        <dependency>
            <groupId>***.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>***.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2021.0.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
            <version>3.1.5</version>
        </dependency>
    </dependencies>

2. 自定义注解

既然我们不使用Spring提供的原生相关注解了,那我们为了简化开发,还是自定义相关注解吧。

2.1 定义标识微服务发布RPC接口注解

此注解的作用于接口上,被该注解标识的接口可以被自定义框架把接口中的所有方法自动发布成URL,简化我们代码开发,不用每个方法单独发布,示例代码:

import java.lang.annotation.*;

/**
 * 通过该注解标识接口发布的应用信息
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CustomRpcApp {
    /**
     * 发布接口的应用名称 
     *
     * @return
     */
    String appName() default "";

    /**
     * 应用的contentPath,通过server.servlet.context-path 属性配置
     *
     * @return
     */
    String contentPath() default "";
}
2.2 定义可以限制接口发布的注解

【步骤2.1】中,我们定义了一个接口服务发布URL的注解,但是接口中,有些方法我们不想发布出去,或者有些方法发布出去的url我们想自己定义另外一个名称,不直接使用方法名称,那么为了自定义的RPC逻辑更加灵活,我们增加一个注解,示例代码如下:

import java.lang.annotation.*;

/**
 * RPC方法发布配置注解
 */
@Target({ElementType.METHOD}) // 作用于方法上
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CustomRpcMethodConfig {

    /**
     * 发布方法取别名
     *
     * @return
     */
    String alias() default "";

    /**
     * 标识该方法是否禁止发布
     *
     * @return
     */
    boolean isForbidden() default false;

}
2.3定义RPC服务提供方注解

接口服务被发布以后,还是需要具体实现类提供服务,我们也需要在实现类上去标识一下这是一个RPC服务的提供方,示例代码如下:

/**
 * 声明服务提供方注解
 */
@Target({ElementType.TYPE}) // 作用于类上, 发布整个类的接口
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CustomRpcProvider {
}
2.4 定义RPC消费者注解

我们提供好RPC服务,是需要提供给第三方调用的,为了让第三方调用时,标识自己需要进行RPC调用,我们也可以定义一个注解,示例代码:

/**
 * 声明一个消费者注解
 */
@Target(ElementType.FIELD) // 作用于字段上
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CustomRp***onsumer {
}

3. 开发服务发布功能

定义好接口以后,我们就可以开始进行服务发布的逻辑开发了,我们大概思路是,通过解析自定义注解标识的服务接口,进行URL发布。

3.1 定义全局的服务发布信息保存类

我们可以定义一个RPC服务发布信息的保存类,在服务启动的时候解析一次进行保存,这样方便我们在后续使用的时候直接获取,不用进行二次解析,示例代码:

import lombok.Data;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class RpcProviderHolder {

    /**
     * 保存RpcProvider提供者的信息
     */
    public static final ConcurrentMap<String, RpcProviderInfo> RPC_PROVIDER_MAP = new ConcurrentHashMap<>();

    @Data
    public static class RpcProviderInfo {

        /**
         * rpc服务提供者应用名称
         */
        private String appName;

        /**
         * rpc发布服务前缀,默认是 /beanName
         */
        private String urlPrefix;

        /**
         * rpc发布服务url核心部分,默认是接口方法名称
         */
        private List<RpcMethod> urlCoreMethod;

        /**
         * rpc服务注册到spring容器中的beanName
         */
        private String rpcBeanName;

        /**
         * rpc服务注册到spring容器中的bean
         */
        private Object rpcBean;
    }

    @Data
    public static class RpcMethod {
        /**
         * 方法对象
         */
        private Method method;

        /**
         * 方法别名
         */
        private String alias;
    }
}
3.2 定义发布url统一的请求处理器

这里URL发布逻辑,之前在另一篇博客Spring Boot项目中不使用@RequestMapping相关注解,如何动态发布自定义URL路径,已经有描述,这里我们不在赘述,直接上代码:

import ***.alibaba.fastjson.JSON;
import ***.alibaba.fastjson.JSONArray;
import ***.alibaba.fastjson.JSONObject;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.util.Uri***ponentsBuilder;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class Custom***monHandlerUrl {

    public static final Method HANDLE_CUSTOM_URL_METHOD;

    static {
        // 提前准备方法对象
        Method tempMethod = null;
        try {
            tempMethod = Custom***monHandlerUrl.class.getMethod("handlerCustomUrl", HttpServletRequest.class, HttpServletResponse.class);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
        HANDLE_CUSTOM_URL_METHOD = tempMethod;
    }

    @ResponseBody
    /**
     *  拦截自定义请求的url,可以做成统一的处理器
     */
    public Object handlerCustomUrl(HttpServletRequest request, HttpServletResponse response) throws Exception {
        // 解析请求url
        List<String> pathSegments = Uri***ponentsBuilder.fromUriString(request.getRequestURI()).build().getPathSegments();
        String rpcService = null;
        String methodName = null;
        // url默认格式是 接口名称/方法名称
        if (pathSegments.size() == 2) {
            rpcService = pathSegments.get(0);
            methodName = pathSegments.get(1);
        } else if (pathSegments.size() == 3) { // 可能配置了contentpath,这里偷懒简单判断一下
            rpcService = pathSegments.get(1);
            methodName = pathSegments.get(2);
        }
        // 获取请求体
        String requestBodyJsonString = StreamUtils.copyToString(request.getInputStream(), StandardCharsets.UTF_8);
        // 解析参数
        Object[] params = resolveParams(requestBodyJsonString, rpcService, methodName);
        // 执行方法
        return execute(rpcService, methodName, params);
    }

    /**
     * 执行方法
     *
     * @param rpcService
     * @param methodName
     * @param params
     * @return
     * @throws InvocationTargetException
     * @throws IllegalA***essException
     */
    private Object execute(String rpcService, String methodName, Object[] params) throws InvocationTargetException, IllegalA***essException {
        // 获取RpcProvider的相关信息
        RpcProviderHolder.RpcProviderInfo rpcProviderInfo = RpcProviderHolder.RPC_PROVIDER_MAP.get(rpcService);
        Object rpcBean = rpcProviderInfo.getRpcBean();
        List<RpcProviderHolder.RpcMethod> urlCoreMethod = rpcProviderInfo.getUrlCoreMethod();
        for (RpcProviderHolder.RpcMethod rm : urlCoreMethod) {
            if (rm.getAlias().equals(methodName)) {
                return rm.getMethod().invoke(rpcBean, params); // 找到该方法,然后执行
            }
        }
        return null;
    }

    /**
     * 解析参数
     *
     * @param requestBodyJsonString
     * @param rpcService
     * @param methodName
     * @return
     */
    private Object[] resolveParams(String requestBodyJsonString, String rpcService, String methodName) {
        // 如果没有请求体,参数直接返回null
        if (!StringUtils.hasLength(requestBodyJsonString)) {
            return null;
        }
        List<Object> paramList = new ArrayList<>();
        // 判断当前需要调用的RPCProvider是否存在
        RpcProviderHolder.RpcProviderInfo rpcProviderInfo = RpcProviderHolder.RPC_PROVIDER_MAP.get(rpcService);
        if (rpcProviderInfo == null) {
            throw new RuntimeException("no service : " + rpcService);
        }
        // 解析参数,默认是JSON数组
        JSONArray objects = JSON.parseArray(requestBodyJsonString);
        List<RpcProviderHolder.RpcMethod> urlCoreMethod = rpcProviderInfo.getUrlCoreMethod();
        if (!CollectionUtils.isEmpty(urlCoreMethod)) {
            for (RpcProviderHolder.RpcMethod rm : urlCoreMethod) { // 寻找当前请求对应的需要执行的方法信息
                if (rm.getAlias().equals(methodName)) {
                    Class<?>[] parameterTypes = rm.getMethod().getParameterTypes();
                    if (objects.size() != parameterTypes.length) { // 判断方法参数和方法对象中的参数个数是否匹配
                        throw new RuntimeException(rpcService + " method : " + methodName + " match error!");
                    }
                    for (int i = 0; i < objects.size(); i++) { // 通过参数类型去解析参数,并保存到list中进行返回,后续执行真正的调用
                        Object obj = objects.get(i);
                        if (obj instanceof JSONObject) {
                            Object parse = ((JSONObject) obj).toJavaObject(parameterTypes[i]);
                            paramList.add(parse);
                        } else {
                            paramList.add(obj);
                        }
                    }
                    return paramList.toArray();
                }
            }
            throw new RuntimeException(rpcService + "no method : " + methodName);
        } else {
            throw new RuntimeException(rpcService + "no method : " + methodName);
        }
    }
}
3.3 定义事件监听,实现RPC服务扫描以及发布

我们定义一个事件监听,在项目启动完成后,扫描我们需要发布的接口,进行URL注册,示例代码:

import ***.learn.rpc.anno.CustomRpcApp;
import ***.learn.rpc.anno.CustomRpcMethodConfig;
import ***.learn.rpc.anno.CustomRpcProvider;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.servlet.mvc.method.RequestMappingInfo;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public class RpcProviderScanAndReleaseListener implements ApplicationListener<WebServerInitializedEvent> {

    /**
     * 标识事件监听器是否已经注册,避免重复注册
     */
    private volatile AtomicBoolean flag = new AtomicBoolean(false);

    private final RequestMappingHandlerMapping requestMappingHandlerMapping;

    private final Custom***monHandlerUrl custom***monHandlerUrl;

    public RpcProviderScanAndReleaseListener(RequestMappingHandlerMapping requestMappingHandlerMapping, Custom***monHandlerUrl custom***monHandlerUrl) {
        this.requestMappingHandlerMapping = requestMappingHandlerMapping;
        this.custom***monHandlerUrl = custom***monHandlerUrl;
    }

    @Override
    public void onApplicationEvent(WebServerInitializedEvent event) {
        if (flag.***pareAndSet(false, true)) {
            // 扫描所有rpcProvider注册的bean
            scanRpcProviderBeans(event);
            // 注册所有扫描到的自定义rpcProvider
            registerUrls();
        }

    }

    /**
     * 扫描所有rpcProvider bean信息
     *
     * @param event
     */
    private void scanRpcProviderBeans(WebServerInitializedEvent event) {
        // 找到所有标识@CustomRpcProvider注解的bean
        Map<String, Object> beans = event.getApplicationContext().getBeansWithAnnotation(CustomRpcProvider.class);
        if (!CollectionUtils.isEmpty(beans)) {
            // 遍历所有标识了@CustomRpcProvider注解的bean
            for (Map.Entry<String, Object> entry : beans.entrySet()) {
                String beanName = entry.getKey();
                Object bean = entry.getValue();
                Class beanType = bean.getClass();
                Class<?>[] interfaces = beanType.getInterfaces();
                if (interfaces != null && interfaces.length != 0) {
                    for (Class clazz : interfaces) {
                        CustomRpcApp customRpcApp = (CustomRpcApp) clazz.getAnnotation(CustomRpcApp.class);
                        if (customRpcApp != null) { // 判断当前类上是否有标识指定发布接口的应用名称
                            // 如果符合我们的自定义发布规范
                            RpcProviderHolder.RpcProviderInfo rpcProviderInfo = new RpcProviderHolder.RpcProviderInfo();
                            rpcProviderInfo.setAppName(StringUtils.hasLength(customRpcApp.appName()) ?
                                    customRpcApp.appName()
                                    : event.getApplicationContext().getEnvironment().getProperty("spring.application.name"));
                            rpcProviderInfo.setRpcBeanName(beanName);
                            rpcProviderInfo.setUrlPrefix("/" + clazz.getSimpleName().toLowerCase()); // url前缀取接口名称
                            rpcProviderInfo.setRpcBean(bean);

                            Method[] methods = clazz.getMethods(); //获取所有方法
                            if (methods != null && methods.length != 0) {
                                List<RpcProviderHolder.RpcMethod> methodList = new ArrayList<>();
                                for (Method m : methods) {
                                    RpcProviderHolder.RpcMethod rm = null;
                                    CustomRpcMethodConfig annotation = m.getAnnotation(CustomRpcMethodConfig.class);
                                    if (annotation != null) { // 判断方法是否有@CustomRpcMethodConfig注解
                                        if (annotation.isForbidden()) { // 方法如果禁用,则不保存发布信息
                                            continue;
                                        }
                                        if (StringUtils.hasLength(annotation.alias())) {
                                            rm = new RpcProviderHolder.RpcMethod();
                                            rm.setAlias(annotation.alias());
                                        }
                                    } else {
                                        rm = new RpcProviderHolder.RpcMethod();
                                        rm.setAlias(m.getName());
                                    }
                                    rm.setMethod(m);
                                    methodList.add(rm);
                                }
                                rpcProviderInfo.setUrlCoreMethod(methodList);
                            }

                            RpcProviderHolder.RPC_PROVIDER_MAP.put(clazz.getSimpleName().toLowerCase(), rpcProviderInfo);
                        }
                    }
                }
            }
        }
    }

    /**
     * 注册所有自定义rpcProvider的url信息
     */
    private void registerUrls() {
        if (!CollectionUtils.isEmpty(RpcProviderHolder.RPC_PROVIDER_MAP)) {
            Collection<RpcProviderHolder.RpcProviderInfo> values = RpcProviderHolder.RPC_PROVIDER_MAP.values();
            for (RpcProviderHolder.RpcProviderInfo rpcProviderInfo : values) {
                String urlPrefix = rpcProviderInfo.getUrlPrefix();
                List<RpcProviderHolder.RpcMethod> urlCores = rpcProviderInfo.getUrlCoreMethod();
                if (!CollectionUtils.isEmpty(urlCores)) {
                    for (RpcProviderHolder.RpcMethod rm : urlCores) {
                        // 构建请求映射对象
                        RequestMappingInfo requestMappingInfo = RequestMappingInfo
                                .paths(urlPrefix + "/" + rm.getAlias()) // 请求URL
                                .methods(RequestMethod.POST) // 请求方法,可以指定多个
                                .build();
                        // 发布url,指定一下url的处理器
                        requestMappingHandlerMapping.registerMapping(requestMappingInfo, custom***monHandlerUrl, Custom***monHandlerUrl.HANDLE_CUSTOM_URL_METHOD);
                    }
                }
            }
        }
    }
}
3.4 装配RPCProvider相关配置

对于我们开发的URL统一处理器和RPC服务扫描注册事件监听器,需要注册到Spring容器中,才会生效。

示例代码如下:

import ***.learn.rpc.core.provider.Custom***monHandlerUrl;
import ***.learn.rpc.core.provider.RpcProviderScanAndReleaseListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;

@Configuration
public class CustomRpcProviderConfig {

    @Bean
    public Custom***monHandlerUrl custom***monHandlerUrl() {
        return new Custom***monHandlerUrl();
    }

    @Bean
    public RpcProviderScanAndReleaseListener rpcProviderReleaseListener(RequestMappingHandlerMapping requestMappingHandlerMapping, Custom***monHandlerUrl custom***monHandlerUrl) {
        return new RpcProviderScanAndReleaseListener(requestMappingHandlerMapping, custom***monHandlerUrl);
    }

}
3.5 自定义Swagger扫描逻辑进行接口暴露查看

通过【步骤3.3】,我们扫描到了需要发布服务的接口,并且成功注册了URL信息,但是此时我们看不到自己发布的接口,不能判断发布有没有成功,而且看不到请求的URL路径,此时我们需要集成swagger自定义扫描逻辑,发布一下我们的接口信息。

  • 发布接口

    import ***.learn.rpc.core.provider.RpcProviderHolder;
    import lombok.Setter;
    import lombok.SneakyThrows;
    import org.springframework.http.MediaType;
    import org.springframework.util.CollectionUtils;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.method.HandlerMethod;
    import org.springframework.web.servlet.mvc.method.RequestMappingInfo;
    import springfox.documentation.RequestHandler;
    import springfox.documentation.spi.service.RequestHandlerProvider;
    import springfox.documentation.spring.web.WebMvcRequestHandler;
    import springfox.documentation.spring.web.readers.operation.HandlerMethodResolver;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    
    public class FoxRpcUrlInfoProvider implements RequestHandlerProvider {
    
        @Setter
        private HandlerMethodResolver methodResolver;
    
        @SneakyThrows
        @Override
        public List<RequestHandler> requestHandlers() {
    
            List<RequestHandler> requestHandlers = new ArrayList<>();
    
            if (!CollectionUtils.isEmpty(RpcProviderHolder.RPC_PROVIDER_MAP)) {
                /**
                 * 拿到扫描到的RPCProvider信息,进行发布
                 */
                Collection<RpcProviderHolder.RpcProviderInfo> values = RpcProviderHolder.RPC_PROVIDER_MAP.values();
                for (RpcProviderHolder.RpcProviderInfo rpcProviderInfo : values) {
                    String urlPrefix = rpcProviderInfo.getUrlPrefix();
                    List<RpcProviderHolder.RpcMethod> urlCores = rpcProviderInfo.getUrlCoreMethod();
                    Object rpcBean = rpcProviderInfo.getRpcBean();
                    if (!CollectionUtils.isEmpty(urlCores)) {
                        for (RpcProviderHolder.RpcMethod urlCore : urlCores) {
                            RequestMappingInfo mappingInfo = RequestMappingInfo.paths(urlPrefix + "/" + urlCore.getAlias())
                                    .methods(RequestMethod.POST)
                                    .produces(MediaType.APPLICATION_JSON_VALUE)
                                    .consumes(MediaType.APPLICATION_JSON_VALUE)
                                    .build();
                            HandlerMethod handlerMethod = new HandlerMethod(rpcBean, urlCore.getMethod());
                            requestHandlers.add(new WebMvcRequestHandler("/", methodResolver, mappingInfo, handlerMethod));
                        }
                    }
                }
            }
            // 忽略请求真实映射
    
            return requestHandlers;
        }
    }
    
  • 装配Swagger相关配置

    @Configuration
    @EnableOpenApi
    public class SwaggerConfig {
    
        @Bean
        @ConditionalOnClass(HandlerMethodResolver.class)
        public FoxRpcUrlInfoProvider foxRpcUrlInfoProvider(HandlerMethodResolver methodResolver) {
            FoxRpcUrlInfoProvider foxRpcUrlInfoProvider = new FoxRpcUrlInfoProvider();
            foxRpcUrlInfoProvider.setMethodResolver(methodResolver);
            return foxRpcUrlInfoProvider;
        }
    }
    

3.6 yaml文件中增加相关配置

server:
  port: 8080
  servlet:
    context-path: /myrpc

spring:
  application:
    name: myrpcdemo
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher

  cloud: 
    nacos: # 需要依赖注册中心进行远程调用
      discovery:
        enabled: true
        namespace: public
        username: nacos
        password: nacos
      server-addr: 127.0.0.1:8848

management:
  server:
    port: 9999

3.8 定义测试发布服务

  • 定义接口

    import ***.learn.rpc.anno.CustomRpcApp;
    
    import java.util.Map;
    
    @CustomRpcApp(appName = "myrpcdemo", contentPath = "/myrpc")
    public interface TestProvider {
    
        String testRpc1();
    
        String testRpc2(String name);
    
        Map<String, Object> testRpc3(Map<String, Object> map);
    
        String testRpc4();
    
    }
    
  • 定义接口实现类

    import ***.learn.rpc.anno.CustomRpcProvider;
    import org.springframework.stereotype.***ponent;
    
    import java.util.Map;
    
    @***ponent
    @CustomRpcProvider
    public class TestProviderImpl implements TestProvider {
    
        @Override
        public String testRpc1() {
            return "I am empty method....";
        }
    
        @Override
        public String testRpc2(String name) {
            return "hello, " + name;
        }
    
        @Override
        public Map<String, Object> testRpc3(Map<String, Object> map) {
            map.put("code", 500);
            return map;
        }
    
        @Override
        public String testRpc4() {
            return null;
        }
    }
    

3.7 Swagger发布测试

启动项目,访问http://127.0.0.1:8080/myrpc/swagger-ui/index.html地址,可以看到我们的接口已经成功发布了。

可以看到,接口中4个方法都成功发布了,此时,如果我们想修改发布URL,或者禁止某些方法发布,就可以使用我们之前定义的@CustomRpcMethodConfig注解,我们修改一下发布接口:

import ***.learn.rpc.anno.CustomRpcApp;
import ***.learn.rpc.anno.CustomRpcMethodConfig;

import java.util.Map;

@CustomRpcApp(appName = "myrpcdemo", contentPath = "/myrpc")
public interface TestProvider {

    String testRpc1();

    String testRpc2(String name);

    @CustomRpcMethodConfig(alias = "testRpc3033")
    Map<String, Object> testRpc3(Map<String, Object> map);

    @CustomRpcMethodConfig(isForbidden = true)
    String testRpc4();
}

重新启动,刷新页面,可以看到,此时注解已经生效了。

3.9 开发服务消费功能

开发好RPC服务扫描注册功能以后,剩下的就是RPC服务消费方的开发了。RPC消费方的逻辑也比较简单,就是使用@CustomRp***onsumer注解标识一个类中字段,该字段是RPC发布接口的类型,框架只需要负责给该字段塞入一个代理对象就可以 了,此时通过该类变量进行调用的时候就是远程RPC调用。

3.9.1 定义RPC消费方注解扫描监听

项目启动时,我们需要扫描@CustomRp***onsumer注解,并通过动态代理,生成代理对象,注入到@CustomRp***onsumer注解标识字段中,这样才可以进行正常调用。

import ***.learn.rpc.anno.CustomRp***onsumer;
import ***.learn.rpc.utils.FieldAnnotationUtils;
import ***.learn.rpc.utils.RpcProxyUtils;
import lombok.SneakyThrows;
import org.springframework.boot.web.context.WebServerApplicationContext;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 扫描到@CustomRp***onsumer,并注入动态代理对象
 */
public class ConsumerScanAndFillListener implements ApplicationListener<WebServerInitializedEvent> {

    /**
     * 标识事件监听器是否已经注册,避免重复注册
     */
    private volatile AtomicBoolean flag = new AtomicBoolean(false);

    @SneakyThrows
    @Override
    public void onApplicationEvent(WebServerInitializedEvent event) {
        if (flag.***pareAndSet(false, true)) {
            WebServerApplicationContext applicationContext = event.getApplicationContext();
            String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();
            for (String name : beanDefinitionNames) { // 遍历所有的bean
                Object bean = applicationContext.getBean(name);
                List<FieldAnnotationUtils.FieldAnnotationInfo> fieldAnnotationInfos = FieldAnnotationUtils.parseFieldAnnotationInfo(bean, CustomRp***onsumer.class);
                if (!CollectionUtils.isEmpty(fieldAnnotationInfos)) { // 判断bean的字段上是否存在@CustomRp***onsumer注解
                    for (FieldAnnotationUtils.FieldAnnotationInfo fieldAnnotationInfo : fieldAnnotationInfos) {
                        Field field = fieldAnnotationInfo.getField();
                        boolean a***essFlag = field.isA***essible();
                        if (!a***essFlag) {
                            field.setA***essible(true);
                        }

                        Class<?> type = field.getType();
                        // 生成代理对象,将代理对象注入到当前bean对象中
                        Object proxyObj = RpcProxyUtils.createProxyObj(type);
                        field.set(fieldAnnotationInfo.getObj(), proxyObj);

                        field.setA***essible(a***essFlag);
                    }
                }
            }

        }
    }
}
3.9.2 字段注解扫描工具类

之前,我有博客Spring Boo项目中方法参数对象中字段上存在的自定义注解如何进行拦截解析描述过FieldAnnotationUtils的作用,这里不再赘述:

import lombok.Data;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.util.ReflectionUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * 字段上注解查找工具类
 */
public class FieldAnnotationUtils {

    /**
     * 字段注解信息返回VO
     */
    @Data
    public static class FieldAnnotationInfo {
        /**
         * 类变量
         */
        private Object obj;
        /**
         * 携带该注解的字段对象
         */
        private Field field;
        /**
         * 注解对象
         */
        private Annotation annotation;
    }

    /**
     * 判断类变量中是否字段上存在指定的注解,如果存在,则返回字段和注解信息
     *
     * @param obj            类变量
     * @param annotationType 注解类型
     * @return
     */
    public static List<FieldAnnotationUtils.FieldAnnotationInfo> parseFieldAnnotationInfo(Object obj, Class annotationType) {
        return parseFieldAnnotationInfo(obj, annotationType, null);
    }

    /**
     * 判断类变量中是否字段上存在指定的注解,如果存在,则返回字段和注解信息
     *
     * @param obj              类变量
     * @param annotationType   注解类型
     * @param filterFieldClazz 注解适用的字段类型(不适用的字段类型即使字段上面添加改注解也不生效)
     * @return
     */
    public static List<FieldAnnotationUtils.FieldAnnotationInfo> parseFieldAnnotationInfo(Object obj, Class annotationType, Set<Class> filterFieldClazz) {
        if (obj == null) {
            return null;
        }
        List<FieldAnnotationUtils.FieldAnnotationInfo> resultList = new ArrayList<>();
        /**
         * 获取该对象的所有字段,进行遍历判断
         */
        ReflectionUtils.doWithFields(obj.getClass(), field -> {
            // 判断该字段上是否存在注解
            Annotation annotation = AnnotatedElementUtils.findMergedAnnotation(field, annotationType);
            if (annotation != null) { // 如果存在指定注解
                boolean flag = true;
                if (filterFieldClazz != null && !filterFieldClazz.isEmpty()) { // 如果指定了适用的字段的类型
                    for (Class c : filterFieldClazz) {
                        if (c.isAssignableFrom(field.getType())) { // 判断该字段类型是否符合使用类型,使用isAssignableFrom方法是为了父类也进行判断
                            break;
                        }
                        flag = false;
                    }
                }
                if (flag) { // 如果该字段类型符合,则返回字段注解信息
                    FieldAnnotationUtils.FieldAnnotationInfo fieldAnnotationInfo = new FieldAnnotationUtils.FieldAnnotationInfo();
                    fieldAnnotationInfo.setObj(obj);
                    fieldAnnotationInfo.setField(field);
                    fieldAnnotationInfo.setAnnotation(annotation);
                    resultList.add(fieldAnnotationInfo);
                }
            }
        });
        return resultList;
    }
}
3.9.3 动态代理工具类

由于服务消费方一般都是引入服务提供方的接口层,所以我们采用JDK的动态代理:

import ***.learn.rpc.anno.CustomRpcApp;
import ***.learn.rpc.anno.CustomRpcMethodConfig;
import ***.learn.rpc.core.consumer.Rp***onsumerExecutor;
import org.springframework.util.StringUtils;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class RpcProxyUtils {

    public static <T> T createProxyObj(Class clazz) {
        if (!clazz.isInterface()) { // 接口才可以进行代理
            throw new IllegalArgumentException(clazz + " is not a interface!");
        }
        return (T) Proxy.newProxyInstance(RpcProxyUtils.class.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 获取服务发布接口上的@CustomRpcApp注解
                CustomRpcApp annotation = (CustomRpcApp) clazz.getAnnotation(CustomRpcApp.class);
                // 获取到@CustomRpcApp注解相关属性拼接出url
                String appName = annotation.appName();
                String contentPath = annotation.contentPath();
                StringBuilder urlSB = new StringBuilder().append("/");
                if (StringUtils.hasLength(contentPath)) {
                    urlSB.append(contentPath).append("/");
                }
                urlSB.append(clazz.getSimpleName().toLowerCase()).append("/");
                CustomRpcMethodConfig customRpcMethodConfig = method.getAnnotation(CustomRpcMethodConfig.class);
                // // 获取到@CustomRpcMethodConfig注解相关属性拼接出url
                if (customRpcMethodConfig != null && StringUtils.hasLength(customRpcMethodConfig.alias())) {
                    urlSB.append(customRpcMethodConfig.alias());
                } else {
                    urlSB.append(method.getName());
                }
                String url = urlSB.toString();
                // 由于当前接口在服务消费方并没有实现类,不能对实现类增强,可以增加一个统一的切入点执行逻辑
                return Rp***onsumerExecutor.execute(appName + url, args, method.getReturnType());
            }
        });
    }

}
3.9.4 定义统一的RPC外调执行工具类

我们定义一个统一的RPC外调执行工具类,所有被代理以后的接口,执行方法时,都经过该底层工具类:

import ***.fasterxml.jackson.core.JsonProcessingException;
import ***.fasterxml.jackson.databind.ObjectMapper;
import ***.learn.rpc.utils.RpcRestTemplateUtils;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;

import java.nio.charset.StandardCharsets;

public class Rp***onsumerExecutor {
    /**
     * 默认是Http协议
     */
    private static final String SCHEME = "http://";

    private static ObjectMapper objectMapper = new ObjectMapper();

    public static Object execute(String url, Object[] params, Class<?> resultType) throws Exception {
        // 获取RestTemplate对象
        RestTemplate restTemplate = RpcRestTemplateUtils.restTemplate();
        // 构建请求体
        HttpEntity<?> httpEntity = createHttpEntity(params);
        // 进行远程rpc请求
        ResponseEntity responseEntity = restTemplate.exchange(SCHEME + url, HttpMethod.POST, httpEntity, resultType);
        // 返回接口
        return responseEntity.getBody();
    }

    /**
     * 构建请求体,默认是JSON数组
     *
     * @param params
     * @return
     * @throws JsonProcessingException
     */
    private static HttpEntity<?> createHttpEntity(Object[] params) throws JsonProcessingException {
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.setContentType(MediaType.APPLICATION_JSON);
        if (params != null && params.length != 0) {
            StringBuilder builder = new StringBuilder();
            builder.append("[");
            for (int i = 0; i < params.length; i++) {
                builder.append(objectMapper.writeValueAsString(params[i]));
                if (i != params.length - 1) {
                    builder.append(",");
                }
            }
            builder.append("]");
            return new HttpEntity<>(builder.toString().getBytes(StandardCharsets.UTF_8), httpHeaders);
        } else {
            return new HttpEntity<>(httpHeaders);
        }
    }
}
3.9.5 定义RestTemplate工具类

为了适配底层统一rpc调用工具类,我们增加适配RestTemplate工具类:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.client.RestTemplate;

import javax.annotation.PostConstruct;

public class RpcRestTemplateUtils {

    private static RpcRestTemplateUtils rpcRestTemplateUtils;

    @Autowired
    private RestTemplate restTemplate;

    @PostConstruct
    public void init() {
        rpcRestTemplateUtils = this;
    }

    public static RestTemplate restTemplate() {
        return rpcRestTemplateUtils.restTemplate;
    }

}
3.9.6 装配RP***onsumer相关配置
import ***.learn.rpc.core.consumer.ConsumerScanAndFillListener;
import ***.learn.rpc.utils.RpcRestTemplateUtils;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

@Configuration
public class CustomRp***onsumerConfig {

    @Bean
    public ConsumerScanAndFillListener consumerScanAndFillListener() {
        return new ConsumerScanAndFillListener();
    }

    @Bean
    @LoadBalanced // 需要增加@LoadBalanced注解,才可以通过应用名称通过注册中心调用
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate(okHttp3ClientHttpRequestFactory());
        return restTemplate;
    }

    /**
     * 底层通信采用性能更好的OkHttp
     *
     * @return
     */
    @Bean
    public OkHttp3ClientHttpRequestFactory okHttp3ClientHttpRequestFactory() {
        OkHttp3ClientHttpRequestFactory okHttp3ClientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory();
        okHttp3ClientHttpRequestFactory.setConnectTimeout(5000);
        okHttp3ClientHttpRequestFactory.setReadTimeout(5000);
        return okHttp3ClientHttpRequestFactory;
    }

    @Bean
    public RpcRestTemplateUtils rpcRestTemplateUtils() {
        return new RpcRestTemplateUtils();
    }

}
3.10 定义测试消费服务
import ***.learn.rpc.anno.CustomRp***onsumer;
import ***.learn.rpc.test.provider.TestProvider;
import org.springframework.stereotype.***ponent;

import java.util.Map;

@***ponent
public class TestRp***onsumer {

    @CustomRp***onsumer // 标识这是一个远程RPC调用
    private TestProvider testProvider;


    public String testRpc1() {
        return testProvider.testRpc1();
    }

    public String testRpc2(String name) {
        return testProvider.testRpc2(name);
    }

    public Map<String, Object> testRpc3(Map<String, Object> map) {
        return testProvider.testRpc3(map);
    }

}
3.11 RPC功能测试
import ***.learn.rpc.test.consumer.TestRp***onsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
public class Application {

    private static Application application;

    @Autowired
    private TestRp***onsumer testRp***onsumer;

    @PostConstruct
    public void init() {
        application = this;
    }


    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
        System.out.println("=====================");
        System.out.println("testRpc1 method run: " + application.testRp***onsumer.testRpc1());
        System.out.println("testRpc2 method run: " + application.testRp***onsumer.testRpc2("CSDN"));
        Map<String, Object> map = new HashMap<>();
        map.put("testKey", "我是测试数据...");
        System.out.println("testRpc3 method run: " + application.testRp***onsumer.testRpc3(map));
        System.out.println("=====================");
    }
}

启动项目的,然后查看输出:

可以看到此时,调用是没有问题的,RPC调用通过我们自定义的框架逻辑实现了。

三、写在最后

通过上面的开发,我们完成了自定义的RPC框架开发,不通过@RestController + @RequestMapping相关注解也可以实现接口发布,不使用OpenFeign组件也实现了接口调用,上面为了开发演示简单,RPC服务提供方和RPC服务调用方并没有分开,小伙伴们自己有精力也可以把上面代码打成jar包,分开RPC服务提供方和消费方进行功能测试,这样体验更加明显哦,到此我们就完成了属于自己的简单RPC框架了。

转载请说明出处内容投诉
CSS教程_站长资源网 » Spring Boot项目如何快速从零开始打造一个属于自己的RPC框架

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买