commit 2f9aae3d512c54281c266c9dd0f9f5d0dd166e38
parent c2fee0e4b857a5e96e8b9eac5b719fe5fff9f77a
Author: dsp <dsp@2f30.org>
Date: Thu, 12 Feb 2015 13:16:41 -0700
retiring mrtarchive and xmlarchive in favor a type being a parameter in the query and in archentryfile type
Diffstat:
2 files changed, 60 insertions(+), 157 deletions(-)
diff --git a/archive/archive.go b/archive/archive.go
@@ -24,6 +24,14 @@ import (
const (
mrtfile = iota
xmlfile
+ ftypesize
+)
+
+var (
+ ftypestrings = [ftypesize]string {
+ "MRT",
+ "XML",
+ }
)
type reply struct {
@@ -91,14 +99,6 @@ type fsarchive struct {
DeleteNotAllowed
}
-type mrtarchive struct {
- *fsarchive
-}
-
-type xmlarchive struct {
- *fsarchive
-}
-
type fsarconf struct {
arfiles *timeentryslice
PutNotAllowed
@@ -140,7 +140,7 @@ func (fsc *fsarconf) Get(values url.Values) (int, chan reply) {
return 200, retc
}
-func (fsa *fsarchive) GetImpl(values url.Values, ar archive) (int, chan reply) {
+func (fsa *fsarchive) Get(values url.Values) (int, chan reply) {
var grwg sync.WaitGroup
retc := make(chan reply)
timeAstrs, ok1 := values["start"]
@@ -158,7 +158,7 @@ func (fsa *fsarchive) GetImpl(values url.Values, ar archive) (int, chan reply) {
retc <- reply{data: nil, err: errbaddate}
} else {
//buf.WriteString(fmt.Sprintf("quering from t0:%s - t1:%s\n", timeA, timeB))
- ar.Query(timeA, timeB, retc, &grwg) //this will fire a new goroutine
+ fsa.Query(timeA, timeB, retc, &grwg) //this will fire a new goroutine
}
}
// the last goroutine that will wait for all we invoked and close the chan
@@ -171,31 +171,26 @@ done:
return 200, retc
}
-func (fsa *mrtarchive) Get(values url.Values) (int, chan reply) {
- return fsa.fsarchive.GetImpl(values, fsa)
-}
-
-func (fsa *xmlarchive) Get(values url.Values) (int, chan reply) {
- return fsa.fsarchive.GetImpl(values, fsa)
-}
-
-func (ma *mrtarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGroup) {
+func (fsar *fsarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGroup) {
log.Printf("querying mrt from %s to %s\n", ta, tb)
go func(rc chan<- reply) {
wg.Add(1)
- ef := *ma.entryfiles
+ ef := *fsar.entryfiles
var scanner *bufio.Scanner
defer wg.Done()
+ //case no files in archive
if len(ef) == 0 {
rc <- reply{nil, errempty}
return
}
- if tb.Before(ef[0].sdate) || ta.After(ef[len(ef)-1].sdate.Add(ma.timedelta)) {
+ //case request not on the served range
+ if tb.Before(ef[0].sdate) || ta.After(ef[len(ef)-1].sdate.Add(fsar.timedelta)) {
rc <- reply{nil, errdate}
return
}
+ //get the start end indices
i := sort.Search(len(ef), func(i int) bool {
- return ef[i].sdate.After(ta.Add(-ma.timedelta - time.Second))
+ return ef[i].sdate.After(ta.Add(-fsar.timedelta - time.Second))
})
j := sort.Search(len(ef), func(i int) bool {
return ef[i].sdate.After(tb)
@@ -207,29 +202,34 @@ func (ma *mrtarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGrou
log.Println("failed opening file: ", ef[k].path, " ", ferr)
continue
}
+
if fext == ".bz2" {
log.Printf("bunzip2 file. opening decompression stream\n")
bzreader := bzip2.NewReader(file)
scanner = bufio.NewScanner(bzreader)
- scanner.Split(mrt.SplitMrt)
} else {
log.Printf("no extension on file: %s. opening normally\n", ef[k].path)
scanner = bufio.NewScanner(file)
+ }
+
+ if ef[k].ftype == mrtfile {
scanner.Split(mrt.SplitMrt)
}
- //buf.WriteString(fmt.Sprintf(" [ file: %s ] ", ef[k].path))
+
startt := time.Now()
for scanner.Scan() {
- data := scanner.Bytes()
- hdr, errh := mrt.NewMrtHdr(data[:mrt.MrtHdr_size])
- if errh != nil {
- log.Printf("error in creating MRT header:%s", errh)
- rc <- reply{data: nil, err: errh}
- continue
- }
- date := time.Unix(int64(hdr.Mrt_timestamp), 0)
- log.Printf("scanned mrt with date:%s", date)
- /*
+ if ef[k].ftype == mrtfile {
+ data := scanner.Bytes()
+ hdr, errh := mrt.NewMrtHdr(data[:mrt.MrtHdr_size])
+ if errh != nil {
+ log.Printf("error in creating MRT header:%s", errh)
+ rc <- reply{data: nil, err: errh}
+ continue
+ }
+ date := time.Unix(int64(hdr.Mrt_timestamp), 0)
+ log.Printf("scanned mrt with date:%s", date)
+ } else if ef[k].ftype == xmlfile {
+ str := scanner.Text()
dateindi := strings.Index(str, "<DATETIME>")
if dateindi == -1 {
log.Println("could not locate DATETIME string in xml msg: ", str)
@@ -253,7 +253,8 @@ func (ma *mrtarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGrou
rc <- reply{data: []byte(fmt.Sprintf("%s\n", str)), err: nil}
} else if xmldate.After(tb) { //only later measurements in this file. leaving
break
- }*/
+ }
+ }
}
if err := scanner.Err(); err != nil && err != io.EOF {
log.Printf("file scanner error:%s\n", err)
@@ -265,9 +266,13 @@ func (ma *mrtarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGrou
}(retc)
}
-func (fsa *mrtarchive) visit(path string, f os.FileInfo, err error) error {
+func (fsa *fsarchive) visit(path string,f os.FileInfo, err error) error {
+ var (
+ ft int
+ datestr string
+ )
fname := f.Name()
- log.Print("examining mrt: ", fname)
+ log.Print("examining: ", fname)
if strings.LastIndex(path, fsa.descriminator) == -1 {
log.Printf("visit: descriminator:%s not found in path:%s . ignoring\n", fsa.descriminator, path)
return nil
@@ -275,104 +280,33 @@ func (fsa *mrtarchive) visit(path string, f os.FileInfo, err error) error {
if f.Mode().IsRegular() {
numind := strings.IndexFunc(fname, unicode.IsDigit)
extind := strings.LastIndex(fname, ".bz2")
- if numind == -1 || extind == -1 || extind-numind != 13 {
- log.Print("file: ", fname, " not in foo.YYYYMMDD.HHMM.bz2... format. extind:%d numberind:%d", extind, numind)
+ xmlind := strings.LastIndex(fname, ".xml")
+ if (extind == -1 && xmlind == -1) || numind == -1 {
+ log.Printf("file: %s not in foo.YYYYMMDD.HHMM.[xml].bz2... format. extind:%d numberind:%d xmlind:%d",fname, extind, numind, xmlind)
return nil
}
- datestr := fname[numind:extind]
+ if extind-numind != 13 && xmlind-numind != 13 {
+ log.Printf("file: %s not in foo.YYYYMMDD.HHMM.[xml].bz2... format. extind:%d numberind:%d xmlind:%d",fname, extind, numind, xmlind)
+ return nil
+ }
+ if xmlind != -1 {
+ datestr = fname[numind:xmlind]
+ ft = xmlfile
+ } else {
+ datestr = fname[numind:extind]
+ ft = mrtfile
+ }
log.Println("datestr in filename is ", datestr)
time, errtime := time.Parse("20060102.1504", datestr)
if errtime != nil {
log.Print("time.Parse() failed on file: ", fname, " that should be in fooHHMM format with error: ", errtime)
return nil
}
- fsa.tempentryfiles = append(fsa.tempentryfiles, archentryfile{path: path, sdate: time, sz: f.Size(), ftype: mrtfile})
+ fsa.tempentryfiles = append(fsa.tempentryfiles, archentryfile{path: path, sdate: time, sz: f.Size(), ftype: ft})
}
return nil
}
-func (fsa *xmlarchive) Query(ta, tb time.Time, retc chan reply, wg *sync.WaitGroup) {
- log.Printf("querying from %s to %s\n", ta, tb)
- go func(rc chan<- reply) {
- wg.Add(1)
- defer wg.Done()
- ef := *fsa.entryfiles
- var scanner *bufio.Scanner
- if len(ef) == 0 {
- rc <- reply{nil, errempty}
- return
- }
- if tb.Before(ef[0].sdate) || ta.After(ef[len(ef)-1].sdate.Add(fsa.timedelta)) {
- rc <- reply{nil, errdate}
- return
- }
- i := sort.Search(len(ef), func(i int) bool {
- return ef[i].sdate.After(ta.Add(-fsa.timedelta - time.Second))
- })
- j := sort.Search(len(ef), func(i int) bool {
- return ef[i].sdate.After(tb)
- })
- for k := i; k < j; k++ {
- fext := filepath.Ext(ef[k].path)
- file, ferr := os.Open(ef[k].path)
- if ferr != nil {
- log.Println("failed opening file: ", ef[k].path, " ", ferr)
- continue
- }
- if fext == "" || fext == ".xml" {
- log.Printf("no extension on file: %s. opening normally\n", ef[k].path)
- scanner = bufio.NewScanner(file)
- } else if fext == ".bz2" {
- log.Printf("bunzip2 file. opening decompression stream\n")
- bzreader := bzip2.NewReader(file)
- scanner = bufio.NewScanner(bzreader)
- } else {
- log.Printf("unhandled file extension: %s\n", ef[j].path)
- continue
- }
- //buf.WriteString(fmt.Sprintf(" [ file: %s ] ", ef[k].path))
- startt := time.Now()
- for scanner.Scan() {
- str := scanner.Text()
- dateindi := strings.Index(str, "<DATETIME>")
- if dateindi == -1 {
- log.Println("could not locate DATETIME string in xml msg: ", str)
- continue
- }
- dateindi = dateindi + 10 // go to start of date data
- dateindj := strings.Index(str[dateindi:], "</DATETIME>")
- if dateindj == -1 {
- log.Println("could not locate closing </DATETIME> string in xml msg: ", str)
- continue
- }
- dateindj = dateindj + dateindi // to return it to the relative start of line pos
- xmldate, derr := time.Parse(time.RFC3339, str[dateindi:dateindj])
- if derr != nil {
- log.Println("could not parse datetime: %s\n", derr)
- continue
- }
- //log.Printf("parse xml message date: %s\n", xmldate)
- if xmldate.After(ta) && xmldate.Before(tb) {
- //buf.WriteString(fmt.Sprintf("%s\n",str))
- rc <- reply{data: []byte(fmt.Sprintf("%s\n", str)), err: nil}
- } else if xmldate.After(tb) { //only later measurements in this file. leaving
- break
- }
- }
- if err := scanner.Err(); err != nil && err != io.EOF {
- log.Printf("file scanner error:%s\n", err)
- }
- log.Printf("finished parsing file %s size %d in %s\n", ef[k].path, ef[k].sz, time.Since(startt))
- file.Close()
- }
- return
- }(retc)
-}
-
-func NewMRTArchive(path, descr string) *mrtarchive {
- return &mrtarchive{NewFsArchive(path, descr)}
-}
-
func NewFsArchive(path, descr string) *fsarchive {
return &fsarchive{
rootpathstr: path,
@@ -391,10 +325,6 @@ func NewFsArchive(path, descr string) *fsarchive {
}
}
-func NewXmlArchive(path, descr string) *xmlarchive {
- return &xmlarchive{NewFsArchive(path, descr)}
-}
-
//trying to see if a dir name is in YYYY.MM form
//returns true, year, month if it is, or false, 0, 0 if not.
func isYearMonthDir(fname string) (res bool, yr int, mon int) {
@@ -433,37 +363,10 @@ func isYearMonthDir(fname string) (res bool, yr int, mon int) {
return
}
-func (fsa *xmlarchive) visit(path string, f os.FileInfo, err error) error {
- fname := f.Name()
- log.Print("examining ", fname)
- if strings.LastIndex(path, fsa.descriminator) == -1 {
- log.Printf("visit: descriminator:%s not found in path:%s . ignoring\n", fsa.descriminator, path)
- return nil
- }
-
- if f.Mode().IsRegular() {
- numind := strings.IndexFunc(fname, unicode.IsDigit)
- xmlind := strings.LastIndex(fname, ".xml")
- if numind == -1 || xmlind == -1 || xmlind-numind != 13 {
- log.Print("file: ", fname, " not in foo.YYYYMMDD.HHMM.xml... format")
- return nil
- }
- datestr := fname[numind:xmlind]
- log.Println("datestr in filename is ", datestr)
- time, errtime := time.Parse("20060102.1504", datestr)
- if errtime != nil {
- log.Print("time.Parse() failed on file: ", fname, " that should be in fooHHMM format with error: ", errtime)
- return nil
- }
- fsa.tempentryfiles = append(fsa.tempentryfiles, archentryfile{path: path, sdate: time, sz: f.Size(), ftype: xmlfile})
- }
- return nil
-}
-
func (fsa *fsarchive) printEntries() {
log.Printf("dumping entries")
for _, ef := range *fsa.entryfiles {
- fmt.Printf("%s %s\n", ef.path, ef.sdate)
+ fmt.Printf("[path:%s\tdate:%s\tsize:%d\ttype:%s\n", ef.path, ef.sdate, ef.sz, ftypestrings[ef.ftype])
}
}
diff --git a/cmd/archive_server.go b/cmd/archive_server.go
@@ -12,13 +12,13 @@ func main() {
log.Fatal("usage: ", os.Args[0], " directory ")
}
basedirstr := os.Args[1]
- ribmrtar := ar.NewMRTArchive(basedirstr, "RIBS")
+ ribmrtar := ar.NewFsArchive(basedirstr, "RIBS")
wg1 := &sync.WaitGroup{}
mrtreqc := ribmrtar.Serve(wg1, ribmrtar)
mrtreqc <- "SCAN"
ribmrtar.Scanwg.Wait()
api := new(ar.API)
- api.AddResource(ribmrtar, "/archive/mrt/ribs")
+ api.AddResource(ribmrtar, "/archive/ribs")
api.Start(3000)
close(mrtreqc)
wg1.Wait()