Author: Paolo Lulli <paolo@lulli.net>
Can store ranges to Parquet files
client/LogNow.sh | 7 ++++ client/postEventNow | 2 client/postMetric.sh | 2 client/postMetricNow.sh | 2 schema/schema.cql | 6 ++++ server/config/config.go | 37 +++++++++++++++++++++---- server/dates/formatting.go | 19 ++++++++++++ server/dates/ranges.go | 28 ++++++++++++++++++ server/db/clients.go | 27 ++++++++++++++++++ server/db/dates_test.go | 19 ++++-------- server/db/metric.go | 45 ++++++++++++------------------ server/db/metric_info.go | 26 +++++++++++++++++ server/db/parquet.go | 8 +++-- server/maintenance.go | 59 ++++++++++++++++++++++++++++++++++++++++ server/model/models.go | 12 ++++++++ server/yats.go | 22 +-------------
diff --git a/client/LogNow.sh b/client/LogNow.sh new file mode 100755 index 0000000000000000000000000000000000000000..dd046920707b38ca20476a02a9298acff82d1941 --- /dev/null +++ b/client/LogNow.sh @@ -0,0 +1,7 @@ +#! /bin/bash +# +curl \ + -X POST\ + --header "Content-Type: application/json"\ + -d '{"id_client":"test-cli","name":"test-log","value":"A log message"}'\ + http://127.0.0.1:18081/rt/metric diff --git a/client/postEventNow b/client/postEventNow index ac6c90d31a2b44e8106b3820de3538bede35b047..f01ee44a8dd9886d4809fb8d0c41e97c620ac69b 100755 --- a/client/postEventNow +++ b/client/postEventNow @@ -4,5 +4,5 @@ event=$1 curl \ -X POST\ --header "Content-Type: application/json"\ - -d "{\"id_client\":\"cli1\",\"name\":\"${event}\"}"\ + -d "{\"id_client\":\"test-cli\",\"name\":\"${event}\"}"\ http://127.0.0.1:18081/rt/event diff --git a/client/postMetric.sh b/client/postMetric.sh index 67ae31835e843ba042fb56f1904d5ccc67ab7c9e..b0edba9185b6ef58b05cf6efb19bdb83d075edd4 100755 --- a/client/postMetric.sh +++ b/client/postMetric.sh @@ -3,5 +3,5 @@ # curl \ -X POST\ --header "Content-Type: application/json"\ - -d '{"mtime":1713216483,"id_client":"cli2","name":"mone","value":"vone"}'\ + -d '{"mtime":1713216483,"id_client":"test-cli","name":"test-measure","value":"vone"}'\ http://127.0.0.1:18081/metric diff --git a/client/postMetricNow.sh b/client/postMetricNow.sh index 1224711169ffcd335f72eb49355c77c39012dae0..c21f37ddbcc2b5df566af2a1c50d70da5f4d4461 100755 --- a/client/postMetricNow.sh +++ b/client/postMetricNow.sh @@ -3,5 +3,5 @@ # curl \ -X POST\ --header "Content-Type: application/json"\ - -d '{"id_client":"cli1","name":"mone","value":"vone"}'\ + -d '{"id_client":"test-cli","name":"mone","value":"vone"}'\ http://127.0.0.1:18081/rt/metric diff --git a/schema/schema.cql b/schema/schema.cql index 723965716188bbb3dec0c5d04064be5e3664944d..0339be5f85ffc2b53ebda480318cfeff5fc0bb88 100644 --- a/schema/schema.cql +++ b/schema/schema.cql @@ -18,6 +18,12 @@ name text, created timestamp, PRIMARY KEY(id) ); +CREATE TABLE metric_info ( + id_client text, + name text, + description text, + PRIMARY KEY (id_client,name) +); CREATE TABLE location ( hash text, lname text, diff --git a/server/config/config.go b/server/config/config.go index 4531a70f52ad48276af26b95e80c3e7d12ad7f7e..24ddd48c5dfc6c26ee7eeab00dea8adf9dd5af59 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1,16 +1,41 @@ package config import ( + "fmt" + "github.com/tkanos/gonfig" ) +type ArchiveFrequency int + +const ( + Daily ArchiveFrequency = iota + Weekly + Monthly +) + +var archFrequency = map[string]ArchiveFrequency{ + "daily": Daily, + "weekly": Weekly, + "monthly": Monthly, +} + +func main() { + interval := archFrequency["daily"] + fmt.Println(interval) //print the enum value +} + type Configuration struct { - DB_USERNAME string - DB_PASSWORD string - DB_PORT string - DB_HOST string - DB_NAME string - REST_ADDRESS string + DB_USERNAME string `json:"databaseUsername"` + DB_PASSWORD string `json:"databasePassword"` + DB_PORT string `json:"databasePort"` + DB_HOST string `json:"databaseHost"` + DB_NAME string `json:"databaseSchema"` + + REST_ADDRESS string `json:"restAddress"` + + ARCHIVE_FREQUENCY string `json:"archiveFrequency"` + ARCHIVE_DIRECTORY string `json:"archiveDirectory"` } func GetConfig(fileName string) Configuration { diff --git a/server/dates/formatting.go b/server/dates/formatting.go new file mode 100644 index 0000000000000000000000000000000000000000..aa9d2a7ab1cbd2b5d1f48335877faf640ba0991c --- /dev/null +++ b/server/dates/formatting.go @@ -0,0 +1,19 @@ +package dates + +import ( + "fmt" + "time" +) + +func YYYYMMDD(t time.Time) string { + return t.Format("20060201") +} + +func YYYYWW(t time.Time) string { + year, week := t.ISOWeek() + return fmt.Sprintf("%d%.2d", year, week) +} + +func YYYYMM(t time.Time) string { + return fmt.Sprintf("%d%.2d", t.Year(), t.Month()) +} diff --git a/server/dates/ranges.go b/server/dates/ranges.go new file mode 100644 index 0000000000000000000000000000000000000000..dd03dfee6035a2a378cff009aefa61020b8fff50 --- /dev/null +++ b/server/dates/ranges.go @@ -0,0 +1,28 @@ +package dates + +import ( + "fmt" + "time" +) + +type DaysRange struct { + From string + To string +} + +func CalculateFullDay(t time.Time) DaysRange { + yesterday := t.AddDate(0, 0, -1) + tomorrow := t.AddDate(0, 0, +1) + fmt.Println("today YYYY-MM-DD : ", t.Format("2006-02-01")) + fmt.Println("tomorrow YYYY-MM-DD : ", tomorrow.Format("2006-02-01")) + return DaysRange{yesterday.Format("2006-02-01"), t.Format("2006-02-01")} +} + +func CalculateFullDayNDaysAgo(n int) DaysRange { + t := time.Now().AddDate(0, 0, -n) + yesterday := t.AddDate(0, 0, -1) + tomorrow := t.AddDate(0, 0, +1) + fmt.Println("today YYYY-MM-DD : ", t.Format("2006-02-01")) + fmt.Println("tomorrow YYYY-MM-DD : ", tomorrow.Format("2006-02-01")) + return DaysRange{yesterday.Format("2006-02-01"), t.Format("2006-02-01")} +} diff --git a/server/db/clients.go b/server/db/clients.go new file mode 100644 index 0000000000000000000000000000000000000000..7d0681332039272f11f0b99ca0d675fbc7e8972f --- /dev/null +++ b/server/db/clients.go @@ -0,0 +1,27 @@ +package db + +import ( + "fmt" + "time" + "yats-server/model" + + "github.com/gocql/gocql" +) + +func GetClientsList(session *gocql.Session) []model.ClientInfo { + var clients []model.ClientInfo + m := map[string]interface{}{} + + q := fmt.Sprintf("SELECT id, created, name FROM client") + fmt.Println(q) + iter := session.Query(q).Iter() + for iter.MapScan(m) { + clients = append(clients, model.ClientInfo{ + ID: m["id"].(string), + Created: m["created"].(time.Time).UnixMilli(), + Name: m["name"].(string), + }) + m = map[string]interface{}{} + } + return clients +} diff --git a/server/db/dates_test.go b/server/db/dates_test.go index 8643cb78bccd1a5452a4b4862bbda6ba07beb459..5c5e343f5e779dc767aafdc18b9b30691c8daaff 100644 --- a/server/db/dates_test.go +++ b/server/db/dates_test.go @@ -5,13 +5,9 @@ "fmt" "os" "testing" "time" + "yats-server/dates" "yats-server/model" ) - -type DaysRange struct { - From string - To string -} func TestTodayString(t *testing.T) { @@ -25,20 +21,20 @@ daysRange := DaysAgo(10) fmt.Printf("from: %s to: %s\n", daysRange.From, daysRange.To) } -func CalculateDaysRange() DaysRange { +func CalculateDaysRange() dates.DaysRange { currentTime := time.Now() yesterday := currentTime.AddDate(0, 0, -1) fmt.Println("today YYYY-MM-DD : ", currentTime.Format("2006-02-01")) fmt.Println("yesterday YYYY-MM-DD : ", yesterday.Format("2006-02-01")) - return DaysRange{yesterday.Format("2006-02-01"), currentTime.Format("2006-02-01")} + return dates.DaysRange{yesterday.Format("2006-02-01"), currentTime.Format("2006-02-01")} } -func DaysAgo(n int) DaysRange { +func DaysAgo(n int) dates.DaysRange { currentTime := time.Now().AddDate(0, 0, n*(-1)) yesterday := currentTime.AddDate(0, 0, -1) fmt.Println("today YYYY-MM-DD : ", currentTime.Format("2006-02-01")) fmt.Println("yesterday YYYY-MM-DD : ", yesterday.Format("2006-02-01")) - return DaysRange{yesterday.Format("2006-02-01"), currentTime.Format("2006-02-01")} + return dates.DaysRange{yesterday.Format("2006-02-01"), currentTime.Format("2006-02-01")} } func TestParquet(t *testing.T) { @@ -50,7 +46,7 @@ Name: "SomeTest", } tmpFile, _ := os.CreateTemp("/tmp/", "tempEvent-XXXX.parquet") - EventToParquet(event, tmpFile.Name()) + EventToParquet([]model.EventRequest{event}, tmpFile.Name()) metric := model.MetricRequest{ ID_client: "mabc", @@ -61,6 +57,5 @@ } tmpMetricFile, _ := os.CreateTemp("/tmp/", "tempMetric-XXXX.parquet") - MetricToParquet(metric, tmpMetricFile.Name()) - + MetricToParquet([]model.MetricRequest{metric}, tmpMetricFile.Name()) } diff --git a/server/db/metric.go b/server/db/metric.go index 599fde0504104645b67fb65a6e158264d2f07b69..0e9de5c209d7b024d7f2735fc602ae3348c08452 100644 --- a/server/db/metric.go +++ b/server/db/metric.go @@ -3,6 +3,7 @@ import ( "fmt" "time" + "yats-server/dates" "yats-server/model" "github.com/gocql/gocql" @@ -21,38 +22,28 @@ fmt.Printf("%s\n", metric.Name) } } -func LoadMetrics(session *gocql.Session) []model.MetricRequest { +func LoadMetrics(session *gocql.Session, idClient string, daysRange dates.DaysRange) []model.MetricRequest { var metrics []model.MetricRequest m := map[string]interface{}{} - iter := session.Query("SELECT id_client, mtime, name, value FROM metric").Iter() - for iter.MapScan(m) { - metrics = append(metrics, model.MetricRequest{ - ID_client: m["id_client"].(string), - Mtime: m["mtime"].(time.Time).UnixMilli(), - Name: m["name"].(string), - Value: m["value"].(string), - }) + metricInfos := GetClientMetrics(session, idClient) - m = map[string]interface{}{} - } - return metrics -} - -func OLDLoadMetrics(session *gocql.Session) []MetricModel { - var metrics []MetricModel - m := map[string]interface{}{} - - iter := session.Query("SELECT id_client, mtime, name, value FROM metric").Iter() - for iter.MapScan(m) { - metrics = append(metrics, MetricModel{ - ID_client: m["id_client"].(string), - Mtime: m["mtime"].(time.Time), - Name: m["name"].(string), - Value: m["value"].(string), - }) + for mx := range metricInfos { + metricName := metricInfos[mx].Name + fmt.Printf("Querying from: %s to: %s\n", daysRange.From, daysRange.To) + q := fmt.Sprintf("SELECT id_client, mtime, name, value FROM metric where id_client='%s' and name ='%s' and mtime > '%s' and mtime < '%s'", idClient, metricName, daysRange.From, daysRange.To) + fmt.Println(q) + iter := session.Query(q).Iter() + for iter.MapScan(m) { + metrics = append(metrics, model.MetricRequest{ + ID_client: m["id_client"].(string), + Mtime: m["mtime"].(time.Time).UnixMilli(), + Name: m["name"].(string), + Value: m["value"].(string), + }) - m = map[string]interface{}{} + m = map[string]interface{}{} + } } return metrics } diff --git a/server/db/metric_info.go b/server/db/metric_info.go new file mode 100644 index 0000000000000000000000000000000000000000..ffad8eba11e5dcb1999e0985a7eabecaa6eba2a5 --- /dev/null +++ b/server/db/metric_info.go @@ -0,0 +1,26 @@ +package db + +import ( + "fmt" + "yats-server/model" + + "github.com/gocql/gocql" +) + +func GetClientMetrics(session *gocql.Session, id_client string) []model.MetricInfo { + var infos []model.MetricInfo + m := map[string]interface{}{} + + q := fmt.Sprintf("SELECT id_client, name, description FROM metric_info where id_client='%s'", id_client) + fmt.Println(q) + iter := session.Query(q).Iter() + for iter.MapScan(m) { + infos = append(infos, model.MetricInfo{ + ID_client: m["id_client"].(string), + Name: m["name"].(string), + Description: m["description"].(string), + }) + m = map[string]interface{}{} + } + return infos +} diff --git a/server/db/parquet.go b/server/db/parquet.go index d9af11827482b486480f9041de6e1ce27b96ff18..3147329fb01170e3125bc7be27e74775f9196feb 100644 --- a/server/db/parquet.go +++ b/server/db/parquet.go @@ -8,7 +8,7 @@ "github.com/xitongsys/parquet-go/writer" ) -func EventToParquet(event model.EventRequest, parquetFile string) { +func EventToParquet(events []model.EventRequest, parquetFile string) { var err error w, err := os.Create(parquetFile) if err != nil { @@ -22,8 +22,10 @@ log.Println("Can't create parquet writer", err) return } - if err = pw.Write(event); err != nil { - log.Println("Write error", err) + for i := range events { + if err = pw.Write(events[i]); err != nil { + log.Println("Write error", err) + } } if err = pw.WriteStop(); err != nil { diff --git a/server/maintenance.go b/server/maintenance.go new file mode 100644 index 0000000000000000000000000000000000000000..a76ce8fabfef011ec06bd041f5d559dcbd13214e --- /dev/null +++ b/server/maintenance.go @@ -0,0 +1,59 @@ +package main + +import ( + "fmt" + "os" + "time" + "yats-server/config" + "yats-server/dates" + "yats-server/db" +) + +func createParquetDir(cfg config.Configuration, now time.Time, id_client string, item string) (string, error) { + baseParquetDir := cfg.ARCHIVE_DIRECTORY + parquetDir := baseParquetDir + + if cfg.ARCHIVE_FREQUENCY == "daily" { + parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMMDD(now)) + } + + if cfg.ARCHIVE_FREQUENCY == "weekly" { + parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYWW(now)) + } + + if cfg.ARCHIVE_FREQUENCY == "monthly" { + parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMM(now)) + } + + err := os.MkdirAll(parquetDir, 0755) + if nil != err { + return "", err + } + return parquetDir, nil +} + +func MaintenanceThread(cfg config.Configuration) { + var sleeptime, _ = time.ParseDuration("60s") + for { + time.Sleep(sleeptime) + now := time.Now() + daysRange := dates.DaysRange{"2024-05-01", "2024-05-12"} //dates.CalculateFullDayNDaysAgo(1) + + clients := db.GetClientsList(db.Session) + for i := range clients { + fmt.Printf("Processing client: %s with ID: %s", clients[i].Name, clients[i].ID) + parquetDir, err := createParquetDir(cfg, now, "metric", clients[i].ID) + if nil != err { + fmt.Println(err) + return + } + metricFileName := fmt.Sprintf("%s/metric-%.4d%.2d%.2d-%d.parquet", parquetDir, now.Year(), now.Month(), now.Day(), now.Unix()) + fmt.Printf("Writing to file: %s\n", metricFileName) + + metrics := db.LoadMetrics(db.Session, clients[i].ID, daysRange) + fmt.Printf("Loaded: [%d] results", len(metrics)) + db.MetricToParquet(metrics, metricFileName) + } + fmt.Printf("Maintenance Thread\n") + } +} diff --git a/server/model/models.go b/server/model/models.go index e515fcfb185d656648ac344790fcab5d867dc1cf..f28bef74baf0fe3abad047187627748fefa65817 100644 --- a/server/model/models.go +++ b/server/model/models.go @@ -12,3 +12,15 @@ ID_client string `json:"id_client" parquet:"name=id_client, type=BYTE_ARRAY, convertedtype=UTF8"` Etime int64 `json:"etime" parquet:"name=etime, type=INT64, convertedtype=TIMESTAMP_MILLIS"` Name string `json:"name" parquet:"name=name, type=BYTE_ARRAY, convertedtype=UTF8"` } + +type ClientInfo struct { + ID string + Created int64 + Name string +} + +type MetricInfo struct { + ID_client string + Name string + Description string +} diff --git a/server/yats.go b/server/yats.go index caffdaaadbd13d3f49523864fb61e6a7d3c91c89..ce10329caff6908b29be5eb244a76480b72ca3b4 100644 --- a/server/yats.go +++ b/server/yats.go @@ -1,34 +1,16 @@ package main import ( - "fmt" "os" - "time" "yats-server/config" "yats-server/db" ) func main() { - go maintenanceThread() - - startServer() -} - -func maintenanceThread() { - var sleeptime, _ = time.ParseDuration("60s") - for { - time.Sleep(sleeptime) - - metrics := db.LoadMetrics(db.Session) - db.MetricToParquet(metrics, "/tmp/allmetrics.parquet") - fmt.Printf("Maintenance Thread\n") - } -} - -func startServer() { configuration := config.GetConfig(os.Getenv("HOME") + "/.yats.json") + db.Session = db.InitializeDb(configuration) - db.Session = db.InitializeDb(configuration) + go MaintenanceThread(configuration) RestService(configuration) }