interactive shell sampling

This commit is contained in:
sqshq 2019-04-07 11:09:24 -04:00
parent 800c9b2e3e
commit da26ba8165
5 changed files with 163 additions and 33 deletions

View File

@ -26,7 +26,7 @@ runcharts:
items: items:
- label: ACTIVE - label: ACTIVE
init: mongo --quiet --host=localhost blog init: mongo --quiet --host=localhost blog
sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" sample: db.getCollection('posts').find({status:'ACTIVE'}).itcount()
transform: $sample | grep cpu transform: $sample | grep cpu
- label: INACTIVE - label: INACTIVE
sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()"
@ -37,15 +37,19 @@ barcharts:
scale: 0 scale: 0
items: items:
- label: NEW - label: NEW
sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'ACTIVE'}).itcount()" init: mongo --quiet --host=localhost blog
sample: db.getCollection('posts').find({status:'ACTIVE'}).itcount()
- label: TRIGGERED - label: TRIGGERED
sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" init: mongo --quiet --host=localhost blog
sample: db.getCollection('posts').find({status:'INACTIVE'}).itcount()
- label: IN_PROCESS - label: IN_PROCESS
sample: echo 0 sample: echo 0
- label: FAILED - label: FAILED
sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'ACTIVE'}).itcount()" init: mongo --quiet --host=localhost blog
sample: db.getCollection('posts').find({status:'ACTIVE'}).itcount()
- label: FINISHED - label: FINISHED
sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" init: mongo --quiet --host=localhost blog
sample: db.getCollection('posts').find({status:'INACTIVE'}).itcount()
gauges: gauges:
- title: YEAR PROGRESS - title: YEAR PROGRESS
position: [[53, 8], [27, 2]] position: [[53, 8], [27, 2]]

View File

