侧边栏壁纸
  • 累计撰写 71 篇文章
  • 累计创建 87 个标签
  • 累计收到 5 条评论

目 录CONTENT

文章目录

对比Etcd中的Wal和Tidwall中的Wal中的实现

KunkkaWu
2021-12-23 / 0 评论 / 5 点赞 / 8,361 阅读 / 5,327 字 / 正在检测是否收录...

WAL

简介

Write Ahead Logging,简称WAL,也被翻译成预写式日志,是数据库技术中实现事务日志的一种标准方法,可以实现单机事务的原子性,同时可以提高数据库的写入效率。

原理

【写操作】
WAL怎么解决宕机和恢复的问题:

  • 写WAL前宕机了,重启后,数据处于事务未执行的状态
  • 写WAL时宕机了,重启后,可以检查到WAL数据不正确,回滚当事务前的状态
  • 写WAL后宕机了,重启后,把WAL中记录的操作,应用到数据库文件中,得到事务执行后的状态

WAL的核心思想是: 先写日志再写数据文件,修改数据文件必须发生在修改操作记录在日志文件之后.

【读操作】
WAL中可能包含尚未写入数据库的最新值,如果读最新值就需要从WAL中读。如果WAL中未读到,从数据库中读取的就是最新值。

【CheckPoint检查点】
WAL是异步写入数据库的。CheckPoint用来记录已经被写入数据库的文件操作序号,CheckPoint后面的记录尚未写入数据库。

作用

  • 单机事务的原子性【数据库事务】
  • 数据持久化,不丢失
  • 提高写入效率

原子性和持久化

思考: 假设我们在一个事务中,需要修改数据中的A和B,都需要保存最新的值到磁盘上持久化。如果在保存A完成后,系统宕机了,这时B待写入的值丢失。那么如何发现事务没有完成,如何保证事务的原子性?

解决:在磁盘中增加事务标志。 当事务完成后,修改标识为完成。如果事务没有完成,就把事务回滚。保证数据的原子性。

因此:数据库中针对Crash和Recovery的解决方案是WAL。

案例

Etcd的Wal

结构体定义:

type WAL struct {
	dir string // the living directory of the underlay files

	// dirFile is a fd for the wal directory for syncing on Rename
	dirFile *os.File

	metadata []byte           // metadata recorded at the head of each WAL
	state    raftpb.HardState // hardstate recorded at the head of WAL

	start     walpb.Snapshot // snapshot to start reading  从快照确定的位置之后开始读
	decoder   *decoder       // decoder to decode records
	readClose func() error   // closer for decode reader

	mu      sync.Mutex
	enti    uint64   // index of the last entry saved to the wal
	encoder *encoder // encoder to encode records

	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
	fp    *filePipeline          // 会预先创建一个锁定的文件
}

WAL对外暴露的方法

创建Wal文件:Create()
  1. 预分配文件,大小为SegmentSizeBytes(64MB)
  2. 保存metadata到wal
  3. 保存空的snapshot
  4. 刷盘

WAL中的数据都是以Record为单位保存的

type Record struct {
	Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"`
	Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
	Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
	XXX_unrecognized []byte `json:"-"`
}

Create()中保存metadata:

if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
	return nil, err
}
WAL存储:Save()
  1. 判断是否需要同步刷新磁盘
  2. saveEntry
  3. saveState
  4. 根据数据是否小于64MB,是否切割
  5. 根据1的结果,执行是否刷盘

MustSync用来判断当前的Save是否需要同步持久化,由于每台服务器上都必须无条件久化三个量:currentTerm、votedFor和log entries,因此只要log entries不为0,或者候选人id有变化或者是任期号有变化,都需要持久化。

