archive.go (13840B)
1 package archive 2 3 import ( 4 "fmt" 5 "log" 6 //"io/ioutil" 7 //"bytes" 8 "bufio" 9 "compress/bzip2" 10 "errors" 11 mrt "go-bgp/mrt" 12 "io" 13 "net/url" 14 "os" 15 "path/filepath" 16 "sort" 17 "strconv" 18 "strings" 19 "sync" 20 "time" 21 "unicode" 22 ) 23 24 //constants for archentryfile.ftype 25 const ( 26 unknownfile = iota 27 mrtfile 28 xmlfile 29 ftypesize 30 ) 31 32 var ( 33 ftypestrings = [ftypesize]string{ 34 "UNKNOWN", 35 "MRT", 36 "XML", 37 } 38 errbadft = errors.New("error: unknown file type") 39 errparsedate = errors.New("error: parsing date") 40 errbadscan = errors.New("bufio scanner failed") 41 ) 42 43 type reply struct { 44 data []byte 45 err error 46 } 47 48 //To perform a query asynchronously on possibly many files we fire multiple goroutines 49 //that all write their results to chan reply, and we also need the waitgroup 50 //to know when we should close the channel to end the http transaction 51 type archive interface { 52 Query(queryParams) 53 visit(string, os.FileInfo, error) error 54 } 55 56 type queryParams struct { 57 from time.Time 58 to time.Time 59 rtype int 60 rch chan reply 61 wg *sync.WaitGroup 62 } 63 64 type xmlstring struct { 65 timestr string 66 msg string 67 time time.Time 68 } 69 70 func (x *xmlstring) String() string { 71 return x.msg 72 } 73 74 //implements Sort interface by time.Time 75 type archentryfile struct { 76 path string 77 sdate time.Time 78 sz int64 79 ftype int 80 } 81 82 type timeentryslice []archentryfile 83 84 func (p timeentryslice) Len() int { 85 return len(p) 86 } 87 88 func (p timeentryslice) Less(i, j int) bool { 89 return p[i].sdate.Before(p[j].sdate) 90 } 91 92 func (p timeentryslice) Swap(i, j int) { 93 p[i], p[j] = p[j], p[i] 94 } 95 96 type fsarchive struct { 97 rootpathstr string 98 entryfiles *timeentryslice 99 tempentryfiles timeentryslice //once we finish operating on that, we change the entryfiles pointer 100 curyr int //in case we need to walk dirs to find dates,these keep state 101 curmon int 102 curday int 103 reqchan chan string 104 scanning bool 105 Scanwg *sync.WaitGroup // expose it so callers are able to wait for scan to finish 106 scanch chan struct{} 107 timedelta time.Duration 108 descriminator string //a string to be matched if a file is to be added while visiting the fs hier 109 Conf *fsarconf 110 //present tha archive as a restful resource 111 PutNotAllowed 112 PostNotAllowed 113 DeleteNotAllowed 114 } 115 116 type fsarconf struct { 117 ar *fsarchive 118 PutNotAllowed 119 PostNotAllowed 120 DeleteNotAllowed 121 } 122 123 //in order not to block in the following GETs, we need to 124 //fire a new goroutine to send the reply on the channel 125 // the reason is that we create the channel here and we must 126 //return it to the responsewriter and any sends would block 127 //without the receiver being ready. 128 func (fsc *fsarconf) Get(values url.Values) (int, chan reply) { 129 retc := make(chan reply) 130 go func() { 131 defer close(retc) //must close the chan to let the listener finish. 132 if fsc.ar == nil { 133 log.Printf("nil archive in fsarconf. ignoring request\n") 134 return 135 } 136 if _, ok := values["range"]; ok { 137 if len(*fsc.ar.entryfiles) > 0 { 138 f := *fsc.ar.entryfiles 139 dates := fmt.Sprintf("[From:%s - To:%s . Delta:%s]\n", f[0].sdate, f[len(f)-1].sdate, fsc.ar.timedelta) 140 retc <- reply{data: []byte(dates), err: nil} 141 return 142 } 143 retc <- reply{data: nil, err: errempty} 144 return 145 } 146 if _, ok := values["files"]; ok { 147 for _, f := range *fsc.ar.entryfiles { 148 retc <- reply{data: []byte(fmt.Sprintf("[Filename:%s .Type:%s]\n", filepath.Base(f.path), ftypestrings[f.ftype])), err: nil} 149 } 150 return 151 } 152 return 153 }() 154 return 200, retc 155 } 156 157 func (fsa *fsarchive) Get(values url.Values) (int, chan reply) { 158 var ( 159 grwg sync.WaitGroup 160 ftype int 161 ) 162 retc := make(chan reply) 163 timeAstrs, ok1 := values["start"] 164 timeBstrs, ok2 := values["end"] 165 tpstr, ok3 := values["type"] 166 if len(timeAstrs) != len(timeBstrs) || !ok1 || !ok2 { 167 retc <- reply{data: nil, err: errbadreq} 168 goto done 169 } 170 if !ok3 { //no type specified. ensuring it's 0 (unknownfile) 171 ftype = unknownfile 172 } else { 173 for tpi, tps := range ftypestrings { 174 //XXX: here i only respect the first type argument 175 if tps == tpstr[0] { 176 ftype = tpi 177 break 178 } 179 if tpi == ftypesize-1 { //we didn't find it let the user know 180 log.Printf("unknown return type %s in request", tpstr[0]) 181 retc <- reply{data: nil, err: errbadtype} 182 goto done 183 } 184 } 185 } 186 //This will parse pairs of start=YYYYMMDDHHMMSS&end=YYYYMMDDHHMMSS requests 187 for i := 0; i < len(timeAstrs); i++ { 188 log.Printf("timeAstr:%s timeBstr:%s", timeAstrs[i], timeBstrs[i]) 189 timeA, errtime := time.Parse("20060102150405", timeAstrs[i]) 190 timeB, errtime := time.Parse("20060102150405", timeBstrs[i]) 191 if errtime != nil || timeB.Before(timeA) { 192 retc <- reply{data: nil, err: errbaddate} 193 } else { 194 //buf.WriteString(fmt.Sprintf("quering from t0:%s - t1:%s\n", timeA, timeB)) 195 qp := queryParams{from: timeA, to: timeB, rtype: ftype, rch: retc, wg: &grwg} 196 fsa.Query(qp) //this will fire a new goroutine 197 } 198 } 199 // the last goroutine that will wait for all we invoked and close the chan 200 go func(wg *sync.WaitGroup) { 201 wg.Wait() //wait for all the goroutines to finish sending 202 close(retc) //close the chan so that range in responsewriter will finish 203 log.Printf("closing the chan\n") 204 }(&grwg) 205 done: 206 return 200, retc 207 } 208 209 //func (fsar *fsarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGroup) { 210 func (fsar *fsarchive) Query(qp queryParams) { 211 ta := qp.from 212 tb := qp.to 213 retc := qp.rch 214 rt := qp.rtype 215 wg := qp.wg 216 log.Printf("querying from %s to %s return as:%s \n", ta, tb, ftypestrings[rt]) 217 go func(rc chan<- reply) { 218 wg.Add(1) 219 ef := *fsar.entryfiles 220 var ( 221 scanner *bufio.Scanner 222 date time.Time 223 err error 224 ) 225 defer wg.Done() 226 //case no files in archive 227 if len(ef) == 0 { 228 rc <- reply{nil, errempty} 229 return 230 } 231 //case request not on the served range 232 if tb.Before(ef[0].sdate) || ta.After(ef[len(ef)-1].sdate.Add(fsar.timedelta)) { 233 rc <- reply{nil, errdate} 234 return 235 } 236 //get the start end indices 237 i := sort.Search(len(ef), func(i int) bool { 238 return ef[i].sdate.After(ta.Add(-fsar.timedelta - time.Second)) 239 }) 240 j := sort.Search(len(ef), func(i int) bool { 241 return ef[i].sdate.After(tb) 242 }) 243 for k := i; k < j; k++ { 244 //fext := filepath.Ext(ef[k].path) 245 file, ferr := os.Open(ef[k].path) //not calling defer cause i want to close files in the end of the loop 246 if ferr != nil { 247 log.Println("failed opening file: ", ef[k].path, " with ", ferr) 248 continue 249 } 250 ft := ef[k].ftype 251 scanner, err = getScanner(file, ft) 252 if err != nil { 253 log.Println("failed acquiring bufio scanner on: ", ef[k].path, " ", err) 254 continue 255 } 256 257 startt := time.Now() 258 for scanner.Scan() { 259 b := scanner.Bytes() 260 if len(b) == 0 { 261 log.Printf("0 number of bytes scanned from :%s", ef[k].path) 262 } else { 263 date, err = getDate(b, ft) 264 if err != nil { 265 log.Printf("Query: getting date on file:%s error: %s", ef[k].path, err) 266 continue 267 } 268 } 269 if date.After(ta) && date.Before(tb) { 270 log.Printf("succesfully sending data for date:%s", date) 271 rc <- reply{data: b, err: nil} 272 } else if date.After(tb) { 273 break //XXX: this is correct only on ordered files (increasing dates) 274 } 275 } 276 if err := scanner.Err(); err != nil && err != io.EOF { 277 log.Printf("file scanner error:%s\n", err) 278 } 279 log.Printf("finished parsing file %s size %d in %s\n", ef[k].path, ef[k].sz, time.Since(startt)) 280 file.Close() 281 } 282 return 283 }(retc) 284 } 285 286 func getScanner(f *os.File, ft int) (scanner *bufio.Scanner, err error) { 287 scanner = nil 288 fext := filepath.Ext(f.Name()) 289 if fext == ".bz2" { 290 log.Printf("bunzip2 file. opening decompression stream\n") 291 bzreader := bzip2.NewReader(f) 292 scanner = bufio.NewScanner(bzreader) 293 } else { 294 log.Printf("no extension on file: %s. opening normally\n", f.Name()) 295 scanner = bufio.NewScanner(f) 296 } 297 if ft == mrtfile { 298 log.Printf("mrt file: %s. setting splitfunc to SplitMrt\n", f.Name()) 299 scanner.Split(mrt.SplitMrt) 300 } 301 return 302 } 303 304 func getDate(data []byte, ftype int) (t time.Time, err error) { 305 switch ftype { 306 case mrtfile: 307 hdr, err := mrt.NewMrtHdr(data[:mrt.MrtHdr_size]) 308 if err != nil { 309 log.Printf("getDate: error in creating MRT header:%s", err) 310 } else { 311 t = time.Unix(int64(hdr.Mrt_timestamp), 0) 312 } 313 case xmlfile: 314 str := string(data) 315 dateindi := strings.Index(str, "<DATETIME>") 316 if dateindi == -1 { 317 log.Println("getDate: could not locate DATETIME string in xml msg: ", str) 318 err = errbaddate 319 return 320 } 321 dateindi = dateindi + 10 // go to start of date data 322 dateindj := strings.Index(str[dateindi:], "</DATETIME>") 323 if dateindj == -1 { 324 log.Println("getDate: could not locate closing </DATETIME> string in xml msg: ", str) 325 err = errbaddate 326 return 327 } 328 dateindj = dateindj + dateindi // to return it to the relative start of line pos 329 t, err = time.Parse(time.RFC3339, str[dateindi:dateindj]) 330 if err != nil { 331 log.Println("getDate: could not parse datetime: %s\n", err) 332 } 333 default: 334 err = errbadft 335 } 336 return 337 } 338 339 func getFirstDate(path string, ftype int) (date time.Time, err error) { 340 f, err := os.Open(path) 341 if err != nil { 342 return 343 } 344 defer f.Close() 345 sc, err := getScanner(f, ftype) 346 if err != nil { 347 return 348 } 349 sc.Scan() 350 b := sc.Bytes() 351 if len(b) == 0 { 352 log.Printf("0 number of bytes scanned from :%s", path) 353 err = errbadscan 354 } else { 355 date, err = getDate(b, ftype) 356 } 357 return 358 } 359 360 func (fsa *fsarchive) visit(path string, f os.FileInfo, err error) error { 361 var ( 362 ft int 363 ) 364 fname := f.Name() 365 log.Print("examining: ", fname) 366 if strings.LastIndex(path, fsa.descriminator) == -1 { 367 log.Printf("visit: descriminator:%s not found in path:%s . ignoring\n", fsa.descriminator, path) 368 return nil 369 } 370 if f.Mode().IsRegular() { 371 numind := strings.IndexFunc(fname, unicode.IsDigit) 372 extind := strings.LastIndex(fname, ".bz2") 373 xmlind := strings.LastIndex(fname, ".xml") 374 if (extind == -1 && xmlind == -1) || numind == -1 { 375 log.Printf("file: %s not in foo.YYYYMMDD.HHMM.[xml].bz2... format. extind:%d numberind:%d xmlind:%d", fname, extind, numind, xmlind) 376 return nil 377 } 378 if extind-numind != 13 && xmlind-numind != 13 { 379 log.Printf("file: %s not in foo.YYYYMMDD.HHMM.[xml].bz2... format. extind:%d numberind:%d xmlind:%d", fname, extind, numind, xmlind) 380 return nil 381 } 382 if xmlind != -1 { 383 ft = xmlfile 384 } else { 385 ft = mrtfile 386 } 387 time, err := getFirstDate(path, ft) 388 if err != nil { 389 log.Print("visit: getFirstDate failed on file: ", fname, " with error: ", err) 390 //return err // this should make the filepath.Walk to end 391 return nil 392 } 393 fsa.tempentryfiles = append(fsa.tempentryfiles, archentryfile{path: path, sdate: time, sz: f.Size(), ftype: ft}) 394 } 395 return nil 396 } 397 398 func (fsa *fsarchive) SetDelta(a time.Duration) { 399 fsa.timedelta = a 400 } 401 402 func NewFsArchive(path, descr string) *fsarchive { 403 return &fsarchive{ 404 rootpathstr: path, 405 entryfiles: &timeentryslice{}, 406 tempentryfiles: timeentryslice{}, 407 curyr: 0, 408 curmon: 0, 409 curday: 0, 410 reqchan: make(chan string), 411 scanning: false, 412 Scanwg: &sync.WaitGroup{}, 413 scanch: make(chan struct{}), 414 timedelta: 15 * time.Minute, 415 descriminator: descr, 416 Conf: &fsarconf{}, 417 } 418 } 419 420 //trying to see if a dir name is in YYYY.MM form 421 //returns true, year, month if it is, or false, 0, 0 if not. 422 func isYearMonthDir(fname string) (res bool, yr int, mon int) { 423 var err error 424 res = false 425 yr = 0 426 mon = 0 427 isdot := func(r rune) bool { 428 if r == '.' { 429 return true 430 } 431 return false 432 } 433 ind := strings.IndexFunc(fname, isdot) 434 //not found or in the form foo. 435 if ind == -1 || ind == len(fname) { 436 return 437 } 438 //not YYYY or MM 439 if len(fname[:ind]) != 4 || len(fname[ind+1:]) != 2 { 440 return 441 } 442 yr, err = strconv.Atoi(fname[:ind]) 443 if err != nil { 444 return 445 } 446 mon, err = strconv.Atoi(fname[ind+1:]) 447 if err != nil { 448 return 449 } 450 if mon < 1 || mon > 12 { 451 return 452 } 453 //the values were found to be valid 454 res = true 455 return 456 } 457 458 func (fsa *fsarchive) printEntries() { 459 log.Printf("dumping entries") 460 for _, ef := range *fsa.entryfiles { 461 fmt.Printf("[path:%s\tdate:%s\tsize:%d\ttype:%s\n", ef.path, ef.sdate, ef.sz, ftypestrings[ef.ftype]) 462 } 463 } 464 465 func (fsa *fsarchive) scan(ar archive) { 466 //clear the temp slice 467 fsa.tempentryfiles = []archentryfile{} 468 fsa.Scanwg.Add(1) 469 fsa.scanning = true 470 filepath.Walk(fsa.rootpathstr, ar.visit) 471 sort.Sort(fsa.tempentryfiles) 472 //allow the serve goroutine to unblock in case of STOP. 473 fsa.Scanwg.Done() 474 //signal the serve goroutine on scandone channel 475 fsa.scanch <- struct{}{} 476 } 477 478 func (fsa *fsarchive) Serve(wg *sync.WaitGroup, ar archive) (reqchan chan<- string) { 479 if fsa.reqchan == nil { // we have closed the channel and now called again 480 fsa.reqchan = make(chan string) 481 } 482 wg.Add(1) 483 go func() { 484 defer wg.Done() 485 for { 486 select { 487 case req := <-fsa.reqchan: 488 switch req { 489 case "SCAN": 490 if fsa.scanning { 491 log.Print("fsarchive: already scanning. ignoring command") 492 } else { //fire an async goroutine to scan the files and wait for SCANDONE 493 go fsa.scan(ar) 494 } 495 case "DUMPENTRIES": 496 if fsa.scanning { 497 log.Print("fsar: warning. scanning in progress") 498 } 499 fsa.printEntries() 500 case "STOP": 501 log.Print("fsar: stopping") 502 fsa.Scanwg.Wait() 503 fsa.reqchan = nil //no more stuff from this channel 504 return 505 default: 506 log.Print("fsarchive: unknown request: ", req) 507 } 508 case <-fsa.scanch: 509 //update the reference to our file slice 510 fsa.entryfiles = &fsa.tempentryfiles 511 fsa.scanning = false 512 //let the config know 513 log.Printf("setting archive conf from :%v to a slice of len: %v\n", fsa.Conf.ar, len(*fsa.entryfiles)) 514 fsa.Conf.ar = fsa 515 log.Print("fsarchive: scan finished") 516 } 517 } 518 }() 519 return fsa.reqchan 520 }