| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 """
23 model abstraction for administration clients supporting different views
24 """
25
26 from twisted.internet import error, defer, reactor
27 from zope.interface import implements
28
29 from flumotion.common import common, errors, interfaces, log
30 from flumotion.common import keycards, planet, medium, package
31 from flumotion.common import messages, signals, connection
32 from flumotion.configure import configure
33 from flumotion.twisted import pb as fpb
34
35 # these two imports are for their side effects of jelly type
36 # registration
37 from flumotion.common import planet, worker
38
39 from flumotion.common.messages import N_
40 T_ = messages.gettexter('flumotion')
41
43 perspectiveInterface = interfaces.IAdminMedium
44
46 """
47 @type medium: AdminModel
48 """
49 fpb.ReconnectingFPBClientFactory.__init__(self)
50 self.medium = medium
51 self.maxDelay = maxDelay
52
53 self.extraTenacious = extraTenacious
54 self.hasBeenConnected = 0
55
57 self.hasBeenConnected = 1
58
59 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
60
62 """
63 @param reason: L{twisted.spread.pb.failure.Failure}
64 """
65 if reason.check(error.DNSLookupError):
66 self.debug('DNS lookup error')
67 if not self.extraTenacious:
68 self.medium.connectionFailed(reason)
69 return
70 elif (reason.check(error.ConnectionRefusedError)
71 or reason.check(error.ConnectError)):
72 # If we're logging in for the first time, we want to make this a
73 # real error; we present a dialog, etc.
74 # However, if we fail later on (e.g. manager shut down, and
75 # hasn't yet been restarted), we want to keep trying to reconnect,
76 # so we just log a message.
77 self.debug("Error connecting to %s: %s", connector.getDestination(),
78 log.getFailureMessage(reason))
79 if self.hasBeenConnected:
80 self.log("we've been connected before though, so going "
81 "to retry")
82 # fall through
83 elif self.extraTenacious:
84 self.log("trying again due to +100 tenacity")
85 # fall through
86 else:
87 self.log("telling medium about connection failure")
88 self.medium.connectionFailed(reason)
89 # return
90 return
91
92 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self,
93 connector, reason)
94
95 # vmethod implementation
99
100 def error(failure):
101 if self.extraTenacious:
102 self.debug('connection problem: %s',
103 log.getFailureMessage(failure))
104 self.debug('we are tenacious, so trying again later')
105 self.disconnect()
106 elif failure.check(errors.ConnectionFailedError):
107 self.debug("emitting connection-failed")
108 self.medium.emit('connection-failed', "I failed my master")
109 self.debug("emitted connection-failed")
110 elif failure.check(errors.ConnectionRefusedError):
111 self.debug("emitting connection-refused")
112 self.medium.emit('connection-refused')
113 self.debug("emitted connection-refused")
114 elif failure.check(errors.NotAuthenticatedError):
115 # FIXME: unauthorized login emit !
116 self.debug("emitting connection-refused")
117 self.medium.emit('connection-refused')
118 self.debug("emitted connection-refused")
119 else:
120 self.medium.emit('connection-error', failure)
121 self.warning('connection error: %s',
122 log.getFailureMessage(failure))
123 # swallow error
124
125 d.addCallbacks(success, error)
126 return d
127
128 # FIXME: stop using signals, we can provide a richer interface with actual
129 # objects and real interfaces for the views a model communicates with
131 """
132 I live in the admin client.
133 I am a data model for any admin view implementing a UI to
134 communicate with one manager.
135 I send signals when things happen.
136
137 Manager calls on us through L{flumotion.manager.admin.AdminAvatar}
138 """
139 __signals__ = ('connected', 'disconnected', 'connection-refused',
140 'connection-failed', 'connection-error', 'reloading',
141 'message', 'update')
142
143 logCategory = 'adminmodel'
144
145 implements(interfaces.IAdminMedium)
146
147 # Public instance variables (read-only)
148 planet = None
149
151 # All of these instance variables are private. Cuidado cabrones!
152 self.connectionInfo = None
153 self.keepTrying = None
154 self._writeConnection = True
155
156 self.managerId = '<uninitialized>'
157
158 self.connected = False
159 self.clientFactory = None
160
161 self._deferredConnect = None
162
163 self._components = {} # dict of components
164 self.planet = None
165 self._workerHeavenState = None
166
169 'Connect to a host.'
170 assert self.clientFactory is None
171
172 self.connectionInfo = connectionInfo
173 self._writeConnection = writeConnection
174
175 # give the admin an id unique to the manager -- if a program is
176 # adminning multiple managers, this id should tell them apart
177 # (and identify duplicates)
178 self.managerId = str(connectionInfo)
179 self.logName = self.managerId
180
181 self.info('Connecting to manager %s with %s',
182 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP')
183
184 self.clientFactory = AdminClientFactory(self,
185 extraTenacious=keepTrying,
186 maxDelay=20)
187 self.clientFactory.startLogin(connectionInfo.authenticator)
188
189 if connectionInfo.use_ssl:
190 common.assertSSLAvailable()
191 from twisted.internet import ssl
192 reactor.connectSSL(connectionInfo.host, connectionInfo.port,
193 self.clientFactory, ssl.ClientContextFactory())
194 else:
195 reactor.connectTCP(connectionInfo.host, connectionInfo.port,
196 self.clientFactory)
197
198 def connected(model, d):
199 # model is really "self". yay gobject?
200 d.callback(model)
201
202 def disconnected(model, d):
203 # can happen after setRemoteReference but before
204 # getPlanetState or getWorkerHeavenState returns
205 if not keepTrying:
206 d.errback(errors.ConnectionFailedError('Lost connection'))
207
208 def connection_refused(model, d):
209 if not keepTrying:
210 d.errback(errors.ConnectionRefusedError())
211
212 def connection_failed(model, reason, d):
213 if not keepTrying:
214 d.errback(errors.ConnectionFailedError(reason))
215
216 def connection_error(model, failure, d):
217 if not keepTrying:
218 d.errback(failure)
219
220 d = defer.Deferred()
221 ids = []
222 ids.append(self.connect('connected', connected, d))
223 ids.append(self.connect('disconnected', disconnected, d))
224 ids.append(self.connect('connection-refused', connection_refused, d))
225 ids.append(self.connect('connection-failed', connection_failed, d))
226 ids.append(self.connect('connection-error', connection_error, d))
227
228 def success(model):
229 map(self.disconnect, ids)
230 self._deferredConnect = None
231 return model
232
233 def failure(f):
234 map(self.disconnect, ids)
235 self._deferredConnect = None
236 return f
237
238 d.addCallbacks(success, failure)
239 self._deferredConnect = d
240 return d
241
243 self.debug('shutting down')
244 if self.clientFactory is not None:
245 # order not semantically important, but this way we avoid a
246 # "reconnecting in X seconds" in the log
247 self.clientFactory.stopTrying()
248 self.clientFactory.disconnect()
249 self.clientFactory = None
250
251 if self._deferredConnect is not None:
252 # this can happen with keepTrying=True
253 self.debug('cancelling connection attempt')
254 self._deferredConnect.errback(errors.ConnectionCancelledError())
255
257 """Close any existing connection to the manager and
258 reconnect."""
259 self.debug('asked to log in again')
260 self.shutdown()
261 return self.connectToManager(self.connectionInfo, keepTrying)
262
263 # FIXME: give these three sensible names
266
268 return '%s:%s (%s)' % (self.connectionInfo.host,
269 self.connectionInfo.port,
270 self.connectionInfo.use_ssl
271 and 'https' or 'http')
272
273 # used in fgc
277
279 # called by client factory
280 if failure.check(error.DNSLookupError):
281 message = ("Could not look up host '%s'."
282 % self.connectionInfo.host)
283 elif (failure.check(error.ConnectionRefusedError)
284 or failure.check(error.ConnectionRefusedError)):
285 message = ("Could not connect to host '%s' on port %d."
286 % (self.connectionInfo.host,
287 self.connectionInfo.port))
288 else:
289 message = ("Unexpected failure.\nDebug information: %s"
290 % log.getFailureMessage (failure))
291 self.debug('emitting connection-failed')
292 self.emit('connection-failed', message)
293 self.debug('emitted connection-failed')
294
296 self.debug("setRemoteReference %r", remoteReference)
297 def gotPlanetState(planet):
298 self.planet = planet
299 # monkey, Monkey, MONKEYPATCH!!!!!
300 self.planet.admin = self
301 self.debug('got planet state')
302 return self.callRemote('getWorkerHeavenState')
303
304 def gotWorkerHeavenState(whs):
305 self._workerHeavenState = whs
306 self.debug('got worker state')
307
308 self.debug('Connected to manager and retrieved all state')
309 self.connected = True
310 self.emit('connected')
311
312 def writeConnection():
313 i = self.connectionInfo
314 if not (i.authenticator.username
315 and i.authenticator.password):
316 self.log('not caching connection information')
317 return
318 s = ''.join(['<connection>',
319 '<host>%s</host>' % i.host,
320 '<manager>%s</manager>' % self.planet.get('name'),
321 '<port>%d</port>' % i.port,
322 '<use_insecure>%d</use_insecure>'
323 % ((not i.use_ssl) and 1 or 0),
324 '<user>%s</user>' % i.authenticator.username,
325 '<passwd>%s</passwd>' % i.authenticator.password,
326 '</connection>'])
327
328 import os
329 import md5
330 sum = md5.new(s).hexdigest()
331 f = os.path.join(configure.registrydir, '%s.connection' % sum)
332 try:
333 h = open(f, 'w')
334 h.write(s)
335 h.close()
336 except Exception, e:
337 self.info('failed to write connection cache file %s: %s',
338 f, log.getExceptionMessage(e))
339
340 # chain up
341 medium.PingingMedium.setRemoteReference(self, remoteReference)
342
343 # fixme: push the disconnect notification upstream
344 def remoteDisconnected(remoteReference):
345 self.debug("emitting disconnected")
346 self.connected = False
347 self.emit('disconnected')
348 self.debug("emitted disconnected")
349 self.remote.notifyOnDisconnect(remoteDisconnected)
350
351 d = self.callRemote('getPlanetState')
352 d.addCallback(gotPlanetState)
353 d.addCallback(gotWorkerHeavenState)
354 if self._writeConnection:
355 d.addCallback(lambda _: writeConnection())
356 return d
357
358 ### model functions; called by UI's to send requests to manager or comp
359
360 ## view management functions
362 return self.connected
363
364 ## generic remote call methods
366 """
367 Call the given method on the given component with the given args.
368
369 @param componentState: component to call the method on
370 @type componentState: L{flumotion.common.planet.AdminComponentState}
371 @param methodName: name of method to call; serialized to a
372 remote_methodName on the worker's medium
373
374 @rtype: L{twisted.internet.defer.Deferred}
375 """
376 d = self.callRemote('componentCallRemote',
377 componentState, methodName,
378 *args, **kwargs)
379 def errback(failure):
380 msg = None
381 if failure.check(errors.NoMethodError):
382 msg = "Remote method '%s' does not exist." % methodName
383 msg += "\n" + failure.value
384 else:
385 msg = log.getFailureMessage(failure)
386
387 # FIXME: we probably need a nicer way of getting component
388 # messages shown from the admin model, but this allows us to
389 # make sure every type of admin has these messages
390 self.warning(msg)
391 m = messages.Warning(T_(N_("Internal error in component.")),
392 debug=msg)
393 componentState.observe_append('messages', m)
394 return failure
395
396 d.addErrback(errback)
397 # FIXME: dialog for other errors ?
398 return d
399
401 """
402 Call the the given method on the given worker with the given args.
403
404 @param workerName: name of the worker to call the method on
405 @param methodName: name of method to call; serialized to a
406 remote_methodName on the worker's medium
407
408 @rtype: L{twisted.internet.defer.Deferred}
409 """
410 return self.callRemote('workerCallRemote', workerName,
411 methodName, *args, **kwargs)
412
413 ## manager remote methods
415 return self.callRemote('loadConfiguration', xml_string)
416
418 return self.callRemote('getConfiguration')
419
421 return self.callRemote('cleanComponents')
422
423 ## worker remote methods
425 return self.workerCallRemote(workerName, 'checkElements', elements)
426
429
431 """
432 Run the given function and args on the given worker. If the
433 worker does not already have the module, or it is out of date,
434 it will be retrieved from the manager.
435
436 @rtype: L{twisted.internet.defer.Deferred} firing an
437 L{flumotion.common.messages.Result}
438 """
439 return self.workerCallRemote(workerName, 'runFunction', moduleName,
440 functionName, *args, **kwargs)
441
444
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Thu Aug 7 15:45:49 2008 | http://epydoc.sourceforge.net |