func MustSync(st, prevst pb.HardState, entsnum int) bool {
	// Persistent state on all servers:
	// (Updated on stable storage before responding to RPCs)
	// currentTerm
	// votedFor
	// log entries[]
	return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

HardState表示服务器当前状态,定义在raft.pb.go,主要包含Term、Vote、Commit

// the last known state
// Term:服务器最后一次知道的任期号
// Vote:当前获得选票的候选人的id
// Commit:已知的最大的已经被提交的日志条目的索引值(被多数派确认的)
type HardState struct {
	Term             uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
	Vote             uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
	Commit           uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
	XXX_unrecognized []byte `json:"-"`
}

Entry就表示提交的日志条目了,定义在raft.pb.go中

// Term:该条日志对应的Term
// Index:日志的索引
// Type:日志的类型,普通日志和配置变更日志
// Data:日志内容
type Entry struct {
	Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
	Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
	Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
	Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
	XXX_unrecognized []byte    `json:"-"`
}

由前面的Save逻辑可以看出,当WAL文件超过一定大小时(默认为64MB),就需要进行切割,其逻辑在cut方法中实现(在wal.go中)

  • 关闭当前文件写入,刷盘
  • 启用一个新的wal文件,并写入头部信息
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.
func (w *WAL) cut() error {
	// close old wal file; truncate(截断) to avoid wasting space if an early cut
	off, serr := w.tail().Seek(0, io.SeekCurrent)
	if serr != nil {
		return serr
	}

	// Truncate changes the size of the file.  It does not change the I/O offset.
	if err := w.tail().Truncate(off); err != nil {
		return err
	}

	// 同步更新,刷盘
	if err := w.sync(); err != nil {
		return err
	}

    // seq+1 ,index为最后一条日志的索引+1
	fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))

	// create a temp wal file with name sequence + 1, or truncate the existing one
	// 从filePipeline中获取一个预先打开的wal临时LockedFile
	newTail, err := w.fp.Open()
	if err != nil {
		return err
	}

	// update writer and save the previous crc
	// 将新文件添加到LockedFile数组
	w.locks = append(w.locks, newTail)
	// 计算当前文件的crc
	prevCrc := w.encoder.crc.Sum32()
	// 用新创建的文件创建encoder,并传入之前文件的crc,这样可以前后校验
	w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
	if err != nil {
		return err
	}
	// 保存crcType类型的recode
	if err = w.saveCrc(prevCrc); err != nil {
		return err
	}

	// metadata必须放在wal文件头
	if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
		return err
	}
	// 保存HardState型recode
	if err = w.saveState(&w.state); err != nil {
		return err
	}
	// atomically move temp wal file to wal file
	if err = w.sync(); err != nil {
		return err
	}

	off, err = w.tail().Seek(0, io.SeekCurrent)
	if err != nil {
		return err
	}

	// 重命名
	if err = os.Rename(newTail.Name(), fpath); err != nil {
		return err
	}

	// 同步目录
	if err = fileutil.Fsync(w.dirFile); err != nil {
		return err
	}

	// reopen newTail with its new path so calls to Name() match the wal filename format
	newTail.Close()

	// 重新打开并上锁新的文件(重命名之后的)
	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
		return err
	}
	if _, err = newTail.Seek(off, io.SeekStart); err != nil {
		return err
	}

	// 重新添加到LockedFile数组(替换之前那个临时的)
	w.locks[len(w.locks)-1] = newTail
    // 获取上一个文件的crc
	prevCrc = w.encoder.crc.Sum32()
	// 用新文件重新创建encoder
	w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
	if err != nil {
		return err
	}

	plog.Infof("segmented wal file %v is created", fpath)
	return nil
}

tidwall/wal

一款轻量级的wal

  • 支持批量写BatchWrite
  • TruncateFront: 该方法是移除指定的index之前的所有数据,将该index置为firstIndex
  • TruncateBack : 移除指定的index之后的所有日志,将该index变成lastIndex
  • Cache支持lru淘汰机制
  • 默认同步刷盘,写的时候判断segment文件是否已经达到阈值SegmentSize,再进行刷盘。
  • 支持,Sync手动同步刷盘,和异步刷盘

Alt text

OPtions
 // Options for Log
type Options struct {
	// NoSync disables fsync after writes. This is less durable and puts the
	// log at risk of data loss when there's a server crash.
	NoSync bool
	// SegmentSize of each segment. This is just a target value, actual size
	// may differ. Default is 20 MB.
  // 每个segment的目标大小20M,实际值可能会有些偏差
	SegmentSize int
	// LogFormat is the format of the log files. Default is Binary.
	LogFormat LogFormat
	// SegmentCacheSize is the maximum number of segments that will be held in
	// memory for caching. Increasing this value may enhance performance for
	// concurrent read operations. Default is 1
	// 缓存的segment的最大个数 
	SegmentCacheSize int
	// NoCopy allows for the Read() operation to return the raw underlying data
	// slice. This is an optimization to help minimize allocations. When this
	// option is set, do not modify the returned data because it may affect
	// other Read calls. Default false
	// 在读时是否拷贝一份数据返回给client,默认false 
	NoCopy bool
}

// DefaultOptions for Open().
var DefaultOptions = &Options{
	NoSync:           false,    // true不每次写实时刷盘,false表示每次写实时刷盘
	SegmentSize:      20971520, // 20 MB log segment files.
	LogFormat:        Binary,   // Binary format is small and fast.
	SegmentCacheSize: 2,        // Number of cached in-memory segments
	NoCopy:           false,    // Make a new copy of data for every Read call.
}
Sigmeng
 // segment represents a single segment file.
