adding duration key to smart attributes api endpoint

This commit is contained in:
Jason Kulatunga
2022-04-29 15:26:15 -07:00
parent f60636a6aa
commit 0a9d364aea
4 changed files with 175 additions and 94 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ type DeviceRepo interface {
GetDeviceDetails(ctx context.Context, wwn string) (models.Device, error) GetDeviceDetails(ctx context.Context, wwn string) (models.Device, error)
SaveSmartAttributes(ctx context.Context, wwn string, collectorSmartData collector.SmartInfo) (measurements.Smart, error) SaveSmartAttributes(ctx context.Context, wwn string, collectorSmartData collector.SmartInfo) (measurements.Smart, error)
GetSmartAttributeHistory(ctx context.Context, wwn string, startAt string, attributes []string) ([]measurements.Smart, error) GetSmartAttributeHistory(ctx context.Context, wwn string, durationKey string, attributes []string) ([]measurements.Smart, error)
SaveSmartTemperature(ctx context.Context, wwn string, deviceProtocol string, collectorSmartData collector.SmartInfo) error SaveSmartTemperature(ctx context.Context, wwn string, deviceProtocol string, collectorSmartData collector.SmartInfo) error
@@ -5,8 +5,6 @@ import (
"fmt" "fmt"
"github.com/analogj/scrutiny/webapp/backend/pkg/config" "github.com/analogj/scrutiny/webapp/backend/pkg/config"
"github.com/analogj/scrutiny/webapp/backend/pkg/models" "github.com/analogj/scrutiny/webapp/backend/pkg/models"
"github.com/analogj/scrutiny/webapp/backend/pkg/models/collector"
"github.com/analogj/scrutiny/webapp/backend/pkg/models/measurements"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/domain" "github.com/influxdata/influxdb-client-go/v2/domain"
@@ -221,96 +219,6 @@ func (sr *scrutinyRepository) EnsureBuckets(ctx context.Context, org *domain.Org
return nil return nil
} }
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// SMART
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *scrutinyRepository) SaveSmartAttributes(ctx context.Context, wwn string, collectorSmartData collector.SmartInfo) (measurements.Smart, error) {
deviceSmartData := measurements.Smart{}
err := deviceSmartData.FromCollectorSmartInfo(wwn, collectorSmartData)
if err != nil {
sr.logger.Errorln("Could not process SMART metrics", err)
return measurements.Smart{}, err
}
tags, fields := deviceSmartData.Flatten()
p := influxdb2.NewPoint("smart",
tags,
fields,
deviceSmartData.Date)
// write point immediately
return deviceSmartData, sr.influxWriteApi.WritePoint(ctx, p)
}
func (sr *scrutinyRepository) GetSmartAttributeHistory(ctx context.Context, wwn string, startAt string, attributes []string) ([]measurements.Smart, error) {
// Get SMartResults from InfluxDB
fmt.Println("GetDeviceDetails from INFLUXDB")
//TODO: change the filter startrange to a real number.
// Get parser flux query result
//appConfig.GetString("web.influxdb.bucket")
queryStr := fmt.Sprintf(`
import "influxdata/influxdb/schema"
from(bucket: "%s")
|> range(start: -2y, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "%s" )
|> schema.fieldsAsCols()
|> group(columns: ["device_wwn"])
|> yield(name: "last")
`,
sr.appConfig.GetString("web.influxdb.bucket"),
wwn,
)
smartResults := []measurements.Smart{}
result, err := sr.influxQueryApi.Query(ctx, queryStr)
if err == nil {
fmt.Println("GetDeviceDetails NO EROR")
// Use Next() to iterate over query result lines
for result.Next() {
fmt.Println("GetDeviceDetails NEXT")
// Observe when there is new grouping key producing new table
if result.TableChanged() {
//fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("DECODINIG TABLE VALUES: %v", result.Record().Values())
smartData, err := measurements.NewSmartFromInfluxDB(result.Record().Values())
if err != nil {
return nil, err
}
smartResults = append(smartResults, *smartData)
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
return nil, err
}
return smartResults, nil
//if err := device.SquashHistory(); err != nil {
// logger.Errorln("An error occurred while squashing device history", err)
// c.JSON(http.StatusInternalServerError, gin.H{"success": false})
// return
//}
//
//if err := device.ApplyMetadataRules(); err != nil {
// logger.Errorln("An error occurred while applying scrutiny thresholds & rules", err)
// c.JSON(http.StatusInternalServerError, gin.H{"success": false})
// return
//}
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// DeviceSummary // DeviceSummary
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -0,0 +1,163 @@
package database
import (
"context"
"fmt"
"github.com/analogj/scrutiny/webapp/backend/pkg/models/collector"
"github.com/analogj/scrutiny/webapp/backend/pkg/models/measurements"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"strings"
)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// SMART
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *scrutinyRepository) SaveSmartAttributes(ctx context.Context, wwn string, collectorSmartData collector.SmartInfo) (measurements.Smart, error) {
deviceSmartData := measurements.Smart{}
err := deviceSmartData.FromCollectorSmartInfo(wwn, collectorSmartData)
if err != nil {
sr.logger.Errorln("Could not process SMART metrics", err)
return measurements.Smart{}, err
}
tags, fields := deviceSmartData.Flatten()
p := influxdb2.NewPoint("smart",
tags,
fields,
deviceSmartData.Date)
// write point immediately
return deviceSmartData, sr.influxWriteApi.WritePoint(ctx, p)
}
func (sr *scrutinyRepository) GetSmartAttributeHistory(ctx context.Context, wwn string, durationKey string, attributes []string) ([]measurements.Smart, error) {
// Get SMartResults from InfluxDB
fmt.Println("GetDeviceDetails from INFLUXDB")
//TODO: change the filter startrange to a real number.
// Get parser flux query result
//appConfig.GetString("web.influxdb.bucket")
queryStr := sr.aggregateSmartAttributesQuery(wwn, durationKey)
smartResults := []measurements.Smart{}
result, err := sr.influxQueryApi.Query(ctx, queryStr)
if err == nil {
fmt.Println("GetDeviceDetails NO EROR")
// Use Next() to iterate over query result lines
for result.Next() {
fmt.Println("GetDeviceDetails NEXT")
// Observe when there is new grouping key producing new table
if result.TableChanged() {
//fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("DECODINIG TABLE VALUES: %v", result.Record().Values())
smartData, err := measurements.NewSmartFromInfluxDB(result.Record().Values())
if err != nil {
return nil, err
}
smartResults = append(smartResults, *smartData)
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
return nil, err
}
return smartResults, nil
//if err := device.SquashHistory(); err != nil {
// logger.Errorln("An error occurred while squashing device history", err)
// c.JSON(http.StatusInternalServerError, gin.H{"success": false})
// return
//}
//
//if err := device.ApplyMetadataRules(); err != nil {
// logger.Errorln("An error occurred while applying scrutiny thresholds & rules", err)
// c.JSON(http.StatusInternalServerError, gin.H{"success": false})
// return
//}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Helper Methods
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *scrutinyRepository) aggregateSmartAttributesQuery(wwn string, durationKey string) string {
/*
import "influxdata/influxdb/schema"
weekData = from(bucket: "metrics")
|> range(start: -1w, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "%s" )
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> group(columns: ["device_wwn"])
|> toInt()
monthData = from(bucket: "metrics_weekly")
|> range(start: -1mo, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "%s" )
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> group(columns: ["device_wwn"])
|> toInt()
union(tables: [weekData, monthData])
|> group(columns: ["device_wwn"])
|> sort(columns: ["_time"], desc: false)
|> schema.fieldsAsCols()
*/
partialQueryStr := []string{
`import "influxdata/influxdb/schema"`,
}
nestedDurationKeys := sr.lookupNestedDurationKeys(durationKey)
subQueryNames := []string{}
for _, nestedDurationKey := range nestedDurationKeys {
bucketName := sr.lookupBucketName(nestedDurationKey)
durationRange := sr.lookupDuration(nestedDurationKey)
subQueryNames = append(subQueryNames, fmt.Sprintf(`%sData`, nestedDurationKey))
partialQueryStr = append(partialQueryStr, []string{
fmt.Sprintf(`%sData = from(bucket: "%s")`, nestedDurationKey, bucketName),
fmt.Sprintf(`|> range(start: %s, stop: %s)`, durationRange[0], durationRange[1]),
`|> filter(fn: (r) => r["_measurement"] == "smart" )`,
fmt.Sprintf(`|> filter(fn: (r) => r["device_wwn"] == "%s" )`, wwn),
`|> aggregateWindow(every: 1h, fn: last, createEmpty: false)`,
`|> group(columns: ["device_wwn"])`,
`|> toInt()`,
"",
}...)
}
if len(subQueryNames) == 1 {
//there's only one bucket being queried, no need to union, just aggregate the dataset and return
partialQueryStr = append(partialQueryStr, []string{
subQueryNames[0],
"|> schema.fieldsAsCols()",
"|> yield()",
}...)
} else {
partialQueryStr = append(partialQueryStr, []string{
fmt.Sprintf("union(tables: [%s])", strings.Join(subQueryNames, ", ")),
`|> group(columns: ["device_wwn"])`,
`|> sort(columns: ["_time"], desc: false)`,
"|> schema.fieldsAsCols()",
}...)
}
return strings.Join(partialQueryStr, "\n")
}
@@ -19,7 +19,17 @@ func GetDeviceDetails(c *gin.Context) {
return return
} }
smartResults, err := deviceRepo.GetSmartAttributeHistory(c, c.Param("wwn"), "", nil) durationKey, exists := c.GetQuery("duration_key")
if !exists {
durationKey = "forever"
}
smartResults, err := deviceRepo.GetSmartAttributeHistory(c, c.Param("wwn"), durationKey, nil)
if err != nil {
logger.Errorln("An error occurred while retrieving device smart results", err)
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
}
var deviceMetadata interface{} var deviceMetadata interface{}
if device.IsAta() { if device.IsAta() {