THIS IS A TEST INSTANCE ONLY! REPOSITORIES CAN BE DELETED AT ANY TIME!

Browse Source

Support N distributed agents

This is the third main feature of this system. The code needs a bunch of
polish, but it actually all works :)

I've tested this briefly with N <= 3.

Currently you have to build your own etcd cluster. It's quite easy, just
run `etcd` and it will be ready. I usually run it in a throw away /tmp/
dir so that I can blow away the stored data easily.
pull/6/head
James Shubin 5 years ago
parent
commit
d8cbeb56f9
  1. 43
      config.go
  2. 155
      configwatch.go
  3. 47
      etcd.go
  4. 3
      event.go
  5. 8
      examples/graph1.yaml
  6. 2
      examples/graph2.yaml
  7. 28
      examples/graph3a.yaml
  8. 28
      examples/graph3b.yaml
  9. 44
      examples/graph3c.yaml
  10. 6
      examples/graph4.yaml
  11. 2
      examples/graph5.yaml
  12. 73
      file.go
  13. 99
      main.go
  14. 11
      misc.go
  15. 15
      misc_test.go
  16. 3
      omv.yaml
  17. 28
      pgraph.go
  18. 6
      service.go
  19. 21
      types.go

43
config.go

@ -66,7 +66,23 @@ func (c *graphConfig) Parse(data []byte) error {
return nil
}
func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAPI) bool {
func ParseConfigFromFile(filename string) *graphConfig {
data, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("Error: Config: ParseConfigFromFile: File: %v", err)
return nil
}
var config graphConfig
if err := config.Parse(data); err != nil {
log.Printf("Error: Config: ParseConfigFromFile: Parse: %v", err)
return nil
}
return &config
}
func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi etcd.KeysAPI) {
var NoopMap map[string]*Vertex = make(map[string]*Vertex)
var FileMap map[string]*Vertex = make(map[string]*Vertex)
@ -77,17 +93,6 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP
lookup["file"] = FileMap
lookup["service"] = ServiceMap
data, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatal(err)
return false
}
var config graphConfig
if err := config.Parse(data); err != nil {
log.Fatal(err)
return false
}
//fmt.Printf("%+v\n", config) // debug
g.SetName(config.Graph) // set graph name
@ -116,7 +121,7 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP
continue
}
} else {
obj := NewFileType(t.Name, t.Path, t.Content, t.State)
obj := NewFileType(t.Name, t.Path, t.Dirname, t.Basename, t.Content, t.State)
v := g.GetVertexMatch(obj)
if v == nil { // no match found
v = NewVertex(obj)
@ -145,15 +150,19 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP
if ok {
for _, t := range config.Collector {
// XXX: use t.Type and optionally t.Pattern to collect from etcd storage
log.Printf("Collect: %v(%v)", t.Type, t.Pattern)
log.Printf("Collect: %v; Pattern: %v", t.Type, t.Pattern)
for _, x := range EtcdGetProcess(nodes, "file") {
var obj *FileType
if B64ToObj(x, &obj) != true {
log.Printf("File: %v error!", x)
log.Printf("Collect: File: %v not collected!", x)
continue
}
log.Printf("File: %v found!", obj.GetName())
if t.Pattern != "" { // XXX: currently the pattern for files can only override the Dirname variable :P
obj.Dirname = t.Pattern
}
log.Printf("Collect: File: %v collected!", obj.GetName())
// XXX: similar to file add code:
v := g.GetVertexMatch(obj)
@ -182,6 +191,4 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP
for _, e := range config.Edges {
g.AddEdge(lookup[e.From.Type][e.From.Name], lookup[e.To.Type][e.To.Name], NewEdge(e.Name))
}
return true
}

155
configwatch.go

@ -0,0 +1,155 @@
// Mgmt
// Copyright (C) 2013-2015+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
"log"
"math"
"path"
"strings"
"syscall"
)
// XXX: it would be great if we could reuse code between this and the file type
// XXX: patch this to submit it as part of go-fsnotify if they're interested...
func ConfigWatch(file string) chan bool {
ch := make(chan bool)
go func() {
var safename = path.Clean(file) // no trailing slash
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
patharray := PathSplit(safename) // tokenize the path
var index = len(patharray) // starting index
var current string // current "watcher" location
var delta_depth int // depth delta between watcher and event
var send = false // send event?
for {
current = strings.Join(patharray[0:index], "/")
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
log.Printf("Watching: %v\n", current) // attempting to watch...
// initialize in the loop so that we can reset on rm-ed handles
err = watcher.Add(current)
if err != nil {
if err == syscall.ENOENT {
index-- // usually not found, move up one dir
} else if err == syscall.ENOSPC {
// XXX: occasionally: no space left on device,
// XXX: probably due to lack of inotify watches
log.Printf("Lack of watches for config(%v) error: %+v\n", file, err.Error) // 0x408da0
log.Fatal(err)
} else {
log.Printf("Unknown config(%v) error:\n", file)
log.Fatal(err)
}
index = int(math.Max(1, float64(index)))
continue
}
select {
case event := <-watcher.Events:
// the deeper you go, the bigger the delta_depth is...
// this is the difference between what we're watching,
// and the event... doesn't mean we can't watch deeper
if current == event.Name {
delta_depth = 0 // i was watching what i was looking for
} else if HasPathPrefix(event.Name, current) {
delta_depth = len(PathSplit(current)) - len(PathSplit(event.Name)) // -1 or less
} else if HasPathPrefix(current, event.Name) {
delta_depth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more
} else {
// TODO different watchers get each others events!
// https://github.com/go-fsnotify/fsnotify/issues/95
// this happened with two values such as:
// event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2
continue
}
//log.Printf("The delta depth is: %v\n", delta_depth)
// if we have what we wanted, awesome, send an event...
if event.Name == safename {
//log.Println("Event!")
send = true
// file removed, move the watch upwards
if delta_depth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
//log.Println("Removal!")
watcher.Remove(current)
index--
}
// we must be a parent watcher, so descend in
if delta_depth < 0 {
watcher.Remove(current)
index++
}
// if safename starts with event.Name, we're above, and no event should be sent
} else if HasPathPrefix(safename, event.Name) {
//log.Println("Above!")
if delta_depth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
log.Println("Removal!")
watcher.Remove(current)
index--
}
if delta_depth < 0 {
log.Println("Parent!")
if PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
//send = true
}
watcher.Remove(current)
index++
}
// if event.Name startswith safename, send event, we're already deeper
} else if HasPathPrefix(event.Name, safename) {
//log.Println("Event2!")
//send = true
}
case err := <-watcher.Errors:
log.Println("error:", err)
log.Fatal(err)
}
// do our event sending all together to avoid duplicate msgs
if send {
send = false
ch <- true
}
}
close(ch)
}()
return ch
}

47
etcd.go

@ -27,10 +27,19 @@ import (
"time"
)
func EtcdGetKAPI() etcd.KeysAPI {
//go:generate stringer -type=etcdMsg -output=etcdmsg_stringer.go
type etcdMsg int
const (
etcdStart etcdMsg = iota
etcdEvent
etcdFoo
etcdBar
)
func EtcdGetKAPI(seed string) etcd.KeysAPI {
cfg := etcd.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
Endpoints: []string{seed},
Transport: etcd.DefaultTransport,
// set timeout per request to fail fast when the target endpoint is unavailable
HeaderTimeoutPerRequest: time.Second,
@ -56,24 +65,21 @@ func EtcdGetKAPI() etcd.KeysAPI {
return etcd.NewKeysAPI(c)
}
func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string {
func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg {
// XXX: i think we need this buffered so that when we're hanging on the
// channel, which is inside the EtcdWatch main loop, we still want the
// calls to Get/Set on etcd to succeed, so blocking them here would
// kill the whole thing
ch := make(chan string, 1) // XXX: buffer of at least 1 is required
if kick {
ch <- "hello"
}
go func(ch chan string) {
ch := make(chan etcdMsg, 1) // XXX: buffer of at least 1 is required
go func(ch chan etcdMsg) {
tmin := 500 // initial (min) delay in ms
t := tmin // current time
tmult := 2 // multiplier for exponential delay
tmax := 16000 // max delay
watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true})
for {
log.Printf("Watching etcd...")
resp, err := watcher.Next(etcd_context.Background())
log.Printf("Etcd: Watching...")
resp, err := watcher.Next(etcd_context.Background()) // blocks here
if err != nil {
if err == etcd_context.Canceled {
// ctx is canceled by another routine
@ -87,8 +93,18 @@ func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string {
for _, e := range cerr.Errors {
if strings.HasSuffix(e.Error(), "getsockopt: connection refused") {
t = int(math.Min(float64(t*tmult), float64(tmax)))
log.Printf("Waiting %d ms for etcd...", t)
log.Printf("Etcd: Waiting %d ms for connection...", t)
time.Sleep(time.Duration(t) * time.Millisecond) // sleep for t ms
} else if e.Error() == "unexpected EOF" {
log.Printf("Etcd: Disconnected...")
} else if strings.HasPrefix(e.Error(), "unsupported protocol scheme") {
// usually a bad peer endpoint value
log.Fatal("Bad peer endpoint value?")
} else {
log.Fatal("Woops: ", e.Error())
}
}
} else {
@ -114,7 +130,8 @@ func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string {
// IOW, ignore everything except for the value or some
// field which gets set last... this could be the max count field thing...
ch <- resp.Node.Value // event
log.Printf("Etcd: Value: %v", resp.Node.Value) // event
ch <- etcdEvent // event
}
} // end for loop
@ -127,7 +144,7 @@ func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string {
func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool {
output, ok := ObjToB64(obj)
if !ok {
log.Printf("Could not encode %v for etcd.", key)
log.Printf("Etcd: Could not encode %v key.", key)
return false
}
@ -146,7 +163,7 @@ func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool
//if e == etcd.ErrClusterUnavailable
}
}
log.Printf("Could not store %v in etcd.", key)
log.Printf("Etcd: Could not store %v key.", key)
return false
}
log.Print("Etcd: ", resp) // w00t... bonus
@ -155,7 +172,6 @@ func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool
// lookup /exported/ node hierarchy
func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) {
// key structure is /exported/<hostname>/types/...
resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true})
if err != nil {
@ -165,7 +181,6 @@ func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) {
}
func EtcdGetProcess(nodes etcd.Nodes, typ string) []string {
//path := fmt.Sprintf("/exported/%s/types/", h)
top := "/exported/"
log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes

3
event.go

@ -24,11 +24,8 @@ const (
eventExit eventName = iota
eventStart
eventPause
eventContinue
eventPoke
eventChanged
//eventPaused
eventStarted
)
type Event struct {

8
examples/graph1.yaml

@ -5,22 +5,22 @@ types:
- name: noop1
file:
- name: file1
path: /tmp/mgmt/f1
path: "/tmp/mgmt/f1"
content: |
i am f1
state: exists
- name: file2
path: /tmp/mgmt/f2
path: "/tmp/mgmt/f2"
content: |
i am f2
state: exists
- name: file3
path: /tmp/mgmt/f3
path: "/tmp/mgmt/f3"
content: |
i am f3
state: exists
- name: file4
path: /tmp/mgmt/f4
path: "/tmp/mgmt/f4"
content: |
i am f4 and i should not be here
state: absent

2
examples/graph2.yaml

@ -5,7 +5,7 @@ types:
- name: noop1
file:
- name: file1
path: /tmp/mgmt/f1
path: "/tmp/mgmt/f1"
content: |
i am f1
state: exists

28
examples/graph3a.yaml

@ -2,43 +2,43 @@
graph: mygraph
types:
noop:
- name: noop1
- name: noop1a
file:
- name: file1
path: /tmp/mgmt/f1
- name: file1a
path: "/tmp/mgmt1/f1a"
content: |
i am f1
state: exists
- name: file2
path: /tmp/mgmt/f2
- name: file2a
path: "/tmp/mgmt1/f2a"
content: |
i am f2
state: exists
- name: '@@file3'
path: /tmp/mgmt/f3
- name: "@@file3a"
path: "/tmp/mgmt1/f3a"
content: |
i am f3, exported from host A
state: exists
- name: '@@file4'
path: /tmp/mgmt/f4
- name: "@@file4a"
path: "/tmp/mgmt1/f4a"
content: |
i am f4, exported from host A
state: exists
collect:
- type: file
pattern: ''
pattern: "/tmp/mgmt1/"
edges:
- name: e1
from:
type: noop
name: noop1
name: noop1a
to:
type: file
name: file1
name: file1a
- name: e2
from:
type: file
name: file1
name: file1a
to:
type: file
name: file2
name: file2a

28
examples/graph3b.yaml

@ -2,43 +2,43 @@
graph: mygraph
types:
noop:
- name: noop1
- name: noop1b
file:
- name: file1
path: /tmp/mgmt/f1
- name: file1b
path: "/tmp/mgmt2/f1b"
content: |
i am f1
state: exists
- name: file2
path: /tmp/mgmt/f2
- name: file2b
path: "/tmp/mgmt2/f2b"
content: |
i am f2
state: exists
- name: '@@file3'
path: /tmp/mgmt/f3
- name: "@@file3b"
path: "/tmp/mgmt2/f3b"
content: |
i am f3, exported from host B
state: exists
- name: '@@file4'
path: /tmp/mgmt/f4
- name: "@@file4b"
path: "/tmp/mgmt2/f4b"
content: |
i am f4, exported from host B
state: exists
collect:
- type: file
pattern: ''
pattern: "/tmp/mgmt2/"
edges:
- name: e1
from:
type: noop
name: noop1
name: noop1b
to:
type: file
name: file1
name: file1b
- name: e2
from:
type: file
name: file1
name: file1b
to:
type: file
name: file2
name: file2b

44
examples/graph3c.yaml

@ -0,0 +1,44 @@
---
graph: mygraph
types:
noop:
- name: noop1c
file:
- name: file1c
path: "/tmp/mgmt3/f1c"
content: |
i am f1
state: exists
- name: file2c
path: "/tmp/mgmt3/f2c"
content: |
i am f2
state: exists
- name: "@@file3c"
path: "/tmp/mgmt3/f3c"
content: |
i am f3, exported from host C
state: exists
- name: "@@file4c"
path: "/tmp/mgmt3/f4c"
content: |
i am f4, exported from host C
state: exists
collect:
- type: file
pattern: "/tmp/mgmt3/"
edges:
- name: e1
from:
type: noop
name: noop1c
to:
type: file
name: file1c
- name: e2
from:
type: file
name: file1c
to:
type: file
name: file2c

6
examples/graph4.yaml

@ -3,12 +3,12 @@ graph: mygraph
types:
file:
- name: file1
path: /tmp/mgmt/f1
path: "/tmp/mgmt/f1"
content: |
i am f1
state: exists
- name: '@@file3'
path: /tmp/mgmt/f3
- name: "@@file3"
path: "/tmp/mgmt/f3"
content: |
i am f3, exported from host A
state: exists

2
examples/graph5.yaml

@ -3,7 +3,7 @@ graph: mygraph
types:
file:
- name: file1
path: /tmp/mgmt/f1
path: "/tmp/mgmt/f1"
content: |
i am f1
state: exists

73
file.go

@ -35,12 +35,14 @@ import (
type FileType struct {
BaseType `yaml:",inline"`
Path string `yaml:"path"` // path variable (should default to name)
Dirname string `yaml:"dirname"`
Basename string `yaml:"basename"`
Content string `yaml:"content"`
State string `yaml:"state"` // state: exists/present?, absent, (undefined?)
sha256sum string
}
func NewFileType(name, path, content, state string) *FileType {
func NewFileType(name, path, dirname, basename, content, state string) *FileType {
// FIXME if path = nil, path = name ...
return &FileType{
BaseType: BaseType{
@ -49,6 +51,8 @@ func NewFileType(name, path, content, state string) *FileType {
vertex: nil,
},
Path: path,
Dirname: dirname,
Basename: basename,
Content: content,
State: state,
sha256sum: "",
@ -59,15 +63,52 @@ func (obj *FileType) GetType() string {
return "File"
}
// validate if the params passed in are valid data
func (obj *FileType) Validate() bool {
if obj.Dirname != "" {
// must end with /
if obj.Dirname[len(obj.Dirname)-1:] != "/" {
return false
}
}
if obj.Basename != "" {
// must not start with /
if obj.Basename[0:1] == "/" {
return false
}
}
return true
}
func (obj *FileType) GetPath() string {
d := Dirname(obj.Path)
b := Basename(obj.Path)
if !obj.Validate() || (obj.Dirname == "" && obj.Basename == "") {
return obj.Path
} else if obj.Dirname == "" {
return d + obj.Basename
} else if obj.Basename == "" {
return obj.Dirname + b
} else { // if obj.dirname != "" && obj.basename != "" {
return obj.Dirname + obj.Basename
}
}
// File watcher for files and directories
// Modify with caution, probably important to write some test cases first!
// obj.Path: file or directory
// obj.GetPath(): file or directory
func (obj *FileType) Watch() {
if obj.IsWatching() {
return
}
obj.SetWatching(true)
defer obj.SetWatching(false)
//var recursive bool = false
//var isdir = (obj.Path[len(obj.Path)-1:] == "/") // dirs have trailing slashes
//var isdir = (obj.GetPath()[len(obj.GetPath())-1:] == "/") // dirs have trailing slashes
//fmt.Printf("IsDirectory: %v\n", isdir)
//vertex := obj.GetVertex() // stored with SetVertex
var safename = path.Clean(obj.Path) // no trailing slash
var safename = path.Clean(obj.GetPath()) // no trailing slash
watcher, err := fsnotify.NewWatcher()
if err != nil {
@ -204,7 +245,7 @@ func (obj *FileType) HashSHA256fromContent() string {
}
func (obj *FileType) StateOK() bool {
if _, err := os.Stat(obj.Path); os.IsNotExist(err) {
if _, err := os.Stat(obj.GetPath()); os.IsNotExist(err) {
// no such file or directory
if obj.State == "absent" {
return true // missing file should be missing, phew :)
@ -216,7 +257,7 @@ func (obj *FileType) StateOK() bool {
// TODO: add file mode check here...
if PathIsDir(obj.Path) {
if PathIsDir(obj.GetPath()) {
return obj.StateOKDir()
} else {
return obj.StateOKFile()
@ -224,7 +265,7 @@ func (obj *FileType) StateOK() bool {
}
func (obj *FileType) StateOKFile() bool {
if PathIsDir(obj.Path) {
if PathIsDir(obj.GetPath()) {
log.Fatal("This should only be called on a File type.")
}
@ -232,7 +273,7 @@ func (obj *FileType) StateOKFile() bool {
hash := sha256.New()
f, err := os.Open(obj.Path)
f, err := os.Open(obj.GetPath())
if err != nil {
//log.Fatal(err)
return false
@ -255,7 +296,7 @@ func (obj *FileType) StateOKFile() bool {
}
func (obj *FileType) StateOKDir() bool {
if !PathIsDir(obj.Path) {
if !PathIsDir(obj.GetPath()) {
log.Fatal("This should only be called on a Dir type.")
}
@ -267,7 +308,7 @@ func (obj *FileType) StateOKDir() bool {
func (obj *FileType) Apply() bool {
fmt.Printf("Apply->File[%v]\n", obj.Name)
if PathIsDir(obj.Path) {
if PathIsDir(obj.GetPath()) {
return obj.ApplyDir()
} else {
return obj.ApplyFile()
@ -276,13 +317,13 @@ func (obj *FileType) Apply() bool {
func (obj *FileType) ApplyFile() bool {
if PathIsDir(obj.Path) {
if PathIsDir(obj.GetPath()) {
log.Fatal("This should only be called on a File type.")
}
if obj.State == "absent" {
log.Printf("About to remove: %v\n", obj.Path)
err := os.Remove(obj.Path)
log.Printf("About to remove: %v\n", obj.GetPath())
err := os.Remove(obj.GetPath())
if err != nil {
return false
}
@ -290,7 +331,7 @@ func (obj *FileType) ApplyFile() bool {
}
//fmt.Println("writing: " + filename)
f, err := os.Create(obj.Path)
f, err := os.Create(obj.GetPath())
if err != nil {
log.Println("error:", err)
return false
@ -307,7 +348,7 @@ func (obj *FileType) ApplyFile() bool {
}
func (obj *FileType) ApplyDir() bool {
if !PathIsDir(obj.Path) {
if !PathIsDir(obj.GetPath()) {
log.Fatal("This should only be called on a Dir type.")
}
@ -329,7 +370,7 @@ func (obj *FileType) compare(typ *FileType) bool {
if obj.Name != typ.Name {
return false
}
if obj.Path != typ.Path {
if obj.GetPath() != typ.Path {
return false
}
if obj.Content != typ.Content {

99
main.go

@ -74,21 +74,61 @@ func run(c *cli.Context) {
}()
}
// initial etcd peer endpoint
seed := c.String("seed")
if seed == "" {
// XXX: start up etcd server, others will join me!
seed = "http://127.0.0.1:2379" // thus we use the local server!
}
// then, connect to `seed` as a client
// FIXME: validate seed, or wait for it to fail in etcd init?
// etcd
hostname := c.String("hostname")
if hostname == "" {
hostname, _ = os.Hostname() // etcd watch key // XXX: this is not the correct key name this is the set key name... WOOPS
}
go func(hostname string) {
go func() {
startchan := make(chan struct{}) // start signal
go func() { startchan <- struct{}{} }()
file := c.String("file")
configchan := ConfigWatch(file)
log.Printf("Starting etcd...\n")
kapi := EtcdGetKAPI()
kapi := EtcdGetKAPI(seed)
etcdchan := EtcdWatch(kapi)
first := true // first loop or not
for x := range EtcdWatch(kapi, true) {
for {
select {
case _ = <-startchan: // kick the loop once at start
// pass
case msg := <-etcdchan:
switch msg {
// some types of messages we ignore...
case etcdFoo, etcdBar:
continue
// while others passthrough and cause a compile!
case etcdStart, etcdEvent:
// pass
default:
log.Fatal("Etcd: Unhandled message: %v", msg)
}
case msg := <-configchan:
if c.Bool("no-watch") || !msg {
continue // not ready to read config
}
//case compile_event: XXX
}
// run graph vertex LOCK...
if !first {
log.Printf("Watcher().Node.Value(%v): %+v", hostname, x)
config := ParseConfigFromFile(file)
if config == nil {
log.Printf("Config parse failure")
continue
}
// run graph vertex LOCK...
if !first { // XXX: we can flatten this check out I think
G.SetState(graphPausing)
log.Printf("State: %v", G.State())
G.Pause() // sync
@ -97,10 +137,8 @@ func run(c *cli.Context) {
}
// build the graph from a config file
// build the graph on events (eg: from etcd) but kick it once...
if !UpdateGraphFromConfig(c.String("file"), hostname, G, kapi) {
log.Fatal("Graph failure")
}
// build the graph on events (eg: from etcd)
UpdateGraphFromConfig(config, hostname, G, kapi)
log.Printf("Graph: %v\n", G) // show graph
err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz"))
if err != nil {
@ -109,29 +147,20 @@ func run(c *cli.Context) {
log.Printf("Graphviz: Successfully generated graph!")
}
G.SetVertex()
if first {
// G.Start(...) needs to be synchronous or wait,
// because if half of the nodes are started and
// some are not ready yet and the EtcdWatch
// loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors
G.SetState(graphStarting)
log.Printf("State: %v", G.State())
G.Start(&wg)
G.SetState(graphStarted)
log.Printf("State: %v", G.State())
// G.Start(...) needs to be synchronous or wait,
// because if half of the nodes are started and
// some are not ready yet and the EtcdWatch
// loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors
G.SetState(graphStarting)
log.Printf("State: %v", G.State())
G.Start(&wg) // sync
G.SetState(graphStarted)
log.Printf("State: %v", G.State())
} else {
G.SetState(graphContinuing)
log.Printf("State: %v", G.State())
G.Continue() // sync
G.SetState(graphStarted)
log.Printf("State: %v", G.State())
}
first = false
}
}(hostname)
}()
log.Println("Running...")
@ -175,6 +204,10 @@ func main() {
Value: "",
Usage: "graph definition to run",
},
cli.BoolFlag{
Name: "no-watch",
Usage: "do not update graph on watched graph definition file changes",
},
cli.StringFlag{
Name: "code, c",
Value: "",
@ -196,6 +229,12 @@ func main() {
Value: "",
Usage: "hostname to use",
},
// if empty, it will startup a new server
cli.StringFlag{
Name: "seed, s",
Value: "",
Usage: "default etc peer endpoint",
},
cli.IntFlag{
Name: "exittime",
Value: 0,

11
misc.go

@ -27,10 +27,21 @@ import (
// Similar to the GNU dirname command
func Dirname(p string) string {
if p == "/" {
return ""
}
d, _ := path.Split(path.Clean(p))
return d
}
func Basename(p string) string {
_, b := path.Split(path.Clean(p))
if p[len(p)-1:] == "/" { // don't loose the tail slash
b += "/"
}
return b
}
// Split a path into an array of tokens excluding any trailing empty tokens
func PathSplit(p string) []string {
return strings.Split(path.Clean(p), "/")

15
misc_test.go

@ -32,9 +32,22 @@ func TestMiscT1(t *testing.T) {
t.Errorf("Result is incorrect.")
}
if Dirname("/") != "/" {
if Dirname("/") != "" { // TODO: should this equal "/" or "" ?
t.Errorf("Result is incorrect.")
}
if Basename("/foo/bar/baz") != "baz" {
t.Errorf("Result is incorrect.")
}
if Basename("/foo/bar/baz/") != "baz/" {
t.Errorf("Result is incorrect.")
}
if Basename("/") != "/" { // TODO: should this equal "" or "/" ?
t.Errorf("Result is incorrect.")
}
}
func TestMiscT2(t *testing.T) {

3
omv.yaml

@ -1,7 +1,7 @@
---
:domain: example.com
:network: 192.168.123.0/24
:image: centos-7.1
:image: fedora-23
:cpus: ''
:memory: ''
:disks: 0
@ -34,5 +34,6 @@
:reboot: false
:unsafe: false
:nested: false
:tests: []
:comment: ''
:reallyrm: false

28
pgraph.go

@ -39,7 +39,6 @@ const (
graphStarted
graphPausing
graphPaused
graphContinuing
)
// The graph abstract data type (ADT) is defined as follows:
@ -538,18 +537,20 @@ func HeisenbergCount(ch chan *Vertex) int {
}
// main kick to start the graph
func (g *Graph) Start(wg *sync.WaitGroup) {
func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
t, _ := g.TopologicalSort()
for _, v := range Reverse(t) {
wg.Add(1)
// must pass in value to avoid races...
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
go func(vv *Vertex) {
defer wg.Done()
vv.Type.Watch()
log.Printf("Finish: %v", vv.GetName())
}(v)
if !v.Type.IsWatching() { // if Watch() is not running...
wg.Add(1)
// must pass in value to avoid races...
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
go func(vv *Vertex) {
defer wg.Done()
vv.Type.Watch()
log.Printf("Finish: %v", vv.GetName())
}(v)
}
// ensure state is started before continuing on to next vertex
v.Type.SendEvent(eventStart, true)
@ -557,13 +558,6 @@ func (g *Graph) Start(wg *sync.WaitGroup) {
}
}
func (g *Graph) Continue() {
t, _ := g.TopologicalSort()
for _, v := range Reverse(t) {
v.Type.SendEvent(eventContinue, true)
}
}
func (g *Graph) Pause() {
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...

6
service.go

@ -51,6 +51,12 @@ func (obj *ServiceType) GetType() string {
// Service watcher
func (obj *ServiceType) Watch() {
if obj.IsWatching() {
return
}
obj.SetWatching(true)
defer obj.SetWatching(false)
// obj.Name: service name
//vertex := obj.GetVertex() // stored with SetVertex
if !util.IsRunningSystemd() {

21
types.go

@ -33,6 +33,8 @@ type Type interface {
SetVertex(*Vertex)
Compare(Type) bool
SendEvent(eventName, bool)
IsWatching() bool
SetWatching(bool)
GetTimestamp() int64
UpdateTimestamp() int64
//Process()
@ -43,6 +45,7 @@ type BaseType struct {
timestamp int64 // last updated timestamp ?
events chan Event
vertex *Vertex
watching bool // is Watch() loop running ?
}
type NoopType struct {
@ -84,6 +87,16 @@ func (obj *BaseType) SetVertex(v *Vertex) {
obj.vertex = v
}
// is the Watch() function running?
func (obj *BaseType) IsWatching() bool {
return obj.watching
}
// store status of if the Watch() function is running
func (obj *BaseType) SetWatching(b bool) {
obj.watching = b
}
// get timestamp of a vertex
func (obj *BaseType) GetTimestamp() int64 {
return obj.timestamp
@ -160,7 +173,7 @@ func (obj *BaseType) ReadEvent(event *Event) bool {
e.ACK()
if e.Name == eventExit {
return false
} else if e.Name == eventContinue {
} else if e.Name == eventStart { // eventContinue
return true
} else {
log.Fatal("Unknown event: ", e)
@ -208,6 +221,12 @@ func (obj *NoopType) GetType() string {
}
func (obj *NoopType) Watch() {
if obj.IsWatching() {
return
}
obj.SetWatching(true)
defer obj.SetWatching(false)
//vertex := obj.vertex // stored with SetVertex
var send = false // send event?
for {

Loading…
Cancel
Save