Author: Paolo Lulli <paolo@lulli.net>
Refactor To/From + events write
server/db/dates_test.go | 8 ++-- server/db/metric.go | 8 ++-- server/db/parquet.go | 8 ++-- server/db/queries.go | 64 ++++++++++++++++++++++++++++++++++++++- server/model/models.go | 4 +- server/rest/rest-event.go | 37 +++++++++++++++------- server/rest/rest-metric.go | 58 +++++++++++------------------------ server/service-rest.go | 6 +--
diff --git a/server/db/dates_test.go b/server/db/dates_test.go index 66063081155983bb621634a60165ab335b7a96ed..5df24b22afecaf7d84a94a49b409fef7e7e848f7 100644 --- a/server/db/dates_test.go +++ b/server/db/dates_test.go @@ -55,16 +55,16 @@ } func TestParquet(t *testing.T) { - event := model.EventSaveRequest{ + event := model.EventModel{ IdClient: "abc", Etime: int64(time.Now().UnixMilli()), Name: "SomeTest", } tmpFile, _ := os.CreateTemp("/tmp/", "tempEvent-XXXX.parquet") - EventToParquet([]model.EventSaveRequest{event}, tmpFile.Name()) + EventToParquet([]model.EventModel{event}, tmpFile.Name()) - metric := model.MetricSaveRequest{ + metric := model.MetricModel{ IdClient: "mabc", Mtime: int64(time.Now().UnixMilli()), Name: "someName", @@ -73,7 +73,7 @@ } tmpMetricFile, _ := os.CreateTemp("/tmp/", "tempMetric-XXXX.parquet") - MetricToParquet([]model.MetricSaveRequest{metric}, tmpMetricFile.Name()) + MetricToParquet([]model.MetricModel{metric}, tmpMetricFile.Name()) } func TestDates(t *testing.T) { diff --git a/server/db/metric.go b/server/db/metric.go index ccd1f566c527a4ac994977e1377a477e63539266..8a42c2ca4f4f84330f4eb947419cf36885de5f88 100644 --- a/server/db/metric.go +++ b/server/db/metric.go @@ -25,14 +25,14 @@ Name string `json:"name"` Value string `json:"value"` } -func PrintMetrics(metrics []model.MetricSaveRequest) { +func PrintMetrics(metrics []model.MetricModel) { for _, metric := range metrics { fmt.Printf("%s\n", metric.Name) } } -func LoadMetrics(session *gocql.Session, idClient string, daysRange dates.DaysRange) []model.MetricSaveRequest { - var metrics []model.MetricSaveRequest +func LoadMetrics(session *gocql.Session, idClient string, daysRange dates.DaysRange) []model.MetricModel { + var metrics []model.MetricModel m := map[string]interface{}{} metricInfos := GetClientMetrics(session, idClient) @@ -44,7 +44,7 @@ q := fmt.Sprintf("SELECT id_client, mtime, name, value FROM metric where id_client='%s' and name ='%s' and mtime > '%s' and mtime < '%s'", idClient, metricName, daysRange.From, daysRange.To) fmt.Println(q) iter := session.Query(q).Iter() for iter.MapScan(m) { - metrics = append(metrics, model.MetricSaveRequest{ + metrics = append(metrics, model.MetricModel{ IdClient: m["id_client"].(string), Mtime: m["mtime"].(time.Time).UnixMilli(), Name: m["name"].(string), diff --git a/server/db/parquet.go b/server/db/parquet.go index 7f0a042fe358f952d46ff1062ad1a19242332881..67515530329f54ebe02839206b04f7dacb3e4808 100644 --- a/server/db/parquet.go +++ b/server/db/parquet.go @@ -18,7 +18,7 @@ "github.com/xitongsys/parquet-go/writer" ) -func EventToParquet(events []model.EventSaveRequest, parquetFile string) { +func EventToParquet(events []model.EventModel, parquetFile string) { var err error w, err := os.Create(parquetFile) if err != nil { @@ -26,7 +26,7 @@ log.Println("Can't create local file", err) return } - pw, err := writer.NewParquetWriterFromWriter(w, new(model.EventSaveRequest), 4) + pw, err := writer.NewParquetWriterFromWriter(w, new(model.EventModel), 4) if err != nil { log.Println("Can't create parquet writer", err) return @@ -46,7 +46,7 @@ log.Println("Write Finished") w.Close() } -func MetricToParquet(metrics []model.MetricSaveRequest, parquetFile string) { +func MetricToParquet(metrics []model.MetricModel, parquetFile string) { var err error w, err := os.Create(parquetFile) @@ -55,7 +55,7 @@ log.Println("Can't create local file", err) return } - pw, err := writer.NewParquetWriterFromWriter(w, new(model.MetricSaveRequest), 4) + pw, err := writer.NewParquetWriterFromWriter(w, new(model.MetricModel), 4) if err != nil { log.Println("Can't create parquet writer", err) return diff --git a/server/db/queries.go b/server/db/queries.go index 367dfa630af33640b48e8a670af6fcb79659953c..2932aedc0d779bbbef916a60a2e4a3ddaf21510f 100644 --- a/server/db/queries.go +++ b/server/db/queries.go @@ -17,15 +17,15 @@ "time" "yats-server/model" ) -func MetricsFrom(session *gocql.Session, idClient string, metricName string, fromTime string, limit int32) []model.MetricSaveRequest { - var metrics []model.MetricSaveRequest +func MetricsFrom(session *gocql.Session, idClient string, metricName string, fromTime string, limit int32) []model.MetricModel { + var metrics []model.MetricModel m := map[string]interface{}{} q := fmt.Sprintf("SELECT id_client, mtime, name, value FROM metric where id_client='%s' and name ='%s' and mtime > '%s' order by mtime limit %d", idClient, metricName, fromTime, limit) fmt.Println(q) iter := session.Query(q).Iter() for iter.MapScan(m) { - metrics = append(metrics, model.MetricSaveRequest{ + metrics = append(metrics, model.MetricModel{ IdClient: m["id_client"].(string), Mtime: m["mtime"].(time.Time).UnixMilli(), Name: m["name"].(string), @@ -36,3 +36,61 @@ m = map[string]interface{}{} } return metrics } + +func MetricsBetween(session *gocql.Session, idClient string, metricName string, fromTime string, toTime string, limit int32) []model.MetricModel { + var metrics []model.MetricModel + m := map[string]interface{}{} + + q := fmt.Sprintf("SELECT id_client, mtime, name, value FROM metric where id_client='%s' and name ='%s' and mtime > '%s' and mtime <= '%s' order by mtime limit %d", idClient, metricName, fromTime, toTime, limit) + fmt.Println(q) + iter := session.Query(q).Iter() + for iter.MapScan(m) { + metrics = append(metrics, model.MetricModel{ + IdClient: m["id_client"].(string), + Mtime: m["mtime"].(time.Time).UnixMilli(), + Name: m["name"].(string), + Value: m["value"].(string), + }) + + m = map[string]interface{}{} + } + return metrics +} + +func EventsFrom(session *gocql.Session, idClient string, fromTime string, limit int32) []model.EventModel { + var events []model.EventModel + m := map[string]interface{}{} + + q := fmt.Sprintf("SELECT id_client, etime, name FROM event where id_client='%s' and etime > '%s' order by etime limit %d", idClient, fromTime, limit) + fmt.Println(q) + iter := session.Query(q).Iter() + for iter.MapScan(m) { + events = append(events, model.EventModel{ + IdClient: m["id_client"].(string), + Etime: m["etime"].(time.Time).UnixMilli(), + Name: m["name"].(string), + }) + + m = map[string]interface{}{} + } + return events +} + +func EventsBetween(session *gocql.Session, idClient string, fromTime string, totime string, limit int32) []model.EventModel { + var events []model.EventModel + m := map[string]interface{}{} + + q := fmt.Sprintf("SELECT id_client, etime, name FROM event where id_client='%s' and etime > '%s' and etime <= '%s' order by etime limit %d", idClient, fromTime, totime, limit) + fmt.Println(q) + iter := session.Query(q).Iter() + for iter.MapScan(m) { + events = append(events, model.EventModel{ + IdClient: m["id_client"].(string), + Etime: m["etime"].(time.Time).UnixMilli(), + Name: m["name"].(string), + }) + + m = map[string]interface{}{} + } + return events +} diff --git a/server/model/models.go b/server/model/models.go index 0c0b18c3897b451e339ed296843b3fda65710671..cd444369a594c4d3339530bee0c0ecda33fdcad6 100644 --- a/server/model/models.go +++ b/server/model/models.go @@ -10,14 +10,14 @@ */ package model -type MetricSaveRequest struct { +type MetricModel struct { IdClient string `json:"id_client" parquet:"name=id_client, type=BYTE_ARRAY, convertedtype=UTF8"` Mtime int64 `json:"mtime" parquet:"name=mtime, type=INT64, convertedtype=TIMESTAMP_MILLIS"` Name string `json:"name" parquet:"name=name, type=BYTE_ARRAY, convertedtype=UTF8"` Value string `json:"value" parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8"` } -type EventSaveRequest struct { +type EventModel struct { IdClient string `json:"id_client" parquet:"name=id_client, type=BYTE_ARRAY, convertedtype=UTF8"` Etime int64 `json:"etime" parquet:"name=etime, type=INT64, convertedtype=TIMESTAMP_MILLIS"` Name string `json:"name" parquet:"name=name, type=BYTE_ARRAY, convertedtype=UTF8"` diff --git a/server/rest/rest-event.go b/server/rest/rest-event.go index 4832b00e9aa96c54669433b6633942ccd7b04b2f..b569f63f0412c14fe2afc3b800ca704b0c000e35 100644 --- a/server/rest/rest-event.go +++ b/server/rest/rest-event.go @@ -19,8 +19,8 @@ "yats-server/db" "yats-server/model" ) -func WriteEventNow(c *gin.Context) { - var event model.EventSaveRequest +func WriteEventcAt(c *gin.Context) { + var event model.EventModel if err := c.BindJSON(&event); err != nil { c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "-1"}) @@ -29,13 +29,17 @@ } clientCN := GetClientCN(c) fmt.Printf("%s / %s ", clientCN, event.Name) - db.SaveEvent(clientCN, event.Name) - - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "0"}) + if event.Etime == 0 { + unixTimeUTC := time.Unix(event.Etime, 0) + db.SaveEventAt(clientCN, unixTimeUTC, event.Name) + } else { + db.SaveEvent(clientCN, event.Name) + } + c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "OK"}) } -func WriteEventcAt(c *gin.Context) { - var event model.EventSaveRequest +func GetEvents(c *gin.Context) { + var event model.EventSearchRequest if err := c.BindJSON(&event); err != nil { c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "-1"}) @@ -43,10 +47,19 @@ return } clientCN := GetClientCN(c) - fmt.Printf("%s / %s ", clientCN, event.Name) + fmt.Printf("%s ", clientCN) - unixTimeUTC := time.Unix(event.Etime, 0) - db.SaveEventAt(clientCN, unixTimeUTC, event.Name) - - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "OK"}) + var eventsPack []model.EventModel + if event.To == 0 { + unixTimeUTC := time.Unix(event.From, 0) + timeAsBytes, _ := unixTimeUTC.UTC().MarshalText() + eventsPack = db.EventsFrom(db.Session, clientCN, string(timeAsBytes), 100) + } else if event.From != 0 { + fromUnixTimeUTC := time.Unix(event.From, 0) + toUnixTimeUTC := time.Unix(event.To, 0) + fromInBytes, _ := fromUnixTimeUTC.UTC().MarshalText() + toInBytes, _ := toUnixTimeUTC.UTC().MarshalText() + eventsPack = db.EventsBetween(db.Session, clientCN, string(fromInBytes), string(toInBytes), 100) + } + c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "OK", "content": eventsPack}) } diff --git a/server/rest/rest-metric.go b/server/rest/rest-metric.go index 23a7422190a344c660b6a9cf2808d05252799f2f..a178f98b0b7c7554f8cfcb96adb1d9790b735056 100644 --- a/server/rest/rest-metric.go +++ b/server/rest/rest-metric.go @@ -19,8 +19,8 @@ "yats-server/db" "yats-server/model" ) -func WriteMetricNow(c *gin.Context) { - var metric model.MetricSaveRequest +func WriteMetricAt(c *gin.Context) { + var metric model.MetricModel if err := c.BindJSON(&metric); err != nil { c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "-1"}) @@ -29,29 +29,18 @@ } clientCN := GetClientCN(c) fmt.Printf("%s / %s / %s", clientCN, metric.Name, metric.Value) - db.SaveMetric(clientCN, metric.Name, metric.Value) - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "0"}) -} - -func WriteMetricAt(c *gin.Context) { - var metric model.MetricSaveRequest - - if err := c.BindJSON(&metric); err != nil { - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "-1"}) - return + if metric.Mtime == 0 { + unixTimeUTC := time.Unix(metric.Mtime, 0) + db.SaveMetricAt(clientCN, unixTimeUTC, metric.Name, metric.Value) + } else { + db.SaveMetric(clientCN, metric.Name, metric.Value) } - clientCN := GetClientCN(c) - fmt.Printf("%s / %s / %s", clientCN, metric.Name, metric.Value) - - unixTimeUTC := time.Unix(metric.Mtime, 0) - db.SaveMetricAt(clientCN, unixTimeUTC, metric.Name, metric.Value) - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "OK"}) } -func GetMetricsFrom(c *gin.Context) { +func GetMetrics(c *gin.Context) { var metric model.MetricSearchRequest if err := c.BindJSON(&metric); err != nil { @@ -62,27 +51,18 @@ clientCN := GetClientCN(c) fmt.Printf("%s / %s ", clientCN, metric.Name) - unixTimeUTC := time.Unix(metric.From, 0) - timeAsBytes, _ := unixTimeUTC.UTC().MarshalText() - metricsPack := db.MetricsFrom(db.Session, clientCN, metric.Name, string(timeAsBytes), 100) + var metricsPack []model.MetricModel - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "OK", "content": metricsPack}) -} - -func GetMetricsBetween(c *gin.Context) { - var metric model.MetricSaveRequest - - if err := c.BindJSON(&metric); err != nil { - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "-1"}) - return + if metric.To == 0 { + unixTimeUTC := time.Unix(metric.From, 0) + timeAsBytes, _ := unixTimeUTC.UTC().MarshalText() + metricsPack = db.MetricsFrom(db.Session, clientCN, metric.Name, string(timeAsBytes), 100) + } else if metric.From != 0 { + fromUnixTimeUTC := time.Unix(metric.From, 0) + toUnixTimeUTC := time.Unix(metric.To, 0) + fromInBytes, _ := fromUnixTimeUTC.UTC().MarshalText() + toInBytes, _ := toUnixTimeUTC.UTC().MarshalText() + metricsPack = db.MetricsBetween(db.Session, clientCN, metric.Name, string(fromInBytes), string(toInBytes), 100) } - - clientCN := GetClientCN(c) - fmt.Printf("%s / %s / %s", clientCN, metric.Name, metric.Value) - - unixTimeUTC := time.Unix(metric.Mtime, 0) - timeAsBytes, _ := unixTimeUTC.UTC().MarshalText() - metricsPack := db.MetricsFrom(db.Session, clientCN, metric.Name, string(timeAsBytes), 100) - c.IndentedJSON(http.StatusAccepted, gin.H{"ret": "OK", "content": metricsPack}) } diff --git a/server/service-rest.go b/server/service-rest.go index f29f1cb81b0f6802f7a36dc5a41eaff7ca19f458..7830d593602b833e7250e865e1d3ae1daf2152ee 100644 --- a/server/service-rest.go +++ b/server/service-rest.go @@ -28,13 +28,11 @@ router.SetTrustedProxies([]string{"127.0.0.1"}) gin.SetMode(gin.ReleaseMode) - //router.POST("/rt/metric", rest.WriteMetricNow) router.POST("/metric", rest.WriteMetricAt) - - router.POST("/metric/get", rest.GetMetricsFrom) + router.POST("/metric/get", rest.GetMetrics) - //router.POST("/rt/event", rest.WriteEventNow) router.POST("/event", rest.WriteEventcAt) + router.POST("/event/get", rest.GetEvents) router.Run(address) //router.RunTLS(address, c.CertFile, c.KeyFile)