// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms
// keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64// 下一次读的位置 nextReadFileNum int64// 下一次读的文件number
// no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) }
go d.ioLoop() return &d }
可以看出, 队列中均使用不带cache 的chan,消息只能阻塞处理。
d.retrieveMetaData() 是从文件中恢复元数据。
d.ioLoop() 是队列的事件处理逻辑,后文详细解答
消息的读写
文件格式
文件名 "name" + .diskqueue.%06d.dat 其中, name 是 topic, 或者topic + channel 命名. 数据采用二进制方式存储, 消息大小+ body 的形式存储。
for { // 若到达刷盘频次,标记等待刷盘 if count == d.syncEvery { d.needSync = true }
if d.needSync { err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } count = 0 }
// 有可读数据,并且当前读chan的数据已经被读走,则读取下一条数据 if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) d.handleReadError() continue } } r = d.readChan } else { // 如果无可读数据,那么设置 r 为nil, 防止将dataRead数据重复传入readChan中 r = nil }
select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed // 如果读chan 被写入成功,则会修改读的偏移 d.moveForward() case <-d.emptyChan: // 清空所有文件,并返回empty的结果 d.emptyResponseChan <- d.deleteAllFiles() count = 0 case dataWrite := <-d.writeChan: // 写msg count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: // 到刷盘时间,则修改needSync = true if count == 0 { // avoid sync when there's no activity continue } d.needSync = true case <-d.exitChan: goto exit } }
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
现象 nsq 在定义struct 的时候,很多会出现类似的注释
原因 原因在golang 源码 sync/atomic/doc.go 中
1 2 3 4 5
// On ARM, x86-32, and 32-bit MIPS, // it is the caller's responsibility to arrange for 64-bit // alignment of 64-bit words accessed atomically. The first word in a // variable or in an allocated struct, array, or slice can be relied upon to be // 64-bit aligned.