原创

(开源项目)一个易用、实用、高可用的分布式RPC框架--MyRPC

github

一、背景

dubbo在开源rpc界是无敌的存在,稳定,用户多,但是在某些领域,例如javase,非web场景,或者不使用spring的场景,就急需一个轻量级别的rpc框架,于是MyRPC就这样被背景下诞生了

二、简介

MyRPC是一个可用于生产环境的轻量级,高可用,高性能,高易用分布式远程调用框架,参考dubbo的设计,是一个五脏俱全的简易版dubbo,支持同步调用,异步调用,服务自动注册,定时调度系统等。

1. 架构

 

2.特性:

连通性

注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小服务提供者向注册中心注册其提供的服务,服务消费者向注册中心获取服务提供者地址列表,并根据负载算法直接调用提供者

注册中心通过长连接感知服务提供者的存在,服务提供者宕机,注册中心将立即推送事件通知消费者

健壮性

注册中心对等集群,任意一台宕掉后,将自动切换到另一台,注册中心全部宕掉后,服务提供者和服务消费者仍能通过本地缓存通讯

服务提供者无状态,任意一台宕掉后,不影响使用,服务提供者全部宕掉后,服务消费者应用将无法使用,并无限次重连等待服务提供者恢复

伸缩性

服务提供者无状态,可动态增加机器部署实例,注册中心将推送新的服务提供者信息给消费者

兼容性

Rpc接口使用 Protostuff  序列化框架,可以支持向后兼容

3.使用技术

  • 使用成熟的NIO框架Netty
  • 使用Protostuff作为高性能的序列化框架
  • 使用高可用的注册中心Zookeeper
  • 使用注解自动注入RPC服务
  • 使用注解实现异步回调,方便易用
  • 封装了Quartz,通过注解方便使用定时器

三、使用

启动服务需要预先启动zookeeper服务,项目中默认带了dubbo的内嵌zookeeper服务,在test目录下cn.gameboys.rpc.test下,该服务只能用于本地功能测试,不能用于测试框架的注册中心容错性,如果测试zookeeper的容错性可以参考博客搭建

1.服务接口

package cn.gameboys.rpc.test.api;
import java.util.List;
import java.util.Map;
public interface Type1Service {
List<Person> getTestPerson(String name, int num);
Map<Integer,Person> mapTest(List<Person> list);
}

2.服务实现

package cn.gameboys.rpc.test.server.type1;
import cn.gameboys.rpc.server.RpcService;
import cn.gameboys.rpc.test.api.Person;
import cn.gameboys.rpc.test.api.Type1Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RpcService(Type1Service.class)
public class PersonServiceImpl implements Type1Service {
@Override
public List<Person> getTestPerson(String name, int num) {
List<Person> persons = new ArrayList<>(num);
for (int i = 0; i < num; ++i) {
persons.add(new Person(i,"type2-"+i, name));
}
return persons;
}
@Override
public Map<Integer, Person> mapTest(List<Person> list) {
Map<Integer, Person> map = new HashMap<Integer, Person>();
for (Person person : list) {
map.put(person.getUserID(), person);
}
return map;
}
}

3.服务启动

package cn.gameboys.rpc.test.server;
import cn.gameboys.rpc.server.RpcServer;
import cn.gameboys.rpc.server.RpcServerConfig;
public class Type1Server1 {
public static void main(String[] args) {
RpcServerConfig cfg = new RpcServerConfig();
cfg.setBasePackage("cn.gameboys.rpc.test.server.type1");
cfg.setRegistryAddress("192.168.1.107:2181,192.168.1.107:3181,192.168.1.107:4181");
// cfg.setRegistryAddress("127.0.0.1:2181");
cfg.setServerID(1);
cfg.setType(1);
RpcServer rpcServer = new RpcServer(cfg);
try {
rpcServer.start();
} catch (Exception ex) {
}
}
}

4.同步请求

