一般的系统中都会有大量的写日志操作,业务日志,mysql日志,ngxin日志等等。假设有一个系统目前的访问量很大,那么就会有大量的写日志操作。
Linux下读写文件是有buffer io的,写数据的时候先写到一个内存中的page buffer,此时这个page buffer就是dirty(脏的)。
可以看出,随着大量脏数据的产生,来不及刷盘是有可能阻塞write系统调用写日志的,会造成磁盘iops剧烈抖动,iowait飙升。我们可以采用异步写入方式,一般异步执行都是需要一个队列的东西来保存内容,而且这个队列必须是线程安全.
环形缓冲区,本质一个数组,也可以看成一个队列,不过它是首尾相连成环的,可以存储数据,用在不同线程间传递数据的缓冲区,一边生产数据塞进去,另一半消费数据拿出来。懒得画图,详细介绍可以参考这篇
Ring Buffer是什么
使用数组的最大优点是对CPU缓存友好,数组内元素的内存地址的连续性存储的,因为只要一个元素被加载到CPU缓存行,其相邻的其他元素也会被加载进同一个缓存行,减少了CPU访问内存的次数。
多说无益,实现一下,主要思路就是一个圆环,两个读写游标,分别记录读到哪了,写到哪了,当游标跑一圈到起始位置时,重置游标为0,当写游标等于读游标且数组不为空时满了。其实,就存在2种情况
package ringbuffer
import (
"errors"
"sync"
)
type RingBuffer struct {
// 存数据
buf []byte
// buf大小
size int
// 读游标
r int
// 写游标
w int
// 是否满了
isFull bool
// 线程安全
mu sync.Mutex
}
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
buf: make([]byte, size),
size: size,
}
}
// 读取指定大小的数据出来
func (r *RingBuffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
defer r.mu.Unlock()
// 读游标==写游标==0 无数据
if r.w == r.r && !r.isFull {
return 0, errors.New("It is empty")
}
// 第一圈, 写快读慢
if r.w > r.r {
// 可读出来的数据长度
n = r.w - r.r
if n > len(p) {
// 超过搬运缓冲区大小则截取
n = len(p)
}
copy(p, r.buf[r.r:r.r+n])
// 读完数据了,维护读游标,取模技巧(始终会在小于等于size的范围内)
r.r = (r.r + n) % r.size
return
}
// 第二圈,此时 读在前,写在后
// 可读出来的数据长度计算
n = r.size - r.r + r.w
if n > len(p) {
n = len(p)
}
if r.r+n <= r.size {
// 第一圈可以填充完毕
copy(p, r.buf[r.r:r.r+n])
} else {
// 分两步骤填充,第一圈填充一部分,第二圈填充一部分
c1 := r.size - r.r
copy(p, r.buf[r.r:r.size])
c2 := n - c1
copy(p[c1:], r.buf[0:c2])
}
// 维护读游标,取模技巧(始终会在小于等于size的范围内)
r.r = (r.r + n) % r.size
// 读出一部分数据除去,肯定是不满的
r.isFull = false
return n, err
}
// 写数据
func (r *RingBuffer) Write(p []byte) (n int, err error) {
// 需要写入的数据为空
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
defer r.mu.Unlock()
// 满了
if r.isFull {
return 0, errors.New("It is full")
}
// 计算可写容量
var avail int
if r.w >= r.r {
// 可用 = 总共-写占用+读释放
avail = r.size - r.w + r.r
} else {
// 写游标第二圈了,所以在读后面,那么剩余额容量= (读游标-写游标)
avail = r.r - r.w
}
// 要写的数据太多了,存不下
if len(p) > avail {
err = errors.New("Too many data to write")
// 截取
p = p[:avail]
}
n = len(p)
// 第一圈,写快 读慢
if r.w >= r.r {
// 第一圈剩余多少
c1 := r.size - r.w
// 第一圈剩下的够存下 需要写入的字符
if c1 >= n {
copy(r.buf[r.w:], p)
r.w += n
} else {
// 第一圈剩下的不够够存下需要写入的字符,分两步写
// 第一圈先写一部分
copy(r.buf[r.w:], p[:c1])
c2 := n - c1
// 第二圈继续写剩余的部分
copy(r.buf[0:], p[c1:])
// 此时写游标在第二圈,读游标后面
r.w = c2
}
} else {
// 写游标跑第二圈了
copy(r.buf[r.w:], p)
r.w += n
}
// 写游标写了一圈到了起始点回归0
if r.w == r.size {
r.w = 0
}
// 写游标跑一圈追上读游标了,此时满了
if r.w == r.r {
r.isFull = true
}
return n, err
}
func (r *RingBuffer) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
r.r = 0
r.w = 0
r.isFull = false
}
func main() {
buffer := ringbuffer.NewRingBuffer(1024)
_, _ = buffer.Write([]byte("hello"))
container := make([]byte, 5)
_, _ = buffer.Read(container)
fmt.Println(string(container))
}