go-bgp

a collection of golang BGP tools to monitor, archive and serve
git clone git://git.2f30.org/go-bgp
Log | Files | Refs | README

archive.go (13840B)


      1 package archive
      2 
      3 import (
      4 	"fmt"
      5 	"log"
      6 	//"io/ioutil"
      7 	//"bytes"
      8 	"bufio"
      9 	"compress/bzip2"
     10 	"errors"
     11 	mrt "go-bgp/mrt"
     12 	"io"
     13 	"net/url"
     14 	"os"
     15 	"path/filepath"
     16 	"sort"
     17 	"strconv"
     18 	"strings"
     19 	"sync"
     20 	"time"
     21 	"unicode"
     22 )
     23 
     24 //constants for archentryfile.ftype
     25 const (
     26 	unknownfile = iota
     27 	mrtfile
     28 	xmlfile
     29 	ftypesize
     30 )
     31 
     32 var (
     33 	ftypestrings = [ftypesize]string{
     34 		"UNKNOWN",
     35 		"MRT",
     36 		"XML",
     37 	}
     38 	errbadft     = errors.New("error:  unknown file type")
     39 	errparsedate = errors.New("error: parsing date")
     40 	errbadscan   = errors.New("bufio scanner failed")
     41 )
     42 
     43 type reply struct {
     44 	data []byte
     45 	err  error
     46 }
     47 
     48 //To perform a query asynchronously on possibly many files we fire multiple goroutines
     49 //that all write their results to chan reply, and we also need the waitgroup
     50 //to know when we should close the channel to end the http transaction
     51 type archive interface {
     52 	Query(queryParams)
     53 	visit(string, os.FileInfo, error) error
     54 }
     55 
     56 type queryParams struct {
     57 	from  time.Time
     58 	to    time.Time
     59 	rtype int
     60 	rch   chan reply
     61 	wg    *sync.WaitGroup
     62 }
     63 
     64 type xmlstring struct {
     65 	timestr string
     66 	msg     string
     67 	time    time.Time
     68 }
     69 
     70 func (x *xmlstring) String() string {
     71 	return x.msg
     72 }
     73 
     74 //implements Sort interface by time.Time
     75 type archentryfile struct {
     76 	path  string
     77 	sdate time.Time
     78 	sz    int64
     79 	ftype int
     80 }
     81 
     82 type timeentryslice []archentryfile
     83 
     84 func (p timeentryslice) Len() int {
     85 	return len(p)
     86 }
     87 
     88 func (p timeentryslice) Less(i, j int) bool {
     89 	return p[i].sdate.Before(p[j].sdate)
     90 }
     91 
     92 func (p timeentryslice) Swap(i, j int) {
     93 	p[i], p[j] = p[j], p[i]
     94 }
     95 
     96 type fsarchive struct {
     97 	rootpathstr    string
     98 	entryfiles     *timeentryslice
     99 	tempentryfiles timeentryslice //once we finish operating on that, we change the entryfiles pointer
    100 	curyr          int            //in case we need to walk dirs to find dates,these keep state
    101 	curmon         int
    102 	curday         int
    103 	reqchan        chan string
    104 	scanning       bool
    105 	Scanwg         *sync.WaitGroup // expose it so callers are able to wait for scan to finish
    106 	scanch         chan struct{}
    107 	timedelta      time.Duration
    108 	descriminator  string //a string to be matched if a file is to be added while visiting the fs hier
    109 	Conf           *fsarconf
    110 	//present tha archive as a restful resource
    111 	PutNotAllowed
    112 	PostNotAllowed
    113 	DeleteNotAllowed
    114 }
    115 
    116 type fsarconf struct {
    117 	ar *fsarchive
    118 	PutNotAllowed
    119 	PostNotAllowed
    120 	DeleteNotAllowed
    121 }
    122 
    123 //in order not to block in the following GETs, we need to
    124 //fire a new goroutine to send the reply on the channel
    125 // the reason is that we create the channel here and we must
    126 //return it to the responsewriter and any sends would block
    127 //without the receiver being ready.
    128 func (fsc *fsarconf) Get(values url.Values) (int, chan reply) {
    129 	retc := make(chan reply)
    130 	go func() {
    131 		defer close(retc) //must close the chan to let the listener finish.
    132 		if fsc.ar == nil {
    133 			log.Printf("nil archive in fsarconf. ignoring request\n")
    134 			return
    135 		}
    136 		if _, ok := values["range"]; ok {
    137 			if len(*fsc.ar.entryfiles) > 0 {
    138 				f := *fsc.ar.entryfiles
    139 				dates := fmt.Sprintf("[From:%s - To:%s . Delta:%s]\n", f[0].sdate, f[len(f)-1].sdate, fsc.ar.timedelta)
    140 				retc <- reply{data: []byte(dates), err: nil}
    141 				return
    142 			}
    143 			retc <- reply{data: nil, err: errempty}
    144 			return
    145 		}
    146 		if _, ok := values["files"]; ok {
    147 			for _, f := range *fsc.ar.entryfiles {
    148 				retc <- reply{data: []byte(fmt.Sprintf("[Filename:%s .Type:%s]\n", filepath.Base(f.path), ftypestrings[f.ftype])), err: nil}
    149 			}
    150 			return
    151 		}
    152 		return
    153 	}()
    154 	return 200, retc
    155 }
    156 
    157 func (fsa *fsarchive) Get(values url.Values) (int, chan reply) {
    158 	var (
    159 		grwg  sync.WaitGroup
    160 		ftype int
    161 	)
    162 	retc := make(chan reply)
    163 	timeAstrs, ok1 := values["start"]
    164 	timeBstrs, ok2 := values["end"]
    165 	tpstr, ok3 := values["type"]
    166 	if len(timeAstrs) != len(timeBstrs) || !ok1 || !ok2 {
    167 		retc <- reply{data: nil, err: errbadreq}
    168 		goto done
    169 	}
    170 	if !ok3 { //no type specified. ensuring it's 0 (unknownfile)
    171 		ftype = unknownfile
    172 	} else {
    173 		for tpi, tps := range ftypestrings {
    174 			//XXX: here i only respect the first type argument
    175 			if tps == tpstr[0] {
    176 				ftype = tpi
    177 				break
    178 			}
    179 			if tpi == ftypesize-1 { //we didn't find it let the user know
    180 				log.Printf("unknown return type %s in request", tpstr[0])
    181 				retc <- reply{data: nil, err: errbadtype}
    182 				goto done
    183 			}
    184 		}
    185 	}
    186 	//This will parse pairs of start=YYYYMMDDHHMMSS&end=YYYYMMDDHHMMSS requests
    187 	for i := 0; i < len(timeAstrs); i++ {
    188 		log.Printf("timeAstr:%s timeBstr:%s", timeAstrs[i], timeBstrs[i])
    189 		timeA, errtime := time.Parse("20060102150405", timeAstrs[i])
    190 		timeB, errtime := time.Parse("20060102150405", timeBstrs[i])
    191 		if errtime != nil || timeB.Before(timeA) {
    192 			retc <- reply{data: nil, err: errbaddate}
    193 		} else {
    194 			//buf.WriteString(fmt.Sprintf("quering from t0:%s - t1:%s\n", timeA, timeB))
    195 			qp := queryParams{from: timeA, to: timeB, rtype: ftype, rch: retc, wg: &grwg}
    196 			fsa.Query(qp) //this will fire a new goroutine
    197 		}
    198 	}
    199 	// the last goroutine that will wait for all we invoked and close the chan
    200 	go func(wg *sync.WaitGroup) {
    201 		wg.Wait()   //wait for all the goroutines to finish sending
    202 		close(retc) //close the chan so that range in responsewriter will finish
    203 		log.Printf("closing the chan\n")
    204 	}(&grwg)
    205 done:
    206 	return 200, retc
    207 }
    208 
    209 //func (fsar *fsarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGroup) {
    210 func (fsar *fsarchive) Query(qp queryParams) {
    211 	ta := qp.from
    212 	tb := qp.to
    213 	retc := qp.rch
    214 	rt := qp.rtype
    215 	wg := qp.wg
    216 	log.Printf("querying from %s to %s return as:%s \n", ta, tb, ftypestrings[rt])
    217 	go func(rc chan<- reply) {
    218 		wg.Add(1)
    219 		ef := *fsar.entryfiles
    220 		var (
    221 			scanner *bufio.Scanner
    222 			date    time.Time
    223 			err     error
    224 		)
    225 		defer wg.Done()
    226 		//case no files in archive
    227 		if len(ef) == 0 {
    228 			rc <- reply{nil, errempty}
    229 			return
    230 		}
    231 		//case request not on the served range
    232 		if tb.Before(ef[0].sdate) || ta.After(ef[len(ef)-1].sdate.Add(fsar.timedelta)) {
    233 			rc <- reply{nil, errdate}
    234 			return
    235 		}
    236 		//get the start end indices
    237 		i := sort.Search(len(ef), func(i int) bool {
    238 			return ef[i].sdate.After(ta.Add(-fsar.timedelta - time.Second))
    239 		})
    240 		j := sort.Search(len(ef), func(i int) bool {
    241 			return ef[i].sdate.After(tb)
    242 		})
    243 		for k := i; k < j; k++ {
    244 			//fext := filepath.Ext(ef[k].path)
    245 			file, ferr := os.Open(ef[k].path) //not calling defer cause i want to close files in the end of the loop
    246 			if ferr != nil {
    247 				log.Println("failed opening file: ", ef[k].path, " with ", ferr)
    248 				continue
    249 			}
    250 			ft := ef[k].ftype
    251 			scanner, err = getScanner(file, ft)
    252 			if err != nil {
    253 				log.Println("failed acquiring bufio scanner on: ", ef[k].path, " ", err)
    254 				continue
    255 			}
    256 
    257 			startt := time.Now()
    258 			for scanner.Scan() {
    259 				b := scanner.Bytes()
    260 				if len(b) == 0 {
    261 					log.Printf("0 number of bytes scanned from :%s", ef[k].path)
    262 				} else {
    263 					date, err = getDate(b, ft)
    264 					if err != nil {
    265 						log.Printf("Query: getting date on file:%s error: %s", ef[k].path, err)
    266 						continue
    267 					}
    268 				}
    269 				if date.After(ta) && date.Before(tb) {
    270 					log.Printf("succesfully sending data for date:%s", date)
    271 					rc <- reply{data: b, err: nil}
    272 				} else if date.After(tb) {
    273 					break //XXX: this is correct only on ordered files (increasing dates)
    274 				}
    275 			}
    276 			if err := scanner.Err(); err != nil && err != io.EOF {
    277 				log.Printf("file scanner error:%s\n", err)
    278 			}
    279 			log.Printf("finished parsing file %s size %d in %s\n", ef[k].path, ef[k].sz, time.Since(startt))
    280 			file.Close()
    281 		}
    282 		return
    283 	}(retc)
    284 }
    285 
    286 func getScanner(f *os.File, ft int) (scanner *bufio.Scanner, err error) {
    287 	scanner = nil
    288 	fext := filepath.Ext(f.Name())
    289 	if fext == ".bz2" {
    290 		log.Printf("bunzip2 file. opening decompression stream\n")
    291 		bzreader := bzip2.NewReader(f)
    292 		scanner = bufio.NewScanner(bzreader)
    293 	} else {
    294 		log.Printf("no extension on file: %s. opening normally\n", f.Name())
    295 		scanner = bufio.NewScanner(f)
    296 	}
    297 	if ft == mrtfile {
    298 		log.Printf("mrt file: %s. setting splitfunc to SplitMrt\n", f.Name())
    299 		scanner.Split(mrt.SplitMrt)
    300 	}
    301 	return
    302 }
    303 
    304 func getDate(data []byte, ftype int) (t time.Time, err error) {
    305 	switch ftype {
    306 	case mrtfile:
    307 		hdr, err := mrt.NewMrtHdr(data[:mrt.MrtHdr_size])
    308 		if err != nil {
    309 			log.Printf("getDate: error in creating MRT header:%s", err)
    310 		} else {
    311 			t = time.Unix(int64(hdr.Mrt_timestamp), 0)
    312 		}
    313 	case xmlfile:
    314 		str := string(data)
    315 		dateindi := strings.Index(str, "<DATETIME>")
    316 		if dateindi == -1 {
    317 			log.Println("getDate: could not locate DATETIME string in xml msg: ", str)
    318 			err = errbaddate
    319 			return
    320 		}
    321 		dateindi = dateindi + 10 // go to start of date data
    322 		dateindj := strings.Index(str[dateindi:], "</DATETIME>")
    323 		if dateindj == -1 {
    324 			log.Println("getDate: could not locate closing </DATETIME> string in xml msg: ", str)
    325 			err = errbaddate
    326 			return
    327 		}
    328 		dateindj = dateindj + dateindi // to return it to the relative start of line pos
    329 		t, err = time.Parse(time.RFC3339, str[dateindi:dateindj])
    330 		if err != nil {
    331 			log.Println("getDate: could not parse datetime: %s\n", err)
    332 		}
    333 	default:
    334 		err = errbadft
    335 	}
    336 	return
    337 }
    338 
    339 func getFirstDate(path string, ftype int) (date time.Time, err error) {
    340 	f, err := os.Open(path)
    341 	if err != nil {
    342 		return
    343 	}
    344 	defer f.Close()
    345 	sc, err := getScanner(f, ftype)
    346 	if err != nil {
    347 		return
    348 	}
    349 	sc.Scan()
    350 	b := sc.Bytes()
    351 	if len(b) == 0 {
    352 		log.Printf("0 number of bytes scanned from :%s", path)
    353 		err = errbadscan
    354 	} else {
    355 		date, err = getDate(b, ftype)
    356 	}
    357 	return
    358 }
    359 
    360 func (fsa *fsarchive) visit(path string, f os.FileInfo, err error) error {
    361 	var (
    362 		ft int
    363 	)
    364 	fname := f.Name()
    365 	log.Print("examining: ", fname)
    366 	if strings.LastIndex(path, fsa.descriminator) == -1 {
    367 		log.Printf("visit: descriminator:%s not found in path:%s . ignoring\n", fsa.descriminator, path)
    368 		return nil
    369 	}
    370 	if f.Mode().IsRegular() {
    371 		numind := strings.IndexFunc(fname, unicode.IsDigit)
    372 		extind := strings.LastIndex(fname, ".bz2")
    373 		xmlind := strings.LastIndex(fname, ".xml")
    374 		if (extind == -1 && xmlind == -1) || numind == -1 {
    375 			log.Printf("file: %s not in foo.YYYYMMDD.HHMM.[xml].bz2... format. extind:%d numberind:%d xmlind:%d", fname, extind, numind, xmlind)
    376 			return nil
    377 		}
    378 		if extind-numind != 13 && xmlind-numind != 13 {
    379 			log.Printf("file: %s not in foo.YYYYMMDD.HHMM.[xml].bz2... format. extind:%d numberind:%d xmlind:%d", fname, extind, numind, xmlind)
    380 			return nil
    381 		}
    382 		if xmlind != -1 {
    383 			ft = xmlfile
    384 		} else {
    385 			ft = mrtfile
    386 		}
    387 		time, err := getFirstDate(path, ft)
    388 		if err != nil {
    389 			log.Print("visit: getFirstDate failed on file: ", fname, " with error: ", err)
    390 			//return err // this should make the filepath.Walk to end
    391 			return nil
    392 		}
    393 		fsa.tempentryfiles = append(fsa.tempentryfiles, archentryfile{path: path, sdate: time, sz: f.Size(), ftype: ft})
    394 	}
    395 	return nil
    396 }
    397 
    398 func (fsa *fsarchive) SetDelta(a time.Duration) {
    399 	fsa.timedelta = a
    400 }
    401 
    402 func NewFsArchive(path, descr string) *fsarchive {
    403 	return &fsarchive{
    404 		rootpathstr:    path,
    405 		entryfiles:     &timeentryslice{},
    406 		tempentryfiles: timeentryslice{},
    407 		curyr:          0,
    408 		curmon:         0,
    409 		curday:         0,
    410 		reqchan:        make(chan string),
    411 		scanning:       false,
    412 		Scanwg:         &sync.WaitGroup{},
    413 		scanch:         make(chan struct{}),
    414 		timedelta:      15 * time.Minute,
    415 		descriminator:  descr,
    416 		Conf:           &fsarconf{},
    417 	}
    418 }
    419 
    420 //trying to see if a dir name is in YYYY.MM form
    421 //returns true, year, month if it is, or false, 0, 0 if not.
    422 func isYearMonthDir(fname string) (res bool, yr int, mon int) {
    423 	var err error
    424 	res = false
    425 	yr = 0
    426 	mon = 0
    427 	isdot := func(r rune) bool {
    428 		if r == '.' {
    429 			return true
    430 		}
    431 		return false
    432 	}
    433 	ind := strings.IndexFunc(fname, isdot)
    434 	//not found or in the form foo.
    435 	if ind == -1 || ind == len(fname) {
    436 		return
    437 	}
    438 	//not YYYY or MM
    439 	if len(fname[:ind]) != 4 || len(fname[ind+1:]) != 2 {
    440 		return
    441 	}
    442 	yr, err = strconv.Atoi(fname[:ind])
    443 	if err != nil {
    444 		return
    445 	}
    446 	mon, err = strconv.Atoi(fname[ind+1:])
    447 	if err != nil {
    448 		return
    449 	}
    450 	if mon < 1 || mon > 12 {
    451 		return
    452 	}
    453 	//the values were found to be valid
    454 	res = true
    455 	return
    456 }
    457 
    458 func (fsa *fsarchive) printEntries() {
    459 	log.Printf("dumping entries")
    460 	for _, ef := range *fsa.entryfiles {
    461 		fmt.Printf("[path:%s\tdate:%s\tsize:%d\ttype:%s\n", ef.path, ef.sdate, ef.sz, ftypestrings[ef.ftype])
    462 	}
    463 }
    464 
    465 func (fsa *fsarchive) scan(ar archive) {
    466 	//clear the temp slice
    467 	fsa.tempentryfiles = []archentryfile{}
    468 	fsa.Scanwg.Add(1)
    469 	fsa.scanning = true
    470 	filepath.Walk(fsa.rootpathstr, ar.visit)
    471 	sort.Sort(fsa.tempentryfiles)
    472 	//allow the serve goroutine to unblock in case of STOP.
    473 	fsa.Scanwg.Done()
    474 	//signal the serve goroutine on scandone channel
    475 	fsa.scanch <- struct{}{}
    476 }
    477 
    478 func (fsa *fsarchive) Serve(wg *sync.WaitGroup, ar archive) (reqchan chan<- string) {
    479 	if fsa.reqchan == nil { // we have closed the channel and now called again
    480 		fsa.reqchan = make(chan string)
    481 	}
    482 	wg.Add(1)
    483 	go func() {
    484 		defer wg.Done()
    485 		for {
    486 			select {
    487 			case req := <-fsa.reqchan:
    488 				switch req {
    489 				case "SCAN":
    490 					if fsa.scanning {
    491 						log.Print("fsarchive: already scanning. ignoring command")
    492 					} else { //fire an async goroutine to scan the files and wait for SCANDONE
    493 						go fsa.scan(ar)
    494 					}
    495 				case "DUMPENTRIES":
    496 					if fsa.scanning {
    497 						log.Print("fsar: warning. scanning in progress")
    498 					}
    499 					fsa.printEntries()
    500 				case "STOP":
    501 					log.Print("fsar: stopping")
    502 					fsa.Scanwg.Wait()
    503 					fsa.reqchan = nil //no more stuff from this channel
    504 					return
    505 				default:
    506 					log.Print("fsarchive: unknown request: ", req)
    507 				}
    508 			case <-fsa.scanch:
    509 				//update the reference to our file slice
    510 				fsa.entryfiles = &fsa.tempentryfiles
    511 				fsa.scanning = false
    512 				//let the config know
    513 				log.Printf("setting archive conf  from :%v to a slice of len: %v\n", fsa.Conf.ar, len(*fsa.entryfiles))
    514 				fsa.Conf.ar = fsa
    515 				log.Print("fsarchive: scan finished")
    516 			}
    517 		}
    518 	}()
    519 	return fsa.reqchan
    520 }