首页 > 数据库技术 > 详细

PG虚拟文件描述符(VFD)机制——封装的文件接口:postgresql-8.4.1/src/backend/storage/file/fd.c

时间:2020-08-12 20:03:14      阅读:73      评论:0      收藏:0      [点我收藏+]

  本篇博客讲解fd.c文件中对C函数库文件操作API的相关封装。(相关C函数库文件操作API参见博主linux分类下的文章)InitFileAccess函数用于在postgresql启动时初始化VFD LRU池,并向系统注册proc-exit勾子以确保退出时清理临时文件。

 

InitFileAccess函数在后端启动初始化阶段调用(normal or standalone backend),在数据库运行过程中只能调用一次。主要用于VFD LRU池中的头元素的空间,并设置LRU池的大小为1。最后注册proc-exit勾子以帮助确保退出时临时文件丢弃(register proc-exit hook to ensure temp files are dropped at exit)。on_proc_exit向proc_exit()函数调用的函数列表中添加回调函数。

 1 /*
 2  * InitFileAccess --- initialize this module during backend startup
 3  *
 4  * This is called during either normal or standalone backend start.
 5  * It is *not* called in the postmaster.
 6  */
 7 void
 8 InitFileAccess(void)
 9 {
10     Assert(SizeVfdCache == 0);    /* call me only once */
11 
12     /* initialize cache header entry */
13     VfdCache = (Vfd *) malloc(sizeof(Vfd));
14     if (VfdCache == NULL)
15         ereport(FATAL,
16                 (errcode(ERRCODE_OUT_OF_MEMORY),
17                  errmsg("out of memory")));
18 
19     MemSet((char *) &(VfdCache[0]), 0, sizeof(Vfd));
20     VfdCache->fd = VFD_CLOSED;
21 
22     SizeVfdCache = 1;
23 
24     /* register proc-exit hook to ensure temp files are dropped at exit */
25     on_proc_exit(AtProcExit_Files, 0);
26 }
函数on_proc_exit处于backend/storage/ipc/ipc.c文件中

技术分享图片回调函数指针类型

 技术分享图片

需要注册的回调函数,用于在关闭后端时清理临时文件(包括interXact文件) 

 1 /*
 2  * AtProcExit_Files
 3  *
 4  * on_proc_exit hook to clean up temp files during backend shutdown.
 5  * Here, we want to clean up *all* temp files including interXact ones.
 6  */
 7 static void
 8 AtProcExit_Files(int code, Datum arg)
 9 {
10     CleanupTempFiles(true);
11 }

 

BasicOpenFile --- 除了可以根据需要释放其他FD,该函数与open(2)相同
导出该文件供真正需要普通内核FD的地方使用,但需要证明不会耗尽FD。成功返回FD之后,调用者有责任确保它不会在ereport()上泄漏! 大多数用户不应该*直接*调用该例程,而应使用VFD抽象级别,该级别提供了防止描述符泄漏以及对需要短时间打开的文件进行管理保护。 理想情况下,这应该是后端中open()的* only *直接调用。 实际上,postmaster直接调用open(),并且在后端启动的早期就完成了一些直接的open()调用。 这样就可以了,因为无论如何该模块都不会关闭任何打开的文件。

 也就是该模块不使用Lru池功能,直接用C函数库API open打开文件。但是如果系统fd不足,可能需要释放lru池中的FD,并重新调用open。

 1 int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
 2 {
 3     int            fd;
 4 
 5 tryAgain:
 6     fd = open(fileName, fileFlags, fileMode);
 7 
 8     if (fd >= 0)
 9         return fd;                /* success! */
10 
11     if (errno == EMFILE || errno == ENFILE)
12     {
13         int            save_errno = errno;
14 
15         ereport(LOG,
16                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
17                  errmsg("out of file descriptors: %m; release and retry")));
18         errno = 0;
19         if (ReleaseLruFile())
20             goto tryAgain;
21         errno = save_errno;
22     }
23 
24     return -1;                    /* failure */
25 }

 

FileAccess成功返回0,重新打开失败返回-1且设置errno。如果文件没有打开(不拥有FD,即FD为-1),使用LruInsert函数(函数中BasicOpenFile打开文件,并将fd放入vfd中对应的成员中)。如果打开了且不是最近使用的vfd,需要将该vfd移动到LRU池的头部称为最近使用的。

 1 static int FileAccess(File file)
 2 {
 3     int            returnValue;
 4 
 5     DO_DB(elog(LOG, "FileAccess %d (%s)",
 6                file, VfdCache[file].fileName));
 7 
 8     /*
 9      * Is the file open?  If not, open it and put it at the head of the LRU
10      * ring (possibly closing the least recently used file to get an FD).
11      */
12 
13     if (FileIsNotOpen(file))
14     {
15         returnValue = LruInsert(file);
16         if (returnValue != 0)
17             return returnValue;
18     }
19     else if (VfdCache[0].lruLessRecently != file)
20     {
21         /*
22          * We now know that the file is open and that it is not the last one
23          * accessed, so we need to move it to the head of the Lru ring.
24          */
25 
26         Delete(file);
27         Insert(file);
28     }
29 
30     return 0;
31 }

技术分享图片

 

 PathNameOpenFile函数使用绝对路径打开文件,并使用打开模式和标志参数。当传入相对参数,将会使用进程工作目录的路径作为前缀($PGDATA中存储的路径)。一般流程:分配VFD,使用BasicOpenFile打开文件,将fd与VFD关联,将vfd插入LRU池,配置VFDD的参数(取消O_CREAT | O_TRUNC | O_EXCL模式)。

 1 File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode)
 2 {
 3     char       *fnamecopy;
 4     File        file;
 5     Vfd           *vfdP;
 6 
 7     DO_DB(elog(LOG, "PathNameOpenFile: %s %x %o",
 8                fileName, fileFlags, fileMode));
 9 
10     /*
11      * We need a malloc‘d copy of the file name; fail cleanly if no room.
12      */
13     fnamecopy = strdup(fileName);
14     if (fnamecopy == NULL)
15         ereport(ERROR,
16                 (errcode(ERRCODE_OUT_OF_MEMORY),
17                  errmsg("out of memory")));
18 
19     file = AllocateVfd();
20     vfdP = &VfdCache[file];
21 
22     while (nfile + numAllocatedDescs >= max_safe_fds)
23     {
24         if (!ReleaseLruFile())
25             break;
26     }
27 
28     vfdP->fd = BasicOpenFile(fileName, fileFlags, fileMode);
29 
30     if (vfdP->fd < 0)
31     {
32         FreeVfd(file);
33         free(fnamecopy);
34         return -1;
35     }
36     ++nfile;
37     DO_DB(elog(LOG, "PathNameOpenFile: success %d",
38                vfdP->fd));
39 
40     Insert(file);
41 
42     vfdP->fileName = fnamecopy;
43     /* Saved flags are adjusted to be OK for re-opening file */
44     vfdP->fileFlags = fileFlags & ~(O_CREAT | O_TRUNC | O_EXCL);
45     vfdP->fileMode = fileMode;
46     vfdP->seekPos = 0;
47     vfdP->fdstate = 0x0;
48 
49     return file;
50 }

 

FileClosse关闭文件,如果文件打开,则从lru池中删除相应的VFD节点,关闭相应的fd,减少nfile计数。如果文件是临时文件则删除临时文件(vfdP->fdstate & FD_TEMPORARY)。将vfd放入空闲链表。

 1 void FileClose(File file)
 2 {
 3     Vfd           *vfdP;
 4     struct stat filestats;
 5 
 6     Assert(FileIsValid(file));
 7 
 8     DO_DB(elog(LOG, "FileClose: %d (%s)",
 9                file, VfdCache[file].fileName));
10 
11     vfdP = &VfdCache[file];
12 
13     if (!FileIsNotOpen(file))
14     {
15         /* remove the file from the lru ring */
16         Delete(file);
17 
18         /* close the file */
19         if (close(vfdP->fd))
20             elog(ERROR, "could not close file \"%s\": %m", vfdP->fileName);
21 
22         --nfile;
23         vfdP->fd = VFD_CLOSED;
24     }
25 
26     /*
27      * Delete the file if it was temporary
28      */
29     if (vfdP->fdstate & FD_TEMPORARY)
30     {
31         /* reset flag so that die() interrupt won‘t cause problems */
32         vfdP->fdstate &= ~FD_TEMPORARY;
33         if (log_temp_files >= 0)
34         {
35             if (stat(vfdP->fileName, &filestats) == 0)
36             {
37                 if (filestats.st_size >= log_temp_files)
38                     ereport(LOG,
39                             (errmsg("temporary file: path \"%s\", size %lu",
40                                     vfdP->fileName,
41                                     (unsigned long) filestats.st_size)));
42             }
43             else
44                 elog(LOG, "could not stat file \"%s\": %m", vfdP->fileName);
45         }
46         if (unlink(vfdP->fileName))
47             elog(LOG, "could not unlink file \"%s\": %m", vfdP->fileName);
48     }
49 
50     /*
51      * Return the Vfd slot to the free list
52      */
53     FreeVfd(file);
54 }

 FileInvalidate函数检查file对应vfd的fd是否为VFD_CLOSED,并从Lru池中删除该vfd

1 FileInvalidate(File file)
2 {
3     Assert(FileIsValid(file));
4     if (!FileIsNotOpen(file))
5         LruDelete(file);
6 }