type segment struct {
	path  string // path of segment file
	index uint64 // first index of segment
	ebuf  []byte // cached entries buffer,该segment实际的数据
	epos  []bpos // cached entries positions in buffer,该segment数据的索引
    // 对于其中存储的一条日志条目来说,假设其在epos中的位置为i,则其对应的数据的起始下标和结束下标分别为start=epos[i].pos,end=epos[i].end,
    // 因此最终的数据为 data=ebuf[start:end]  
    
}

type bpos struct {
	pos int // byte position
	end int // one byte past pos
}

日志的写入
  • 首先加锁、检查文件是否冲突和关闭
  • 首先对进来的日志index进行检查,所有的index都是递增的,如果小于当前记录的最后一条日志lastIndex的话,就直接报错
  • 所有的日志都是进行追加的操作,往最后一个segment文件中追加。(遍历b中的条目,根据entry中记录的size拿到entry中的data。最后往segment的ebuf中写入。并同时记录epos信息)
  • ,每条日志写入后都要判断当前的segment文件是否已经达到阈值(SegmentSize)了,如果达到的话,则调用cycle()方法,将当前的数据刷到磁盘,然后新建一个新的segment文件,记录segment.Index,并将当前的segment放入cache中,继续进行写入
  • 日志写入分为两种格式:json和二进制,json的话,会将index和data拼接成一个json串,然后写入到文件中,一个条目一行
  • 所有日志写完后,然后释放锁
// Write an entry to the log.
func (l *Log) Write(index uint64, data []byte) error {
	l.mu.Lock()
    // 异常检查
	defer l.mu.Unlock()
	if l.corrupt {
		return ErrCorrupt
	} else if l.closed {
		return ErrClosed
	}
	l.wbatch.Clear()
    // 写入到内置的wbatch中
	l.wbatch.Write(index, data)
	return l.writeBatch(&l.wbatch)
}


// WriteBatch writes the entries in the batch to the log in the order that they
// were added to the batch. The batch is cleared upon a successful return.
func (l *Log) WriteBatch(b *Batch) error {
	l.mu.Lock()
	defer l.mu.Unlock()
	if l.corrupt {
		return ErrCorrupt
	} else if l.closed {
		return ErrClosed
	}
	if len(b.entries) == 0 {
		return nil
	}
	return l.writeBatch(b)
}

func (l *Log) writeBatch(b *Batch) error {
	// check that all indexes in batch are sane
	for i := 0; i < len(b.entries); i++ {
		if b.entries[i].index != l.lastIndex+uint64(i+1) {
			return ErrOutOfOrder
		}
	}

	// load the tail segment
	s := l.segments[len(l.segments)-1]
    // 写之前先检查是否满了,满了的话,重新开一个新的segment,往新的segment中写入数据
	if len(s.ebuf) > l.opts.SegmentSize {
		// tail segment has reached capacity. Close it and create a new one.
		if err := l.cycle(); err != nil {
			return err
		}
		s = l.segments[len(l.segments)-1]
	}

	mark := len(s.ebuf)
	datas := b.datas
	for i := 0; i < len(b.entries); i++ {
		// 1. 拿到index对应的data,1和4是相互对应的
		data := datas[:b.entries[i].size]
		
        var epos bpos
		// Write(index,data)一样
		s.ebuf, epos = l.appendEntry(s.ebuf, b.entries[i].index, data)
		s.epos = append(s.epos, epos)
		// 每写入一次,判断是否大于一个块的文件内容了,大于的话,新建一个segment
		if len(s.ebuf) >= l.opts.SegmentSize {
			// segment has reached capacity, cycle now
			if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil {
				return err
			}
			l.lastIndex = b.entries[i].index
			if err := l.cycle(); err != nil {
				return err
			}
			s = l.segments[len(l.segments)-1]
			mark = 0
		}

		// 4. 移动datas
		datas = datas[b.entries[i].size:]
	}

    // 集中写入一个batch后,往文件中写一次
	if len(s.ebuf)-mark > 0 {
		if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil {
			return err
		}
		l.lastIndex = b.entries[len(b.entries)-1].index
	}

    // 判断是否需要刷盘
	if !l.opts.NoSync {
		if err := l.sfile.Sync(); err != nil {
			return err
		}
	}

    // 清空wbatch
	b.Clear()
	return nil
}


