Write a rpc framework from 0.5 to 1-5: service monitoring and management (actuator)

Write a rpc framework from 0.5 to 1-5: service monitoring and management (actuator)

Microservices generally need to monitor the status of an instance and do some intervention on it. This function can be achieved through spring's endpoint.

As long as the spring-boot-starter-actuator is introduced into the springboot project, some management service interfaces can be obtained, such as stopping the service and obtaining service information. He is not using a controller, but an Endpoint, but the main functions are similar.

With the help of the filter mechanism implemented in the previous section, this function can be implemented without changing the core code of the framework. As a practice, write two functions: obtain server-side statistical data, service status control

Create a new module acuprpc-spring-boot-starter-actuator.

In order to manage the endpoint of this framework uniformly, a parent class is defined. The ids of all subclasses are prefixed with "rpc" by default

public abstract class AbstractRpcEndpoint<T> extends AbstractEndpoint<T> {
    private static final String PREFIX = "rpc";

    public AbstractRpcEndpoint(String id) {
        super(PREFIX + id);
    }
    public AbstractRpcEndpoint(String id, boolean sensitive) {
        super(PREFIX + id, sensitive);
    }
    public AbstractRpcEndpoint(String id, boolean sensitive, boolean enabled) {
        super(PREFIX + id, sensitive, enabled);
    }
}

Statistics

MonitorFilter

Use filter to intercept requests and count the number of processed requests.

@Getter
public class MonitorFilter implements RpcFilter {

    private Map<String, RequestCount> requestCountMap = new ConcurrentHashMap<>();

    @Override
    public void doFilter(RpcRequest request, RpcResponse response, RpcFilterChain filterChain) {
        RequestCount count = requestCountMap.computeIfAbsent(request.getKey(), RequestCount::new);
        count.received.increment();
        count.invoking.increment();
        try {
            filterChain.doFilter(request, response);
            count.success.increment();
        } catch (Exception e) {
            count.failed.increment();
            throw e;
        } finally {
            count.invoking.decrement();
        }
    }

    @Getter
    public static class RequestCount {
        private String key;
        private LongAdder received = new LongAdder();//Received
        private LongAdder invoking = new LongAdder();//In execution
        private LongAdder success = new LongAdder();//Processed successfully
        private LongAdder failed = new LongAdder();//Processing failed

        public RequestCount(String key) {
            this.key = key;
        }
    }
}

RpcStatEndpoint

Provide http interface, and you can get the return value of invoke() through/rpcstat.

public class RpcStatEndpoint extends AbstractRpcEndpoint<Map<String, Object>> {
    private MonitorFilter filter;

    public RpcStatEndpoint(MonitorFilter filter) {
        super("stat");
        this.filter = filter;
    }

    @Override
    public Map<String, Object> invoke() {
        Map<String, Object> result = new HashMap<>();
        Collection<MonitorFilter.RequestCount> counts = filter.getRequestCountMap().values();
        result.put("counts", counts);
        result.put("serving", counts.stream().anyMatch(t -> t.getInvoking().sum()> 0L));
        return result;
    }
}

Service management

RejectFilter

Use filter to intercept requests and maintain a offline state in the filter. If offline, all requests are rejected (for this return value, the client can rediscover other nodes).

@Data
public class RejectFilter implements RpcFilter {
    private boolean reject = false;
   //The processing logic for rejecting requests can also be customized
    private BiConsumer<RpcRequest, RpcResponse> rejectFunction = (rpcRequest, response) -> response.reject();

    @Override
    public void doFilter(RpcRequest request, RpcResponse response, RpcFilterChain filterChain) {
        if (reject) {
            rejectFunction.accept(request, response);
            return;
        }
        filterChain.doFilter(request, response);
    }
}

EndpointMvcAdapter

Endpoint is very convenient to use, but it is not so flexible relative to the controller. For example, if I want the interface to support parameters, I need some other operations. I wrap the Endpoint with EndpointMvcAdapter once. In order to reuse, I wrote a general EndpointMvcAdapter, through reflection to call the method specified by the parameters.

@Slf4j
public class ReflectEndpointMvcAdapter extends EndpointMvcAdapter implements RpcCode {
    private Map<String, Method> methodMap = new HashMap<>();
    private Set<String> ipWhiteList = new HashSet<>();

    public ReflectEndpointMvcAdapter(Endpoint<?> delegate, String ipWhiteList) {
        super(delegate);
        Method[] methods = delegate.getClass().getMethods();
       //...
    }

    @RequestMapping(value = "/{name:.*}", method = RequestMethod.GET, produces = {
            ActuatorMediaTypes.APPLICATION_ACTUATOR_V1_JSON_VALUE,
            MediaType.APPLICATION_JSON_VALUE
    })
    @ResponseBody
    @HypermediaDisabled
    public Object invoke(HttpServletRequest request, HttpServletResponse response, @PathVariable String name) {
        if (!checkIp(request)) {
           //...
        }
        Method method = methodMap.get(name);
       //...
        try {
            return method.invoke(getDelegate());
        } catch (Exception e) {
           //...
        }
    }

    private boolean checkIp(HttpServletRequest request) {
      //...
    }

    private String getIp(HttpServletRequest request) {
       //...
    }
}

RpcEndpoint

Because I want to use ReflectEndpointMvcAdapter, the invoke method does not expect to use it for the time being (called when/rpc is used), and it returns null.

public class RpcEndpoint extends AbstractRpcEndpoint<Object> implements RpcCode {
    private RejectFilter filter;

    public RpcEndpoint(RejectFilter filter) {
        super("");
        this.filter = filter;
    }

    @Override
    public Object invoke() {
        return null;
    }

    public void online() {
        filter.setReject(false);
    }

    public void offline() {
        filter.setReject(true);
    }

    public int status() {
        if (filter.isReject()) {
            throw new HttpStatusException(NOT_AVAILABLE);
        }
        return 0;
    }

}

Wrapping when defining the bean

@Bean
    public ReflectEndpointMvcAdapter rpcEndpoint(RejectFilter rejectFilter) {
        return new ReflectEndpointMvcAdapter(process(new RpcEndpoint(rejectFilter)), ipWhiteList);
    }

    private <T extends AbstractRpcEndpoint<?>> T process(T endpoint) {
        endpoint.setSensitive(sensitive);
        return endpoint;
    }

Now, as long as you introduce acuprpc-spring-boot-starter-actuator, you can get these http interfaces. With the help of these interface services, you can resend gracefully.