package cn.gameboys.rpc.test.client;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.gameboys.rpc.client.RpcClient;
import cn.gameboys.rpc.registry.ServiceDiscovery;
import cn.gameboys.rpc.test.api.Person;
import cn.gameboys.rpc.test.api.Type1Service;
import cn.gameboys.rpc.test.api.Type2Service;
public class SyncClientTest {
private static final Logger logger = LoggerFactory.getLogger(SyncClientTest.class);
public static void main(String[] args) throws Exception {
//
// ServiceDiscovery serviceDiscovery = new ServiceDiscovery("127.0.0.1:2181");
ServiceDiscovery serviceDiscovery = new ServiceDiscovery("192.168.1.107:2181,192.168.1.107:3181,192.168.1.107:4181");
final RpcClient rpcClient = new RpcClient(serviceDiscovery, "cn.gameboys.rpc.test.client");
int thread1Num = 1;
int thread2Num = 1;
int requestNum = 1000;
Thread[] threads1 = new Thread[thread1Num];
Thread[] threads2 = new Thread[thread2Num];
for (int i = 0; i < thread1Num; ++i) {
threads1[i] = new Thread(new Runnable() {
@Override
public void run() {
// 执行type==1
for (int i = 0; i < requestNum; i++) {
List<Person> list = null;
try {
Type1Service client = rpcClient.create(1, 0, Type1Service.class);
list = client.getTestPerson("sniper", 20);
client.mapTest(list);
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + " " + e);
}
System.out.println("@@@@@@@@type1---" + Thread.currentThread().getName() + " " + list);
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
threads1[i].start();
}
for (int i = 0; i < thread2Num; ++i) {
threads2[i] = new Thread(new Runnable() {
@Override
public void run() {
// 执行type==2
for (int i = 0; i < requestNum; i++) {
List<Person> list = null;
try {
Type2Service client = rpcClient.create(2, 0, Type2Service.class);
list = client.getTestPerson("sniper", 20);
client.mapTest(list);
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + " " + e);
}
System.out.println("@@@@@@@@type2---" + Thread.currentThread().getName() + " " + list);
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
threads2[i].start();
}
while (true) {

}
// rpcClient.stop();
}

}

5.异步请求

package cn.gameboys.rpc.test.client;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.gameboys.rpc.client.RpcClient;
import cn.gameboys.rpc.client.async.AsyncRPCCallback;
import cn.gameboys.rpc.registry.ServiceDiscovery;
import cn.gameboys.rpc.test.api.Person;
import cn.gameboys.rpc.test.api.Type1Service;
import cn.gameboys.rpc.test.api.Type2Service;
public class ASyncClientTest {
private static final Logger logger = LoggerFactory.getLogger(ASyncClientTest.class);
public static void main(String[] args) throws Exception {
//ServiceDiscovery serviceDiscovery = new ServiceDiscovery("127.0.0.1:2181");
ServiceDiscovery serviceDiscovery = new ServiceDiscovery("192.168.1.107:2181,192.168.1.107:3181,192.168.1.107:4181");
final RpcClient rpcClient = new RpcClient(serviceDiscovery, "cn.gameboys.rpc.test.client");
int thread1Num = 1;
int thread2Num = 1;
int requestNum = 1000;
Thread[] threads1 = new Thread[thread1Num];
Thread[] threads2 = new Thread[thread2Num];
for (int i = 0; i < thread1Num; ++i) {
threads1[i] = new Thread(new Runnable() {
@Override
public void run() {
// 执行type==1
for (int i = 0; i < requestNum; i++) {
List<Person> list = null;
try {
Type1Service client = rpcClient.create(1, 0, Type1Service.class);
list = client.getTestPerson("sniper", 20);
Type1Service asyncClient = rpcClient.createAsync(1, 0, Type1Service.class);
asyncClient.mapTest(list);
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + " " + e);
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
threads1[i].start();
}
for (int i = 0; i < thread2Num; ++i) {
threads2[i] = new Thread(new Runnable() {
@Override
public void run() {
// 执行type==2
for (int i = 0; i < requestNum; i++) {
List<Person> list = null;
try {
Type2Service asyncClient = rpcClient.createAsync(2, 0, Type2Service.class);
asyncClient.getTestPerson("sniper", 20);
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + " " + e);
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
threads2[i].start();
}
while (true) {
}
// rpcClient.stop();
}
@AsyncRPCCallback(value = "mapTest")
public void hehe(boolean isError, Object[] parameters, Object result) {
logger.info("@@@@@@@@hehe---" + Thread.currentThread().getName() + " " + result);
}
@AsyncRPCCallback(value = "getTestPerson")
public void haha(boolean isError, Object[] parameters, Object result) {
logger.info("@@@@@@@@haha---" + Thread.currentThread().getName() + " " + result);
}
}


四、杂谈

1. 日志规范

如果不可恢复或需要报警,打印 ERROR 日志。

如果可恢复异常,或瞬时的状态不一致,打印 WARN 日志。

正常运行时的中间状态提示,打印 INFO 日志。

2.魔鬼在细节

防止空指针和下标越界

这是我最不喜欢看到的异常,尤其在核心框架中,我更愿看到信息详细的参数不合法异常。这也是一个编写健壮程序的开发人员,在写每一行代码都应在潜意识中防止的异常。基本上要能确保每一次写完的代码,在不测试的情况下,都不会出现这两个异常才算合格。

保证线程安全性和可见性

对于框架的开发人员,对线程安全性和可见性的深入理解是最基本的要求。需要开发人员,在写每一行代码时都应在潜意识中确保其正确性。因为这种代码,在小并发下做功能测试时,会显得很正常。但在高并发下就会出现莫明其妙的问题,而且场景很难重现,极难排查。

尽早失败和前置断言

尽早失败也应该成为潜意识,在有传入参数和状态变化时,均在入口处全部断言。一个不合法的值和状态,在第一时间就应报错,而不是等到要用时才报错。因为等到要用时,可能前面已经修改其它相关状态,而在程序中很少有人去处理回滚逻辑。这样报错后,其实内部状态可能已经混乱,极易在一个隐蔽分支上引发程序不可恢复。

分离可靠操作和不可靠操作

这里的可靠是狭义的指是否会抛出异常或引起状态不一致,比如,写入一个线程安全的 Map,可以认为是可靠的,而写入数据库等,可以认为是不可靠的。开发人员必须在写每一行代码时,都注意它的可靠性与否,在代码中尽量划分开,并对失败做异常处理,并为容错,自我保护,自动恢复或切换等补偿逻辑提供清晰的切入点,保证后续增加的代码不至于放错位置,而导致原先的容错处理陷入混乱。

异常防御,但不忽略异常

这里讲的异常防御,指的是对非必须途径上的代码进行最大限度的容忍,包括程序上的 BUG,比如:获取程序的版本号,会通过扫描 Manifest jar 包名称抓取版本号,这个逻辑是辅助性的,但代码却不少,初步测试也没啥问题,但应该在整个 getVersion() 中加上一个全函数的 try-catch 打印错误日志,并返回基本版本,因为 getVersion() 可能存在未知特定场景异常,或被其他的开发人员误修改逻辑(但一般人员不会去掉 try-catch),而如果它抛出异常会导致主流程异常,这是我们不希望看到的。但这里要控制个度,不要随意 try-catch,更不要无声无息的吃掉异常。

缩小可变域和尽量 final

如果一个类可以成为不变类(Immutable Class),就优先将它设计成不变类。不变类有天然的并发共享优势,减少同步或复制,而且可以有效帮忙分析线程安全的范围。就算是可变类,对于从构造函数传入的引用,在类中持有时,最好将字段 final,以免被中途误修改引用。不要以为这个字段是私有的,这个类的代码都是我自己写的,不会出现对这个字段的重新赋值。要考虑的一个因素是,这个代码可能被其他人修改,他不知道你的这个弱约定,final 就是一个不变契约。

降低修改时的误解性,不埋雷

前面不停的提到代码被其他人修改,这也开发人员要随时紧记的。这个其他人包括未来的自己,你要总想着这个代码可能会有人去改它。我应该给修改的人一点什么提示,让他知道我现在的设计意图,而不要在程序里面加潜规则,或埋一些容易忽视的雷,比如:你用 null 表示不可用,size 等于 0 表示黑名单,这就是一个雷,下一个修改者,包括你自己,都不会记得有这样的约定,可能后面为了改某个其它 BUG,不小心改到了这里,直接引爆故障。对于这个例子,一个原则就是永远不要区分 null 引用和 empty 值。

提高代码的可测性

这里的可测性主要指 Mock 的容易程度,和测试的隔离性。至于测试的自动性,可重复性,非偶然性,无序性,完备性(全覆盖),轻量性(可快速执行),一般开发人员,加上 JUnit 等工具的辅助基本都能做到,也能理解它的好处,只是工作量问题。这里要特别强调的是测试用例的单一性(只测目标类本身)和隔离性(不传染失败)。现在的测试代码,过于强调完备性,大量重复交叉测试,看起来没啥坏处,但测试代码越多,维护代价越高。经常出现的问题是,修改一行代码或加一个判断条件,引起 100 多个测试用例不通过。时间一紧,谁有这个闲功夫去改这么多形态各异的测试用例?久而久之,这个测试代码就已经不能真实反应代码现在的状况,很多时候会被迫绕过。最好的情况是,修改一行代码,有且只有一行测试代码不通过。如果修改了代码而测试用例还能通过,那也不行,表示测试没有覆盖到。另外,可 Mock 性是隔离的基础,把间接依赖的逻辑屏蔽掉。可 Mock 性的一个最大的杀手就是静态方法,尽量少用。


正文到此结束
本文目录