如何使用Apache BookKeeper构建分布式数据库-01

初识HerdDB

HerdDB是一个分布式数据库,数据在服务器群集之间分布,而无需共享存储。

HerdDB的主要语言是SQL,建议客户端同时使用JDBC驱动程序API和低级API。

HerdDB可嵌入到任何Java虚拟机中,每个节点无需网络即可访问本地数据。

HerdDB复制功能建立在Apache ZooKeeper和Apache BookKeeper项目上。

HerdDB与NoSQL数据库非常相似,实际上在低级上,它基本上是具有SQL抽象层的键值数据库,使每个用户都可以利用现有的专有技术并将现有的应用程序移植到HerdDB。

HerdDB设计用于快速“写入”和主键读取/更新数据访问模式。

HerdDB支持事务和“提交读取”隔离级别

HerdDB使用Apache Calcite作为SQL解析器和SQL Planner

基本概念

数据与任何SQL数据库一样,都以表进行组织,并且为了利用HerdDB复制功能,表在表空间内进行分组。

表空间是表的逻辑集合,是建立复制的基石。

有些数据库功能仅在相同表空间的表之间可用:

事务可能只跨越相同表空间的表 子查询只能跨越相同表空间的表 复制是在表空间级别配置的,因此对于每个表空间,只有一个服务器被设计为“领导者”(管理者),然后您可以配置一组“副本”。 系统自动在副本之间复制数据并透明地处理服务器故障。

系统总览

让我们从一个高层次的观点出发,我们要构建什么以及我们需要什么特性。

  • 我们需要一个数据库,一个持久保存数据的存储,并且可以从远程客户端访问它
  • 我们将数据存储在机器集群中
  • 我们的机器不共享磁盘或使用共享的安装,仅网络连接(LAN或WAN)
  • 机器可能会发生故障,磁盘随时可能丢失,但是我们希望该服务对客户端可用,直到它们的一部分启动并运行
  • 我们希望能够在不中断服务的情况下添加和删除计算机
  • 我们希望完全控制一致性

这个列表听起来很普通,有几种设计具有这种功能的系统的方法。

为了使其更加具体,让我们创建一个具体方案:

  • 我们有一个带有一个表的SQL数据库
  • 数据通过N台计算机复制
  • 没有共享磁盘,只有服务器之间以及服务器和客户端之间的网络连接
  • 我们采用复制状态机的架构模式

预写日志

为了支持ACID(原子性,一致性,隔离性,耐久性)语义,数据库使用预写日志记录。

假设数据库将表的副本存储在本地易失性存储器(RAM)中。 当客户端请求写操作(如UPDATE操作)时,数据库会将操作“记录”到持久性存储中,并将记录的新值写入日志(WAL)。

当存储确认写入(fsync)时,更改将应用于表的内存副本,然后存储将结果确认给客户端。 一旦我们更新了内存中的副本,其他客户端便能够读取新值。预写日志流和表内容:

avatar

在上面的示例中,表开始为空,然后我们执行第一个写操作INSERT(record1),该操作在LSN(日志序列号)1处发生。我们的表现在包含record1。 然后我们在LSN2上记录另一个修改,即INSERT(record2),现在该表包含record1和record2,然后在LSN3上记录DELETE(record1),该表仅包含record2。

当服务器重新启动时,它执行恢复操作,读取所有日志并重建表的内容,因此最终导致表中仅包含record2。

如果在日志上有一个值,我们确定它不会丢失,并且任何在重新启动事件之前读取该值的客户端都可以再次读取相同的值。

如果我们在写入日志之前应用了内存中的更改,则不会发生这种情况。

请注意,只有更改表内容的操作才写入预写日志:我们不记录读取。

检查点

你始终可以从日志中重建表的内容,但不能存储无限的日志,因为在某个时间点该日志被截断了。 为了释放空间,此操作通常称为检查点。

当数据库执行检查点时,它将在持久存储中刷新给定LSN上表的内容。一个检查点发生在LSN3:

avatar

现在我们已经持久地将表保留在LSN3上,我们可以节省资源并将部分日志从LSN1删除到LSN3。 因此,当服务器执行恢复时,它只需要重播LSN4,这反过来又允许更快的启动顺序。

在检查点期间,表的内容存储在哪里? 您可以有几种策略,例如,可以将内容存储在某些本地磁盘上(记住要使用fsync)。 但是,如果相对于对WAL的写入次数而言,表的内容确实很小(因此您在同一组记录上有很多更改),则可以考虑将表的内容转储到WAL本身。

复制状态机

复制状态机是一个实体,在这种情况下为表,即在给定时间(日志序列号)处于给定状态(表的内容)的状态,并且在一组状态下状态更改的顺序相同 相互连接的机器,因此最终每台机器都保持相同的状态。

每当您在计算机上更改一条记录时,都必须在其他所有副本上应用相同的更改。

我们需要对计算机状态进行更改的总顺序,而我们的预写日志非常适合此目的。每个节点都有一个表的副本,并且WAL是共享的:

avatar

在我们的体系结构中,只有一个节点能够更改系统状态,即更改表的内容:我们称其为领导者。

