RPC

手写实现一个简单的RPC

Posted by Kaka Blog on November 11, 2019

RPC,即远程过程调用,方法在服务端,远程调用时,跟在本地调用一样。

RPC 是一种技术思想而非一种规范或协议,常见 RPC 技术和框架有:

  • 应用级的服务框架:阿里的 Dubbo/Dubbox、Google gRPC、Spring Boot/Spring Cloud。
  • 远程通信协议:RMI、Socket、SOAP(HTTP XML)、REST(HTTP JSON)。
  • 通信框架:MINA 和 Netty。

实现RPC的基本思路

  1. 服务端实现接口;
  2. 接口注册到注册中心,即把服务发布出去;
  3. 服务端接收请求,进行反序列化操作后再进行反射,调用本地方法,返回数据;
  4. 客户端通过动态代理,在调用方法前先对服务端发起请求,接收到请求后返回。

核心技术

  • 通信协议:采用Socket进行通信
  • 序列化/反序列化:采用JDK自带
  • 动态代理
  • 反射

代码实现

服务端

1、创建项目rpc-server,创建两个模块:rpc-apirpc-manager

  • rpc-api封装请求实体,对外的接口
  • rpc-manager封装服务的实现和服务发布

img

2、rpc-api模块增加测试接口。

public interface IHelloService {
    String sayHello(String content);
}

3、rpc-api模块增加请求实体类RpcRequst

public class RpcRequst implements Serializable {
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] arguments;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getArguments() {
        return arguments;
    }

    public void setArguments(Object[] arguments) {
        this.arguments = arguments;
    }
}
  • className表示类名
  • methodName表示方法名
  • parameterTppes表示参数类型
  • arguments表示参数

到这里rpc-api模块完成,打开根目录,运行npm install安装到maven库,便于rpc-client引用。其它两个模块添加以下依赖:

<dependency>
    <groupId>com.fang</groupId>
    <artifactId>rpc-api</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

4、实现API接口。

public class HelloService implements IHelloService {
    @Override
    public String sayHello(String content) {
        System.out.println("接收到参数:" + content);
        return "Hello, " + content;
    }
}

5、新建注册类,基于socket进行通信,负责接收请求。

public class Registry {
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public void publish(Object service, int port) {
        ServerSocket socket = null;
        try {
            socket = new ServerSocket(port);
            while (true) {
                Socket connect = socket.accept();
                executorService.execute(new RevokeHandler(service, connect));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

6、创建处理请求类,反序列化请求对象,通过反射调用方法,将结果写入到输出流。

public class RevokeHandler implements Runnable {
    private Object service;
    private Socket socket;

    public RevokeHandler(Object service, Socket socket) {
        this.service = service;
        this.socket = socket;
    }

    @Override
    public void run() {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        try {
            // 获取请求
            inputStream = new ObjectInputStream(socket.getInputStream());
            Object obj = inputStream.readObject();
            if (obj instanceof RpcRequst) {
                RpcRequst rpcRequst = (RpcRequst) obj;
                Object result = revoke(rpcRequst);
                outputStream = new ObjectOutputStream(socket.getOutputStream());
                outputStream.writeObject(result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private Object revoke(RpcRequst rpcRequst) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Class<?> clazz = Class.forName(rpcRequst.getClassName());
        Method method = clazz.getMethod(rpcRequst.getMethodName(), rpcRequst.getParameterTypes());
        return method.invoke(service, rpcRequst.getArguments());
    }
}

7、启动类,监听端口9999。

public class App 
{
    public static void main( String[] args )
    {
        IHelloService helloService = new HelloService();
        Registry registry = new Registry();
        registry.publish(helloService, 9999);
    }
}

到这里rpc-manager模块就完成了,启动项目。

客户端

1、创建项目rpc-client

  • rpc-client封装服务的请求和代理方法

2、创建动态代理类,将请求对象发送到服务端。

public class RevokeHandler implements InvocationHandler {
    private String address;
    private int port;

    public RevokeHandler(String address, int port) {
        this.address = address;
        this.port = port;
    }

    public <T> T createProxyInstance(Class<?> service) {
        return (T)Proxy.newProxyInstance(service.getClassLoader(),
                new Class[]{service}, this);
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequst rpcRequst = new RpcRequst();
        rpcRequst.setClassName(method.getDeclaringClass().getName());
        rpcRequst.setMethodName(method.getName());
        rpcRequst.setParameterTypes(method.getParameterTypes());
        rpcRequst.setArguments(args);
        RpcClent clent = new RpcClent(address, port);
        return clent.send(rpcRequst);
    }
}

3、创建socket通信类。

public class RpcClent {
    private int port;
    private String address;

    public RpcClent(String address, int port) {
        this.address = address;
        this.port = port;
    }

    public Object send(RpcRequst requst) throws IOException, ClassNotFoundException {
        Socket socket = new Socket(address, port);
        ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
        outputStream.writeObject(requst);
        ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
        return inputStream.readObject();
    }
}

4、启动类。

public class App 
{
    public static void main( String[] args )
    {
        RevokeHandler revokeHandler = new RevokeHandler("127.0.0.1", 9999);
        IHelloService helloService = revokeHandler.createProxyInstance(IHelloService.class);
        System.out.println(helloService.sayHello("Fang"));
    }
}

运行效果

rpc-client终端打印Hello, Fang

rpc-manager终端打印接收到参数:Fang

总结

该项目主要是通过实现rpc了解rpc的过程,第一版算是基本完成,当然还有很多需要优化的地方,比如注册中心独立出来,使用更高效的对象序列化和反序列化、通信使用Netty或Zookeeper。

项目源码

参考资料