本文共 1765 字,大约阅读时间需要 5 分钟。
前阶段看Hadoop源码看到editlog部分,和flume file-channel的logfile部分注意到一个相同之处:Groupcommit.其实最早接触这个概念是在MySQL写redo log(注:binlog group commit在mariadb/procona是支持的,或者mysqlsync_binlog=0;细节移步:);
先来看看Hadoop是怎么处理的:
Editlog是可以被多个线程并发写入的,每个线程维护了自己最新的一个事务ID:
privatestaticfinalThreadLocal<TransactionId> myTransactionId = newThreadLocal<TransactionId>() {
protectedsynchronizedTransactionId initialValue() {
returnnewTransactionId(Long.MAX_VALUE);
}
};
在提交的时候,首先获得提交时最新的事务ID:
synchronized(this){
TransactionId id = myTransactionId.get();
id.txid= txid;
}
然后开始同步(代码被删减):
//拿到自己的事务ID
longmytxid = myTransactionId.get().txid;
booleansync = false;
try{
EditLogOutputStream logStream = null;
synchronized(this){
try {
//如果自己的事务未被同步,但是同步正在被其他线程处理,那么就阻塞
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {}
}
//当被唤醒或者超时发现自己的事务已经被group commit了,那么就返回
if (mytxid <= synctxid) {
return;
}
//否则开始进行sync
isSyncRunning = true;
sync = true;
//Hadoop的editlog使用了double buffer来达到刷新和写不阻塞;这里置换buffer
editLogStream.setReadyToFlush();
logStream = editLogStream;
try{
if (logStream != null) {
logStream.flush();
}
} catch(IOException ex) {}
} finally{
synchronized(this){
if (sync) {
isSyncRunning = false;
}
//刷新完成,唤醒阻塞线程
this.notifyAll();
}
}
而在Flume File-channel里的group commit也是类似的方式,不过更为简洁:
一样是分两个阶段,每个阶段都是同步方法,并且Flume的transactionid和position是分开的,每次只需同步文件末尾位置:
Commit();
Sync();
//在提交的时候更新最后提交位置
synchronizedvoidcommit(ByteBuffer buffer) throws IOException {
write(buffer);
lastCommitPosition= position();
}
//若已经被同步了则什么也不做,返回
synchronizedvoidsync() throwsIOException {
if(lastSyncPosition< lastCommitPosition){
getFileChannel().force(false);
lastSyncPosition = position();
syncCount++;
}
}