commit 42e2b4e39d199a002c2b34ef71282a0cc9f915ac
parent eeee7a46703a4fb58b8f095aacd1f2cb21969cc9
Author: dsp <dsp@2f30.org>
Date: Mon, 16 Feb 2015 22:16:46 -0700
implemented getScanner , getDate and getFirstDate, to abstract part of Query's functionality.
Diffstat:
M | archive/archive.go | | | 137 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------- |
1 file changed, 114 insertions(+), 23 deletions(-)
diff --git a/archive/archive.go b/archive/archive.go
@@ -6,6 +6,7 @@ import (
//"io/ioutil"
//"bytes"
"bufio"
+ "errors"
"compress/bzip2"
mrt "go-bgp/mrt"
"io"
@@ -34,6 +35,9 @@ var (
"MRT",
"XML",
}
+ errbadft = errors.New("error: unknown file type")
+ errparsedate = errors.New("error: parsing date")
+ errbadscan = errors.New("bufio scanner failed")
)
type reply struct {
@@ -213,7 +217,11 @@ func (fsar *fsarchive) Query(qp queryParams) {
go func(rc chan<- reply) {
wg.Add(1)
ef := *fsar.entryfiles
- var scanner *bufio.Scanner
+ var (
+ scanner *bufio.Scanner
+ date time.Time
+ err error
+ )
defer wg.Done()
//case no files in archive
if len(ef) == 0 {
@@ -233,28 +241,39 @@ func (fsar *fsarchive) Query(qp queryParams) {
return ef[i].sdate.After(tb)
})
for k := i; k < j; k++ {
- fext := filepath.Ext(ef[k].path)
- file, ferr := os.Open(ef[k].path)
+ //fext := filepath.Ext(ef[k].path)
+ file, ferr := os.Open(ef[k].path) //not calling defer cause i want to close files in the end of the loop
if ferr != nil {
- log.Println("failed opening file: ", ef[k].path, " ", ferr)
+ log.Println("failed opening file: ", ef[k].path, " with ", ferr)
continue
}
-
- if fext == ".bz2" {
- log.Printf("bunzip2 file. opening decompression stream\n")
- bzreader := bzip2.NewReader(file)
- scanner = bufio.NewScanner(bzreader)
- } else {
- log.Printf("no extension on file: %s. opening normally\n", ef[k].path)
- scanner = bufio.NewScanner(file)
- }
-
- if ef[k].ftype == mrtfile {
- scanner.Split(mrt.SplitMrt)
+ ft := ef[k].ftype
+ scanner , err = getScanner(file, ft)
+ if err != nil {
+ log.Println("failed acquiring bufio scanner on: ", ef[k].path, " ", err)
+ continue
}
startt := time.Now()
for scanner.Scan() {
+ b := scanner.Bytes()
+ if len(b) == 0 {
+ log.Printf("0 number of bytes scanned from :%s", ef[k].path)
+ } else {
+ date, err = getDate(b, ft)
+ if err != nil {
+ log.Printf("Query: getting date on file:%s error: %s", ef[k].path, err)
+ continue
+ }
+ }
+ if date.After(ta) && date.Before(tb) {
+ log.Printf("succesfully sending data for date:%s", date)
+ rc <- reply{data: b, err: nil}
+ } else if date.After(tb) {
+ break //XXX: this is correct only on ordered files (increasing dates)
+ }
+ /*
+
if ef[k].ftype == mrtfile {
data := scanner.Bytes()
hdr, errh := mrt.NewMrtHdr(data[:mrt.MrtHdr_size])
@@ -292,6 +311,7 @@ func (fsar *fsarchive) Query(qp queryParams) {
break
}
}
+ */
}
if err := scanner.Err(); err != nil && err != io.EOF {
log.Printf("file scanner error:%s\n", err)
@@ -303,10 +323,83 @@ func (fsar *fsarchive) Query(qp queryParams) {
}(retc)
}
+func getScanner(f *os.File, ft int) (scanner *bufio.Scanner, err error) {
+ scanner = nil
+ fext := filepath.Ext(f.Name())
+ if fext == ".bz2" {
+ log.Printf("bunzip2 file. opening decompression stream\n")
+ bzreader := bzip2.NewReader(f)
+ scanner = bufio.NewScanner(bzreader)
+ } else {
+ log.Printf("no extension on file: %s. opening normally\n", f.Name())
+ scanner = bufio.NewScanner(f)
+ }
+ if ft == mrtfile {
+ log.Printf("mrt file: %s. setting splitfunc to SplitMrt\n", f.Name())
+ scanner.Split(mrt.SplitMrt)
+ }
+ return
+}
+
+func getDate(data []byte, ftype int) (t time.Time, err error) {
+ switch ftype {
+ case mrtfile :
+ hdr, err := mrt.NewMrtHdr(data[:mrt.MrtHdr_size])
+ if err != nil {
+ log.Printf("getDate: error in creating MRT header:%s", err)
+ } else {
+ t = time.Unix(int64(hdr.Mrt_timestamp), 0)
+ }
+ case xmlfile :
+ str := string(data)
+ dateindi := strings.Index(str, "<DATETIME>")
+ if dateindi == -1 {
+ log.Println("getDate: could not locate DATETIME string in xml msg: ", str)
+ err = errbaddate
+ return
+ }
+ dateindi = dateindi + 10 // go to start of date data
+ dateindj := strings.Index(str[dateindi:], "</DATETIME>")
+ if dateindj == -1 {
+ log.Println("getDate: could not locate closing </DATETIME> string in xml msg: ", str)
+ err = errbaddate
+ return
+ }
+ dateindj = dateindj + dateindi // to return it to the relative start of line pos
+ t, err = time.Parse(time.RFC3339, str[dateindi:dateindj])
+ if err != nil {
+ log.Println("getDate: could not parse datetime: %s\n", err)
+ }
+ default:
+ err = errbadft
+ }
+ return
+}
+
+func getFirstDate(path string, ftype int) (date time.Time, err error) {
+ f, err := os.Open(path)
+ if err != nil {
+ return
+ }
+ defer f.Close()
+ sc, err := getScanner(f, ftype)
+ if err != nil {
+ return
+ }
+ sc.Scan()
+ b := sc.Bytes()
+ if len(b) == 0 {
+ log.Printf("0 number of bytes scanned from :%s", path)
+ err = errbadscan
+ } else {
+ date, err = getDate(b, ftype)
+ }
+ return
+}
+
func (fsa *fsarchive) visit(path string,f os.FileInfo, err error) error {
var (
ft int
- datestr string
)
fname := f.Name()
log.Print("examining: ", fname)
@@ -327,16 +420,14 @@ func (fsa *fsarchive) visit(path string,f os.FileInfo, err error) error {
return nil
}
if xmlind != -1 {
- datestr = fname[numind:xmlind]
ft = xmlfile
} else {
- datestr = fname[numind:extind]
ft = mrtfile
}
- log.Println("datestr in filename is ", datestr)
- time, errtime := time.Parse("20060102.1504", datestr)
- if errtime != nil {
- log.Print("time.Parse() failed on file: ", fname, " that should be in fooHHMM format with error: ", errtime)
+ time, err := getFirstDate(path, ft)
+ if err != nil {
+ log.Print("visit: getFirstDate failed on file: ", fname, " with error: ", err)
+ //return err // this should make the filepath.Walk to end
return nil
}
fsa.tempentryfiles = append(fsa.tempentryfiles, archentryfile{path: path, sdate: time, sz: f.Size(), ftype: ft})