15 "github.com/mediocregopher/radix.v2/pool"
16 "github.com/mgutz/str"
17 "github.com/sirupsen/logrus"
20 const HANDLE_DIG = " /dig?"
21 const HANDLE_MOVIE = "/movie/"
22 const HANDLE_LIST = "/list/"
23 const HANDLE_HTML = ".html"
25 type cmdParams struct {
41 unType string // 详情页 或者 列表页 或者 首页
42 unRid int // Resource ID 资源ID
43 unUrl string // 当前这个页面的url
44 unTime string // 当前访问这个页面的时间
46 type storageBlock struct {
52 var log = logrus.New()
56 log.SetLevel(logrus.DebugLevel)
61 logFilePath := flag.String("logFilePath", "/Users/pangee/Public/nginx/logs/dig.log", "log file path")
62 routineNum := flag.Int("routineNum", 5, "consumer numble by goroutine")
63 l := flag.String("l", "/tmp/log", "this programe runtime log target file path")
66 params := cmdParams{*logFilePath, *routineNum}
69 logFd, err := os.OpenFile(*l, os.O_CREATE|os.O_WRONLY, 0644)
74 log.Infof("Exec start.")
75 log.Infof("Params: logFilePath=%s, routineNum=%d", params.logFilePath, params.routineNum)
77 // 初始化一些channel,用于数据传递
78 var logChannel = make(chan string, 3*params.routineNum)
79 var pvChannel = make(chan urlData, params.routineNum)
80 var uvChannel = make(chan urlData, params.routineNum)
81 var storageChannel = make(chan storageBlock, params.routineNum)
84 redisPool, err := pool.New("tcp", "localhost:6379", 2*params.routineNum)
86 log.Fatalln("Redis pool created failed.")
92 time.Sleep(3 * time.Second)
98 go readFileLinebyLine(params, logChannel)
101 for i := 0; i < params.routineNum; i++ {
102 go logConsumer(logChannel, pvChannel, uvChannel)
106 go pvCounter(pvChannel, storageChannel)
107 go uvCounter(uvChannel, storageChannel, redisPool)
111 go dataStorage(storageChannel, redisPool)
113 time.Sleep(1000 * time.Second)
117 func dataStorage(storageChannel chan storageBlock, redisPool *pool.Pool) {
118 for block := range storageChannel {
119 prefix := block.counterType + "_"
123 // 层级: 定级-大分类-小分类-终极页面
124 // 存储模型: Redis SortedSet
126 prefix + "day_" + getTime(block.unode.unTime, "day"),
127 prefix + "hour_" + getTime(block.unode.unTime, "hour"),
128 prefix + "min_" + getTime(block.unode.unTime, "min"),
129 prefix + block.unode.unType + "_day_" + getTime(block.unode.unTime, "day"),
130 prefix + block.unode.unType + "_hour_" + getTime(block.unode.unTime, "hour"),
131 prefix + block.unode.unType + "_min_" + getTime(block.unode.unTime, "min"),
134 rowId := block.unode.unRid
136 for _, key := range setKeys {
137 ret, err := redisPool.Cmd(block.storageModel, key, 1, rowId).Int()
138 if ret <= 0 || err != nil {
139 log.Errorln("DataStorage redis storage error.", block.storageModel, key, rowId)
145 func pvCounter(pvChannel chan urlData, storageChannel chan storageBlock) {
146 for data := range pvChannel {
147 sItem := storageBlock{"pv", "ZINCRBY", data.unode}
148 storageChannel <- sItem
151 func uvCounter(uvChannel chan urlData, storageChannel chan storageBlock, redisPool *pool.Pool) {
152 for data := range uvChannel {
154 hyperLogLogKey := "uv_hpll_" + getTime(data.data.time, "day")
155 ret, err := redisPool.Cmd("PFADD", hyperLogLogKey, data.uid, "EX", 86400).Int()
157 log.Warningln("UvCounter check redis hyperloglog failed, ", err)
163 sItem := storageBlock{"uv", "ZINCRBY", data.unode}
164 storageChannel <- sItem
168 func logConsumer(logChannel chan string, pvChannel, uvChannel chan urlData) error {
169 for logStr := range logChannel {
171 data := cutLogFetchData(logStr)
174 // 说明: 课程中模拟生成uid, md5(refer+ua)
176 hasher.Write([]byte(data.refer + data.ua))
177 uid := hex.EncodeToString(hasher.Sum(nil))
183 uData := urlData{data, uid, formatUrl(data.url, data.time)}
190 func cutLogFetchData(logStr string) digData {
191 logStr = strings.TrimSpace(logStr)
192 pos1 := str.IndexOf(logStr, HANDLE_DIG, 0)
196 pos1 += len(HANDLE_DIG)
197 pos2 := str.IndexOf(logStr, " HTTP/", pos1)
198 d := str.Substr(logStr, pos1, pos2-pos1)
200 urlInfo, err := url.Parse("http://localhost/?" + d)
204 data := urlInfo.Query()
212 func readFileLinebyLine(params cmdParams, logChannel chan string) error {
213 fd, err := os.Open(params.logFilePath)
215 log.Warningf("ReadFileLinebyLine can't open file:%s", params.logFilePath)
221 bufferRead := bufio.NewReader(fd)
223 line, err := bufferRead.ReadString('\n')
227 if count%(1000*params.routineNum) == 0 {
228 log.Infof("ReadFileLinebyLine line: %d", count)
232 time.Sleep(3 * time.Second)
233 log.Infof("ReadFileLinebyLine wait, raedline:%d", count)
235 log.Warningf("ReadFileLinebyLine read log error")
242 func formatUrl(url, t string) urlNode {
243 // 一定从量大的着手, 详情页>列表页≥首页
244 pos1 := str.IndexOf(url, HANDLE_MOVIE, 0)
246 pos1 += len(HANDLE_MOVIE)
247 pos2 := str.IndexOf(url, HANDLE_HTML, 0)
248 idStr := str.Substr(url, pos1, pos2-pos1)
249 id, _ := strconv.Atoi(idStr)
250 return urlNode{"movie", id, url, t}
252 pos1 = str.IndexOf(url, HANDLE_LIST, 0)
254 pos1 += len(HANDLE_LIST)
255 pos2 := str.IndexOf(url, HANDLE_HTML, 0)
256 idStr := str.Substr(url, pos1, pos2-pos1)
257 id, _ := strconv.Atoi(idStr)
258 return urlNode{"list", id, url, t}
260 return urlNode{"home", 1, url, t}
261 } // 如果页面url有很多种,就不断在这里扩展
265 func getTime(logTime, timeType string) string {
272 item = "2006-01-02 15"
275 item = "2006-01-02 15:04"
278 t, _ := time.Parse(item, time.Now().Format(item))
279 return strconv.FormatInt(t.Unix(), 10)