每个节点在内存中都有整个表的副本。发生写操作时,会将其写到WAL,然后使其对客户端可见以进行读取。

其他非领导者节点(跟随者)跟踪日志,它连续地从日志中读取更改,其更改顺序与领导者发布的顺序完全相同。

追随者会将所有更改应用于自己的本地副本,这样,他们将看到表的相同历史记录。

同样重要的是,只有在WAL确认相同的更改之后,追随者才能应用每个更改,否则,追随者将在领导者的将来。

Apache BookKeeper是我们需要的预写日志:它是持久且分布式的。它不需要共享磁盘或远程存储,不需要保证所有项目的顺序,也不需要防护。在下一篇文章中,我将向您展示Apache Bookkeeper如何保证满足我们的需求。

资料

https://streamnative.io/blog/tech/2020-02-04-how-to-build-database/

HikariCP为啥这么快

数据库连接池原理

在系统初始化的时候,在内存中开辟一片空间,将一定数量的数据库连接作为对象存储在对象池里,并对外提供数据库连接的获取和归还方法。用户访问数据库,并不是建立一个新的连接,而是从数据库连接池中取出一个已有的空闲连接对象;使用完毕归还后的连接也不会马上关闭,而是由数据库连接池统一管理回收,为下一次借用做好准备。如果由于高并发请求导致数据库连接池的连接被借用完毕,其它线程就会等待,直到有连接被归还。整个过程中,连接并不会关闭,而是源源不断地循环使用,有借有还。

常见数据库连接池

  • C3P0:实现jdbc3和jdbc2扩展规范说明的Connection 和Statement 池的DataSources 对象

  • DBCP: Apache下独立的数据库连接池组件,由于Apache的缘故,它可能是使用最多的开源数据库连接池

  • BoneCP: 在c3p0和DBCP存在的时代,BoneCP的出现就是为了追求极致,并且提供了完善的基准测试

  • Druid: 阿里出品,是阿里巴巴唯一使用的数据库连接池,阿里云DRDS和阿里TDDL都采用了Druid,可支持”双十一”等最严苛的使用场景,并且提供了强大的监控功能,在国内有不少用户。

  • HikariCP: HiKariCP是数据库连接池的一个后起之秀,号称性能最好,可以完美地PK掉其他连接池,Springboot 2.0选择HikariCP作为默认数据库连接池

有一个争论是HikariCP与Druid相比哪个更好,对此Druid作者温少是直接上场对过线的,感兴趣的可以参考:

https://github.com/brettwooldridge/HikariCP/issues/232

avatar

HikariCP为什么这么快

