Write a rpc framework from 0.5 to 1-3: remote service call (thrift)

Write a rpc framework from 0.5 to 1-3: remote service call (thrift)

This is similar to the previous one, except that the framework of remote calling is changed. If you are interested, you can implement more ways. Here is only one attempt.

thrift is Facebook's open source rpc framework, based on TPC, using binary by default.

You need to master the basic usage of thrift first: thrift-Java example

Project structure

-acuprpc
    + acuprpc-core//server/client core processing logic
    + acuprpc-protocol-thrift//Remote call based on thrift
    + acuprpc-spring-boot-starter//server-side service scan, client-side dynamic proxy, service registration/discovery

thrift communication

Interface definition

Define the data structure used by the service provider (server) and the service caller (client) to communicate. The client needs to tell the server the class name, method name, and parameters (string in json format, deserialized on the server side) to be called.

resources/service.thrift

namespace java com.acupt.acuprpc.protocol.thrift.proto
service ThriftService{
    InvokeResponse invokeMethod(1: InvokeRequest invokeRequest)
}

struct InvokeRequest{
1: required string appName;
2: required string serviceName;
3: required string methodName;
4: required list<string> orderedParameter;
5: required map<string,string> namedParameter;
}

struct InvokeResponse{
1: required i32 code;
2: optional string message;
3: optional string result;
}

thrift-service

This class is responsible for receiving the request sent by thrift-client, taking out the parameters in the request, converting it into a general structure, and passing it to the RpcServer of the core layer to execute the corresponding method, and then serializing the return value into json and returning it to thrift-client.

public class ThriftService implements com.acupt.acuprpc.protocol.thrift.proto.ThriftService.Iface {

    private RpcServer rpcServer;

    public ThriftService(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    @Override
    public InvokeResponse invokeMethod(InvokeRequest invokeRequest) {
        RpcRequest rpcRequest = new RpcRequest(
                invokeRequest.getAppName(),
                invokeRequest.getServiceName(),
                invokeRequest.getMethodName(),
                invokeRequest.getOrderedParameter(),
                invokeRequest.getNamedParameter());
        RpcResponse rpcResponse = rpcServer.execute(rpcRequest);
        InvokeResponse response = new InvokeResponse();
        response.setCode(rpcResponse.getCode());
        response.setMessage(rpcResponse.getMessage());
        response.setResult(rpcResponse.getResultString());
        return response;
    }
}

thrift-server

The specific implementation class of the crop service provider only needs to implement two methods: start the service and close the service, and leave the rest to the parent class of the core layer.

Since thrift server will block the thread after calling the serve() method, it is necessary to start another thread to start the service.

public class ThriftServer extends RpcServer {
    private static final int nThreads = 100;
    private TServer server;
    public ThriftServer(RpcInstance rpcInstance) {
        super(rpcInstance);
    }

    @Override
    protected void startRpc() {
        new Thread(() -> {
            TProcessor tprocessor = new com.acupt.acuprpc.protocol.thrift.proto.ThriftService.
                    Processor<com.acupt.acuprpc.protocol.thrift.proto.ThriftService.Iface>(new ThriftService(this));
            TServerTransport serverTransport = null;
            try {
                serverTransport = new TServerSocket(getRpcInstance().getRpcConf().getPort());
            } catch (TTransportException e) {
                throw new RpcException(e);
            }
            TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);
            tArgs.processor(tprocessor);
            tArgs.executorService(Executors.newFixedThreadPool(nThreads));
            server = new TThreadPoolServer(tArgs);
            server.serve();//blocking
        }).start();
    }

    @Override
    protected void shutdownRpc() {
        if (server != null) {
            server.setShouldStop(true);
        }
    }
}

thrift-client

As a service caller, you need to package the request information from the dynamic proxy class into a structure supported by thrift, call the thrift request method, and then return the result returned by the remote service to the proxy class.

The thrift client is not thread-safe, as can be seen from the methods it provides.

public void send_invokeMethod(InvokeRequest invokeRequest){
   //...
}

public InvokeResponse recv_invokeMethod(){
   //...
}

public InvokeResponse invokeMethod(InvokeRequest invokeRequest) throws org.apache.thrift.TException
{
    send_invokeMethod(invokeRequest);
    return recv_invokeMethod();
}

For simplicity, directly set the method to synchronized, and then use the object pool later

public class ThriftClient extends RpcClient implements RpcCode {

    private AtomicReference<ThriftService.Client> clientRef;

    public ThriftClient(NodeInfo nodeInfo) {
        super(nodeInfo);
        clientRef = new AtomicReference<>(getClient(nodeInfo));
    }

   //todo client thread is not safe, use connection pool management
    @Override
    @SneakyThrows
    protected synchronized String remoteInvoke(RpcRequest rpcRequest) {
        InvokeRequest request = new InvokeRequest();
        request.setAppName(rpcRequest.getAppName());
        request.setServiceName(rpcRequest.getServiceName());
        request.setMethodName(rpcRequest.getMethodName());
        request.setOrderedParameter(rpcRequest.getOrderedParameter());
        InvokeResponse response = clientRef.get().invokeMethod(request);
        if (response.getCode() != SUCCESS) {
            throw new HttpStatusException(response.getCode(), response.getMessage());
        }
        return response.getResult();
    }

    @Override
    protected NodeInfo reconnectRpc(NodeInfo nodeInfo) {
       //...
    }

    @Override
    public void shutdownRpc() {
        close(clientRef.get());
    }

    private ThriftService.Client getClient(NodeInfo nodeInfo) {
       //...
    }

    private void close(ThriftService.Client client) {
       //...
    }
}