@ -1,11 +1,15 @@
package data package data
import ( import (
"bufio"
"errors"
ui "github.com/gizak/termui/v3" ui "github.com/gizak/termui/v3"
"github.com/sqshq/sampler/config" "github.com/sqshq/sampler/config"
"io"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"time"
) )
type Item struct { type Item struct {
@ -14,33 +18,56 @@ type Item struct {
InitScript *string InitScript *string
TransformScript *string TransformScript *string
Color *ui.Color Color *ui.Color
RefreshRateMs int
InteractiveShell *InteractiveShell
} }
func NewItems(cfgs []config.Item) []Item { type InteractiveShell struct {
StdoutCh chan string
StderrCh chan string
Stdin io.WriteCloser
Cmd *exec.Cmd
}
items := make([]Item, 0) func NewItems(cfgs []config.Item, refreshRateMs int) []*Item {
items := make([]*Item, 0)
for _, i := range cfgs { for _, i := range cfgs {
item := Item{ item := &Item{
Label: *i.Label, Label: *i.Label,
SampleScript: *i.SampleScript, SampleScript: *i.SampleScript,
InitScript: i.InitScript, InitScript: i.InitScript,
TransformScript: i.TransformScript, TransformScript: i.TransformScript,
Color: i.Color} Color: i.Color,
RefreshRateMs: refreshRateMs,
}
items = append(items, item) items = append(items, item)
} }
return items return items
} }
func (i *Item) nextValue(variables []string) (value string, err error) { func (i *Item) nextValue(variables []string) (string, error) {
if i.InitScript != nil && i.InteractiveShell == nil {
err := i.initInteractiveShell(variables)
if err != nil {
return "", err
}
}
if i.InitScript != nil {
return i.executeInteractiveShellCmd(variables)
} else {
return i.executeCmd(variables)
}
}
func (i *Item) executeCmd(variables []string) (string, error) {
cmd := exec.Command("sh", "-c", i.SampleScript) cmd := exec.Command("sh", "-c", i.SampleScript)
cmd.Env = os.Environ() enrichEnvVariables(cmd, variables)
for _, variable := range variables {
cmd.Env = append(cmd.Env, variable)
}
output, err := cmd.Output() output, err := cmd.Output()
@ -50,3 +77,102 @@ func (i *Item) nextValue(variables []string) (value string, err error) {
return strings.TrimSpace(string(output)), nil return strings.TrimSpace(string(output)), nil
} }
func (i *Item) initInteractiveShell(variables []string) error {
cmd := exec.Command("sh", "-c", *i.InitScript)
enrichEnvVariables(cmd, variables)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
stdoutScanner := bufio.NewScanner(stdout)
stderrScanner := bufio.NewScanner(stderr)
stdoutCh := make(chan string)
stderrCh := make(chan string)
go func() {
for stdoutScanner.Scan() {
stdoutCh <- stdoutScanner.Text()
stderrCh <- stderrScanner.Text()
}
}()
i.InteractiveShell = &InteractiveShell{
StdoutCh: stdoutCh,
StderrCh: stderrCh,
Stdin: stdin,
Cmd: cmd,
}
err = cmd.Start()
if err != nil {
return err
}
return nil
}
func (i *Item) executeInteractiveShellCmd(variables []string) (string, error) {
_, err := io.WriteString(i.InteractiveShell.Stdin, i.SampleScript+"\n")
if err != nil {
return "", err
}
timeout := make(chan bool, 1)
go func() {
time.Sleep(time.Duration(i.RefreshRateMs))
timeout <- true
}()
var resultText strings.Builder
var errorText strings.Builder
for {
select {
case stdout := <-i.InteractiveShell.StdoutCh:
if len(stdout) > 0 {
resultText.WriteString(stdout)
resultText.WriteString("\n")
}
case stderr := <-i.InteractiveShell.StderrCh:
if len(stderr) > 0 {
errorText.WriteString(stderr)
errorText.WriteString("\n")
}
case <-timeout:
if errorText.Len() > 0 {
return "", errors.New(errorText.String())
} else {
return i.transformInteractiveShellCmd(resultText.String())
}
}
}
}
func (i *Item) transformInteractiveShellCmd(value string) (string, error) {
// TODO use transform script, if any
return strings.TrimSpace(value), nil
}
func enrichEnvVariables(cmd *exec.Cmd, variables []string) {
cmd.Env = os.Environ()
for _, variable := range variables {
cmd.Env = append(cmd.Env, variable)
}
}

View File

@ -7,12 +7,12 @@ import (
type Sampler struct { type Sampler struct {
consumer *Consumer consumer *Consumer
items []Item items []*Item
triggers []Trigger triggers []*Trigger
triggersChannel chan *Sample triggersChannel chan *Sample
} }
func NewSampler(consumer *Consumer, items []Item, triggers []Trigger, options config.Options, rateMs int) Sampler { func NewSampler(consumer *Consumer, items []*Item, triggers []*Trigger, options config.Options, rateMs int) Sampler {
ticker := time.NewTicker(time.Duration(rateMs * int(time.Millisecond))) ticker := time.NewTicker(time.Duration(rateMs * int(time.Millisecond)))
@ -45,15 +45,15 @@ func NewSampler(consumer *Consumer, items []Item, triggers []Trigger, options co
return sampler return sampler
} }
func (s *Sampler) sample(item Item, options config.Options) { func (s *Sampler) sample(item *Item, options config.Options) {
val, err := item.nextValue(options.Variables) val, err := item.nextValue(options.Variables)
if err == nil { if len(val) > 0 {
sample := &Sample{Label: item.Label, Value: val, Color: item.Color} sample := &Sample{Label: item.Label, Value: val, Color: item.Color}
s.consumer.SampleChannel <- sample s.consumer.SampleChannel <- sample
s.triggersChannel <- sample s.triggersChannel <- sample
} else { } else if err != nil {
s.consumer.AlertChannel <- &Alert{ s.consumer.AlertChannel <- &Alert{
Title: "SAMPLING FAILURE", Title: "SAMPLING FAILURE",
Text: getErrorMessage(err), Text: getErrorMessage(err),

View File

@ -38,9 +38,9 @@ type Values struct {
previous string previous string
} }
func NewTriggers(cfgs []config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) []Trigger { func NewTriggers(cfgs []config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) []*Trigger {
triggers := make([]Trigger, 0) triggers := make([]*Trigger, 0)
for _, cfg := range cfgs { for _, cfg := range cfgs {
triggers = append(triggers, NewTrigger(cfg, consumer, options, player)) triggers = append(triggers, NewTrigger(cfg, consumer, options, player))
@ -49,8 +49,8 @@ func NewTriggers(cfgs []config.TriggerConfig, consumer *Consumer, options config
return triggers return triggers
} }
func NewTrigger(config config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) Trigger { func NewTrigger(config config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) *Trigger {
return Trigger{ return &Trigger{
title: config.Title, title: config.Title,
condition: config.Condition, condition: config.Condition,
consumer: consumer, consumer: consumer,

View File

@ -22,10 +22,10 @@ type Starter struct {
opt config.Options opt config.Options
} }
func (s *Starter) start(drawable ui.Drawable, consumer *data.Consumer, conponentConfig config.ComponentConfig, itemsConfig []config.Item, triggersConfig []config.TriggerConfig) { func (s *Starter) start(drawable ui.Drawable, consumer *data.Consumer, componentConfig config.ComponentConfig, itemsConfig []config.Item, triggersConfig []config.TriggerConfig) {
cpt := component.NewComponent(drawable, consumer, conponentConfig) cpt := component.NewComponent(drawable, consumer, componentConfig)
triggers := data.NewTriggers(triggersConfig, consumer, s.opt, s.player) triggers := data.NewTriggers(triggersConfig, consumer, s.opt, s.player)
data.NewSampler(consumer, data.NewItems(itemsConfig), triggers, s.opt, *conponentConfig.RefreshRateMs) data.NewSampler(consumer, data.NewItems(itemsConfig, *componentConfig.RefreshRateMs), triggers, s.opt, *componentConfig.RefreshRateMs)
s.lout.AddComponent(cpt) s.lout.AddComponent(cpt)
} }