Author: Paolo Lulli <paolo@lulli.net>
Store to Parquet the last month
server/dates/ranges.go | 36 ++++++++++++++++++++++ server/db/dates_test.go | 16 +++++++++ server/maintenance.go | 69 +++++++++++++++++++++++++++---------------
diff --git a/server/dates/ranges.go b/server/dates/ranges.go index dd03dfee6035a2a378cff009aefa61020b8fff50..3b29d40bc9e88105ca2e83274ac12bc982452c2c 100644 --- a/server/dates/ranges.go +++ b/server/dates/ranges.go @@ -10,6 +10,42 @@ From string To string } +func CalculatePreviousMonth(t time.Time) string { + currentMonth := t.Month() + previousMonth := currentMonth - 1 + previousMonthYear := t.Year() + if currentMonth == time.January { + previousMonth = time.December + previousMonthYear = t.Year() - 1 + } + + lastMonth := fmt.Sprintf("%d%.2d", previousMonthYear, previousMonth) + + fmt.Println("month YYYYMM : ", lastMonth) + + return lastMonth +} + +func CalculatePreviousMonthRange(t time.Time) DaysRange { + currentMonth := t.Month() + previousMonth := currentMonth - 1 + previousMonthYear := t.Year() + if currentMonth == time.January { + previousMonth = time.December + previousMonthYear = t.Year() - 1 + } + + beginningOfLastMonth := time.Date(previousMonthYear, previousMonth, 1, 00, 00, 00, 00, time.UTC) + beginningOfCurrentMonth := time.Date(t.Year(), currentMonth, 1, 00, 00, 00, 00, time.UTC) + + sBeginning := fmt.Sprintf("%d-%.2d-%.2d", beginningOfLastMonth.Year(), beginningOfLastMonth.Month(), beginningOfLastMonth.Day()) + sEnd := fmt.Sprintf("%d-%.2d-%.2d", beginningOfCurrentMonth.Year(), beginningOfCurrentMonth.Month(), beginningOfCurrentMonth.Day()) + + fmt.Printf("Range today YYYY-MM-DD : %s \n", sBeginning) + fmt.Printf("Range tomorrow YYYY-MM-DD : %s \n", sEnd) + return DaysRange{sBeginning, sEnd} +} + func CalculateFullDay(t time.Time) DaysRange { yesterday := t.AddDate(0, 0, -1) tomorrow := t.AddDate(0, 0, +1) diff --git a/server/db/dates_test.go b/server/db/dates_test.go index 5c5e343f5e779dc767aafdc18b9b30691c8daaff..1cc422be7d5400b9fa8fa15e94d48704efd40330 100644 --- a/server/db/dates_test.go +++ b/server/db/dates_test.go @@ -59,3 +59,19 @@ tmpMetricFile, _ := os.CreateTemp("/tmp/", "tempMetric-XXXX.parquet") MetricToParquet([]model.MetricRequest{metric}, tmpMetricFile.Name()) } + +func TestDates(t *testing.T) { + + now := time.Now() + + yyyymmdd := dates.YYYYMMDD(now) + fmt.Println("--->" + yyyymmdd) + + r := dates.CalculatePreviousMonthRange(now) + + l := dates.CalculatePreviousMonth(now) + fmt.Printf("Range from %s to: %s\n", r.From, r.To) + fmt.Printf("Last month: %s\n", l) + + fmt.Println(now.Format("2006-01-31")) +} diff --git a/server/maintenance.go b/server/maintenance.go index a76ce8fabfef011ec06bd041f5d559dcbd13214e..caa308ba8b378835cbf5c4f561d790d5be03c7f7 100644 --- a/server/maintenance.go +++ b/server/maintenance.go @@ -7,23 +7,37 @@ "time" "yats-server/config" "yats-server/dates" "yats-server/db" + "yats-server/model" ) -func createParquetDir(cfg config.Configuration, now time.Time, id_client string, item string) (string, error) { +func calculateParquetDirName(cfg config.Configuration, t time.Time, id_client string, item string) string { + fmt.Println("called calculateParquetDirName()") baseParquetDir := cfg.ARCHIVE_DIRECTORY - parquetDir := baseParquetDir + fmt.Printf("calculateParquetDirName: %s", baseParquetDir) + previousMonth := dates.CalculatePreviousMonth(t) + fmt.Printf("previousMonth: %s", previousMonth) + parquetDirName := fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, previousMonth) + fmt.Printf("parquetDirName: %s", parquetDirName) + return parquetDirName +} - if cfg.ARCHIVE_FREQUENCY == "daily" { - parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMMDD(now)) - } +func createParquetDir(cfg config.Configuration, t time.Time, id_client string, item string) (string, error) { + fmt.Println("Called createParquetDir") + parquetDir := calculateParquetDirName(cfg, t, id_client, item) + + /* + if cfg.ARCHIVE_FREQUENCY == "daily" { + parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMMDD(t)) + } - if cfg.ARCHIVE_FREQUENCY == "weekly" { - parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYWW(now)) - } + if cfg.ARCHIVE_FREQUENCY == "weekly" { + parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYWW(t)) + } - if cfg.ARCHIVE_FREQUENCY == "monthly" { - parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMM(now)) - } + if cfg.ARCHIVE_FREQUENCY == "monthly" { + parquetDir = fmt.Sprintf("%s/%s/%s/%s", baseParquetDir, id_client, item, dates.YYYYMM(t)) + } + */ err := os.MkdirAll(parquetDir, 0755) if nil != err { @@ -37,23 +51,28 @@ var sleeptime, _ = time.ParseDuration("60s") for { time.Sleep(sleeptime) now := time.Now() - daysRange := dates.DaysRange{"2024-05-01", "2024-05-12"} //dates.CalculateFullDayNDaysAgo(1) + daysRange := dates.CalculatePreviousMonthRange(now) clients := db.GetClientsList(db.Session) - for i := range clients { - fmt.Printf("Processing client: %s with ID: %s", clients[i].Name, clients[i].ID) - parquetDir, err := createParquetDir(cfg, now, "metric", clients[i].ID) - if nil != err { - fmt.Println(err) - return - } - metricFileName := fmt.Sprintf("%s/metric-%.4d%.2d%.2d-%d.parquet", parquetDir, now.Year(), now.Month(), now.Day(), now.Unix()) - fmt.Printf("Writing to file: %s\n", metricFileName) + writeAllMetricsToParquet(cfg, now, clients, daysRange) - metrics := db.LoadMetrics(db.Session, clients[i].ID, daysRange) - fmt.Printf("Loaded: [%d] results", len(metrics)) - db.MetricToParquet(metrics, metricFileName) - } fmt.Printf("Maintenance Thread\n") } } + +func writeAllMetricsToParquet(cfg config.Configuration, t time.Time, clients []model.ClientInfo, daysRange dates.DaysRange) { + for i := range clients { + fmt.Printf("Processing client: %s with ID: %s", clients[i].Name, clients[i].ID) + parquetDir, err := createParquetDir(cfg, t, "metric", clients[i].ID) + if nil != err { + fmt.Println(err) + return + } + metricFileName := fmt.Sprintf("%s/metric-%.4d%.2d%.2d-%d.parquet", parquetDir, t.Year(), t.Month(), t.Day(), t.Unix()) + fmt.Printf("Writing to file: %s\n", metricFileName) + + metrics := db.LoadMetrics(db.Session, clients[i].ID, daysRange) + fmt.Printf("Loaded: [%d] results", len(metrics)) + db.MetricToParquet(metrics, metricFileName) + } +}