Version 01
动态代理通过jdk动态代理方式实现,网络通信通过Socket实现,服务注册和获取通过HashMap实现,对象的序列化通过jdk 序列化方式实现。
此版本的代码在:Version 01代码
RPC框架会逐步进行完善,目前是简单的第一版本。
Module:rpc-framework-common
用于搭载请求信息和失败信息。
dto包
-
RpcRequest
此模块主要编写rpcClient和rpcServer之间网络传输时候的媒介:RpcRequest
其作为接口名、方法名、参数、参数类型的载体。
因为要通过网络进行传输,所以需要实现序列化接口。
这里使用lombok注解了@Builder和@Data
-
RpcResponse
用来做响应的RpcResponse,主要搭载RpcServer向RpcClient返回的信息,包括状态码,状态信息以及返回的数据。
提供两个状态的设置数据的方法,success传参为返回的数据,因为状态码和状态信息均为成功。
fail需要传入枚举类RpcResponseCode,因为失败的原因是有多种的,需要由上游决定。
enumeration包
枚举类型
-
RpcResponseCode
枚举类型。为响应状态码,与RpcResponse协同工作
-
RpcErrorMessageEnum
枚举类型。为异常信息,与RpcException协同工作
exception包
-
RpcException
自定义异常,使用到了RpcErrorMessageEnum
Module:rpc-framework-simple
此模块主要编写RPC客户端和RPC服务端
registry包
-
ServiceRegistry
定义接口,其功能为服务的获取以及服务的注册。
注册服务需要传入指定的服务;
获取服务需要传入服务名。
-
DefaultServiceRegistry
实现了上述接口
在注册服务中,服务名为接口名,比如
com.wyb.neu.HelloServiceImpl
,并且保存需要注册的服务是通过HashMap来保存的,HashMap的key为接口名,value为实现的类。添加服务的过程会将当前服务实现的所有接口都添加进去。
@Override public <T> void register(T service) { //获取服务名 String serviceName = service.getClass().getCanonicalName(); //是否已经注册过 if (registeredService.contains(serviceName)) { return; } //先添加到注册过的set集合中 registeredService.add(serviceName); //获取接口集合 Class[] interfaces = service.getClass().getInterfaces(); //如果接口集合是0 if (interfaces.length == 0) { throw new RpcException(RpcErrorMessageEnum.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE); } //将所有接口都添加进去 for (Class i : interfaces) { serviceMap.put(i.getCanonicalName(), service); } logger.info("Add service: {} and interfaces:{}", serviceName, service.getClass().getInterfaces()); }``` 那么是如何获取服务的呢? 传入服务名,判断HashMap里面有没有就可以了。 ```java @Override public Object getService(String serviceName) { Object service = serviceMap.get(serviceName); if(null == service) throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND); return service; }
remoting包
-
RpcClient
使用jdk seriable以及Socket完成RpcRequest的发送,并且通过Socket以及序列化再接收返回信息,进行判断。
主要代码:
public Object sendRpcRequest(RpcRequest rpcRequest, String host, int port)
{
//创建socket对象,根据指定的主机和端口
try(Socket socket = new Socket(host,port)){
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(rpcRequest);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// return objectInputStream.readObject();
// Server端的信息都封装到了RpcServer中,后续操作是获取RpcResponse,然后判断是否正确执行
RpcResponse rpcResponse = (RpcResponse)objectInputStream.readObject();
if(rpcResponse == null)
{
logger.error("Invoke Service Fail , Service Name is : {}",rpcRequest.getInterfaceName());
}
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())) {
logger.error("调用服务失败,serviceName:{},RpcResponse:{}", rpcRequest.getInterfaceName(), rpcResponse);
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName:" + rpcRequest.getInterfaceName());
}
return rpcResponse.getData();
}catch (IOException | ClassNotFoundException e)
{
logger.error("occur exception ", e);
}
return null;
}
-
RpcClientProxy
使用jdk动态代理,jdkProxy
主要代码:
public <T>T getProxy(Class<T> clazz){
// 参数1:被代理类的类加载器 参数2:被代理类的接口
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
logger.info("Call \"invoke method\" and invoked method: {}", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.build();
RpcClient rpcClient = new RpcClient();
return rpcClient.sendRpcRequest(rpcRequest,host,port);
}
-
RpcServer
主要使用到ExecutorService线程池、Socket
主要功能为注册服务,并且不断监听客户端,如果收到客户端的请求,将请求执行的任务加入到线程池中执行。
主要的实现逻辑:
public void start(int port) {
try (ServerSocket server = new ServerSocket(port);) {
logger.info("server starts...");
Socket socket;
while ((socket = server.accept()) != null) {
logger.info("client connected");
threadPool.execute(new RpcRequestHandlerRunnable(socket, rpcRequestHandler, serviceRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("occur IOException:", e);
}
}
-
RpcRequestHandlerRunnable
实现了Runnable接口,主要编写线程执行的主体run方法,其里面完成服务的获取,并调用RpcRequestHandler完成方法的执行,并将结果写回
@Override public void run() { try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) { RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();//获取Client传入的RpcRequest String interfaceName = rpcRequest.getInterfaceName();//获取要执行的接口名 Object service = serviceRegistry.getService(interfaceName);//根据接口名去注册中心查找Service Object result = rpcRequestHandler.handle(rpcRequest, service);//交到RpcRequestHandler中执行 objectOutputStream.writeObject(RpcResponse.success(result)); objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { logger.error("occur exception:", e); } }
-
RpcRequestHandler
利用反射,完成方法的调用,所执行的方法再rpcRequest中保存着,service的话,在线程里面已经完成了获取
public Object handle(RpcRequest rpcRequest, Object service) { Object result = null; try { result = invokeTargetMethod(rpcRequest, service); logger.info("service:{} successful invoke method:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { logger.error("occur exception", e); } return result; } private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes()); if (null == method) { return RpcResponse.fail(RpcResponseCode.NOT_FOUND_METHOD); } return method.invoke(service, rpcRequest.getParameters()); }
Module:hello-service-api
HelloService
接口,服务器提供的方法接口,服务器提供的服务
public interface HelloService {
String hello(Hello hello);
}
Hello
实现了序列化,因为这个类也需要进行网络传输,是HelloService的参数
public class Hello implements Serializable {
private String message;
private String description;
//get & set
public Hello(String message, String description) {
this.message = message;
this.description = description;
}
}
example-server
HelloServiceImpl
实现了HelloService中的Hello方法,服务器中的实现类
public class HelloServiceImpl implements HelloService{
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
public String sayHello(Hello hello) {
logger.info("HelloServiceImpl Get: {}",hello.getMessage());
String results = "Hello description is "+hello.getDescription();
logger.info("HelloServiceImpl Return: {}",results);
return results;
}
}
RpcFrameworkSimpleServerMain
启动服务器,注册服务,传入需要注册的服务以及端口号
public class RpcFrameworkSimpleMain {
public static void main(String[] args) {
HelloServiceImpl helloService = new HelloServiceImpl();
DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry();
// 手动注册
defaultServiceRegistry.register(helloService);
RpcServer rpcServer = new RpcServer(defaultServiceRegistry);
rpcServer.start(9999);
}
}
example-client
RpcFrameworkSimpleClientMain
启动客户端,获取服务
public class RpcFrameworkSimpleClientMain {
public static void main(String[] args) {
RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1",9999);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.sayHello(new Hello("Hello","Hello World"));
System.out.println(hello);
}
}
运行实例
1:启动Server
2:启动Client
此时的Client:
此时的Server: