From da26ba816520e71ea4e43a925f19eced803407b4 Mon Sep 17 00:00:00 2001 From: sqshq Date: Sun, 7 Apr 2019 11:09:24 -0400 Subject: [PATCH] interactive shell sampling --- config.yml | 14 +++-- data/item.go | 156 +++++++++++++++++++++++++++++++++++++++++++----- data/sampler.go | 12 ++-- data/trigger.go | 8 +-- main.go | 6 +- 5 files changed, 163 insertions(+), 33 deletions(-) diff --git a/config.yml b/config.yml index b216e4c..e9c1c7c 100644 --- a/config.yml +++ b/config.yml @@ -26,7 +26,7 @@ runcharts: items: - label: ACTIVE init: mongo --quiet --host=localhost blog - sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" + sample: db.getCollection('posts').find({status:'ACTIVE'}).itcount() transform: $sample | grep cpu - label: INACTIVE sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" @@ -37,15 +37,19 @@ barcharts: scale: 0 items: - label: NEW - sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'ACTIVE'}).itcount()" + init: mongo --quiet --host=localhost blog + sample: db.getCollection('posts').find({status:'ACTIVE'}).itcount() - label: TRIGGERED - sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" + init: mongo --quiet --host=localhost blog + sample: db.getCollection('posts').find({status:'INACTIVE'}).itcount() - label: IN_PROCESS sample: echo 0 - label: FAILED - sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'ACTIVE'}).itcount()" + init: mongo --quiet --host=localhost blog + sample: db.getCollection('posts').find({status:'ACTIVE'}).itcount() - label: FINISHED - sample: mongo --quiet --host=localhost blog --eval "db.getCollection('posts').find({status:'INACTIVE'}).itcount()" + init: mongo --quiet --host=localhost blog + sample: db.getCollection('posts').find({status:'INACTIVE'}).itcount() gauges: - title: YEAR PROGRESS position: [[53, 8], [27, 2]] diff --git a/data/item.go b/data/item.go index 2e71f61..1fce214 100644 --- a/data/item.go +++ b/data/item.go @@ -1,46 +1,73 @@ package data import ( + "bufio" + "errors" ui "github.com/gizak/termui/v3" "github.com/sqshq/sampler/config" + "io" "os" "os/exec" "strings" + "time" ) type Item struct { - Label string - SampleScript string - InitScript *string - TransformScript *string - Color *ui.Color + Label string + SampleScript string + InitScript *string + TransformScript *string + Color *ui.Color + RefreshRateMs int + InteractiveShell *InteractiveShell } -func NewItems(cfgs []config.Item) []Item { +type InteractiveShell struct { + StdoutCh chan string + StderrCh chan string + Stdin io.WriteCloser + Cmd *exec.Cmd +} - items := make([]Item, 0) +func NewItems(cfgs []config.Item, refreshRateMs int) []*Item { + + items := make([]*Item, 0) for _, i := range cfgs { - item := Item{ + item := &Item{ Label: *i.Label, SampleScript: *i.SampleScript, InitScript: i.InitScript, TransformScript: i.TransformScript, - Color: i.Color} + Color: i.Color, + RefreshRateMs: refreshRateMs, + } items = append(items, item) } return items } -func (i *Item) nextValue(variables []string) (value string, err error) { +func (i *Item) nextValue(variables []string) (string, error) { + + if i.InitScript != nil && i.InteractiveShell == nil { + err := i.initInteractiveShell(variables) + if err != nil { + return "", err + } + } + + if i.InitScript != nil { + return i.executeInteractiveShellCmd(variables) + } else { + return i.executeCmd(variables) + } +} + +func (i *Item) executeCmd(variables []string) (string, error) { cmd := exec.Command("sh", "-c", i.SampleScript) - cmd.Env = os.Environ() - - for _, variable := range variables { - cmd.Env = append(cmd.Env, variable) - } + enrichEnvVariables(cmd, variables) output, err := cmd.Output() @@ -50,3 +77,102 @@ func (i *Item) nextValue(variables []string) (value string, err error) { return strings.TrimSpace(string(output)), nil } + +func (i *Item) initInteractiveShell(variables []string) error { + + cmd := exec.Command("sh", "-c", *i.InitScript) + enrichEnvVariables(cmd, variables) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + + stdin, err := cmd.StdinPipe() + if err != nil { + return err + } + + stdoutScanner := bufio.NewScanner(stdout) + stderrScanner := bufio.NewScanner(stderr) + + stdoutCh := make(chan string) + stderrCh := make(chan string) + + go func() { + for stdoutScanner.Scan() { + stdoutCh <- stdoutScanner.Text() + stderrCh <- stderrScanner.Text() + } + }() + + i.InteractiveShell = &InteractiveShell{ + StdoutCh: stdoutCh, + StderrCh: stderrCh, + Stdin: stdin, + Cmd: cmd, + } + + err = cmd.Start() + if err != nil { + return err + } + + return nil +} + +func (i *Item) executeInteractiveShellCmd(variables []string) (string, error) { + + _, err := io.WriteString(i.InteractiveShell.Stdin, i.SampleScript+"\n") + if err != nil { + return "", err + } + + timeout := make(chan bool, 1) + + go func() { + time.Sleep(time.Duration(i.RefreshRateMs)) + timeout <- true + }() + + var resultText strings.Builder + var errorText strings.Builder + + for { + select { + case stdout := <-i.InteractiveShell.StdoutCh: + if len(stdout) > 0 { + resultText.WriteString(stdout) + resultText.WriteString("\n") + } + case stderr := <-i.InteractiveShell.StderrCh: + if len(stderr) > 0 { + errorText.WriteString(stderr) + errorText.WriteString("\n") + } + case <-timeout: + if errorText.Len() > 0 { + return "", errors.New(errorText.String()) + } else { + return i.transformInteractiveShellCmd(resultText.String()) + } + } + } +} + +func (i *Item) transformInteractiveShellCmd(value string) (string, error) { + // TODO use transform script, if any + return strings.TrimSpace(value), nil +} + +func enrichEnvVariables(cmd *exec.Cmd, variables []string) { + cmd.Env = os.Environ() + for _, variable := range variables { + cmd.Env = append(cmd.Env, variable) + } +} diff --git a/data/sampler.go b/data/sampler.go index e9bb560..59323e0 100644 --- a/data/sampler.go +++ b/data/sampler.go @@ -7,12 +7,12 @@ import ( type Sampler struct { consumer *Consumer - items []Item - triggers []Trigger + items []*Item + triggers []*Trigger triggersChannel chan *Sample } -func NewSampler(consumer *Consumer, items []Item, triggers []Trigger, options config.Options, rateMs int) Sampler { +func NewSampler(consumer *Consumer, items []*Item, triggers []*Trigger, options config.Options, rateMs int) Sampler { ticker := time.NewTicker(time.Duration(rateMs * int(time.Millisecond))) @@ -45,15 +45,15 @@ func NewSampler(consumer *Consumer, items []Item, triggers []Trigger, options co return sampler } -func (s *Sampler) sample(item Item, options config.Options) { +func (s *Sampler) sample(item *Item, options config.Options) { val, err := item.nextValue(options.Variables) - if err == nil { + if len(val) > 0 { sample := &Sample{Label: item.Label, Value: val, Color: item.Color} s.consumer.SampleChannel <- sample s.triggersChannel <- sample - } else { + } else if err != nil { s.consumer.AlertChannel <- &Alert{ Title: "SAMPLING FAILURE", Text: getErrorMessage(err), diff --git a/data/trigger.go b/data/trigger.go index f07e9de..25f9543 100644 --- a/data/trigger.go +++ b/data/trigger.go @@ -38,9 +38,9 @@ type Values struct { previous string } -func NewTriggers(cfgs []config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) []Trigger { +func NewTriggers(cfgs []config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) []*Trigger { - triggers := make([]Trigger, 0) + triggers := make([]*Trigger, 0) for _, cfg := range cfgs { triggers = append(triggers, NewTrigger(cfg, consumer, options, player)) @@ -49,8 +49,8 @@ func NewTriggers(cfgs []config.TriggerConfig, consumer *Consumer, options config return triggers } -func NewTrigger(config config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) Trigger { - return Trigger{ +func NewTrigger(config config.TriggerConfig, consumer *Consumer, options config.Options, player *asset.AudioPlayer) *Trigger { + return &Trigger{ title: config.Title, condition: config.Condition, consumer: consumer, diff --git a/main.go b/main.go index a106487..dea56b0 100644 --- a/main.go +++ b/main.go @@ -22,10 +22,10 @@ type Starter struct { opt config.Options } -func (s *Starter) start(drawable ui.Drawable, consumer *data.Consumer, conponentConfig config.ComponentConfig, itemsConfig []config.Item, triggersConfig []config.TriggerConfig) { - cpt := component.NewComponent(drawable, consumer, conponentConfig) +func (s *Starter) start(drawable ui.Drawable, consumer *data.Consumer, componentConfig config.ComponentConfig, itemsConfig []config.Item, triggersConfig []config.TriggerConfig) { + cpt := component.NewComponent(drawable, consumer, componentConfig) triggers := data.NewTriggers(triggersConfig, consumer, s.opt, s.player) - data.NewSampler(consumer, data.NewItems(itemsConfig), triggers, s.opt, *conponentConfig.RefreshRateMs) + data.NewSampler(consumer, data.NewItems(itemsConfig, *componentConfig.RefreshRateMs), triggers, s.opt, *componentConfig.RefreshRateMs) s.lout.AddComponent(cpt) }