#define FileIsValid(file) \
((file) > 0 && (file) < (int) SizeVfdCache && VfdCache[file].fileName != NULL)

#define FileIsNotOpen(file) (VfdCache[file].fd == VFD_CLOSED)

 

FilePrefetch-启动文件给定范围的异步读取(initiate asynchronous read of a given range of the file)。 逻辑查找位置logical seek position不受影响。 当前,此功能的唯一实现是使用posix_fadvise,它是完成此功能的最简单的标准化接口。 我们将来可以使用libaio添加一个实现。 但请注意,此API不适合libaio,因为libaio希望提供一个缓冲区以供读取。

 1 int FilePrefetch(File file, off_t offset, int amount)
 2 {
 3 #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
 4     int            returnCode;
 5 
 6     Assert(FileIsValid(file));
 7 
 8     DO_DB(elog(LOG, "FilePrefetch: %d (%s) " INT64_FORMAT " %d",
 9                file, VfdCache[file].fileName,
10                (int64) offset, amount));
11 
12     returnCode = FileAccess(file);
13     if (returnCode < 0)
14         return returnCode;
15 
16     returnCode = posix_fadvise(VfdCache[file].fd, offset, amount,
17                                POSIX_FADV_WILLNEED);
18 
19     return returnCode;
20 #else
21     Assert(FileIsValid(file));
22     return 0;
23 #endif
24 }
 1 int FileWrite(File file, char *buffer, int amount)
 2 {
 3     int            returnCode;
 4 
 5     Assert(FileIsValid(file));
 6 
 7     DO_DB(elog(LOG, "FileWrite: %d (%s) " INT64_FORMAT " %d %p",
 8                file, VfdCache[file].fileName,
 9                (int64) VfdCache[file].seekPos,
10                amount, buffer));
11 
12     returnCode = FileAccess(file);
13     if (returnCode < 0)
14         return returnCode;
15 
16 retry:
17     errno = 0;
18     returnCode = write(VfdCache[file].fd, buffer, amount);
19 
20     /* if write didn‘t set errno, assume problem is no disk space */
21     if (returnCode != amount && errno == 0)
22         errno = ENOSPC;
23 
24     if (returnCode >= 0)
25         VfdCache[file].seekPos += returnCode;
26     else
27     {
28         /*
29          * See comments in FileRead()
30          */
31 #ifdef WIN32
32         DWORD        error = GetLastError();
33 
34         switch (error)
35         {
36             case ERROR_NO_SYSTEM_RESOURCES:
37                 pg_usleep(1000L);
38                 errno = EINTR;
39                 break;
40             default:
41                 _dosmaperr(error);
42                 break;
43         }
44 #endif
45         /* OK to retry if interrupted */
46         if (errno == EINTR)
47             goto retry;
48 
49         /* Trouble, so assume we don‘t know the file position anymore */
50         VfdCache[file].seekPos = FileUnknownPos;
51     }
52 
53     return returnCode;
54 }
 1 int FileSync(File file)
 2 {
 3     int            returnCode;
 4 
 5     Assert(FileIsValid(file));
 6 
 7     DO_DB(elog(LOG, "FileSync: %d (%s)",
 8                file, VfdCache[file].fileName));
 9 
10     returnCode = FileAccess(file);
11     if (returnCode < 0)
12         return returnCode;
13 
14     return pg_fsync(VfdCache[file].fd);
15 }
 1 off_t FileSeek(File file, off_t offset, int whence)
 2 {
 3     int            returnCode;
 4 
 5     Assert(FileIsValid(file));
 6 
 7     DO_DB(elog(LOG, "FileSeek: %d (%s) " INT64_FORMAT " " INT64_FORMAT " %d",
 8                file, VfdCache[file].fileName,
 9                (int64) VfdCache[file].seekPos,
10                (int64) offset, whence));
11 
12     if (FileIsNotOpen(file))
13     {
14         switch (whence)
15         {
16             case SEEK_SET:
17                 if (offset < 0)
18                     elog(ERROR, "invalid seek offset: " INT64_FORMAT,
19                          (int64) offset);
20                 VfdCache[file].seekPos = offset;
21                 break;
22             case SEEK_CUR:
23                 VfdCache[file].seekPos += offset;
24                 break;
25             case SEEK_END:
26                 returnCode = FileAccess(file);
27                 if (returnCode < 0)
28                     return returnCode;
29                 VfdCache[file].seekPos = lseek(VfdCache[file].fd,
30                                                offset, whence);
31                 break;
32             default:
33                 elog(ERROR, "invalid whence: %d", whence);
34                 break;
35         }
36     }
37     else
38     {
39         switch (whence)
40         {
41             case SEEK_SET:
42                 if (offset < 0)
43                     elog(ERROR, "invalid seek offset: " INT64_FORMAT,
44                          (int64) offset);
45                 if (VfdCache[file].seekPos != offset)
46                     VfdCache[file].seekPos = lseek(VfdCache[file].fd,
47                                                    offset, whence);
48                 break;
49             case SEEK_CUR:
50                 if (offset != 0 || VfdCache[file].seekPos == FileUnknownPos)
51                     VfdCache[file].seekPos = lseek(VfdCache[file].fd,
52                                                    offset, whence);
53                 break;
54             case SEEK_END:
55                 VfdCache[file].seekPos = lseek(VfdCache[file].fd,
56                                                offset, whence);
57                 break;
58             default:
59                 elog(ERROR, "invalid whence: %d", whence);
60                 break;
61         }
62     }
63     return VfdCache[file].seekPos;
64 }
1 off_t FileTell(File file)
2 {
3     Assert(FileIsValid(file));
4     DO_DB(elog(LOG, "FileTell %d (%s)",
5                file, VfdCache[file].fileName));
6     return VfdCache[file].seekPos;
7 }
8 #endif
 1 int FileTruncate(File file, off_t offset)
 2 {
 3     int            returnCode;
 4 
 5     Assert(FileIsValid(file));
 6 
 7     DO_DB(elog(LOG, "FileTruncate %d (%s)",
 8                file, VfdCache[file].fileName));
 9 
10     returnCode = FileAccess(file);
11     if (returnCode < 0)
12         return returnCode;
13 
14     returnCode = ftruncate(VfdCache[file].fd, offset);
15     return returnCode;
16 }

 

 1 int FileRead(File file, char *buffer, int amount)
 2 {
 3     int            returnCode;
 4 
 5     Assert(FileIsValid(file));
 6 
 7     DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %d %p",
 8                file, VfdCache[file].fileName,
 9                (int64) VfdCache[file].seekPos,
10                amount, buffer));
11 
12     returnCode = FileAccess(file);
13     if (returnCode < 0)
14         return returnCode;
15 
16 retry:
17     returnCode = read(VfdCache[file].fd, buffer, amount);
18 
19     if (returnCode >= 0)
20         VfdCache[file].seekPos += returnCode;
21     else
22     {
23         /*
24          * Windows may run out of kernel buffers and return "Insufficient
25          * system resources" error.  Wait a bit and retry to solve it.
26          *
27          * It is rumored that EINTR is also possible on some Unix filesystems,
28          * in which case immediate retry is indicated.
29          */
30 #ifdef WIN32
31         DWORD        error = GetLastError();
32 
33         switch (error)
34         {
35             case ERROR_NO_SYSTEM_RESOURCES:
36                 pg_usleep(1000L);
37                 errno = EINTR;
38                 break;
39             default:
40                 _dosmaperr(error);
41                 break;
42         }
43 #endif
44         /* OK to retry if interrupted */
45         if (errno == EINTR)
46             goto retry;
47 
48         /* Trouble, so assume we don‘t know the file position anymore */
49         VfdCache[file].seekPos = FileUnknownPos;
50     }
51 
52     return returnCode;
53 }

 

