Write an rpc framework from 0.5 to 1-1: service registration/discovery (eureka)

Write an rpc framework from 0.5 to 1-1: service registration/discovery (eureka)

1. implement a service registration discovery function

-acuprpc
    + acuprpc-core//server/client core processing logic
    + acuprpc-spring-boot-starter//server-side service scan, client-side dynamic proxy, service registration/discovery

Eureka Server

spring-cloud-starter-eureka-server

Eureka Client

The principle is to use the client class provided by eureka to send a registration request to Eureka Server, and tell the registration center the address and port (rpc service port, not the http port started by springboot) of the service provided by itself, so that other clients (including themselves) You can request Eureka Server to obtain the required service node information.

/**
 * Instance registered in the service center
 */
@Getter
@Slf4j
public class RpcInstance {

    private EurekaClient eurekaClient;

    private ApplicationInfoManager applicationInfoManager;

    private RpcConf rpcConf;

    public RpcInstance(RpcConf rpcConf) {
        RpcEurekaInstanceConfig instanceConfig = new RpcEurekaInstanceConfig();

        instanceConfig.setAppGroupName(rpcConf.getAppGroup());
        instanceConfig.setAppname(rpcConf.getAppName());
        instanceConfig.setNonSecurePort(rpcConf.getPort());
        instanceConfig.setIpAddress(IpUtil.INTRANET_IP);
        instanceConfig.setHostname(IpUtil.HOSTNAME);

        RpcEurekaClientConfig clientConfig = new RpcEurekaClientConfig();
        clientConfig.getServiceUrl().put("default", rpcConf.getDiscoveryServiceUrl());
        clientConfig.setRegisterWithEureka(rpcConf.isRegisterWithDiscovery());

        InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
        this.applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
        this.eurekaClient = new DiscoveryClient(applicationInfoManager, clientConfig);
        this.rpcConf = rpcConf;
        log.info("protocol server -> "+ rpcConf.getRpcServerClass());
        log.info("protocol client -> "+ rpcConf.getRpcClientClass());
    }

    public void start() {
        applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.STARTING);
    }

    public void started() {
        applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
    }

    public void shutdown() {
        applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
        eurekaClient.shutdown();
    }

   /**
     * Create an rpc server, and generate objects according to the configured call method (implementation class)
     */
    @SneakyThrows
    public RpcServer newRpcServer() {
        return rpcConf.getRpcServerClass().getConstructor(RpcInstance.class).newInstance(this);
    }

   /**
     * Create an rpc client and generate objects according to the configured call method (implementation class)
     */
    @SneakyThrows
    public RpcClient newRpcClient(NodeInfo nodeInfo) {
        return rpcConf.getRpcClientClass().getConstructor(NodeInfo.class).newInstance(nodeInfo);
    }
}

starter

Build your own spring boot starter, so that other projects only need to introduce this dependency to use the services provided by the starter.

-resources
    -META-INF
        spring.factories//Define the path of the @Configuration class. With this statement, projects that depend on the starter can get the beans provided in the starter
        spring-configuration-metadata.json//Configuration information (optional), with it, you can see the prompt information by editing the application configuration file in the IDE

rpc server service management

As an rpc service provider, you need to manage the services with annotations (@Rpc) when the application starts, so that after receiving the rpc request, you can quickly query the specified object and execute the specified method.

The bean that implements the interface BeanPostProcessor can be processed for all beans in spring (the interface method will be called after each bean is initialized).

public class RpcServiceScanner implements BeanPostProcessor {

    private RpcServer rpcServer;

    public RpcServiceScanner(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
       //AOP proxy class needs to get the original class, otherwise the annotations on the class cannot be read
        Class<?> beanClass = AopUtils.isAopProxy(bean)? AopUtils.getTargetClass(bean): bean.getClass();
        val nrpc = beanClass.getAnnotation(Rpc.class);
        if (nrpc == null) {
            return bean;
        }

        Method[] methods = beanClass.getDeclaredMethods();
        if (methods.length == 0) {
            return bean;
        }
        Map<String, MethodInfo> methodInfoMap = new HashMap<>();
        for (Method method: methods) {
            methodInfoMap.put(method.getName(), new MethodInfo(method));
        }

        Class<?>[] interfaces = beanClass.getInterfaces();
        if (interfaces.length == 0) {
            return bean;
        }
       //The client calls the server through the interface, so it does not know the path of the specific implementation class, only the interface name, so all interfaces are registered again
        for (Class<?> serviceInterface: interfaces) {
            rpcServer.registerService(
                    new RpcServiceInfo(rpcServer.getRpcInstance().getRpcConf().getAppName(),
                            serviceInterface.getCanonicalName()),
                    bean,
                    serviceInterface,
                    methodInfoMap);
        }
        return bean;
    }
}

rpc client remote service agent

As a service caller, you can call remote services through the interface like calling local code. The principle is to create a proxy for the interface and make remote calls in the proxy.

The method of actively creating an agent is used here.

public class RpcServiceConsumer {

    private RpcClientManager rpcClientManager;

    public RpcServiceConsumer(RpcClientManager rpcClientManager) {
        this.rpcClientManager = rpcClientManager;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(String appName, Class<T> serviceInterface) {
        RpcServiceInfo serviceInfo = new RpcServiceInfo(appName, serviceInterface.getCanonicalName());
        return (T) Proxy.newProxyInstance(
                serviceInterface.getClassLoader(),
                new Class<?>[]{serviceInterface},
                new RpcInvocationHandler(serviceInfo, rpcClientManager));
    }
}
public class RpcInvocationHandler implements InvocationHandler {

    private RpcServiceInfo rpcServiceInfo;

    private RpcServiceManager rpcServiceManager;

    private RpcClient rpcClient;

    public RpcInvocationHandler(RpcServiceInfo rpcServiceInfo, RpcServiceManager rpcServiceManager) {
        this.rpcServiceInfo = rpcServiceInfo;
        this.rpcServiceManager = rpcServiceManager;
        tryInitRpcClient(false);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcMethodInfo methodInfo = new RpcMethodInfo(rpcServiceInfo, method.getName(), method.getGenericReturnType());
        return tryGetRpcClient().invoke(methodInfo, args);
    }

    private RpcClient tryGetRpcClient() {
        if (rpcClient == null) {
            tryInitRpcClient(true);
        }
        return rpcClient;
    }

    private synchronized void tryInitRpcClient(boolean throwError) {
        if (rpcClient != null) {
            return;
        }
        try {
            rpcClient = rpcServiceManager.lookup(rpcServiceInfo);
        } catch (Exception e) {
            if (throwError) {
                throw e;
            }
        }
    }
}

Register an ApplicationListener, receive the signal after the springboot program is ready, and then tell the registration center that it is ready.

public class RpcApplicationListener implements ApplicationListener<ApplicationReadyEvent> {

    private RpcServer rpcServer;

    public RpcApplicationListener(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        rpcServer.started();
    }
}

1: Service registration/discovery (eureka)

2: Remote service call (grpc)

3: Remote service call (thrift)

4:request filter

5: Service monitoring and management (actuator)

6: Call the abnormal node to automatically retry

7: Gateway support (gateway)