ref: f79a9d09c451653ff27fb34afb5b0c71dc758a32
server/maintenance.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
package main import ( "fmt" "os" "time" "yats-server/config" "yats-server/dates" "yats-server/db" ) func createParquetDir(cfg config.Configuration, now time.Time, id_client string, item string) (string, error) { baseParquetDir := cfg.ARCHIVE_DIRECTORY parquetDir := baseParquetDir if cfg.ARCHIVE_FREQUENCY == "daily" { parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMMDD(now)) } if cfg.ARCHIVE_FREQUENCY == "weekly" { parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYWW(now)) } if cfg.ARCHIVE_FREQUENCY == "monthly" { parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMM(now)) } err := os.MkdirAll(parquetDir, 0755) if nil != err { return "", err } return parquetDir, nil } func MaintenanceThread(cfg config.Configuration) { var sleeptime, _ = time.ParseDuration("60s") for { time.Sleep(sleeptime) now := time.Now() daysRange := dates.DaysRange{"2024-05-01", "2024-05-12"} //dates.CalculateFullDayNDaysAgo(1) clients := db.GetClientsList(db.Session) for i := range clients { fmt.Printf("Processing client: %s with ID: %s", clients[i].Name, clients[i].ID) parquetDir, err := createParquetDir(cfg, now, "metric", clients[i].ID) if nil != err { fmt.Println(err) return } metricFileName := fmt.Sprintf("%s/metric-%.4d%.2d%.2d-%d.parquet", parquetDir, now.Year(), now.Month(), now.Day(), now.Unix()) fmt.Printf("Writing to file: %s\n", metricFileName) metrics := db.LoadMetrics(db.Session, clients[i].ID, daysRange) fmt.Printf("Loaded: [%d] results", len(metrics)) db.MetricToParquet(metrics, metricFileName) } fmt.Printf("Maintenance Thread\n") } } |