在HikariCP官网(https://github.com/brettwooldridge/HikariCP/wiki/Down-the-Rabbit-Hole)详细介绍了HikariCP所做的优化:

  • 优化并精简字节码、优化代码和拦截器
  • 使用FastList替代ArrayList
  • 有更好的并发集合类实现ConcurrentBag
  • 其它针对BoneCP缺陷的优化,比如对耗时超过一个CPU时间片的方法调用的研究

接下来将探究FastList和ConcurrentBag的实现

FastList

HikariCP重现设计了一个List接口实现类,用以替换ArrayList。FastList是List接口的精简实现,只实现了接口中必要的几个方法。

jdk中的ArrayList:

1
2
public class ArrayList<E> extends AbstractList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable

HikariCP中的FastList:

1
public final class FastList<T> implements List<T>, RandomAccess, Serializable

可以看到FastList并没有继承AbstractList

ArrayList的get方法:

1
2
3
4
5
6
7
8
9
10
public E get(int index) {
rangeCheck(index);

return elementData(index);
}

private void rangeCheck(int index) {
if (index >= size)
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

FastList的get方法:

1
2
3
public T get(int index)  {
return elementData[index];
}

可以看出FastList的get方法取消了rangeCheck,在一定程度上追求了极致。

ArrayList的remove(Object)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean remove(Object o) {
if (o == null) {
for (int index = 0; index < size; index++)//从头到尾遍历
if (elementData[index] == null) {
fastRemove(index);//从头到尾移除
return true;
}
} else {
for (int index = 0; index < size; index++)
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
return false;
}

与ArrayList相反,FastList选择从数组的尾部开始遍历(JDBC编程中的常见模式是在使用后立即关闭Statement,或者以打开的相反顺序关闭Statement,可以理解为同一个Connection创建了多个Statement时,后打开的Statement会先关闭),因而更加高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean remove(Object element) {
for (int index = size - 1; index >= 0; index--) {//从尾部遍历
if (element == elementData[index]) {
final int numMoved = size - index - 1;
if (numMoved > 0) {
System.arraycopy(elementData, index + 1, elementData, index, numMoved);
}
elementData[--size] = null;
return true;
}
}

return false;
}

ConcurrentBag

参考资料

HikariCP数据库连接池实战

h2database初始化流程

载入数据库驱动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Class.forName("org.h2.Driver");


//org.h2.Driver
private static final Driver INSTANCE = new Driver();

public static synchronized Driver load() {
try {
if (!registered) {
registered = true;
DriverManager.registerDriver(INSTANCE);
}
} catch (SQLException e) {
DbException.traceThrowable(e);
}
return INSTANCE;
}

获取 session(数据库会话/连接)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Connection conn = DriverManager.getConnection("jdbc:h2:~/test");

//org.h2.Driver#connect
public Connection connect(String url, Properties info) throws SQLException {
try {
if (info == null) {
info = new Properties();
}
if (!acceptsURL(url)) {
return null;
}
if (url.equals(DEFAULT_URL)) {
return DEFAULT_CONNECTION.get();
}
Connection c = DbUpgrade.connectOrUpgrade(url, info);
if (c != null) {
return c;
}
return new JdbcConnection(url, info);
} catch (Exception e) {
throw DbException.toSQLException(e);
}
}

创建session(数据库连接):初始化SessionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
打开一个新的(远程或嵌入式)会话。
//org.h2.engine.SessionRemote#connectEmbeddedOrServer
public SessionInterface connectEmbeddedOrServer(boolean openNew) {
ConnectionInfo ci = connectionInfo;
if (ci.isRemote()) {
connectServer(ci);
return this;
}
// create the session using reflection,
// so that the JDBC layer can be compiled without it
boolean autoServerMode = ci.getProperty("AUTO_SERVER", false);
ConnectionInfo backup = null;
try {
if (autoServerMode) {
backup = ci.clone();
connectionInfo = ci.clone();
}
if (openNew) {
ci.setProperty("OPEN_NEW", "true");
}
if (sessionFactory == null) {
sessionFactory = (SessionFactory) Class.forName(
"org.h2.engine.Engine").getMethod("getInstance").invoke(null);
}
return sessionFactory.createSession(ci);
} catch (Exception re) {
DbException e = DbException.convert(re);
if (e.getErrorCode() == ErrorCode.DATABASE_ALREADY_OPEN_1) {
if (autoServerMode) {
String serverKey = ((JdbcException) e.getSQLException()).getSQL();
if (serverKey != null) {
backup.setServerKey(serverKey);
// OPEN_NEW must be removed now, otherwise
// opening a session with AUTO_SERVER fails
// if another connection is already open
backup.removeProperty("OPEN_NEW", null);
connectServer(backup);
return this;
}
}
}
throw e;
}
}

首先要设置 sessionFactory,利用Class.forName() 装载 org.h2.engine.Engine类并且对其实例化,生成了一个单例的 org.h2.engine.Engine对象赋值给sessionFactory

创建session(数据库会话/连接)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//org.h2.engine.Engine#openSession(org.h2.engine.ConnectionInfo)
private synchronized Session openSession(ConnectionInfo ci) {
boolean ifExists = ci.removeProperty("IFEXISTS", false);
boolean forbidCreation = ci.removeProperty("FORBID_CREATION", false);
boolean ignoreUnknownSetting = ci.removeProperty(
"IGNORE_UNKNOWN_SETTINGS", false);
String cipher = ci.removeProperty("CIPHER", null);
String init = ci.removeProperty("INIT", null);
Session session;
long start = System.nanoTime();
for (;;) {
session = openSession(ci, ifExists, forbidCreation, cipher);
if (session != null) {
break;
}
// we found a database that is currently closing
// wait a bit to avoid a busy loop (the method is synchronized)
if (System.nanoTime() - start > 60_000_000_000L) {
// retry at most 1 minute
throw DbException.get(ErrorCode.DATABASE_ALREADY_OPEN_1,
"Waited for database closing longer than 1 minute");
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw DbException.get(ErrorCode.DATABASE_CALLED_AT_SHUTDOWN);
}
}
synchronized (session) {
session.setAllowLiterals(true);
DbSettings defaultSettings = DbSettings.getDefaultSettings();
for (String setting : ci.getKeys()) {
if (defaultSettings.containsKey(setting)) {
// database setting are only used when opening the database
continue;
}
String value = ci.getProperty(setting);
if (!ParserUtil.isSimpleIdentifier(setting, false, false)) {
throw DbException.get(ErrorCode.UNSUPPORTED_SETTING_1, setting);
}
try {
CommandInterface command = session.prepareCommand(
"SET " + setting + ' ' + value,
Integer.MAX_VALUE);
command.executeUpdate(null);
} catch (DbException e) {
if (e.getErrorCode() == ErrorCode.ADMIN_RIGHTS_REQUIRED) {
session.getTrace().error(e, "admin rights required; user: \"" +
ci.getUserName() + "\"");
} else {
session.getTrace().error(e, "");
}
if (!ignoreUnknownSetting) {
session.close();
throw e;
}
}
}
if (init != null) {
try {
CommandInterface command = session.prepareCommand(init,
Integer.MAX_VALUE);
command.executeUpdate(null);
} catch (DbException e) {
if (!ignoreUnknownSetting) {
session.close();
throw e;
}
}
}
session.setAllowLiterals(false);
session.commit(true);
}
return session;
}


private Session openSession(ConnectionInfo ci, boolean ifExists, boolean forbidCreation, String cipher) {
String name = ci.getName();
Database database;
ci.removeProperty("NO_UPGRADE", false);
boolean openNew = ci.getProperty("OPEN_NEW", false);
boolean opened = false;
User user = null;
synchronized (DATABASES) {
if (openNew || ci.isUnnamedInMemory()) {
database = null;
} else {
database = DATABASES.get(name);
}
if (database == null) {
String p = ci.getProperty("MV_STORE");
boolean exists = p == null ? Database.exists(name)
: Database.exists(name, Utils.parseBoolean(p, true, false));
if (!exists) {
if (ifExists) {
throw DbException.get(ErrorCode.DATABASE_NOT_FOUND_WITH_IF_EXISTS_1, name);
}
if (forbidCreation) {
throw DbException.get(ErrorCode.REMOTE_DATABASE_NOT_FOUND_1, name);
}
}
//使用ConnectionInfo创建Database
database = new Database(ci, cipher);
opened = true;
if (database.getAllUsers().isEmpty()) {
// users is the last thing we add, so if no user is around,
// the database is new (or not initialized correctly)
user = new User(database, database.allocateObjectId(),
ci.getUserName(), false);
user.setAdmin(true);
user.setUserPasswordHash(ci.getUserPasswordHash());
database.setMasterUser(user);
}
if (!ci.isUnnamedInMemory()) {
DATABASES.put(name, database);
}
}
}
if (opened) {
// start the thread when already synchronizing on the database
// otherwise a deadlock can occur when the writer thread
// opens a new database (as in recovery testing)
database.opened();
}
if (database.isClosing()) {
return null;
}
if (user == null) {
if (database.validateFilePasswordHash(cipher, ci.getFilePasswordHash())) {
if (ci.getProperty("AUTHREALM")== null) {
user = database.findUser(ci.getUserName());
if (user != null) {
if (!user.validateUserPasswordHash(ci.getUserPasswordHash())) {
user = null;
}
}
} else {
Authenticator authenticator = database.getAuthenticator();
if (authenticator==null) {
throw DbException.get(ErrorCode.AUTHENTICATOR_NOT_AVAILABLE, name);
} else {
try {
AuthenticationInfo authenticationInfo=new AuthenticationInfo(ci);
user = database.getAuthenticator().authenticate(authenticationInfo, database);
} catch (AuthenticationException authenticationError) {
database.getTrace(Trace.DATABASE).error(authenticationError,
"an error occurred during authentication; user: \"" +
ci.getUserName() + "\"");
}
}
}
}
if (opened && (user == null || !user.isAdmin())) {
// reset - because the user is not an admin, and has no
// right to listen to exceptions
database.setEventListener(null);
}
}
if (user == null) {
DbException er = DbException.get(ErrorCode.WRONG_USER_OR_PASSWORD);
database.getTrace(Trace.DATABASE).error(er, "wrong user or password; user: \"" +
ci.getUserName() + "\"");
database.removeSession(null);
throw er;
}
//Prevent to set _PASSWORD
ci.cleanAuthenticationInfo();
checkClustering(ci, database);

//创建Session
Session session = database.createSession(user, ci.getNetworkConnectionInfo());
if (session == null) {
// concurrently closing
return null;
}
if (ci.getProperty("JMX", false)) {
try {
Utils.callStaticMethod(
"org.h2.jmx.DatabaseInfo.registerMBean", ci, database);
} catch (Exception e) {
database.removeSession(session);
throw DbException.get(ErrorCode.FEATURE_NOT_SUPPORTED_1, e, "JMX");
}
jmx = true;
}
return session;
}

这里很关键的一步是使用ConnectionInfo创建出一个Database,利用这个Database对象创建Session。创建Database过程中会创建系统表

创建数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//使用ConnectionInfo创建Database
database = new Database(ci, cipher);

public Database(ConnectionInfo ci, String cipher) {
if (ASSERT) {
META_LOCK_DEBUGGING.set(null);
META_LOCK_DEBUGGING_DB.set(null);
META_LOCK_DEBUGGING_STACK.set(null);
}
String name = ci.getName();
this.dbSettings = ci.getDbSettings();
this.compareMode = CompareMode.getInstance(null, 0);
this.persistent = ci.isPersistent();
this.filePasswordHash = ci.getFilePasswordHash();
this.fileEncryptionKey = ci.getFileEncryptionKey();
this.databaseName = name;
this.databaseShortName = parseDatabaseShortName();
this.maxLengthInplaceLob = Constants.DEFAULT_MAX_LENGTH_INPLACE_LOB;
this.cipher = cipher;
this.accessModeData = StringUtils.toLowerEnglish(
ci.getProperty("ACCESS_MODE_DATA", "rw"));
this.autoServerMode = ci.getProperty("AUTO_SERVER", false);
this.autoServerPort = ci.getProperty("AUTO_SERVER_PORT", 0);
int defaultCacheSize = Utils.scaleForAvailableMemory(
Constants.CACHE_SIZE_DEFAULT);
this.cacheSize =
ci.getProperty("CACHE_SIZE", defaultCacheSize);
this.pageSize = ci.getProperty("PAGE_SIZE",
Constants.DEFAULT_PAGE_SIZE);
if ("r".equals(accessModeData)) {
readOnly = true;
}
String lockMethodName = ci.getProperty("FILE_LOCK", null);
if (dbSettings.mvStore && lockMethodName == null) {
fileLockMethod = autoServerMode ? FileLockMethod.FILE : FileLockMethod.FS;
} else {
fileLockMethod = FileLock.getFileLockMethod(lockMethodName);
}
this.databaseURL = ci.getURL();
String listener = ci.removeProperty("DATABASE_EVENT_LISTENER", null);
if (listener != null) {
listener = StringUtils.trim(listener, true, true, "'");
setEventListenerClass(listener);
}
String modeName = ci.removeProperty("MODE", null);
if (modeName != null) {
mode = Mode.getInstance(modeName);
if (mode == null) {
throw DbException.get(ErrorCode.UNKNOWN_MODE_1, modeName);
}
}
this.logMode =
ci.getProperty("LOG", PageStore.LOG_MODE_SYNC);
this.javaObjectSerializerName =
ci.getProperty("JAVA_OBJECT_SERIALIZER", null);
this.allowBuiltinAliasOverride =
ci.getProperty("BUILTIN_ALIAS_OVERRIDE", false);
boolean closeAtVmShutdown =
dbSettings.dbCloseOnExit;
int traceLevelFile =
ci.getIntProperty(SetTypes.TRACE_LEVEL_FILE,
TraceSystem.DEFAULT_TRACE_LEVEL_FILE);
int traceLevelSystemOut =
ci.getIntProperty(SetTypes.TRACE_LEVEL_SYSTEM_OUT,
TraceSystem.DEFAULT_TRACE_LEVEL_SYSTEM_OUT);
this.cacheType = StringUtils.toUpperEnglish(
ci.removeProperty("CACHE_TYPE", Constants.CACHE_TYPE_DEFAULT));
this.ignoreCatalogs = ci.getProperty("IGNORE_CATALOGS",
dbSettings.ignoreCatalogs);
openDatabase(traceLevelFile, traceLevelSystemOut, closeAtVmShutdown, ci);
}

第一步先设置一些参数,第二步去创建数据库需要的对象。一些重要对象:

  • User: DBA

    1
    systemUser = new User(this, 0, SYSTEM_USER_NAME, true);
  • Schema: PUBLIC

    1
    2
    mainSchema = new Schema(this, Constants.MAIN_SCHEMA_ID, sysIdentifier(Constants.SCHEMA_MAIN), systemUser,
    true);
  • Schema:INFORMATION_SCHEMA

    1
    2
    infoSchema = new Schema(this, Constants.INFORMATION_SCHEMA_ID, sysIdentifier("INFORMATION_SCHEMA"), systemUser,
    true);
  • Session:systemSession lobSession

    1
    2
    systemSession = new Session(this, systemUser, ++nextSessionId);
    lobSession = new Session(this, systemUser, ++nextSessionId);
  • Table: SYS

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    //创建Table:SYS
    CreateTableData data = new CreateTableData();
    ArrayList<Column> cols = data.columns;
    Column columnId = new Column("ID", Value.INT);
    columnId.setNullable(false);
    cols.add(columnId);
    cols.add(new Column("HEAD", Value.INT));
    cols.add(new Column("TYPE", Value.INT));
    cols.add(new Column("SQL", Value.STRING));
    boolean create = true;
    if (pageStore != null) {
    create = pageStore.isNew();
    }
    data.tableName = "SYS";
    data.id = 0;
    data.temporary = false;
    data.persistData = persistent;
    data.persistIndexes = persistent;
    data.create = create;
    data.isHidden = true;
    data.session = systemSession;
    starting = true;
    meta = mainSchema.createTable(data);

h2架构解析

介绍

H2在基于B树的磁盘存储之上实现了一个嵌入式且独立于ANSI-SQL89的SQL引擎。

截至2013年10月,Thomas仍在开发名为MVStore的下一代存储引擎。 这将及时替换基于B树的存储引擎。

自顶向上概述

从上至下工作,各层如下所示:

  • JDBC driver.
  • Connection/session management.
  • SQL Parser.
  • Command execution and planning.
  • Table/Index/Constraints.
  • Undo log, redo log, and transactions layer.
  • B-tree engine and page-based storage allocation.
  • Filesystem abstraction.

JDBC driver

JDBC驱动程序实现位于org.h2.jdbc,org.h2.jdbcx中

Connection/session management

主要关注的类如下:

Package 描述
org.h2.engine.Database 根/全局类
org.h2.engine.SessionInterface 嵌入式会话和远程会话抽象类
org.h2.engine.Session 本地/嵌入式会话
org.h2.engine.SessionRemote 远程会话

Parser

解析器位于org.h2.command.Parser中。 它使用简单的递归下降设计。

请参阅Wikipedia递归下降解析器页面。

Command execution and planning

与其他数据库不同,我们没有中间步骤,无法生成查询的某种IR(中间表示)。 解析器类直接生成命令执行对象。 然后,我们对命令运行一些优化步骤,以可能生成更有效的命令。 感兴趣的主要软件包是:

Package 描述
org.h2.command.ddl 修改表结构等的命令
org.h2.command.dml 修改数据的命令

Table/Index/Constraints

这里要注意的一件事是,索引只是作为特殊类型的表存储。

感兴趣的主要软件包是:

Package 描述
org.h2.table 各种表的实现
org.h2.index 各种索引的实现

Undo log, redo log, and transactions layer

我们有一个事务日志,该日志在所有会话之间共享。 也可以看看

https://en.wikipedia.org/wiki/Transaction_log/

https://h2database.com/html/grammar.html#set_log/

我们还有一个针对每个会话的撤消日志,用于撤消操作(例如,更新失败)并回滚事务。 从理论上讲,可以使用事务日志,但是为了简单起见,H2当前使用它自己的“操作列表”(通常在内存中)。

有了MVStore,就不再需要它了(只是事务日志)。

B-tree engine and page-based storage allocation.

感兴趣的主要软件包是org.h2.store。

这实现了一种存储机制,该机制分配存储页面(通常为2k大小),并在这些页面上实现b树,以允许快速检索和更新。

Filesystem abstraction

感兴趣的主要类是org.h2.store.FileStore。

这实现了随机访问文件的抽象。 这使高层可以将内存数据库,磁盘数据库和zip文件数据库相同。

sofa-rpc源码解析-服务端netty启动过程

  1. 创建server
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //ServerConfig.java

    public synchronized Server buildIfAbsent() {
    if (server != null) {
    return server;
    }
    // 提前检查协议+序列化方式
    // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
    // SerializationType.valueOf(getSerialization()));

    server = ServerFactory.getServer(this);
    return server;
    }

如果不存在则创建,关键在于ServerFactory的getServer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

/**
* 全部服务端
*/

private final static ConcurrentHashMap<String, Server> SERVER_MAP = new ConcurrentHashMap<String, Server>();

public synchronized static Server getServer(ServerConfig serverConfig) {
try {
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
if (server == null) {
// 算下网卡和端口
resolveServerConfig(serverConfig);

ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
"Unsupported protocol of server!");
}
server = ext.getExtInstance();
server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
}
return server;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}

getServer默认返回的是一个BoltServer

2.启动server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
DefaultProviderBootstrap.java
for (ServerConfig serverConfig : serverConfigs) {

try {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}


//BoltServer.java
@Override
public void start() {
if (started) {
return;
}
synchronized (this) {
if (started) {
return;
}
// 生成Server对象
remotingServer = initRemotingServer();
try {
if (remotingServer.start(serverConfig.getBoundHost())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
serverConfig.getPort());
}
} else {
throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
started = true;

if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}

} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}

protected RemotingServer initRemotingServer() {
// 绑定到端口
RemotingServer remotingServer = new RpcServer(serverConfig.getPort());
remotingServer.registerUserProcessor(boltServerProcessor);
return remotingServer;
}
  • 启动过程使用了双重检查机制,防止Server重复启动
  • initRemotingServer方法里初始化了netty的一些配置,跟到代码里可以看到我们通常启动一个netty服务端需要的bossGroup(workerGroup在初始化过程里doInit方法里设置好)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //RpcServer.java ->sofa-bolt项目
    private static final NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new NamedThreadFactory("Rpc-netty-server-worker"));

    public RpcServer(int port) {
    super(port);
    this.globalSwitch = new GlobalSwitch();
    this.connectionEventListener = new ConnectionEventListener();
    this.userProcessors = new ConcurrentHashMap(4);
    this.bossGroup = new NioEventLoopGroup(1, new NamedThreadFactory("Rpc-netty-server-boss"));
    }
  • 启动:

    1
    2
    3
    4
    protected boolean doStart(String ip) throws InterruptedException {
    this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip, this.port)).sync();
    return this.channelFuture.isSuccess();
    }