清理临时文件CleanupTempFiles

 1 /*
 2  * Close temporary files and delete their underlying files.
 3  *
 4  * isProcExit: if true, this is being called as the backend process is
 5  * exiting. If that‘s the case, we should remove all temporary files; if
 6  * that‘s not the case, we are being called for transaction commit/abort
 7  * and should only remove transaction-local temp files.  In either case,
 8  * also clean up "allocated" stdio files and dirs.
 9  */
10 static void
11 CleanupTempFiles(bool isProcExit)
12 {
13     Index        i;
14 
15     /*
16      * Careful here: at proc_exit we need extra cleanup, not just
17      * xact_temporary files.
18      */
19     if (isProcExit || have_xact_temporary_files)
20     {
21         Assert(FileIsNotOpen(0));        /* Make sure ring not corrupted */
22         for (i = 1; i < SizeVfdCache; i++)
23         {
24             unsigned short fdstate = VfdCache[i].fdstate;
25 
26             if ((fdstate & FD_TEMPORARY) && VfdCache[i].fileName != NULL)
27             {
28                 /*
29                  * If we‘re in the process of exiting a backend process, close
30                  * all temporary files. Otherwise, only close temporary files
31                  * local to the current transaction.
32                  */
33                 if (isProcExit || (fdstate & FD_XACT_TEMPORARY))
34                     FileClose(i);
35             }
36         }
37 
38         have_xact_temporary_files = false;
39     }
40 
41     while (numAllocatedDescs > 0)
42         FreeDesc(&allocatedDescs[0]);
43 }

 

技术分享图片

PG虚拟文件描述符(VFD)机制——封装的文件接口:postgresql-8.4.1/src/backend/storage/file/fd.c

原文:https://www.cnblogs.com/feishujun/p/PostgreSQLSourceAnalysis_vfd02.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!