上一篇文章中,讲解了Object-aduitor的启动,其中审计的具体执行是AuditorWorker实现的,在run_audit中实例化了AuditorWorker类,并调用audit_all_objects方法,下面看此方法的具体代码实现:
def audit_all_objects(self, mode=‘once‘, device_dirs=None): #run_forever传过来的mode 为forever description = ‘‘ if device_dirs: device_dir_str = ‘,‘.join(sorted(device_dirs)) description = _(‘ - %s‘) % device_dir_str self.logger.info(_(‘Begin object audit "%s" mode (%s%s)‘) % (mode, self.auditor_type, description)) begin = reported = time.time() self.total_bytes_processed = 0 self.total_files_processed = 0 total_quarantines = 0 total_errors = 0 time_auditing = 0 #返回的是 device_dirs 下的文件hash列表 #返回内容为 hsh_path, device, partition #all_locs 为设备self.device中device_dirs下的所有文件,为 AuditLocation(hsh_path, device, partition)对象 all_locs = self.diskfile_mgr.object_audit_location_generator( device_dirs=device_dirs) for location in all_locs: loop_time = time.time() #一个个的审计 self.failsafe_object_audit(location) self.logger.timing_since(‘timing‘, loop_time) self.files_running_time = ratelimit_sleep( self.files_running_time, self.max_files_per_second) self.total_files_processed += 1 now = time.time() if now - reported >= self.log_time: self.logger.info(_( ‘Object audit (%(type)s). ‘ ‘Since %(start_time)s: Locally: %(passes)d passed, ‘ ‘%(quars)d quarantined, %(errors)d errors ‘ ‘files/sec: %(frate).2f , bytes/sec: %(brate).2f, ‘ ‘Total time: %(total).2f, Auditing time: %(audit).2f, ‘ ‘Rate: %(audit_rate).2f‘) % { ‘type‘: ‘%s%s‘ % (self.auditor_type, description), ‘start_time‘: time.ctime(reported), ‘passes‘: self.passes, ‘quars‘: self.quarantines, ‘errors‘: self.errors, ‘frate‘: self.passes / (now - reported), ‘brate‘: self.bytes_processed / (now - reported), ‘total‘: (now - begin), ‘audit‘: time_auditing, ‘audit_rate‘: time_auditing / (now - begin)}) cache_entry = self.create_recon_nested_dict( ‘object_auditor_stats_%s‘ % (self.auditor_type), device_dirs, {‘errors‘: self.errors, ‘passes‘: self.passes, ‘quarantined‘: self.quarantines, ‘bytes_processed‘: self.bytes_processed, ‘start_time‘: reported, ‘audit_time‘: time_auditing}) dump_recon_cache(cache_entry, self.rcache, self.logger) reported = now total_quarantines += self.quarantines total_errors += self.errors self.passes = 0 #隔离的数量 self.quarantines = 0 self.errors = 0 self.bytes_processed = 0 time_auditing += (now - loop_time) # Avoid divide by zero during very short runs elapsed = (time.time() - begin) or 0.000001 self.logger.info(_( ‘Object audit (%(type)s) "%(mode)s" mode ‘ ‘completed: %(elapsed).02fs. Total quarantined: %(quars)d, ‘ ‘Total errors: %(errors)d, Total files/sec: %(frate).2f, ‘ ‘Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, ‘ ‘Rate: %(audit_rate).2f‘) % { ‘type‘: ‘%s%s‘ % (self.auditor_type, description), ‘mode‘: mode, ‘elapsed‘: elapsed, ‘quars‘: total_quarantines + self.quarantines, ‘errors‘: total_errors + self.errors, ‘frate‘: self.total_files_processed / elapsed, ‘brate‘: self.total_bytes_processed / elapsed, ‘audit‘: time_auditing, ‘audit_rate‘: time_auditing / elapsed}) # Clear recon cache entry if device_dirs is set if device_dirs: cache_entry = self.create_recon_nested_dict( ‘object_auditor_stats_%s‘ % (self.auditor_type), device_dirs, {}) dump_recon_cache(cache_entry, self.rcache, self.logger) if self.stats_sizes: self.logger.info( _(‘Object audit stats: %s‘) % json.dumps(self.stats_buckets))方法 self.failsafe_object_audit(location)是找到devcie/objects下所有被审计对象的位置,审计就是要扫描所有的对象,发现有问题的文件就要隔离,在得到被审计对象的位置后,返回 AuditLocation(hsh_path, device, partition)类的迭代对象,也就是对于没一个.data文件都会去实例化一个 AuditLocation对象,将其传给failsafe_object_audit方法,由其来进行下一步的操作,那么具体看failsafe_object_audit的实现:
def failsafe_object_audit(self, location): """ object_audit的切入点 Entrypoint to object_audit, with a failsafe generic exception handler. """ try: #审计object self.object_audit(location) except (Exception, Timeout): self.logger.increment(‘errors‘) self.errors += 1 self.logger.exception(_(‘ERROR Trying to audit %s‘), location)此方法中主要是执行object_audit()方法,由其来执行具体的审计。其代码实现如下:
def object_audit(self, location): """ Audits the given object location. :param location: an audit location (from diskfile.object_audit_location_generator) """ def raise_dfq(msg): raise DiskFileQuarantined(msg) try: df = self.diskfile_mgr.get_diskfile_from_audit_location(location) #df 调用 DiskFile中的open方法 with df.open(): metadata = df.get_metadata() obj_size = int(metadata[‘Content-Length‘]) if self.stats_sizes: self.record_stats(obj_size) #没有被损坏 if self.zero_byte_only_at_fps and obj_size: self.passes += 1 return #_quarantine_hook 隔离挂钩 reader中 reader中如果文件大小或mdf变化会把文件隔离 #reader是DiskFileReader对象 reader = df.reader(_quarantine_hook=raise_dfq) #在文件关闭的时候,调用DiskFileReader的close方法。并且self._handle_close_quarantine()来处理隔离 replicate是如何接收隔离的? with closing(reader): for chunk in reader: chunk_len = len(chunk) #流量限制 ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5) self.bytes_running_time = ratelimit_sleep( self.bytes_running_time, self.max_bytes_per_second, incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len except DiskFileNotExist: return #抛出文件隔离错误,隔离数量+1 except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_(‘ERROR Object %(obj)s failed audit and was‘ ‘ quarantined: %(err)s‘), {‘obj‘: location, ‘err‘: err}) self.passes += 1下面我们来具体分析该方法,首先是通过传入的参数,来实例化一个DiskFile类,df = self.diskfile_mgr.get_diskfile_from_audit_location(location),其中location是 AuditLocation实例,
def get_diskfile_from_audit_location(self, audit_location): dev_path = self.get_dev_path(audit_location.device, mount_check=False) return DiskFile.from_hash_dir( self, audit_location.path, dev_path, audit_location.partition) @classmethod def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition): return cls(mgr, device_path, None, partition, _datadir=hash_dir_path)上面两个函数的意思即为利用AuditLocation实例中的属性来实例化一个DiskFile类,DiskFile有对文件操作的具体方法,在得到df后需要打开auditlocation.path所指向的文件,并获得它的metadata,然后读取文件,读取文件需要专门实例化一个DiskFileReader类,这个类是对文件进行隔离的关键,方法比较隐蔽,需要多多注意。
def reader(self, keep_cache=False, _quarantine_hook=lambda m: None): """ Return a :class:`swift.common.swob.Response` class compatible "`app_iter`" object as defined by :class:`swift.obj.diskfile.DiskFileReader`. 这个实现将打开文件的关闭传递给swift.obj.diskfile.DiskFileReader来负责 For this implementation, the responsibility of closing the open file is passed to the :class:`swift.obj.diskfile.DiskFileReader` object. :param keep_cache: caller‘s preference for keeping data read in the OS buffer cache :param _quarantine_hook: 1-arg callable called when obj quarantined; the arg is the reason for quarantine. Default is to ignore it. Not needed by the REST layer. :returns: a :class:`swift.obj.diskfile.DiskFileReader` object """ dr = DiskFileReader( self._fp, self._data_file, int(self._metadata[‘Content-Length‘]), self._metadata[‘ETag‘], self._threadpool, self._disk_chunk_size, self._mgr.keep_cache_size, self._device_path, self._logger, quarantine_hook=_quarantine_hook, keep_cache=keep_cache) # At this point the reader object is now responsible for closing # the file pointer.文件指针 self._fp = None return dr
def __iter__(self): """Returns an iterator over the data file.""" try: dropped_cache = 0 self._bytes_read = 0 self._started_at_0 = False self._read_to_eof = False if self._fp.tell() == 0: self._started_at_0 = True self._iter_etag = hashlib.md5() while True: chunk = self._threadpool.run_in_thread( self._fp.read, self._disk_chunk_size) if chunk: if self._iter_etag: self._iter_etag.update(chunk) self._bytes_read += len(chunk) if self._bytes_read - dropped_cache > (1024 * 1024): self._drop_cache(self._fp.fileno(), dropped_cache, self._bytes_read - dropped_cache) dropped_cache = self._bytes_read yield chunk else: self._read_to_eof = True self._drop_cache(self._fp.fileno(), dropped_cache, self._bytes_read - dropped_cache) break finally: if not self._suppress_file_closing: self.close()__iter__是对文件读取内容的迭代,在读取过程中会计算新的etag值,final方法,关闭文件,在关闭文件时,如果有需要隔离的对象,则就会将对象隔离,先看close函数的实现
def close(self): """ Close the open file handle if present. For this specific implementation, this method will handle quarantining the file if necessary. """ if self._fp: try: if self._started_at_0 and self._read_to_eof: #文件从头到尾都读完 self._handle_close_quarantine() except DiskFileQuarantined: raise except (Exception, Timeout) as e: self._logger.error(_( ‘ERROR DiskFile %(data_file)s‘ ‘ close failure: %(exc)s : %(stack)s‘), {‘exc‘: e, ‘stack‘: ‘‘.join(traceback.format_stack()), ‘data_file‘: self._data_file}) finally: fp, self._fp = self._fp, None fp.close()
当文件从头到尾都都完是,关闭文件时对于不完整的文件会进行隔离,下面看self._handle_close_quarantine()方法
def _handle_close_quarantine(self): """Check if file needs to be quarantined(检查文件是否需要隔离)""" if self._bytes_read != self._obj_size: self._quarantine( "Bytes read: %s, does not match metadata: %s" % ( self._bytes_read, self._obj_size)) elif self._iter_etag and self._etag != self._iter_etag.hexdigest(): self._quarantine( "ETag %s and file‘s md5 %s do not match" % ( self._etag, self._iter_etag.hexdigest()))首先判读文件的长度和读取的长度是否相同,如果不相同,则为_quarantine方法传入的是读取的长度不匹配,如果etag不想同则传入的是md5值不匹配,看_quarantine方法的具体实现:
def _quarantine(self, msg): #移到一个隔离区 self._quarantined_dir = self._threadpool.run_in_thread( quarantine_renamer, self._device_path, self._data_file) self._logger.warn("Quarantined object %s: %s" % ( self._data_file, msg)) self._logger.increment(‘quarantines‘) self._quarantine_hook(msg)_quarantine方法中quarantine_renamer会执行最终的隔离:
def quarantine_renamer(device_path, corrupted_file_path): """ In the case that a file is corrupted文件损坏了, move it to a quarantined area to allow replication to fix it.让 replication 来处理 :params device_path: The path to the device the corrupted file is on. :params corrupted_file_path: The path to the file you want quarantined. :returns: path (str) of directory the file was moved to :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY exceptions from rename """ from_dir = dirname(corrupted_file_path) to_dir = join(device_path, ‘quarantined‘, ‘objects‘, basename(from_dir)) invalidate_hash(dirname(from_dir)) try: renamer(from_dir, to_dir) except OSError as e: if e.errno not in (errno.EEXIST, errno.ENOTEMPTY): raise to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex) renamer(from_dir, to_dir) return to_dir文件将会隔离到device_path/quarantined/objects目录下,并将隔离的目录返回,其中renamer方法
def renamer(old, new): """ Attempt to fix / hide race conditions like empty object directories being removed by backend processes during uploads, by retrying. :param old: old path to be renamed :param new: new path to be renamed to """ try: mkdirs(os.path.dirname(new)) os.rename(old, new) except OSError: mkdirs(os.path.dirname(new)) os.rename(old, new)从代码可以看出,将文件隔离到了新目录中。
由于本人水平有限,文中难免出现理解错误,敬请指正、交流,谢谢!
OpenStack_Swift源码分析——Object-auditor源码分析(2),布布扣,bubuko.com
OpenStack_Swift源码分析——Object-auditor源码分析(2)
原文:http://blog.csdn.net/xianghonglee/article/details/25961639