query temp data across multiple buckets
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -520,19 +521,7 @@ func (sr *scrutinyRepository) GetSmartTemperatureHistory(ctx context.Context, du
|
||||
deviceTempHistory := map[string][]measurements.SmartTemperature{}
|
||||
|
||||
//TODO: change the query range to a variable.
|
||||
queryStr := fmt.Sprintf(`
|
||||
import "influxdata/influxdb/schema"
|
||||
from(bucket: "%s")
|
||||
|> range(start: %s, stop: now())
|
||||
|> filter(fn: (r) => r["_measurement"] == "temp" )
|
||||
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|
||||
|> schema.fieldsAsCols()
|
||||
|> group(columns: ["device_wwn"])
|
||||
|> yield(name: "last")
|
||||
`,
|
||||
sr.lookupBucketName(durationKey),
|
||||
sr.lookupDuration(durationKey),
|
||||
)
|
||||
queryStr := sr.aggregateTempQuery(durationKey)
|
||||
|
||||
result, err := sr.influxQueryApi.Query(ctx, queryStr)
|
||||
if err == nil {
|
||||
@@ -671,21 +660,96 @@ func (sr *scrutinyRepository) lookupBucketName(durationKey string) string {
|
||||
return sr.appConfig.GetString("web.influxdb.bucket")
|
||||
}
|
||||
|
||||
func (sr *scrutinyRepository) lookupDuration(durationKey string) string {
|
||||
func (sr *scrutinyRepository) lookupDuration(durationKey string) []string {
|
||||
|
||||
switch durationKey {
|
||||
case "week":
|
||||
//data stored in the last week
|
||||
return "-1w"
|
||||
return []string{"-1w", "now()"}
|
||||
case "month":
|
||||
// data stored in the last month (after the first week)
|
||||
return "-1mo"
|
||||
return []string{"-1mo", "-1w"}
|
||||
case "year":
|
||||
// data stored in the last year (after the first month)
|
||||
return "-1y"
|
||||
return []string{"-1y", "-1mo"}
|
||||
case "forever":
|
||||
//data stored before the last year
|
||||
return "-10y"
|
||||
return []string{"-10y", "-1y"}
|
||||
}
|
||||
return "-1w"
|
||||
return []string{"-1w", "now()"}
|
||||
}
|
||||
|
||||
func (sr *scrutinyRepository) lookupNestedDurationKeys(durationKey string) []string {
|
||||
switch durationKey {
|
||||
case "week":
|
||||
//all data is stored in a single bucket
|
||||
return []string{"week"}
|
||||
case "month":
|
||||
//data is stored in the week bucket and the month bucket
|
||||
return []string{"week", "month"}
|
||||
case "year":
|
||||
// data stored in the last year (after the first month)
|
||||
return []string{"week", "month", "year"}
|
||||
case "forever":
|
||||
//data stored before the last year
|
||||
return []string{"week", "month", "year"}
|
||||
}
|
||||
return []string{"week"}
|
||||
}
|
||||
|
||||
func (sr *scrutinyRepository) aggregateTempQuery(durationKey string) string {
|
||||
|
||||
//TODO: change the query range to a variable.
|
||||
//queryStr := fmt.Sprintf(`
|
||||
//import "influxdata/influxdb/schema"
|
||||
//from(bucket: "%s")
|
||||
//|> range(start: %s, stop: now())
|
||||
//|> filter(fn: (r) => r["_measurement"] == "temp" )
|
||||
//|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|
||||
//|> schema.fieldsAsCols()
|
||||
//|> group(columns: ["device_wwn"])
|
||||
//|> yield(name: "last")
|
||||
// `,
|
||||
// sr.lookupBucketName(durationKey),
|
||||
// sr.lookupDuration(durationKey),
|
||||
//)
|
||||
|
||||
partialQueryStr := []string{`import "influxdata/influxdb/schema"`}
|
||||
|
||||
nestedDurationKeys := sr.lookupNestedDurationKeys(durationKey)
|
||||
|
||||
subQueryNames := []string{}
|
||||
for _, nestedDurationKey := range nestedDurationKeys {
|
||||
bucketName := sr.lookupBucketName(nestedDurationKey)
|
||||
durationRange := sr.lookupDuration(nestedDurationKey)
|
||||
|
||||
subQueryNames = append(subQueryNames, fmt.Sprintf(`%sData`, nestedDurationKey))
|
||||
partialQueryStr = append(partialQueryStr, []string{
|
||||
fmt.Sprintf(`%sData = from(bucket: "%s")`, nestedDurationKey, bucketName),
|
||||
fmt.Sprintf(`|> range(start: %s, stop: %s)`, durationRange[0], durationRange[1]),
|
||||
`|> filter(fn: (r) => r["_measurement"] == "temp" )`,
|
||||
`|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)`,
|
||||
`|> group(columns: ["device_wwn"])`,
|
||||
`|> toInt()`,
|
||||
"",
|
||||
}...)
|
||||
}
|
||||
|
||||
if len(subQueryNames) == 1 {
|
||||
//there's only one bucket being queried, no need to union, just aggregate the dataset and return
|
||||
partialQueryStr = append(partialQueryStr, []string{
|
||||
subQueryNames[0],
|
||||
"|> schema.fieldsAsCols()",
|
||||
"|> yield()",
|
||||
}...)
|
||||
} else {
|
||||
partialQueryStr = append(partialQueryStr, []string{
|
||||
fmt.Sprintf("union(tables: [%s])", strings.Join(subQueryNames, ", ")),
|
||||
`|> group(columns: ["device_wwn"])`,
|
||||
`|> sort(columns: ["_time"], desc: false)`,
|
||||
"|> schema.fieldsAsCols()",
|
||||
}...)
|
||||
}
|
||||
|
||||
return strings.Join(partialQueryStr, "\n")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user