Author: Paolo Lulli <paolo@lulli.net>
Parquet write ok, but recreates file
server/db/dates_test.go | 8 ++++++-- server/db/metric.go | 24 ++++++++++++++++++++++-- server/db/parquet.go | 9 +++++---- server/yats.go | 11 +++++------
diff --git a/server/db/dates_test.go b/server/db/dates_test.go index ecfd83fdd7f8334eeeb5e95afbbe141b660bf7ec..8643cb78bccd1a5452a4b4862bbda6ba07beb459 100644 --- a/server/db/dates_test.go +++ b/server/db/dates_test.go @@ -2,6 +2,7 @@ package db import ( "fmt" + "os" "testing" "time" "yats-server/model" @@ -48,7 +49,8 @@ Etime: int64(time.Now().UnixMilli()), Name: "SomeTest", } - EventToParquet(event) + tmpFile, _ := os.CreateTemp("/tmp/", "tempEvent-XXXX.parquet") + EventToParquet(event, tmpFile.Name()) metric := model.MetricRequest{ ID_client: "mabc", @@ -57,6 +59,8 @@ Name: "someName", Value: "SomeValue", } - MetricToParquet(metric) + tmpMetricFile, _ := os.CreateTemp("/tmp/", "tempMetric-XXXX.parquet") + + MetricToParquet(metric, tmpMetricFile.Name()) } diff --git a/server/db/metric.go b/server/db/metric.go index 5da48d3492db8c0277eea55b580d0b020a0c7df1..599fde0504104645b67fb65a6e158264d2f07b69 100644 --- a/server/db/metric.go +++ b/server/db/metric.go @@ -3,6 +3,7 @@ import ( "fmt" "time" + "yats-server/model" "github.com/gocql/gocql" ) @@ -14,13 +15,31 @@ Name string `json:"name"` Value string `json:"value"` } -func PrintMetrics(metrics []MetricModel) { +func PrintMetrics(metrics []model.MetricRequest) { for _, metric := range metrics { fmt.Printf("%s\n", metric.Name) } } -func LoadMetrics(session *gocql.Session) []MetricModel { +func LoadMetrics(session *gocql.Session) []model.MetricRequest { + var metrics []model.MetricRequest + m := map[string]interface{}{} + + iter := session.Query("SELECT id_client, mtime, name, value FROM metric").Iter() + for iter.MapScan(m) { + metrics = append(metrics, model.MetricRequest{ + ID_client: m["id_client"].(string), + Mtime: m["mtime"].(time.Time).UnixMilli(), + Name: m["name"].(string), + Value: m["value"].(string), + }) + + m = map[string]interface{}{} + } + return metrics +} + +func OLDLoadMetrics(session *gocql.Session) []MetricModel { var metrics []MetricModel m := map[string]interface{}{} @@ -32,6 +51,7 @@ Mtime: m["mtime"].(time.Time), Name: m["name"].(string), Value: m["value"].(string), }) + m = map[string]interface{}{} } return metrics diff --git a/server/db/parquet.go b/server/db/parquet.go index 0bb149922f06fd71874a06275df3fc7d6960bd6b..271f75c428584d949d2bebfda98a27f25b1838f5 100644 --- a/server/db/parquet.go +++ b/server/db/parquet.go @@ -8,10 +8,10 @@ "github.com/xitongsys/parquet-go/writer" ) -func EventToParquet(event model.EventRequest) { +func EventToParquet(event model.EventRequest, parquetFile string) { var err error - w, err := os.Create("/tmp/flat.parquet") + w, err := os.Create(parquetFile) if err != nil { log.Println("Can't create local file", err) return @@ -35,10 +35,11 @@ log.Println("Write Finished") w.Close() } -func MetricToParquet(metric model.MetricRequest) { +func MetricToParquet(metric model.MetricRequest, parquetFile string) { var err error - w, err := os.Create("/tmp/metric.parquet") + + w, err := os.Create(parquetFile) if err != nil { log.Println("Can't create local file", err) return diff --git a/server/yats.go b/server/yats.go index 5a3f6eeabadadef41a515c647265aa4deacfdd00..42e95cf033963cdde9f379f34bb3eb3e50496d08 100644 --- a/server/yats.go +++ b/server/yats.go @@ -18,6 +18,11 @@ func maintenanceThread() { var sleeptime, _ = time.ParseDuration("60s") for { time.Sleep(sleeptime) + + metrics := db.LoadMetrics(db.Session) + for i := range metrics { + db.MetricToParquet(metrics[i], "/tmp/allmetrics.parquet") + } fmt.Printf("Maintenance Thread\n") } } @@ -26,12 +31,6 @@ func startServer() { configuration := config.GetConfig(os.Getenv("HOME") + "/.yats.json") db.Session = db.InitializeDb(configuration) - //idClient := "ClientZero" - /* - var metrics = db.LoadMetrics(session) - - db.PrintMetrics(metrics) - */ RestService(configuration) }