Ubuntu Pastebin

Paste from dimitern at Wed, 1 Jul 2015 10:41:12 +0000

Download as text
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
1. In apiserver/params/internal.go add:

// EntityWatchResult holds a EntityWatcher id, changes and an error
// (if any).
type EntityWatchResult struct {
	EntityWatcherId string     `json:"EntityWatcherId"`
	Changes          []string  `json:"Changes"`
	Error            *Error    `json:"Error"`
}

// EntityWatchResults holds the results for any API call which ends up
// returning a list of EntityWatchers.
type EntityWatchResults struct {
	Results []EntityWatchResult
}

2. In apiserver/watcher:

func init() {
    // ...
    common.RegisterFacade(
        "EntityWatcher", 0, newEntityWatcher,
        reflect.TypeOf((*srvEntityWatcher)(nil)),
    )
    // ...
}

type EntityWatcher interface {
    state.StringsWatcher
    
    TransformChanges(st *state.State, in []string) ([]string, error)
}

// srvEntityWatcher defines the API for methods on a state.StringsWatcher.
// Each client has its own current set of watchers, stored in resources.
// srvEntityWatcher notifies about changes for all entities of a given kind,
// sending the changes as a list of strings, which could be transformed
// from state entity ids to their corresponding entity tags.
type srvEntityWatcher struct {
	watcher   EntityWatcher
	id        string
	resources *common.Resources
	st *state.State
}

func newEntityWatcher(st *state.State, resources *common.Resources, auth common.Authorizer, id string) (interface{}, error) {
	if !isAgent(auth) {
		return nil, common.ErrPerm
	}
	watcher, ok := resources.Get(id).(EntityWatcher)
	if !ok {
		return nil, common.ErrUnknownWatcher
	}
	return &srvEntityWatcher{
	    st: st,
		watcher:   watcher,
		id:        id,
		resources: resources,
	}, nil
}

// Next returns when a change has occured to an entity of the
// collection being watched since the most recent call to Next
// or the Watch call that created the srvEntityWatcher.
func (w *srvEntityWatcher) Next() (params.EntityWatchResult, error) {
	if changes, ok := <-w.watcher.Changes(); ok {
        transformed, err := w.watcher.TransformChanges(w.st, changes)
        if err != nil {
            return params.EntityWatchResult{}, errors.Annotate(err, "cannot transform changes")
        }
		return params.EntityWatchResult{
			Changes: transformed,
		}, nil
	}
	err := w.watcher.Err()
	if err == nil {
		err = common.ErrStoppedWatcher
	}
	return params.EntityWatchResult{}, err
}

// Stop stops the watcher.
func (w *srvEntityWatcher) Stop() error {
	return w.resources.Stop(w.id)
}

3. In apiserver/addresser/:

type addressesWatcher struct {
    EntityWatcher
}

// TransformChanges converts IP address values to tags.
func (w *addressesWatcher) TransformChanges(st *state.State, in []string) ([]string, error) {
    if len(in) == 0 {
        return in, nil
    }
    
    ipAddresses, closer := st.getCollection(ipAddressesC)
    defer closer()

    which := bson.D{{"value", bson.D{{"$in", in}}}}
    fields := bson.D{{"value", 1}, {"uuid", 1}}
    var docs []ipAddressDoc
    if err := ipAddresses.Find(which).Select(fields).All(&docs); err != nil {
        return nil, errors.Annotate(err, "cannot fetch addresses")
    }
    transformed := make([]string, len(in))
    for i, doc := range docs {
        transformed[i] = names.NewIPAddressTag(doc.UUID)
    }
    return transformed, nil
}

func (api *addresserAPI) WatchIPAddresses() (params.EntityWatcherResult, error) {
    watch := &addressesWatcher{api.st.WatchIPAddresses()}
    
    if changes, ok := <-watch.Changes(); ok {
        transformedChanges, err := watch.TransformChanges(api.st, changes)
        if err != nil {
            return params.EntityWatchResult{], errors.Trace(err)
        }
        return params.EntityWatchResult{
            EntityWatcherId: api.resources.Register(watch),
            Changes: transformedChanges,
        }, nil
    }
    return params.EntityWatchResult{}, watcher.EnsureErr(watch)
}

4. In api/watcher/watcher.go:

// entityWatcher will send events when something changes.
// The content of the changes is a list of entity tags as strings.
type entityWatcher struct {
	commonWatcher
	caller           base.APICaller
	entityWatcherId string
	out              chan []string
}

func NewEntityWatcher(caller base.APICaller, result params.EntityWatchResult) EntityWatcher {
	w := &entityWatcher{
		caller:          caller,
		entityWatcherId: result.EntityWatcherId,
		out:             make(chan []string),
	}
	// NOTE: result.Changes have been transformed already by the caller.
	go func() {
		defer w.tomb.Done()
		defer close(w.out)
		w.tomb.Kill(w.loop(result.Changes))
	}()
	return w
}

func (w *entityWatcher) loop(initialChanges []string) error {
	changes := initialChanges
	w.newResult = func() interface{} { return new(params.EntityWatchResult) }
	w.call = makeWatcherAPICaller(w.caller, "EntityWatcher", w.entityWatcherId)
	w.commonWatcher.init()
	go w.commonLoop()

	for {
		select {
		// Send the initial event or subsequent change.
		case w.out <- changes:
		case <-w.tomb.Dying():
			return nil
		}
		// Read the next change.
		data, ok := <-w.in
		if !ok {
			// The tomb is already killed with the correct error
			// at this point, so just return.
			return nil
		}
		// Changes have been transformed at the server side already.
		changes = data.(*params.EntityWatchResult).Changes
	}
}

// Changes returns a channel that receives a list of changes
// as tags (converted to strings) of the watched entities
// with changes.
func (w *entityWatcher) Changes() <-chan []string {
	return w.out
}

// NOTE: It's perhaps better to make the strings watcher take facade name and reuse the same code?

5. In api/addresser/:

// WatchIPAddresses returns a EntityWatcher for observing the
// tags of IP addresses with changes in life cycle.
// The initial event will contain the tags of any IP addresses
// which are no longer Alive.
func (api *clientAPI) WatchIPAddresses() (watcher.EntityWatcher, error) {
	var results params.EntityWatchResult
	err := api.st.facade.FacadeCall("WatchIPAddresses", nil, &results)
	if err != nil {
		return nil, err
	}
	if len(results.Results) != 1 {
		return nil, fmt.Errorf("expected 1 result, got %d", len(results.Results))
	}
	result := results.Results[0]
	if result.Error != nil {
		return nil, result.Error
	}
	w := watcher.NewEntityWatcher(api.st.facade.RawAPICaller(), result)
	return w, nil
}
Download as text