// Cycle the old segment for a new segment.
func (l *Log) cycle() error {
	if err := l.sfile.Sync(); err != nil {
		return err
	}
	if err := l.sfile.Close(); err != nil {
		return err
	}
	// cache the previous segment
	l.pushCache(len(l.segments) - 1)
	s := &segment{
		index: l.lastIndex + 1,
		path:  filepath.Join(l.path, segmentName(l.lastIndex+1)),
	}
	var err error
	l.sfile, err = os.Create(s.path)
	if err != nil {
		return err
	}
	l.segments = append(l.segments, s)
	return nil
}


日志的读取
  • 首先加锁、检查文件是否冲突和关闭、检查index范围是否合法
  • 接着加载segment
    • 首先判断该index是否在最后一个segment,如果是就返回
    • 其次在cache中寻找,cache中找到后,也就返回
    • 走到这一步说明该index所在的segment只有在磁盘中了,需要从磁盘进行加载
      • 先找该 index命中哪个segment(segment文件名有序,按照二分查找即可)
      • 找到后从segmentFile 中恢复segment(一方面恢复数据ebuf,另外一方面恢复索引epos)
      • 最后将该segment再放进cache中缓存起来
  • 据当前的index以及segment记录的index快读定位到位置,然后再从ebuf中读取数据,如果是json的话再进行处理返回,否则直接返回
  • 最后释放锁
// Read an entry from the log. Returns a byte slice containing the data entry.
func (l *Log) Read(index uint64) (data []byte, err error) {
	l.mu.RLock()
	defer l.mu.RUnlock()
	if l.corrupt {
		return nil, ErrCorrupt
	} else if l.closed {
		return nil, ErrClosed
	}
	// 判断index是否合法,必须在firstIndex~lastIndex之间 
	if index == 0 || index < l.firstIndex || index > l.lastIndex {
		return nil, ErrNotFound
	}
	// 根据index加载segment 
	s, err := l.loadSegment(index)
	if err != nil {
		return nil, err
	}
	// 根据index找到其索引epos,然后直接从ebuf中读取数据 
	epos := s.epos[index-s.index]
	edata := s.ebuf[epos.pos:epos.end]
	if l.opts.LogFormat == JSON {
		return readJSON(edata)
	}
	// binary read
	// 先读取长度
	size, n := binary.Uvarint(edata)
	if n <= 0 {
		return nil, ErrCorrupt
	}
	if uint64(len(edata)-n) < size {
		return nil, ErrCorrupt
	}
	// 然后读取数据 
	if l.opts.NoCopy {
		data = edata[n : uint64(n)+size]
	} else {
		data = make([]byte, size)
		copy(data, edata[n:])
	}
	return data, nil
}


//go:noinline
func readJSON(edata []byte) ([]byte, error) {
	var data []byte
	s := gjson.Get(*(*string)(unsafe.Pointer(&edata)), "data").String()
	if len(s) > 0 && s[0] == '$' {
		var err error
		data, err = base64.URLEncoding.DecodeString(s[1:])
		if err != nil {
			return nil, ErrCorrupt
		}
	} else if len(s) > 0 && s[0] == '+' {
		data = make([]byte, len(s[1:]))
		copy(data, s[1:])
	} else {
		return nil, ErrCorrupt
	}
	return data, nil
}


// loadSegment loads the segment entries into memory, pushes it to the front
// of the lru cache, and returns it.
func (l *Log) loadSegment(index uint64) (*segment, error) {

	// check the last segment first.
    // 先判断是否在最后一个中
	lseg := l.segments[len(l.segments)-1]
	if index >= lseg.index {
		return lseg, nil
	}

	// check the most recent cached segment
    // 再从最近的缓存中寻找
	var rseg *segment
	l.scache.Range(func(_, v interface{}) bool {
		s := v.(*segment)
		if index >= s.index && index < s.index+uint64(len(s.epos)) {
			rseg = s
		}
		return false
	})
	if rseg != nil {
		return rseg, nil
	}
    // 前面两个策略都没找到的话,则从文件中找,首先定位命中的segment是哪个,然后再从segment File中读取数据和重新构建索引
	// find in the segment array
	idx := l.findSegment(index)
	s := l.segments[idx]
	if len(s.epos) == 0 {
		// load the entries from cache
		if err := l.loadSegmentEntries(s); err != nil {
			return nil, err
		}
	}
	// push the segment to the front of the cache
    // 放入cache中
	l.pushCache(idx)
	return s, nil
}


// findSegment performs a bsearch on the segments
// 因为segment的文件名是有序的,所以按照二分查找
func (l *Log) findSegment(index uint64) int {
	i, j := 0, len(l.segments)
	for i < j {
		h := i + (j-i)/2
		if index >= l.segments[h].index {
			i = h + 1
		} else {
			j = h
		}
	}
	return i - 1
}