标准的netty服务端启动模板代码

sofa-rpc源码解析-服务发布过程

SOFA 中间件是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,包括微服务研发框架,RPC 框架,服务注册中心,分布式定时任务,限流/熔断框架,动态配置推送,分布式链路追踪,Metrics监控度量,分布式高可用消息队列,分布式事务框架,分布式数据库代理层等组件,是一套分布式架构的完整的解决方案,也是在金融场景里锤炼出来的最佳实践。

功能特性

  • 透明化、高性能的远程服务调用
  • 支持多种服务路由及负载均衡策略
  • 支持多种注册中心的集成
  • 支持多种协议,包括 Bolt、Rest、Dubbo 等
  • 支持同步、单向、回调、泛化等多种调用方式
  • 支持集群容错、服务预热、自动故障隔离
  • 强大的扩展功能,可以按需扩展各个功能组件
  • 本源码解析系列将围绕这些特性展开.

编写服务端实现

第一步:创建接口

1
2
3
4
5
6
/**
* Quick Start demo interface
*/

public interface HelloService {
String sayHello(String string);
}

第二步:创建接口实现

1
2
3
4
5
6
7
8
9
10
/**
* Quick Start demo implement
*/

public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String string) {
System.out.println("Server receive: " + string);
return "hello " + string + " !";
}
}

