funcNew(namestring,dataPathstring,maxBytesPerFileint64,minMsgSizeint32,maxMsgSizeint32,syncEveryint64,syncTimeouttime.Duration,logfAppLogFunc)Interface{d:=diskQueue{name:name,dataPath:dataPath,maxBytesPerFile:maxBytesPerFile,minMsgSize:minMsgSize,maxMsgSize:maxMsgSize,readChan:make(chan[]byte),writeChan:make(chan[]byte),writeResponseChan:make(chanerror),emptyChan:make(chanint),emptyResponseChan:make(chanerror),exitChan:make(chanint),exitSyncChan:make(chanint),syncEvery:syncEvery,syncTimeout:syncTimeout,logf:logf,}// no need to lock here, nothing else could possibly be touching this instance
err:=d.retrieveMetaData()iferr!=nil&&!os.IsNotExist(err){d.logf(ERROR,"DISKQUEUE(%s) failed to retrieveMetaData - %s",d.name,err)}god.ioLoop()return&d}
可以看出, 队列中均使用不带cache 的chan,消息只能阻塞处理。
d.retrieveMetaData() 是从文件中恢复元数据。
d.ioLoop() 是队列的事件处理逻辑,后文详细解答
消息的读写
文件格式
文件名 "name" + .diskqueue.%06d.dat 其中, name 是 topic, 或者topic + channel 命名.
数据采用二进制方式存储, 消息大小+ body 的形式存储。
func(d*diskQueue)ioLoop(){vardataRead[]bytevarerrerrorvarcountint64varrchan[]byte// 定时器的设置
syncTicker:=time.NewTicker(d.syncTimeout)for{// 若到达刷盘频次,标记等待刷盘
ifcount==d.syncEvery{d.needSync=true}ifd.needSync{err=d.sync()iferr!=nil{d.logf(ERROR,"DISKQUEUE(%s) failed to sync - %s",d.name,err)}count=0}// 有可读数据,并且当前读chan的数据已经被读走,则读取下一条数据
if(d.readFileNum<d.writeFileNum)||(d.readPos<d.writePos){ifd.nextReadPos==d.readPos{dataRead,err=d.readOne()iferr!=nil{d.logf(ERROR,"DISKQUEUE(%s) reading at %d of %s - %s",d.name,d.readPos,d.fileName(d.readFileNum),err)d.handleReadError()continue}}r=d.readChan}else{// 如果无可读数据,那么设置 r 为nil, 防止将dataRead数据重复传入readChan中
r=nil}select{// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
caser<-dataRead:count++// moveForward sets needSync flag if a file is removed
// 如果读chan 被写入成功,则会修改读的偏移
d.moveForward()case<-d.emptyChan:// 清空所有文件,并返回empty的结果
d.emptyResponseChan<-d.deleteAllFiles()count=0casedataWrite:=<-d.writeChan:// 写msg
count++d.writeResponseChan<-d.writeOne(dataWrite)case<-syncTicker.C:// 到刷盘时间,则修改needSync = true
ifcount==0{// avoid sync when there's no activity
continue}d.needSync=truecase<-d.exitChan:gotoexit}}exit:d.logf(INFO,"DISKQUEUE(%s): closing ... ioLoop",d.name)syncTicker.Stop()d.exitSyncChan<-1}
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
现象 nsq 在定义struct 的时候,很多会出现类似的注释
原因 原因在golang 源码 sync/atomic/doc.go 中
1
2
3
4
5
// On ARM, x86-32, and 32-bit MIPS,
// it is the caller's responsibility to arrange for 64-bit
// alignment of 64-bit words accessed atomically. The first word in a
// variable or in an allocated struct, array, or slice can be relied upon to be
// 64-bit aligned.