ref: 5646a8896664ecc31a858a514925b94c50848a76
server/maintenance.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
/** * Yats - yats * * This file is licensed under the Affero General Public License version 3 or * later. See the COPYING file. * * @author Paolo Lulli <kevwe.com> * @copyright Paolo Lulli 2024 */ package main import ( "fmt" "os" "time" "yats-server/config" "yats-server/dates" "yats-server/db" "yats-server/model" ) func calculateParquetDirName(cfg config.Configuration, t time.Time, id_client string, item string) string { baseParquetDir := cfg.ArchiveDirectory previousMonth := dates.CalculatePreviousMonth(t) parquetDirName := fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, previousMonth) return parquetDirName } func createParquetDir(cfg config.Configuration, t time.Time, id_client string, item string) (string, error) { fmt.Println("Called createParquetDir") parquetDir := calculateParquetDirName(cfg, t, id_client, item) /* if cfg.ArchiveFrequency == "daily" { parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMMDD(t)) } if cfg.ArchiveFrequency == "weekly" { parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYWW(t)) } if cfg.ArchiveFrequency == "monthly" { parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMM(t)) } */ err := os.MkdirAll(parquetDir, 0755) if nil != err { return "", err } return parquetDir, nil } func MaintenanceThread(cfg config.Configuration) { var sleeptime, _ = time.ParseDuration("24h") for { time.Sleep(sleeptime) now := time.Now() daysRange := dates.CalculatePreviousMonthRange(now) clients := db.GetClientsList(db.Session) writeAllMetricsToParquet(cfg, now, clients, daysRange) fmt.Printf("Maintenance Thread\n") } } func writeAllMetricsToParquet(cfg config.Configuration, t time.Time, clients []model.ClientInfo, daysRange dates.DaysRange) { for i := range clients { fmt.Printf("Processing client: %s with ID: %s", clients[i].Name, clients[i].ID) parquetDir, err := createParquetDir(cfg, t, "metric", clients[i].ID) if nil != err { fmt.Println(err) return } metricFileName := fmt.Sprintf("%s/metric-%.4d%.2d%.2d-%d.parquet", parquetDir, t.Year(), t.Month(), t.Day(), t.Unix()) fmt.Printf("Writing to file: %s\n", metricFileName) metrics := db.LoadMetrics(db.Session, clients[i].ID, daysRange) fmt.Printf("Loaded: [%d] results", len(metrics)) if len(metrics) > 0 { db.MetricToParquet(metrics, metricFileName) db.DeleteMetrics(db.Session, clients[i].ID, daysRange) } else { fmt.Printf("No results to move into cold storage") } } } |