第三步:编写服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Quick Start Server
*/

public class QuickStartServer {

public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程

ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端

providerConfig.export(); // 发布服务
}
}

编写客户端实现

第一步:拿到服务端接口

一般服务端会通过jar的形式将接口类提供给客户端。而在本例中,由于服务端和客户端在一个工程所以跳过。

第二步:编程客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Quick Start client
*/

public class QuickStartClient {
public static void main(String[] args) {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定协议
.setDirectUrl("bolt://127.0.0.1:12200"); // 指定直连地址
// 生成代理类
HelloService helloService = consumerConfig.refer();
while (true) {
System.out.println(helloService.sayHello("world"));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
}
}
}

服务发布

服务发布过程涉及到三个类 RegistryConfig ,ServerConfig ,ProviderConfig 。

1.RegistryConfig

1
2
3
RegistryConfig registryConfig = new RegistryConfig()
.setProtocol("zookeeper")
.setAddress("127.0.0.1:2181")

RegistryConfig 表示注册中心。如上声明了服务注册中心的地址和端口是127.0.0.1:2181,协议是 Zookeeper 。

2.ServerConfig

1
2
3
ServerConfig serverConfig = new ServerConfig()
.setPort(8803)
.setProtocol("bolt");

ServerConfig 表示服务运行容器。如上声明了一个使用8803端口和 bolt 协议的 server 。
3.ProviderConfig

