这个项目在SimpleDB中实现一个基于锁的简单事务系统。需要在代码的适当位置添加锁定和解锁调用,还需要跟踪每个事务所持有的锁并在需要时将锁授予事务。
事务是原子执行的一组数据库操作(例如,插入,删除和读取);也就是说,要么所有动作都已完成,要么全部都失败回滚。
原子性:严格的两阶段锁定和缓冲区管理可确保原子性。
一致性:由于原子性,数据库是事务一致的。
隔离性:严格的两阶段锁定提供隔离。
持久性:FORCE缓冲区管理策略可确保持久性
实施NO STEAL / FORCE缓冲区管理策略。需要做:
这一部分需要在SimpleDB添加调用,允许调用者代表特定事务请求或释放对特定对象的(共享或独占)锁,建议锁定页面粒度。同时还需要创建数据结构,以跟踪每个事务持有哪些锁,并检查是否应在请求事务时将锁授予该事务。
共享锁和互斥锁有以下特性:
如果某个事务请求了不应授予的锁,则您的代码应阻塞,等待该锁可用。
这里先说几个重要的辅助类, Permission.java, LockState.java 和 LockManager.java。
Permission.java
/**
* Class representing requested permissions to a relation/file.
* Private constructor with two static objects READ_ONLY and READ_WRITE that
* represent the two levels of permission.
*/
public class Permissions {
int permLevel;
private Permissions(int permLevel) {
this.permLevel = permLevel;
}
public String toString() {
if (permLevel == 0)
return "READ_ONLY";
if (permLevel == 1)
return "READ_WRITE";
return "UNKNOWN";
}
public static final Permissions READ_ONLY = new Permissions(0);
public static final Permissions READ_WRITE = new Permissions(1);
}
这个类是用来表示某个object的锁的类型,READ_ONLY和READ_WRITE分别表示共享锁和互斥锁。
LockState
private TransactionId tid;
private Permissions perm;
主要通过TransactionId和Permissions来记录一个锁的状态,当然锁都是针对页面的,每个页面有不同的transaction和permission,后面可以看到LockManager用了一个map来记录它们的对应关系。
LockManager
LockManager是锁管理类,实现了锁相关的很多方法,应该是项目4的核心内容。
//Key相当于资源,LockState存放事务id与锁类型,故每个LockState代表某事务在Key上加了锁
//故整个map为所有资源的锁信息
private Map<PageId, List<LockState>> lockStateMap;
//Key为事务,PageId为正在等待的资源,相当于保存了等待的信息,PS:BufferPool中实际用的是sleep体现等待
private Map<TransactionId, PageId> waitingInfo;
public LockManager() {
//使用支持并发的容器避免ConcurrentModificationException
lockStateMap = new ConcurrentHashMap<>();
waitingInfo = new ConcurrentHashMap<>();
}
lock()
LockManager的lock()方法对某个页面上锁,将它加入lockStateMap中,并从等待HashMapwaitingInfo中移除, waitingInfo中保留的是等待信息,即某个事务在等待某个页面,当这个页面被当前事务锁定,对应的映射就应该从waitingInfo中移除,因为事务已经使用上某个页面,而不是一直在等待,因此这个事务就应该从当前的等待队列中移除.
private synchronized boolean lock(PageId pid, TransactionId tid, Permissions perm) {
LockState nls = new LockState(tid, perm);
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list == null) {
list = new ArrayList<>();
}
list.add(nls);
lockStateMap.put(pid, list);
waitingInfo.remove(tid);
return true;
}
unlock()
unlock同样是改变lockStateMap中与当前PageId对应的lockState的list的状态,也就是把对应的lockState移除,这个时候并不需要把tid和pid放入到waitingInfo中,因为它只是把锁去掉,waitingInfo中存储的是事务请求某个资源等待的信息。
public synchronized boolean unlock(TransactionId tid, PageId pid) {
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list == null || list.size() == 0) return false;
LockState ls = getLockState(tid, pid);
if (ls == null) return false;
list.remove(ls);
lockStateMap.put(pid, list);
return true;
}
wait()
wait方法就是把对应的transactionId和pageId放到waitingInfo这个map里面,是个单独的方法,在需要调用的时候才把某个transaction下的page设为wait状态,而不是解锁了就要加入,这个方法在事务请求某个页面不得,需要等待的时候调用。
private synchronized boolean wait(TransactionId tid, PageId pid) {
waitingInfo.put(tid, pid);
return false;
}
grantSLock()
grantSLock()用于授予共享锁,逻辑如下:
如果LockState的list不为空:
LockState的list长度为1:
LockState的list长度不为1,而是多个:
否则pid对应的LockState的list为空,那么说明这个页面没有加任何的锁,可以直接加锁,加锁就只是改变lockStateMap的状态了,不涉及是否有锁的判断.
public synchronized boolean grantSLock(TransactionId tid, PageId pid) {
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list != null && list.size() != 0) {
if (list.size() == 1) {//pid上只有一个锁
LockState ls = list.iterator().next();
if (ls.getTid().equals(tid)) {//判断是否为自己的锁
//如果是读锁,直接返回(在||的之前返回),否则加锁再返回
return ls.getPerm() == Permissions.READ_ONLY || lock(pid, tid, Permissions.READ_ONLY);
} else {
//如果是别人的读锁,加锁再返回,是写锁则需要等待
return ls.getPerm() == Permissions.READ_ONLY ? lock(pid, tid, Permissions.READ_ONLY) : wait(tid, pid);
}
} else {
//多个锁有四种情况
// 1.两个锁,且都属于tid(一读一写) 2.两个锁,且都属于非tid的事务(一读一写)
// 3.多个读锁,且其中有一个为tid的读锁 4.多个读锁,且没有tid的读锁
for (LockState ls : list) {
if (ls.getPerm() == Permissions.READ_WRITE) {
//如果其中有一个写锁,那么根据是否为自己的来判断属于情况1还是2
return ls.getTid().equals(tid) || wait(tid, pid);
} else if (ls.getTid().equals(tid)) {//如果是读锁且是tid的
return true;//情况3在此返回,也可能是情况1(如果先遍历到读锁)
}
}
//情况4
return lock(pid, tid, Permissions.READ_ONLY);
}
} else {
return lock(pid, tid, Permissions.READ_ONLY);
}
}
grantXLock()
获取互斥锁,逻辑是相对简单的,可以直接看解释:
LockState的数组不为空:
LockState的长度为1,说明只有一个锁:
public synchronized boolean grantXLock(TransactionId tid, PageId pid) {
ArrayList<LockState> list = (ArrayList<LockState>) lockStateMap.get(pid);
if (list != null && list.size() != 0) {
if (list.size() == 1) {//如果pid上只有一个锁
LockState ls = list.iterator().next();
//如果是自己的写锁,直接返回(在||之前返回),否则加锁再返回(在lock处返回)
//如果这个锁是别人的,必须等待,也就是在wait处(冒号之后)返回
return ls.getTid().equals(tid) ? ls.getPerm() == Permissions.READ_WRITE || lock(pid, tid, Permissions.READ_WRITE) : wait(tid, pid);
} else {
//多个锁有三种情况,只有第一种情况返回true,其余返回wait
// 1.两个锁,且都属于tid(一读一写) 2.两个锁,且都属于非tid的事务(一读一写) 3.多个读锁
if (list.size() == 2) {
for (LockState ls : list) {
if (ls.getTid().equals(tid) && ls.getPerm() == Permissions.READ_WRITE) {
return true;//两个锁而且有一个自己的写锁
}
}
}
return wait(tid, pid);
}
} else {//pid上没有锁,可以加写锁
return lock(pid, tid, Permissions.READ_WRITE);
}
}
当我们需要获取一个页面的时候,就需要用到getPage()函数。
getPage()要做的两步是分别获取对应的transaction和page下的锁,不管是读锁还是写锁,这样才有了权限去获取这个页面,然后先从缓存里面找,如果找到了就直接返回,如果没找到,就从磁盘读取。
public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException, InterruptedException {
// some code goes here
boolean result = (perm == Permissions.READ_ONLY) ? lockManager.grantSLock(tid, pid)
: lockManager.grantXLock(tid, pid);
//下面的while循环就是在模拟等待过程,隔一段时间就检查一次是否申请到锁了,还没申请到就检查是否陷入死锁
while (!result) {
if (lockManager.deadlockOccurred(tid, pid)) {
throw new TransactionAbortedException();
}
Thread.sleep(SLEEP_INTERVAL);
//sleep之后再次判断result
result = (perm == Permissions.READ_ONLY) ? lockManager.grantSLock(tid, pid)
: lockManager.grantXLock(tid, pid);
}
HeapPage page = (HeapPage) lruPagesPool.get(pid);
if (page != null) {//直接命中
return page;
}
//未命中,访问磁盘并将其缓存
HeapFile table = (HeapFile) Database.getCatalog().getDbFile(pid.getTableId());
HeapPage newPage = (HeapPage) table.readPage(pid);
Page removedPage = lruPagesPool.put(pid, newPage);
if (removedPage != null) {
try {
flushPage(removedPage);
} catch (IOException e) {
e.printStackTrace();
}
}
return newPage;
}
releasePage()相对简单,就是把对应的锁解除了就可以。
public synchronized void releasePage(TransactionId tid, PageId pid) {
// some code goes here
// not necessary for proj1
if (!lockManager.unlock(tid, pid)) {
//pid does not locked by any transaction
//or tid dose not lock the page pid
throw new IllegalArgumentException();
}
}
holdsLock()也是挺简单的,就是返回状态。
/**
* Return true if the specified transaction has a lock on the specified page
*/
public boolean holdsLock(TransactionId tid, PageId p) {
// some code goes here
// not necessary for proj1
return lockManager.getLockState(tid, p) != null;
}
实施严格的两阶段锁定。这意味着事务在访问该对象之前应获得对任何对象的相应的锁,并且在事务提交后才释放任何锁.获取锁之后,需要考虑何时释放锁。显然,在提交或中止与事务相关联的事务后,应释放所有与之相关的锁,以确保执行严格的2PL。
在BufferPool做好了Page的管理后,HeapFile中获取tuple都需要通过BufferPool.getPage()这个方法来进行,并设置相应的权限。
在HeapFile的Iterator即HeapFileIterator中有:
page = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_ONLY);
HeapFile.insertTuple()实现如下:
public ArrayList<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// some code goes here
ArrayList<Page> affectedPages = new ArrayList<>();
for (int i = 0; i < numPages(); i++) {
// HeapPageId pid = new HeapPageId(getId(), i);
HeapPageId pid = new HeapPageId(getId(), i);
HeapPage page = null;
try {
page = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (page.getNumEmptySlots() != 0) {
//page的insertTuple已经负责修改tuple信息来表明其存储在该page上
page.insertTuple(t);
page.markDirty(true, tid);
affectedPages.add(page);
break;
}
}
if (affectedPages.size() == 0) {//说明page都已经满了
//创建一个新的空白的Page
// HeapPageId npid = new HeapPageId(getId(), numPages());
HeapPageId npid = new HeapPageId(getId(), numPages());
HeapPage blankPage = new HeapPage(npid, HeapPage.createEmptyPageData());
numPage++;
//将其写入磁盘
writePage(blankPage);
//通过BufferPool来访问该新的page
HeapPage newPage = null;
try {
newPage = (HeapPage) Database.getBufferPool().getPage(tid, npid, Permissions.READ_WRITE);
} catch (InterruptedException e) {
e.printStackTrace();
}
newPage.insertTuple(t);
newPage.markDirty(true, tid);
affectedPages.add(newPage);
}
return affectedPages;
// not necessary for proj1
}
HeapFile.deleteTuple()的实现如下:
public Page deleteTuple(TransactionId tid, Tuple t) throws DbException,
TransactionAbortedException {
// some code goes here
PageId pid = t.getRecordId().getPageId();
HeapPage affectedPage = null;
for (int i = 0; i < numPages(); i++) {
if (i == pid.pageNumber()) {
try {
affectedPage = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
} catch (InterruptedException e) {
e.printStackTrace();
}
affectedPage.deleteTuple(t);
affectedPage.markDirty(true, tid);
}
}
if (affectedPage == null) {
throw new DbException("tuple " + t + " is not in this table");
}
return affectedPage;
}
如果脏页被未提交的事务锁定,则不应从缓冲池中清除脏页(这是NO STEAL)
事务的修改仅在提交后才写入磁盘。这意味着我们中止事务时,可以丢弃脏页并从磁盘重新读取它们。因此,在事务未提交的时候,我们不会驱逐脏页。这称为“NO STEAL”。
这一部分在PageLRUCache类中已经实现了页面清除,可以看一下那部分。
在SimpleDB中,在每个查询的开头会创建一个TransactionId对象。该对象传递给查询中涉及的每个运算符。查询完成后,将调用BufferPool的transactionComplete方法。 调用此方法将提交或中止事务(提交或中止由参数标志commit指定)。在执行过程中的任何时候,当方式发生内部错误或死锁, 操作符可能引发TransactionAbortedException异常。在测试用例中将创建适当的TransactionId对象,以适当的方式将其传递给操作符,并在查询完成后调用transactionComplete。这里需要实现transactionComplete方法。
这一部分主要关注的是事务完成时的持久化和回滚操作,分了两个状态,先把lock释放了,然后根据commit来决定是flushpage还是revert,如果commit是true,那意味着事务完成,可以提交,这时候需要把所有的页面写入磁盘,否则,就是事务出现了中止或错误,需要把所有事务回滚,这里的方法就是把所有的页面从磁盘中重新读取进来,从而保证被中止的事务不会造成页面修改,这就是事务的"原子性"的体现,一个事务所做的操作,要么全部完成,要么全部失败回滚.提交时,应将与事务关联的脏页刷新到磁盘。中止时,应通过将页面恢复到其磁盘状态来还原事务所做的任何更改。
最后,无论事务是提交还是中止,都需要释放该事务持有的所有锁。
/**
* Commit or abort a given transaction; release all locks associated to
* the transaction.
*
* @param tid the ID of the transaction requesting the unlock
* @param commit a flag indicating whether we should commit or abort
*/
public synchronized void transactionComplete(TransactionId tid, boolean commit)
throws IOException {
// some code goes here
lockManager.releaseTransactionLocks(tid);
if (commit) {
flushPages(tid);
} else {
revertTransactionAction(tid);
}
}
/**
* 在事务回滚时,撤销该事务对page造成的改变
*
* @param tid
*/
public synchronized void revertTransactionAction(TransactionId tid) {
Iterator<Page> it = lruPagesPool.iterator();
while (it.hasNext()) {
Page p = it.next();
if (p.isDirty() != null && p.isDirty().equals(tid)) {
lruPagesPool.reCachePage(p.getId());
}
}
}
PageLruCache.reCachePage()
/**
* 将pid对应的page从磁盘中再次读入,即将其恢复为磁盘中该page的状态
*
* @param pid
*/
public synchronized void reCachePage(PageId pid) {
if (!isCached(pid)) {
throw new IllegalArgumentException();
}
//访问磁盘获得该page
HeapFile table = (HeapFile) Database.getCatalog().getDbFile(pid.getTableId());
HeapPage originalPage = (HeapPage) table.readPage(pid);
Node node = new Node(pid, originalPage);
cachedEntries.put(pid, node);
Node toRemoved = head;
//这里不需要超出链表尾的判断(toRemoved为null),因为到这里肯定存在该page
while (!(toRemoved = toRemoved.next).key.equals(pid)) ;
node.front = toRemoved.front;
node.next = toRemoved.next;
toRemoved.front.next = node;
if (toRemoved.next != null) {
toRemoved.next.front = node;
} else {
//reCache的是尾节点,需要修改tail指针
tail = node;
}
}
有很多可能的方法来检测死锁。可以实施一个简单的超时策略,如果在给定的时间段后未完成事务,则该事务将中止。或者,也可以基于等待图法来实现死锁检测。在此方案中,无论何时尝试授予新的锁(实际上不是无论合适,一般就是获取一个锁不成功,等待一段时间后继续尝试获取),都将在等待图法中检查循环,如果存在循环,则中止某些操作。
在检测到存在死锁之后,必须决定如何改善这种情况。如果在事务t等待锁时检测到死锁。可以选择终止所有造成t等待的交易。这可能会导致大量工作被撤消,但是可以保证t会往下执行。或者,也可以决定中止事务t,以使其他事务有机会继续进行,这样的话后面就需要把事务t重新执行.我们这里选择了后者,在检测到了死锁之后,就抛出一个TransactionAbortedException异常来确保代码在发生死锁时正确中止事务。执行事务的代码(例如TransactionTest.java)会捕获该异常,该代码应在事务完成后调用transactionComplete()进行清理。因此就不用我们来处理这个异常了.
从HeapFile.getPage()的实现来看,我们用的是用的是基于等待图法的死锁检测,这个过程是每隔一段时间,如果获取不到一个Page的锁,就检测是否造成了环路等待.
下面是死锁检测的两个重要函数,也是相当重要,需要花点时间才能理解,虽然解释已经非常友好了
* 通过检测资源的依赖图根据是否存在环来判断是否已经陷入死锁
* 具体实现:本事务tid需要检测“正在等待的资源的拥有者是否已经直接或间接的在等待本事务tid已经拥有的资源”
* <p>
* 如图,括号内P1,P2,P3为资源,T1,T2,T3为事务
* 虚线以及其上的字母R加上箭头组成了拥有关系,如果是字母W则代表正在等待写锁
* 例如下图左上方T1到P1的一连串符号表示的是T1此时拥有P1的读锁
* 图的边缘可以是虚线的转折点,例如为了表示T2正在等待P1
* <p>
* // T1---R-->P1<-------
* // W
* // ----------------------
* // W
* // ---T2---R-->P2<-------
* // W
* // ----------------------
* // W
* // ---T3---R-->P3
* <p>
* 上图的含义是,Ti拥有了对Pi的读锁(1<=i<=3)
* 因为T1在P1上有了读锁,所以T2正在等待P1的写锁
* 同理,T3正在等待P2的写锁
* <p>
* 现在假设的情景是,此时T1要申请对P3的写锁,进入等待,这将会造成死锁
/**
* 判断tid是否直接或间接地在等待pids中的某个资源
*
* @param tid
* @param pids
* @param toRemove 需要排除toRemove来判断,具体原因见方法内部注释;
* 事实上,toRemove就是leadToDeadLock()的参数tid,也就是要排除它自己对判断过程的影响
* @return
*/
private synchronized boolean isWaitingResources(TransactionId tid, List<PageId> pids, TransactionId toRemove) {
PageId waitingPage = waitingInfo.get(tid);
if (waitingPage == null) {
return false;
}
for (PageId pid : pids) {
if (pid.equals(waitingPage)) {
return true;
}
}
//到达这里说明tid并不直接在等待pids中的任意一个,但有可能间接在等待
//如果waitingPage的拥有者们(去掉toRemove)中的某一个正在等待pids中的某一个,说明是tid间接在等待
List<LockState> holders = lockStateMap.get(waitingPage);
if (holders == null || holders.size() == 0) return false;//该资源没有拥有者
for (LockState ls : holders) {
TransactionId holder = ls.getTid();
if (!holder.equals(toRemove)) {//去掉toRemove,在toRemove刚好拥有waitingResource的读锁时就需要
boolean isWaiting = isWaitingResources(holder, pids, toRemove);
if (isWaiting) return true;
}
}
//如果在for循环中没有return,说明每一个holder都不直接或间接等待pids
//故tid也非间接等待pids
return false;
}
tid就是占领了被请求资源的transaction,pids就是请求transaction拥有的所有page,也就是要判断你这个tid是不是在等待pids中的某一个,进入这个函数的缘由是:toRemove这个transaction正在等待获取tid这个transaction所拥有的某个资源,此时这个函数是要判断tid对应的transaction是否也在等待toRemove这个transaction所拥有的某个资源,如果有,说明死锁发生了。
这里用递归函数实现了类似于递归,回溯这样一个查找过程,就跟一条绳子一样,一个transaction一个transaction地往回拉,具体地,在上面的图中,有:
tid就是T3,它的waitingPage就是P2,因为它就是在等待P2的写锁,而T1要请求P3,此时我们要判断的是T3是否直接或间接地在等待T1的哪个page,也就是pids对应的那些页面中的哪一个,当然这里的pids只有P1,于是进入递归函数,显然T3正在等待的页面P2不和pids中的任一个相等,也就是不存在直接等待关系,但是P2的拥有者T2有可能在等呀,因此我们就把P2的所有拥有者列出来,这里就是只有T2,再次进入递归函数,此时我们就要判断T2是否直接或间接在等待pids中的某一个,一看真的是,T2不就在等待P1吗,此时构成了间接等待关系,因此返回true,否则的话还要把T2的waitingPage的拥有者都拉出来,看下是不是直接或间接在等待pids中的某个page,这就是递归函数一层层地深入的过程。返回true的意思就是,T3间接在等待T1的资源P1,而T1想要请求T3的资源P3,这就造成了环路等待,死锁发生。关于!holder.equals(toRemove)这个语句,其实是保险起见加的,因为上面的语句:
for (PageId pid : pids) {
if (pid.equals(waitingPage)) {
return true;
}
}
其实已经证明了toRemove这个transaction的pids中没有waitingPage了,因此waitingPage的LockState数组中应该是不会有toRemove的。
* 导致死锁的本质原因就是将等待的资源(P3)的拥有者(T3)间接的在等待T1拥有的资源(P1)
* 下面方法的注释以这个例子为基础,具体解释这个方法是如何判断出“T1在P3上的等待已经造成了死锁”的
*
* @param tid
* @param pid
* @return true表示进入了死锁,false表示没有
*/
public synchronized boolean deadlockOccurred(TransactionId tid, PageId pid) {//T1为tid,P3为pid
List<LockState> holders = lockStateMap.get(pid);
if (holders == null || holders.size() == 0) {
return false;
}
List<PageId> pids = getAllLocksByTid(tid);//找出T1拥有的所有资源,即只含有P1的list
for (LockState ls : holders) {
// 这里就是找到占领了当前pid的transaction
TransactionId holder = ls.getTid();
//去掉T1,因为虽然上图没画出这种情况,但T1可能同时也在其他Page上有读锁,这会影响判断结果
// 虽然这一步还是没太看明白,但至少没啥矛盾的,因为本来就是tid要去请求pid嘛,照理说不至于在holder里面已经出现了,但也有可能,所以还是排除掉吧
if (!holder.equals(tid)) {
//判断T3(holder)是否直接或间接在等待P1(pids)
//由图可以看出T3在直接等待P2,而P2的拥有者T2在直接等待P1,即T3在间接等待P1
boolean isWaiting = isWaitingResources(holder, pids, tid);
if (isWaiting) {
return true;
}
}
}
return false;
}
CS186 Project 4: SimpleDB Transactions
原文:https://www.cnblogs.com/zhengxch/p/14904629.html