func (l *Log) loadSegmentEntries(s *segment) error {
	data, err := ioutil.ReadFile(s.path)
	if err != nil {
		return err
	}
	ebuf := data
	var epos []bpos
	var pos int
	// 相当于构建epos索引
	for exidx := s.index; len(data) > 0; exidx++ {
		var n int
		if l.opts.LogFormat == JSON {
			n, err = loadNextJSONEntry(data)
		} else {
			n, err = loadNextBinaryEntry(data)
		}
		if err != nil {
			return err
		}
		data = data[n:]
		epos = append(epos, bpos{pos, pos + n})
		pos += n
	}
	s.ebuf = ebuf
	s.epos = epos
	return nil
}

Redis中的Wal

目前,Redis 的持久化主要有两大机制,即 AOF(Append Only File)日志和 RDB 快照。
其中AOF使用的是:写后日志。 就是先执行命令,再写日志。这种方式的一大好处是,可以避免出现记录错误命令的情况,同时也避免了执行命令因为写日志导致阻塞。

AOF的潜在风险

  • 首先,如果刚执行完一个命令,还没有来得及记日志就宕机了,会产生数据丢失。
  • 可能阻塞下一个命令。AOF日志是主线程执行的。

三种回写策略

  • Always,同步写回:每个写命令执行完,立马同步地将日志写回磁盘;
  • Everysec,每秒写回:每个写命令执行完,只是先把日志写到 AOF 文件的内存缓冲区,每隔一秒把缓冲区中的内容写入磁盘;
  • No,操作系统控制的写回:每个写命令执行完,只是先把日志写到 AOF 文件的内存缓冲区,由操作系统决定何时将缓冲区内容写回磁盘。

因此: 想要获得高性能,就选择 No 策略;如果想要得到高可靠性保证,就选择 Always 策略;如果允许数据有一点丢失,又希望性能别受太大影响的话,那么就选择 Everysec 策略

RocksDB中的Wal

在RocksDB中每一次数据的更新都会涉及到两个结构,一个是内存中的memtable(后续会刷新到磁盘成为SST),第二个是WAL(WriteAheadLog)

  • WAL主要的功能是当RocksDB异常退出后,能够恢复出错前的内存中(memtable)数据.
  • 因此RocksDB默认是每次用户写都会刷新数据到WAL
  • 每次当当前WAL对应的内存数据(memtable)刷新到磁盘之后,都会新建一个WAL
  • 每一个wal文件和一个memtable一一对应

对比

tidwall Etcd Redis RocksDB
刷盘 默认异步,支持手动 同步刷盘 三种策略 同步刷盘
偏向 性能 持久化 灵活 持久化
清理 LRU+前删+后删 手动PurgeFile 手动 -

补充:为什么要刷盘,Sync又是什么?

sync命令文件系统管理 sync命令用于强制被改变的内容立刻写入磁盘,更新超块信息。

在Linux/Unix系统中,在文件或数据处理过程中一般先放到内存缓冲区中,等到适当的时候再写入磁盘,以提高系统的运行效率。。sync命令则可用来强制将内存缓冲区中的数据立即写入磁盘中。

用户通常不需执行sync命令,系统会自动执行update或bdflush操作,将缓冲区的数据写 入磁盘。只有在update或bdflush无法执行或用户需要非正常关机时,才需手动执行sync命令。

buffer与cache buffer

为了解决写磁盘的效率,linux系统为了提高读写磁盘的效率,会先将数据放在一块buffer中。在写磁盘时并不是立即将数据写到磁盘中,而是先写入这块buffer中。此时如果重启系统,就可能造成数据丢失。

sync命令用来flush文件系统buffer,这样数据才会真正的写到磁盘中,并且buffer才能够释放出来,flush就是用来清空buffer。

sync命令会强制将数据写入磁盘中,并释放该数据对应的buffer,所以常常会在写磁盘后输入sync命令来将数据真正的写入磁盘。 如果不去手动的输入sync命令来真正的去写磁盘,linux系统也会周期性的去sync数据

思考

为什么要使用Wal?

  1. 保证有效数据在系统异常时不丢失
  2. 提升数据落盘性能

1 和 2 在某些时候,是冲突的。
如果想要保证1,那么每一次的写入都需要刷盘。如果尚未刷盘的时候,断电了,磁盘缓冲区的数据依然会丢失。
如果部分数据,在一定情况下是可以发生丢失的,那么可以优先2.

总结

tidwall的Wal,目前支持异步刷盘,也支持同步手动刷盘。设计精巧。目前支持绝大多数业务场景。

5

评论区