1
2
3
4
5
6
ProviderConfig<HelloWorldService> providerConfig = new ProviderConfig<HelloWorldService>()
.setInterfaceId(HelloWorldService.class.getName())
.setRef(new HelloWorldServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();

ProviderConfig 表示服务发布。如上声明了服务的接口,实现和该服务运行的 server 。 最终通过 export 方法将这个服务发布出去了。

启动底层Netty服务端

1
2
3
4
5
6
7
8
9
/**
* 发布服务
*/

public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}

发布服务过程首先会初始化一个发布服务的包装类 ProviderBootstrap(DefaultProviderBootstrap),从该实例持有服务提供者配置信息。还提供了延迟加载的特性(单位毫秒)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}

doExport会做一些必要的参数检查,例如服务发布次数限制、服务下方法的黑白名单。最重要的是构建请求调用器也就是请求处理器,并将其与Server绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}

providerProxyInvoker是服务端调用链的入口,rpc请求将会沿着这个入口执行。

HikariCP源码解析一创建数据库连接池(二)

上篇介绍了HikariCP创建连接池的几种方式,跟踪源码发现其真正创建连接池是在HikariDataSource里的getConnection方法(即第一次获取连接则去创建连接池)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 private volatile HikariPool pool;
@Override
public Connection getConnection() throws SQLException{
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}

