| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import os
27
28 import gst
29 import gst.interfaces
30 import gobject
31
32 from twisted.internet import reactor, defer
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.component import component as basecomponent
38 from flumotion.component import feed
39 from flumotion.common import common, interfaces, errors, log, pygobject, messages
40 from flumotion.common import gstreamer
41
42 from flumotion.common.planet import moods
43 from flumotion.common.pygobject import gsignal
44
45 from flumotion.common.messages import N_
46 T_ = messages.gettexter('flumotion')
47
49 """
50 I am a component-side medium for a FeedComponent to interface with
51 the manager-side ComponentAvatar.
52 """
53 implements(interfaces.IComponentMedium)
54 logCategory = 'feedcompmed'
55 remoteLogName = 'feedserver'
56
58 """
59 @param component: L{flumotion.component.feedcomponent.FeedComponent}
60 """
61 basecomponent.BaseComponentMedium.__init__(self, component)
62
63 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) tuple
64 # for remote feeders
65 self._feederPendingConnections = {} # eaterAlias -> cancel thunk
66 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple
67 # for remote eaters
68 self._eaterPendingConnections = {} # feederName -> cancel thunk
69 self.logName = component.name
70
71 ### Referenceable remote methods which can be called from manager
73 self.comp.attachPadMonitorToFeeder(feederName)
74
76 """
77 Sets the GStreamer debugging levels based on the passed debug string.
78
79 @since: 0.4.2
80 """
81 self.debug('Setting GStreamer debug level to %s' % debug)
82 if not debug:
83 return
84
85 for part in debug.split(','):
86 glob = None
87 value = None
88 pair = part.split(':')
89 if len(pair) == 1:
90 # assume only the value
91 value = int(pair[0])
92 elif len(pair) == 2:
93 glob, value = pair
94 value = int(value)
95 else:
96 self.warning("Cannot parse GStreamer debug setting '%s'." %
97 part)
98 continue
99
100 if glob:
101 try:
102 # value has to be an integer
103 gst.debug_set_threshold_for_name(glob, value)
104 except TypeError:
105 self.warning("Cannot set glob %s to value %s" % (
106 glob, value))
107 else:
108 gst.debug_set_default_threshold(value)
109
111 """
112 Tell the component the host and port for the FeedServer through which
113 it can connect a local eater to a remote feeder to eat the given
114 fullFeedId.
115
116 Called on by the manager-side ComponentAvatar.
117 """
118 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
119 return self.connectEater(eaterAlias)
120
122 # The avatarId on the keycards issued by the authenticator will
123 # identify us to the remote component. Attempt to use our
124 # fullFeedId, for debugging porpoises.
125 if hasattr(self.authenticator, 'copy'):
126 tup = common.parseComponentId(self.authenticator.avatarId)
127 flowName, componentName = tup
128 fullFeedId = common.fullFeedId(flowName, componentName,
129 eaterAliasOrFeedName)
130 return self.authenticator.copy(fullFeedId)
131 else:
132 return self.authenticator
133
135 """
136 Connect one of the medium's component's eaters to a remote feed.
137 Called by the component, both on initial connection and for
138 reconnecting.
139
140 @returns: (deferred, cancel) pair, where cancel is a thunk that
141 you can call to cancel any pending connection attempt.
142 """
143 def gotFeed((feedId, fd)):
144 self._feederPendingConnections.pop(eaterAlias, None)
145 self.comp.eatFromFD(eaterAlias, feedId, fd)
146
147 if eaterAlias not in self._feederFeedServer:
148 self.debug("eatFrom() hasn't been called yet for eater %s",
149 eaterAlias)
150 # unclear if this function should have a return value at
151 # all...
152 return defer.succeed(None)
153
154 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
155
156 cancel = self._feederPendingConnections.pop(eaterAlias, None)
157 if cancel:
158 self.debug('cancelling previous connection attempt on %s',
159 eaterAlias)
160 cancel()
161
162 client = feed.FeedMedium(logName=self.comp.name)
163
164 d = client.requestFeed(host, port,
165 self._getAuthenticatorForFeed(eaterAlias),
166 fullFeedId)
167 self._feederPendingConnections[eaterAlias] = client.stopConnecting
168 d.addCallback(gotFeed)
169 return d
170
172 """
173 Tell the component to feed the given feed to the receiving component
174 accessible through the FeedServer on the given host and port.
175
176 Called on by the manager-side ComponentAvatar.
177 """
178 self._eaterFeedServer[fullFeedId] = (host, port)
179 self.connectFeeder(feederName, fullFeedId)
180
182 """
183 Tell the component to feed the given feed to the receiving component
184 accessible through the FeedServer on the given host and port.
185
186 Called on by the manager-side ComponentAvatar.
187 """
188 def gotFeed((fullFeedId, fd)):
189 self._eaterPendingConnections.pop(feederName, None)
190 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
191
192 if fullFeedId not in self._eaterFeedServer:
193 self.debug("feedTo() hasn't been called yet for feeder %s",
194 feederName)
195 # unclear if this function should have a return value at
196 # all...
197 return defer.succeed(None)
198
199 host, port = self._eaterFeedServer[fullFeedId]
200
201 # probably should key on feederName as well
202 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
203 if cancel:
204 self.debug('cancelling previous connection attempt on %s',
205 feederName)
206 cancel()
207
208 client = feed.FeedMedium(logName=self.comp.name)
209
210 d = client.sendFeed(host, port,
211 self._getAuthenticatorForFeed(feederName),
212 fullFeedId)
213 self._eaterPendingConnections[feederName] = client.stopConnecting
214 d.addCallback(gotFeed)
215 return d
216
218 """
219 Tells the component to start providing a master clock on the given
220 UDP port.
221 Can only be called if setup() has been called on the component.
222
223 The IP address returned is the local IP the clock is listening on.
224
225 @returns: (ip, port, base_time)
226 @rtype: tuple of (str, int, long)
227 """
228 self.debug('remote_provideMasterClock(port=%r)' % port)
229 return self.comp.provide_master_clock(port)
230
232 """
233 Return the clock master info created by a previous call to provideMasterClock.
234
235 @returns: (ip, port, base_time)
236 @rtype: tuple of (str, int, long)
237 """
238 return self.comp.get_master_clock()
239
242
244 """
245 Invoke the given methodName on the given effectName in this component.
246 The effect should implement effect_(methodName) to receive the call.
247 """
248 self.debug("calling %s on effect %s" % (methodName, effectName))
249 if not effectName in self.comp.effects:
250 raise errors.UnknownEffectError(effectName)
251 effect = self.comp.effects[effectName]
252 if not hasattr(effect, "effect_%s" % methodName):
253 raise errors.NoMethodError("%s on effect %s" % (methodName,
254 effectName))
255 method = getattr(effect, "effect_%s" % methodName)
256 try:
257 result = method(*args, **kwargs)
258 except TypeError:
259 msg = "effect method %s did not accept %s and %s" % (
260 methodName, args, kwargs)
261 self.debug(msg)
262 raise errors.RemoteRunError(msg)
263 self.debug("effect: result: %r" % result)
264 return result
265
266 from feedcomponent010 import FeedComponent
267
268 FeedComponent.componentMediumClass = FeedComponentMedium
269
271 """A component using gst-launch syntax
272
273 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
274 @cvar checkOffset: whether to check continuity of offsets for
275 eaters
276 """
277
278 DELIMITER = '@'
279
280 # can be set by subclasses
281 checkTimestamp = False
282 checkOffset = False
283
284 # keep these as class variables for the tests
285 FDSRC_TMPL = 'fdsrc name=%(name)s'
286 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
287 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
288 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
289 'recover-policy=1'
290 EATER_TMPL = None
291
293 if not gstreamer.get_plugin_version('coreelements'):
294 raise errors.MissingElementError('identity')
295 if not gstreamer.element_factory_has_property('identity',
296 'check-imperfect-timestamp'):
297 self.checkTimestamp = False
298 self.checkOffset = False
299 self.addMessage(
300 messages.Info(T_(N_(
301 "You will get more debugging information "
302 "if you upgrade to GStreamer 0.10.13 or later."))))
303
304 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL
305 if self.checkTimestamp or self.checkOffset:
306 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE"
307 if self.checkTimestamp:
308 self.EATER_TMPL += " check-imperfect-timestamp=1"
309 if self.checkOffset:
310 self.EATER_TMPL += " check-imperfect-offset=1"
311
312 ### FeedComponent interface implementations
314 try:
315 unparsed = self.get_pipeline_string(self.config['properties'])
316 except errors.MissingElementError, e:
317 m = messages.Error(T_(N_(
318 "The worker does not have the '%s' element installed.\n"
319 "Please install the necessary plug-in and restart "
320 "the component.\n"), e.args[0]))
321 self.state.append('messages', m)
322 raise errors.ComponentSetupHandledError(e)
323
324 self.pipeline_string = self.parse_pipeline(unparsed)
325
326 try:
327 pipeline = gst.parse_launch(self.pipeline_string)
328 except gobject.GError, e:
329 self.warning('Could not parse pipeline: %s' % e.message)
330 m = messages.Error(T_(N_(
331 "GStreamer error: could not parse component pipeline.")),
332 debug=e.message)
333 self.state.append('messages', m)
334 raise errors.PipelineParseError(e.message)
335
336 return pipeline
337
339 FeedComponent.set_pipeline(self, pipeline)
340 if self.checkTimestamp or self.checkOffset:
341 watchElements = dict([(e.elementName + '-identity' , e)
342 for e in self.eaters.values()])
343 self.install_eater_continuity_watch(watchElements)
344 self.configure_pipeline(self.pipeline, self.config['properties'])
345
346 ### ParseLaunchComponent interface for subclasses
348 """
349 Method that must be implemented by subclasses to produce the
350 gstparse string for the component's pipeline. Subclasses should
351 not chain up; this method raises a NotImplemented error.
352
353 Returns: a new pipeline string representation.
354 """
355 raise NotImplementedError('subclasses should implement '
356 'get_pipeline_string')
357
359 """
360 Method that can be implemented by subclasses if they wish to
361 interact with the pipeline after it has been created and set
362 on the component.
363
364 This could include attaching signals and bus handlers.
365 """
366 pass
367
368 ### private methods
370 if len(self.eaters) == 1:
371 eater = 'eater:' + self.eaters.keys()[0]
372 if eater not in pipeline:
373 pipeline = '@' + eater + '@ ! ' + pipeline
374 if len(self.feeders) == 1:
375 feeder = 'feeder:' + self.feeders.keys()[0]
376 if feeder not in pipeline:
377 pipeline = pipeline + ' ! @' + feeder + '@'
378 return pipeline
379
381 """
382 Expand the given pipeline string representation by substituting
383 blocks between '@' with a filled-in template.
384
385 @param pipeline: a pipeline string representation with variables
386 @param templatizers: A dict of prefix => procedure. Template
387 blocks in the pipeline will be replaced
388 with the result of calling the procedure
389 with what is left of the template after
390 taking off the prefix.
391 @returns: a new pipeline string representation.
392 """
393 assert pipeline != ''
394
395 # verify the template has an even number of delimiters
396 if pipeline.count(self.DELIMITER) % 2 != 0:
397 raise TypeError("'%s' contains an odd number of '%s'"
398 % (pipeline, self.DELIMITER))
399
400 out = []
401 for i, block in enumerate(pipeline.split(self.DELIMITER)):
402 # when splitting, the even-indexed members will remain, and
403 # the odd-indexed members are the blocks to be substituted
404 if i % 2 == 0:
405 out.append(block)
406 else:
407 block = block.strip()
408 try:
409 pos = block.index(':')
410 except ValueError:
411 raise TypeError("Template %r has no colon" % (block,))
412 prefix = block[:pos+1]
413 if prefix not in templatizers:
414 raise TypeError("Template %r has invalid prefix %r"
415 % (block, prefix))
416 out.append(templatizers[prefix](block[pos+1:]))
417 return ''.join(out)
418
420 pipeline = " ".join(pipeline.split())
421 self.debug('Creating pipeline, template is %s', pipeline)
422
423 if pipeline == '' and not self.eaters:
424 raise TypeError, "Need a pipeline or a eater"
425
426 if pipeline == '':
427 # code of dubious value
428 assert self.eaters
429 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink'
430
431 pipeline = self.add_default_eater_feeder(pipeline)
432 pipeline = self.parse_tmpl(pipeline,
433 {'eater:': self.get_eater_template,
434 'feeder:': self.get_feeder_template})
435
436 self.debug('pipeline is %s', pipeline)
437 assert self.DELIMITER not in pipeline
438
439 return pipeline
440
442 queue = self.get_queue_string(eaterAlias)
443 elementName = self.eaters[eaterAlias].elementName
444
445 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
446
448 elementName = self.feeders[feederName].elementName
449 return self.FEEDER_TMPL % {'name': elementName}
450
458
460 """
461 I am a part of a feed component for a specific group
462 of functionality.
463
464 @ivar name: name of the effect
465 @type name: string
466 @ivar component: component owning the effect
467 @type component: L{FeedComponent}
468 """
469 logCategory = "effect"
470
472 """
473 @param name: the name of the effect
474 """
475 self.name = name
476 self.setComponent(None)
477
479 """
480 Set the given component as the effect's owner.
481
482 @param component: the component to set as an owner of this effect
483 @type component: L{FeedComponent}
484 """
485 self.component = component
486 self.setUIState(component and component.uiState or None)
487
489 """
490 Set the given UI state on the effect. This method is ideal for
491 adding keys to the UI state.
492
493 @param state: the UI state for the component to use.
494 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
495 """
496 self.uiState = state
497
499 """
500 Get the component owning this effect.
501
502 @rtype: L{FeedComponent}
503 """
504 return self.component
505
507 """
508 This class provides for multi-input ParseLaunchComponents, such as muxers,
509 with a queue attached to each input.
510 """
511 QUEUE_SIZE_BUFFERS = 16
512
514 """
515 Return a gst-parse description of the muxer, which must be named 'muxer'
516 """
517 raise errors.NotImplementedError("Implement in a subclass")
518
520 name = self.eaters[eaterAlias].elementName
521 return ("! queue name=%s-queue max-size-buffers=%d !"
522 % (name, self.QUEUE_SIZE_BUFFERS))
523
525 eaters = self.config.get('eater', {})
526 sources = self.config.get('source', [])
527 if eaters == {} and sources != []:
528 # for upgrade without manager restart
529 feeds = []
530 for feed in sources:
531 if not ':' in feed:
532 feed = '%s:default' % feed
533 feeds.append(feed)
534 eaters = { 'default': [(x, 'default') for x in feeds] }
535
536 pipeline = self.get_muxer_string(properties) + ' '
537 for e in eaters:
538 for feed, alias in eaters[e]:
539 pipeline += '@ eater:%s @ ! muxer. ' % alias
540
541 pipeline += 'muxer.'
542
543 return pipeline
544
546 # Firstly, ensure that any push in progress is guaranteed to return,
547 # by temporarily enlarging the queue
548 queuename = self.eaters[eaterAlias].elementName + '-queue'
549 queue = self.pipeline.get_by_name(queuename)
550
551 size = queue.get_property("max-size-buffers")
552 queue.set_property("max-size-buffers", size + 1)
553
554 # So, now it's guaranteed to return. However, we want to return the
555 # queue size to its original value. Doing this in a thread-safe manner
556 # is rather tricky...
557 def _block_cb(pad, blocked):
558 # This is called from streaming threads, but we don't do anything
559 # here so it's safe.
560 pass
561 def _underrun_cb(element):
562 # Called from a streaming thread. The queue element does not hold
563 # the queue lock when this is called, so we block our sinkpad,
564 # then re-check the current level.
565 pad = element.get_pad("sink")
566 pad.set_blocked_async(True, _block_cb)
567 level = element.get_property("current-level-buffers")
568 if level < self.QUEUE_SIZE_BUFFERS:
569 element.set_property('max-size-buffers',
570 self.QUEUE_SIZE_BUFFERS)
571 element.disconnect(signalid)
572 pad.set_blocked_async(False, _block_cb)
573
574 signalid = queue.connect("underrun", _underrun_cb)
575
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Thu Aug 7 15:45:54 2008 | http://epydoc.sourceforge.net |