这个项目在SimpleDB
中实现一个基于锁的简单事务系统。需要在代码的适当位置添加锁定和解锁调用,还需要跟踪每个事务所持有的锁并在需要时将锁授予事务。
事务是原子执行的一组数据库操作(例如,插入,删除和读取);也就是说,要么所有动作都已完成,要么全部都失败回滚。
原子性:严格的两阶段锁定和缓冲区管理可确保原子性。
一致性:由于原子性,数据库是事务一致的。
隔离性:严格的两阶段锁定提供隔离。
持久性:FORCE缓冲区管理策略可确保持久性
实施NO STEAL / FORCE缓冲区管理策略。需要做:
这一部分需要在SimpleDB
添加调用,允许调用者代表特定事务请求或释放对特定对象的(共享或独占)锁,建议锁定页面粒度。同时还需要创建数据结构,以跟踪每个事务持有哪些锁,并检查是否应在请求事务时将锁授予该事务。
共享锁和互斥锁有以下特性:
如果某个事务请求了不应授予的锁,则您的代码应阻塞,等待该锁可用。
这里先说几个重要的辅助类, Permission.java
, LockState.java
和 LockManager.java
。
Permission.java
/**
* Class representing requested permissions to a relation/file.
* Private constructor with two static objects READ_ONLY and READ_WRITE that
* represent the two levels of permission.
*/
public class Permissions {
int permLevel;
private Permissions(int permLevel) {
this.permLevel = permLevel;
}
public String toString() {
if (permLevel == 0)
return "READ_ONLY";
if (permLevel == 1)
return "READ_WRITE";
return "UNKNOWN";
}
public static final Permissions READ_ONLY = new Permissions(0);
public static final Permissions READ_WRITE = new Permissions(1);
}
这个类是用来表示某个object
的锁的类型,READ_ONLY
和READ_WRITE
分别表示共享锁和互斥锁。
LockState
private TransactionId tid;
private Permissions perm;
主要通过TransactionId
和Permissions
来记录一个锁的状态,当然锁都是针对页面的,每个页面有不同的transaction
和permission
,后面可以看到LockManager
用了一个map来记录它们的对应关系。
LockManager
LockManager
是锁管理类,实现了锁相关的很多方法,应该是项目4的核心内容。
//Key相当于资源,LockState存放事务id与锁类型,故每个LockState代表某事务在Key上加了锁
//故整个map为所有资源的锁信息
private Map<PageId, List<LockState>> lockStateMap;
//Key为事务,PageId为正在等待的资源,相当于保存了等待的信息,PS:BufferPool中实际用的是sleep体现等待
private Map<TransactionId, PageId> waitingInfo;
public LockManager() {
//使用支持并发的容器避免ConcurrentModificationException
lockStateMap = new ConcurrentHashMap<>();
waitingInfo = new ConcurrentHashMap<>();
}
lock()
LockManager
的lock()
方法对某个页面上锁,将它加入lockStateMap
中,并从等待HashMapwaitingInfo
中移除, waitingInfo
中保留的是等待信息,即某个事务在等待某个页面,当这个页面被当前事务锁定,对应的映射就应该从waitingInfo
中移除,因为事务已经使用上某个页面,而不是一直在等待,因此这个事务就应该从当前的等待队列中移除.
private synchronized boolean lock(PageId pid, TransactionId tid, Permissions perm) {
LockState nls = new LockState(tid, perm);
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list == null) {
list = new ArrayList<>();
}
list.add(nls);
lockStateMap.put(pid, list);
waitingInfo.remove(tid);
return true;
}
unlock()
unlock
同样是改变lockStateMap
中与当前PageId
对应的lockState
的list的状态,也就是把对应的lockState
移除,这个时候并不需要把tid
和pid
放入到waitingInfo
中,因为它只是把锁去掉,waitingInfo
中存储的是事务请求某个资源等待的信息。
public synchronized boolean unlock(TransactionId tid, PageId pid) {
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list == null || list.size() == 0) return false;
LockState ls = getLockState(tid, pid);
if (ls == null) return false;
list.remove(ls);
lockStateMap.put(pid, list);
return true;
}
wait()
wait
方法就是把对应的transactionId
和pageId
放到waitingInfo
这个map里面,是个单独的方法,在需要调用的时候才把某个transaction下的page设为wait状态,而不是解锁了就要加入,这个方法在事务请求某个页面不得,需要等待的时候调用。
private synchronized boolean wait(TransactionId tid, PageId pid) {
waitingInfo.put(tid, pid);
return false;
}
grantSLock()
grantSLock()
用于授予共享锁,逻辑如下:
如果LockState
的list不为空:
LockState
的list长度为1:
LockState
的list长度不为1,而是多个:
否则pid
对应的LockState
的list为空,那么说明这个页面没有加任何的锁,可以直接加锁,加锁就只是改变lockStateMap
的状态了,不涉及是否有锁的判断.
public synchronized boolean grantSLock(TransactionId tid, PageId pid) {
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list != null && list.size() != 0) {
if (list.size() == 1) {//pid上只有一个锁
LockState ls = list.iterator().next();
if (ls.getTid().equals(tid)) {//判断是否为自己的锁
//如果是读锁,直接返回(在||的之前返回),否则加锁再返回
return ls.getPerm() == Permissions.READ_ONLY || lock(pid, tid, Permissions.READ_ONLY);
} else {
//如果是别人的读锁,加锁再返回,是写锁则需要等待
return ls.getPerm() == Permissions.READ_ONLY ? lock(pid, tid, Permissions.READ_ONLY) : wait(tid, pid);
}
} else {
//多个锁有四种情况
// 1.两个锁,且都属于tid(一读一写) 2.两个锁,且都属于非tid的事务(一读一写)
// 3.多个读锁,且其中有一个为tid的读锁 4.多个读锁,且没有tid的读锁
for (LockState ls : list) {
if (ls.getPerm() == Permissions.READ_WRITE) {
//如果其中有一个写锁,那么根据是否为自己的来判断属于情况1还是2
return ls.getTid().equals(tid) || wait(tid, pid);
} else if (ls.getTid().equals(tid)) {//如果是读锁且是tid的
return true;//情况3在此返回,也可能是情况1(如果先遍历到读锁)
}
}
//情况4
return lock(pid, tid, Permissions.READ_ONLY);
}
} else {
return lock(pid, tid, Permissions.READ_ONLY);
}
}
grantXLock()
获取互斥锁,逻辑是相对简单的,可以直接看解释:
LockState
的数组不为空:
LockState
的长度为1,说明只有一个锁:
public synchronized boolean grantXLock(TransactionId tid, PageId pid) {
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list != null && list.size() != 0) {
if (list.size() == 1) {//如果pid上只有一个锁
LockState ls = list.iterator().next();
//如果是自己的写锁,直接返回(在||之前返回),否则加锁再返回(在lock处返回)
//如果这个锁是别人的,必须等待,也就是在wait处(冒号之后)返回
return ls.getTid().equals(tid) ? ls.getPerm() == Permissions.READ_WRITE || lock(pid, tid, Permissions.READ_WRITE) : wait(tid, pid);
} else {
//多个锁有三种情况,只有第一种情况返回true,其余返回wait
// 1.两个锁,且都属于tid(一读一写) 2.两个锁,且都属于非tid的事务(一读一写) 3.多个读锁
if (list.size() == 2) {
for (LockState ls : list) {
if (ls.getTid().equals(tid) && ls.getPerm() == Permissions.READ_WRITE) {
return true;//两个锁而且有一个自己的写锁
}
}
}
return wait(tid, pid);
}
} else {//pid上没有锁,可以加写锁
return lock(pid, tid, Permissions.READ_WRITE);
}
}
当我们需要获取一个页面的时候,就需要用到getPage()
函数。
getPage()
要做的两步是分别获取对应的transaction和page下的锁,不管是读锁还是写锁,这样才有了权限去获取这个页面,然后先从缓存里面找,如果找到了就直接返回,如果没找到,就从磁盘读取。
public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException, InterruptedException {
// some code goes here
boolean result = (perm == Permissions.READ_ONLY) ? lockManager.grantSLock(tid, pid)
: lockManager.grantXLock(tid, pid);
//下面的while循环就是在模拟等待过程,隔一段时间就检查一次是否申请到锁了,还没申请到就检查是否陷入死锁
while (!result) {
if (lockManager.deadlockOccurred(tid, pid)) {
throw new TransactionAbortedException();
}
Thread.sleep(SLEEP_INTERVAL);
//sleep之后再次判断result
result = (perm == Permissions.READ_ONLY) ? lockManager.grantSLock(tid, pid)
: lockManager.grantXLock(tid, pid);
}
HeapPage page = (HeapPage) lruPagesPool.get(pid);
if (page != null) {//直接命中
return page;
}
//未命中,访问磁盘并将其缓存
HeapFile table = (HeapFile) Database.getCatalog().getDbFile(pid.getTableId());
HeapPage newPage = (HeapPage) table.readPage(pid);
Page removedPage = lruPagesPool.put(pid, newPage);
if (removedPage != null) {
try {
flushPage(removedPage);
} catch (IOException e) {
e.printStackTrace();
}
}
return newPage;
}
releasePage()
相对简单,就是把对应的锁解除了就可以。
public synchronized void releasePage(TransactionId tid, PageId pid) {
// some code goes here
// not necessary for proj1
if (!lockManager.unlock(tid, pid)) {
//pid does not locked by any transaction
//or tid dose not lock the page pid
throw new IllegalArgumentException();
}
}
holdsLock()
也是挺简单的,就是返回状态。
/**
* Return true if the specified transaction has a lock on the specified page
*/
public boolean holdsLock(TransactionId tid, PageId p) {
// some code goes here
// not necessary for proj1
return lockManager.getLockState(tid, p) != null;
}
实施严格的两阶段锁定。这意味着事务在访问该对象之前应获得对任何对象的相应的锁,并且在事务提交后才释放任何锁.获取锁之后,需要考虑何时释放锁。显然,在提交或中止与事务相关联的事务后,应释放所有与之相关的锁,以确保执行严格的2PL。
在BufferPool
做好了Page
的管理后,HeapFile
中获取tuple
都需要通过BufferPool.getPage()
这个方法来进行,并设置相应的权限。
在HeapFile
的Iterator
即HeapFileIterator
中有:
page = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_ONLY);
HeapFile.insertTuple()
实现如下:
public ArrayList<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// some code goes here
ArrayList<Page> affectedPages = new ArrayList<>();
for (int i = 0; i < numPages(); i++) {
// HeapPageId pid = new HeapPageId(getId(), i);
HeapPageId pid = new HeapPageId(getId(), i);
HeapPage page = null;
try {
page = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (page.getNumEmptySlots() != 0) {
//page的insertTuple已经负责修改tuple信息来表明其存储在该page上
page.insertTuple(t);
page.markDirty(true, tid);
affectedPages.add(page);
break;
}
}
if (affectedPages.size() == 0) {//说明page都已经满了
//创建一个新的空白的Page
// HeapPageId npid = new HeapPageId(getId(), numPages());
HeapPageId npid = new HeapPageId(getId(), numPages());
HeapPage blankPage = new HeapPage(npid, HeapPage.createEmptyPageData());
numPage++;
//将其写入磁盘
writePage(blankPage);
//通过BufferPool来访问该新的page
HeapPage newPage = null;
try {
newPage = (HeapPage) Database.getBufferPool().getPage(tid, npid, Permissions.READ_WRITE);
} catch (InterruptedException e) {
e.printStackTrace();
}
newPage.insertTuple(t);
newPage.markDirty(true, tid);
affectedPages.add(newPage);
}
return affectedPages;
// not necessary for proj1
}
HeapFile.deleteTuple()
的实现如下:
public Page deleteTuple(TransactionId tid, Tuple t) throws DbException,
TransactionAbortedException {
// some code goes here
PageId pid = t.getRecordId().getPageId();
HeapPage affectedPage = null;
for (int i = 0; i < numPages(); i++) {
if (i == pid.pageNumber()) {
try {
affectedPage = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
} catch (InterruptedException e) {
e.printStackTrace();
}
affectedPage.deleteTuple(t);
affectedPage.markDirty(true, tid);
}
}
if (affectedPage == null) {
throw new DbException("tuple " + t + " is not in this table");
}
return affectedPage;
}
如果脏页被未提交的事务锁定,则不应从缓冲池中清除脏页(这是NO STEAL)
事务的修改仅在提交后才写入磁盘。这意味着我们中止事务时,可以丢弃脏页并从磁盘重新读取它们。因此,在事务未提交的时候,我们不会驱逐脏页。这称为“NO STEAL”。
这一部分在PageLRUCache
类中已经实现了页面清除,可以看一下那部分。
在SimpleDB
中,在每个查询的开头会创建一个TransactionId
对象。该对象传递给查询中涉及的每个运算符。查询完成后,将调用BufferPool
的transactionComplete
方法。 调用此方法将提交或中止事务(提交或中止由参数标志commit指定)。在执行过程中的任何时候,当方式发生内部错误或死锁, 操作符可能引发TransactionAbortedException
异常。在测试用例中将创建适当的TransactionId
对象,以适当的方式将其传递给操作符,并在查询完成后调用transactionComplete
。这里需要实现transactionComplete
方法。
这一部分主要关注的是事务完成时的持久化和回滚操作,分了两个状态,先把lock释放了,然后根据commit
来决定是flushpage
还是revert
,如果commit
是true
,那意味着事务完成,可以提交,这时候需要把所有的页面写入磁盘,否则,就是事务出现了中止或错误,需要把所有事务回滚,这里的方法就是把所有的页面从磁盘中重新读取进来,从而保证被中止的事务不会造成页面修改,这就是事务的"原子性"的体现,一个事务所做的操作,要么全部完成,要么全部失败回滚.提交时,应将与事务关联的脏页刷新到磁盘。中止时,应通过将页面恢复到其磁盘状态来还原事务所做的任何更改。
最后,无论事务是提交还是中止,都需要释放该事务持有的所有锁。
/**
* Commit or abort a given transaction; release all locks associated to
* the transaction.
*
* @param tid the ID of the transaction requesting the unlock
* @param commit a flag indicating whether we should commit or abort
*/
public synchronized void transactionComplete(TransactionId tid, boolean commit)
throws IOException {
// some code goes here
lockManager.releaseTransactionLocks(tid);
if (commit) {
flushPages(tid);
} else {
revertTransactionAction(tid);
}
}
/**
* 在事务回滚时,撤销该事务对page造成的改变
*
* @param tid
*/
public synchronized void revertTransactionAction(TransactionId tid) {
Iterator<Page> it = lruPagesPool.iterator();
while (it.hasNext()) {
Page p = it.next();
if (p.isDirty() != null && p.isDirty().equals(tid)) {
lruPagesPool.reCachePage(p.getId());
}
}
}
PageLruCache.reCachePage()
/**
* 将pid对应的page从磁盘中再次读入,即将其恢复为磁盘中该page的状态
*
* @param pid
*/
public synchronized void reCachePage(PageId pid) {
if (!isCached(pid)) {
throw new IllegalArgumentException();
}
//访问磁盘获得该page
HeapFile table = (HeapFile) Database.getCatalog().getDbFile(pid.getTableId());
HeapPage originalPage = (HeapPage) table.readPage(pid);
Node node = new Node(pid, originalPage);
cachedEntries.put(pid, node);
Node toRemoved = head;
//这里不需要超出链表尾的判断(toRemoved为null),因为到这里肯定存在该page
while (!(toRemoved = toRemoved.next).key.equals(pid)) ;
node.front = toRemoved.front;
node.next = toRemoved.next;
toRemoved.front.next = node;
if (toRemoved.next != null) {
toRemoved.next.front = node;
} else {
//reCache的是尾节点,需要修改tail指针
tail = node;
}
}
有很多可能的方法来检测死锁。可以实施一个简单的超时策略,如果在给定的时间段后未完成事务,则该事务将中止。或者,也可以基于等待图法来实现死锁检测。在此方案中,无论何时尝试授予新的锁(实际上不是无论合适,一般就是获取一个锁不成功,等待一段时间后继续尝试获取),都将在等待图法中检查循环,如果存在循环,则中止某些操作。
在检测到存在死锁之后,必须决定如何改善这种情况。如果在事务t
等待锁时检测到死锁。可以选择终止所有造成t
等待的交易。这可能会导致大量工作被撤消,但是可以保证t
会往下执行。或者,也可以决定中止事务t,以使其他事务有机会继续进行,这样的话后面就需要把事务t
重新执行.我们这里选择了后者,在检测到了死锁之后,就抛出一个TransactionAbortedException
异常来确保代码在发生死锁时正确中止事务。执行事务的代码(例如TransactionTest.java
)会捕获该异常,该代码应在事务完成后调用transactionComplete()
进行清理。因此就不用我们来处理这个异常了.
从HeapFile.getPage()
的实现来看,我们用的是用的是基于等待图法的死锁检测,这个过程是每隔一段时间,如果获取不到一个Page
的锁,就检测是否造成了环路等待.
下面是死锁检测的两个重要函数,也是相当重要,需要花点时间才能理解,虽然解释已经非常友好了
* 通过检测资源的依赖图根据是否存在环来判断是否已经陷入死锁
* 具体实现:本事务tid需要检测“正在等待的资源的拥有者是否已经直接或间接的在等待本事务tid已经拥有的资源”
* <p>
* 如图,括号内P1,P2,P3为资源,T1,T2,T3为事务
* 虚线以及其上的字母R加上箭头组成了拥有关系,如果是字母W则代表正在等待写锁
* 例如下图左上方T1到P1的一连串符号表示的是T1此时拥有P1的读锁
* 图的边缘可以是虚线的转折点,例如为了表示T2正在等待P1
* <p>
* // T1---R-->P1<-------
* // W
* // ----------------------
* // W
* // ---T2---R-->P2<-------
* // W
* // ----------------------
* // W
* // ---T3---R-->P3
* <p>
* 上图的含义是,Ti拥有了对Pi的读锁(1<=i<=3)
* 因为T1在P1上有了读锁,所以T2正在等待P1的写锁
* 同理,T3正在等待P2的写锁
* <p>
* 现在假设的情景是,此时T1要申请对P3的写锁,进入等待,这将会造成死锁
/**
* 判断tid是否直接或间接地在等待pids中的某个资源
*
* @param tid
* @param pids
* @param toRemove 需要排除toRemove来判断,具体原因见方法内部注释;
* 事实上,toRemove就是leadToDeadLock()的参数tid,也就是要排除它自己对判断过程的影响
* @return
*/
private synchronized boolean isWaitingResources(TransactionId tid, List<PageId> pids, TransactionId toRemove) {
PageId waitingPage = waitingInfo.get(tid);
if (waitingPage == null) {
return false;
}
for (PageId pid : pids) {
if (pid.equals(waitingPage)) {
return true;
}
}
//到达这里说明tid并不直接在等待pids中的任意一个,但有可能间接在等待
//如果waitingPage的拥有者们(去掉toRemove)中的某一个正在等待pids中的某一个,说明是tid间接在等待
List<LockState> holders = lockStateMap.get(waitingPage);
if (holders == null || holders.size() == 0) return false;//该资源没有拥有者
for (LockState ls : holders) {
TransactionId holder = ls.getTid();
if (!holder.equals(toRemove)) {//去掉toRemove,在toRemove刚好拥有waitingResource的读锁时就需要
boolean isWaiting = isWaitingResources(holder, pids, toRemove);
if (isWaiting) return true;
}
}
//如果在for循环中没有return,说明每一个holder都不直接或间接等待pids
//故tid也非间接等待pids
return false;
}
tid
就是占领了被请求资源的transaction
,pids
就是请求transaction
拥有的所有page
,也就是要判断你这个tid
是不是在等待pids
中的某一个,进入这个函数的缘由是:toRemove
这个transaction
正在等待获取tid
这个transaction
所拥有的某个资源,此时这个函数是要判断tid
对应的transaction
是否也在等待toRemove
这个transaction
所拥有的某个资源,如果有,说明死锁发生了。
这里用递归函数实现了类似于递归,回溯这样一个查找过程,就跟一条绳子一样,一个transaction
一个transaction
地往回拉,具体地,在上面的图中,有:
tid
就是T3
,它的waitingPage
就是P2
,因为它就是在等待P2
的写锁,而T1
要请求P3
,此时我们要判断的是T3
是否直接或间接地在等待T1
的哪个page,也就是pids
对应的那些页面中的哪一个,当然这里的pids
只有P1
,于是进入递归函数,显然T3
正在等待的页面P2
不和pids
中的任一个相等,也就是不存在直接等待关系,但是P2
的拥有者T2
有可能在等呀,因此我们就把P2
的所有拥有者列出来,这里就是只有T2
,再次进入递归函数,此时我们就要判断T2
是否直接或间接在等待pids
中的某一个,一看真的是,T2
不就在等待P1
吗,此时构成了间接等待关系,因此返回true,否则的话还要把T2
的waitingPage
的拥有者都拉出来,看下是不是直接或间接在等待pids
中的某个page,这就是递归函数一层层地深入的过程。返回true
的意思就是,T3
间接在等待T1
的资源P1
,而T1
想要请求T3
的资源P3
,这就造成了环路等待,死锁发生。关于!holder.equals(toRemove)
这个语句,其实是保险起见加的,因为上面的语句:
for (PageId pid : pids) {
if (pid.equals(waitingPage)) {
return true;
}
}
其实已经证明了toRemove
这个transaction
的pids
中没有waitingPage
了,因此waitingPage
的LockState
数组中应该是不会有toRemove
的。
* 导致死锁的本质原因就是将等待的资源(P3)的拥有者(T3)间接的在等待T1拥有的资源(P1)
* 下面方法的注释以这个例子为基础,具体解释这个方法是如何判断出“T1在P3上的等待已经造成了死锁”的
*
* @param tid
* @param pid
* @return true表示进入了死锁,false表示没有
*/
public synchronized boolean deadlockOccurred(TransactionId tid, PageId pid) {//T1为tid,P3为pid
List<LockState> holders = lockStateMap.get(pid);
if (holders == null || holders.size() == 0) {
return false;
}
List<PageId> pids = getAllLocksByTid(tid);//找出T1拥有的所有资源,即只含有P1的list
for (LockState ls : holders) {
// 这里就是找到占领了当前pid的transaction
TransactionId holder = ls.getTid();
//去掉T1,因为虽然上图没画出这种情况,但T1可能同时也在其他Page上有读锁,这会影响判断结果
// 虽然这一步还是没太看明白,但至少没啥矛盾的,因为本来就是tid要去请求pid嘛,照理说不至于在holder里面已经出现了,但也有可能,所以还是排除掉吧
if (!holder.equals(tid)) {
//判断T3(holder)是否直接或间接在等待P1(pids)
//由图可以看出T3在直接等待P2,而P2的拥有者T2在直接等待P1,即T3在间接等待P1
boolean isWaiting = isWaitingResources(holder, pids, tid);
if (isWaiting) {
return true;
}
}
}
return false;
}
CS186 Project 4: SimpleDB Transactions
原文:https://www.cnblogs.com/zhengxch/p/14904629.html