if (fastPathPool != null) {
return fastPathPool.getConnection();
}

// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
LOGGER.info("{} - Starting...", getPoolName());
try {
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
if (pie.getCause() instanceof SQLException) {
throw (SQLException) pie.getCause();
}
else {
throw pie;
}
}
LOGGER.info("{} - Start completed.", getPoolName());
}
}
}

return result.getConnection();
}

这里使用了双重检查机制,要注意的是pool变量使用了volatile这个关键字,原因是new一个对象并不是一个原子操作,要经过以下步骤:

  1. 给pool分配内存
  2. 调用构造函数初始化成员变量
  3. 将pool对象指向分配的内存(此步骤完成pool即为非空)

没有volatile关键字上面这3个步骤可能由于指令重排序令pool在多线程下未正确初始化即被使用则报错。volatile可禁止指令重排序,并强制本地线程去主存中读取pool变量。

HikariCP源码解析一创建数据库连接池

HikariCP是一个快速、简单、可靠的JDBC连接池。大约130Kb,相比于其它流行的数据库连接池非常的轻, spingboot2.0以及在国外非常有名的playFramework框架默认使用该连接池。

连接池 文件数 代码行数
Vibur 34 1927
HikariCP 21 2218
Tomcat-JDBC 31 6345
BoneCP 49 7293
C3P0 120 1550

HikariCP提供了多种创建数据库连接池的方式

  • 硬编码HikariConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    HikariConfig config = new HikariConfig();
    config.setJdbcUrl("jdbc:mysql://localhost:3306/simpsons");
    config.setUsername("bart");
    config.setPassword("51mp50n");
    config.addDataSourceProperty("cachePrepStmts", "true");
    config.addDataSourceProperty("prepStmtCacheSize", "250");
    config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");

    HikariDataSource ds = new HikariDataSource(config);
  • 直接硬编码HikariDataSource

    1
    2
    3
    4
    5
    HikariDataSource ds = new HikariDataSource();
    ds.setJdbcUrl("jdbc:mysql://localhost:3306/simpsons");
    ds.setUsername("bart");
    ds.setPassword("51mp50n");
    ...
  • 加载properties文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    HikariConfig config = new HikariConfig("/some/path/hikari.properties");
    HikariDataSource ds = new HikariDataSource(config);


    example.properties

    dataSourceClassName=org.postgresql.ds.PGSimpleDataSource
    dataSource.user=test
    dataSource.password=test
    dataSource.databaseName=mydb
    dataSource.portNumber=5432
    dataSource.serverName=localhost
  • HikariConfig是一个用来设置数据库连接属性属性的普通java类。还给连接池设置了一些常用默认属性:

1
2
3
4
5
private static final long CONNECTION_TIMEOUT = SECONDS.toMillis(30);
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final int DEFAULT_POOL_SIZE = 10;

值得说明的是HikariCP还可以使用系统的一个默认属性:hikaricp.configurationFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public HikariConfig() {
dataSourceProperties = new Properties();
healthCheckProperties = new Properties();

minIdle = -1;
maxPoolSize = -1;
maxLifetime = MAX_LIFETIME;
connectionTimeout = CONNECTION_TIMEOUT;
validationTimeout = VALIDATION_TIMEOUT;
idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1;
isAutoCommit = true;

String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) {
loadProperties(systemProp);
}
}

hikaricp.configurationFile,可用于指定属性文件的位置。 如果您打算使用此选项,则使用默认构造函数构造HikariConfig或HikariDataSource实例,HikariCP会加载该值对应的属性文件。

02-guava CharMatcher、Charsets、Strings

CharMatcher

1
2
3
4
5
6
7
8
try{
byte[] bytes = "foobarbaz".getBytes("UTF-8");
}catch (UnsupportedEncodingException e){
//This really can't happen UTF-8 must be supported
}

简写:
byte[] bytes2 = "foobarbaz".getBytes(Charsets.UTF_8);

Strings

Strings类为使用字符串提供了一些方便的实用方法。

  • 字符填充

    1
    2
    Strings.padEnd("foo", 6, 'x');//fooxxx
    Strings.padEnd("fooaaa", 6, 'x');//fooaaa
  • 空处理

nullToEmpty:这个方法接受一个字符串作为参数并返回
如果值不为空或长度大于0,则为原始字符串;否则
它返回空串

emptyToNull:该方法以类似于nullToEmpty的方式执行,
但如果字符串参数为空或者为null,则返回null

isNullOrEmpty:此方法对字符串执行空长检查
,如果该字符串实际上为null或空(长度为0),则返回true

CharMatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testRemoveWhiteSpace(){
String tabsAndSpaces = "String with spaces and
tabs";
String expected = "String with spaces and tabs";
String scrubbed = CharMatcher.WHITESPACE.
collapseFrom(tabsAndSpaces,' ');
assertThat(scrubbed,is(expected));
}

@Test
public void testTrimRemoveWhiteSpace(){
String tabsAndSpaces = " String with spaces and
tabs";
String expected = "String with spaces and tabs";
String scrubbed = CharMatcher.WHITESPACE.
trimAndCollapseFrom(tabsAndSpaces,' ');
assertThat(scrubbed,is(expected));
}

@Test
public void testRetainForm() {
String letterAndNumbers = "foo989yxbar234";
String expected = "989234";
String retained = CharMatcher.JAVA_DIGIT.retainFrom(letterAndNumbers);
Assert.assertThat(expected, is(retained));
}

来源:Getting Started with Google Guava

netty BlockingOperationException

对netty线程模型了解不深,使用Future的sync()或await()方法不当会产生BlockingOperationException。stackoverflow上有个回答很好的解答了这个问题:stackoverflow

1
BlockingOperationException will be throw by netty if you call sync*or await* on a Future in the same thread that the EventExecutor is using and to which the Future is tied to. This is usually the EventLoop that is used by the Channel itself.

源码解析

Future代表一个异步任务的结果,netty对java.util.concurrent.Future进行了增强:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 扩展了JDK的Future接口
*/

public interface Future<V> extends java.util.concurrent.Future<V> {

//异步操作完成且正常终止
boolean isSuccess();

//异步操作是否可以取消
boolean isCancellable();

//异步操作失败的原因
Throwable cause();

//添加一个监听者,异步操作完成时回调,类比javascript的回调函数
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

//添加多个监听者
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

//移除一个监听者
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

//移除一个监听者
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

//等待任务结束,如果任务产生异常或被中断则抛出异常,否则返回Future自身
Future<V> sync() throws InterruptedException;

//等待任务结束,任务本身不可中断,如果产生异常则抛出异常,否则返回Future自身
Future<V> syncUninterruptibly();

//等待任务结束,如果任务被中断则抛出中断异常,与sync不同的是只抛出中断异常,不抛出任务产生的异常
Future<V> await() throws InterruptedException;

//阻塞直到异步操作完成
Future<V> awaitUninterruptibly();

//同await,加了时间限制
boolean await(long timeout, TimeUnit unit) throws InterruptedException;

//同await,加了时间限制
boolean await(long timeoutMillis) throws InterruptedException;

//同awaitUninterruptibly,加了时间限制
boolean awaitUninterruptibly(long timeout, TimeUnit unit);

//同awaitUninterruptibly,加了时间限制
boolean awaitUninterruptibly(long timeoutMillis);

//非阻塞地返回异步结果,如果尚未完成返回null
V getNow();

//取消任务
@Override
boolean cancel(boolean mayInterruptIfRunning);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
DefaultPromise.java

@Override
public Promise<V> await() throws InterruptedException {
//如果任务已经完成则直接返回
if (isDone()) {
return this;
}

if (Thread.interrupted()) {
throw new InterruptedException(toString());
}

//检查是否产生死锁
checkDeadLock();

synchronized (this) {
while (!isDone()) {
incWaiters();
try {
//等待被唤醒
wait();
} finally {
decWaiters();
}
}
}
return this;
}

protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}

判断执行任务线程和current thread是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException