| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 from flumotion.component import feedcomponent
23 from flumotion.common import errors, messages
24 from flumotion.common.planet import moods
25 from twisted.internet import defer
26 import threading
27 from flumotion.common.messages import N_
28 T_ = messages.gettexter('flumotion')
29 import gst
30
31 try:
32 # icalendar and dateutil modules needed for scheduling recordings
33 from icalendar import Calendar
34 from dateutil import rrule
35 HAVE_ICAL = True
36 except:
37 HAVE_ICAL = False
38
45
47 logCategory = 'comb-switch'
48 componentMediumClass = SwitchMedium
49
51 self.uiState.addKey("active-eater")
52 self.icalScheduler = None
53 # _idealEater is used to determine what the ideal eater at the current
54 # time is.
55 self._idealEater = "master"
56 # these deferreds will fire when relevant eaters are ready
57 # usually these will be None, but when a scheduled switch
58 # was requested and the eater wasn't ready, it'll fire when ready
59 # so the switch can be made
60 self._eaterReadyDefers = { "master": None, "backup": None }
61 self._started = False
62
64 self.debug("checking whether switch element exists")
65 from flumotion.worker.checks import check
66 d = check.checkPlugin('switch', 'gst-plugins-bad')
67 def cb(result):
68 for m in result.messages:
69 self.addMessage(m)
70 # if we have been passed an ical file to use for scheduling
71 # then start the ical monitor
72 props = self.config['properties']
73 icalfn = props.get('ical-schedule')
74 if icalfn:
75 if HAVE_ICAL:
76 try:
77 from flumotion.component.base import scheduler
78 self.icalScheduler = scheduler.ICalScheduler(open(
79 icalfn, 'r'))
80 self.icalScheduler.subscribe(self.eventStarted,
81 self.eventStopped)
82 if self.icalScheduler.getCurrentEvents():
83 self._idealEater = "backup"
84 except ValueError:
85 m = messages.Warning(T_(N_(
86 "Error parsing ical file %s, so not scheduling any"
87 " events." % icalfn)), id="error-parsing-ical")
88 self.addMessage(m)
89 else:
90 warnStr = "An ical file has been specified for " \
91 "scheduling but the necessary modules " \
92 "dateutil and/or icalendar are not installed"
93 self.warning(warnStr)
94 m = messages.Warning(T_(N_(warnStr)),
95 id="error-parsing-ical")
96 self.addMessage(m)
97 return result
98 d.addCallback(cb)
99 return d
100
104
106 # eaterSubstring is "master" or "backup"
107 for eaterAlias in self.eaters:
108 if eaterSubstring in eaterAlias:
109 return self.eaters[eaterAlias].isActive()
110 return True
111
112 # if an event starts, semantics are to switch to backup
113 # if an event stops, semantics are to switch to master
115 self.debug("event started %r", event)
116 if self.pipeline:
117 self.switch_to_for_event("backup", True)
118
120 self.debug("event stopped %r", event)
121 if self.pipeline:
122 self.switch_to_for_event("master", False)
123
125 feedcomponent.MultiInputParseLaunchComponent.do_pipeline_playing(self)
126 # needed to stop the flapping between master and backup on startup
127 # in the watchdogs if the starting state is backup
128 self._started = True
129
131 # need to just set _started to True if False and mood is happy
132 feedcomponent.MultiInputParseLaunchComponent.eaterSetActive(self, feedId)
133 if not self._started and moods.get(self.getMood()) == moods.happy:
134 self._started = True
135
137 """
138 @param eaterSubstring: either "master" or "backup"
139 @param startOrStop: True if start of event, False if stop
140 """
141 if eaterSubstring != "master" and eaterSubstring != "backup":
142 self.warning("switch_to_for_event should be called with 'master'"
143 " or 'backup'")
144 return None
145 self._idealEater = eaterSubstring
146 d = defer.maybeDeferred(self.switch_to, eaterSubstring)
147 def switch_to_cb(res):
148 if not res :
149 startOrStopStr = "stopped"
150 if startOrStop:
151 startOrStopStr = "started"
152 warnStr = "Event %s but could not switch to %s" \
153 ", will switch when %s is back" % (startOrStopStr,
154 eaterSubstring, eaterSubstring)
155 self.warning(warnStr)
156 m = messages.Warning(T_(N_(warnStr)),
157 id="error-scheduling-event")
158 self.addMessage(m)
159 self._eaterReadyDefers[eaterSubstring] = defer.Deferred()
160 self._eaterReadyDefers[eaterSubstring].addCallback(
161 lambda x: self.switch_to(eaterSubstring))
162 otherEater = "backup"
163 if eaterSubstring == "backup":
164 otherEater = "master"
165 self._eaterReadyDefers[otherEater] = None
166 d.addCallback(switch_to_cb)
167 return d
168
170 logCategory = "comb-single-switch"
171
173 Switch.init(self)
174 self.switchElement = None
175 # eater name -> name of sink pad on switch element
176 self.switchPads = {}
177
179 pipeline = "switch name=switch ! " \
180 "identity silent=true single-segment=true name=iden "
181
182 for eaterAlias in self.eaters:
183 pipeline += '@ eater:%s @ ! switch. ' % (eaterAlias,)
184
185 pipeline += 'iden.'
186
187 return pipeline
188
190 self.switchElement = sw = pipeline.get_by_name("switch")
191 # figure out the pads connected for the eaters
192 padPeers = {} # padName -> peer element name
193 for sinkPadNumber in range(0, len(self.eaters)):
194 self.debug("sink pad %d %r", sinkPadNumber, sw.get_pad("sink%d" % sinkPadNumber))
195 self.debug("peer pad %r", sw.get_pad("sink%d" % (
196 sinkPadNumber)).get_peer())
197 padPeers["sink%d" % sinkPadNumber] = sw.get_pad("sink%d" % (
198 sinkPadNumber)).get_peer().get_parent().get_name()
199
200 for eaterAlias in self.eaters:
201 feedId = self.eaters[eaterAlias].feedId
202 for sinkPad in padPeers:
203 if feedId and feedId in padPeers[sinkPad]:
204 self.switchPads[eaterAlias] = sinkPad
205 if not eaterAlias in self.switchPads:
206 self.warning("could not find sink pad for eater %s",
207 eaterAlias)
208 # make sure switch has the correct sink pad as active
209 self.debug("Setting switch's active-pad to %s",
210 self.switchPads[self._idealEater])
211 self.switchElement.set_property("active-pad",
212 self.switchPads[self._idealEater])
213 self.uiState.set("active-eater", self._idealEater)
214
216 if not self.switchElement:
217 self.warning("switch_to called with eater %s but before pipeline "
218 "configured")
219 return False
220 if not eater in [ "backup", "master" ]:
221 self.warning ("%s is not master or backup", eater)
222 return False
223 if self.is_active(eater):
224 self.switchElement.set_property("active-pad",
225 self.switchPads[eater])
226 self.uiState.set("active-eater", eater)
227 return True
228 else:
229 self.warning("Could not switch to %s because the %s eater "
230 "is not active." % (eater, eater))
231 return False
232
234 Switch.eaterSetActive(self, feedId)
235 eaterName = self.get_eater_name_for_feed_id(feedId)
236 d = self._eaterReadyDefers[eaterName]
237 if d:
238 d.callback(True)
239 self._eaterReadyDefers[eaterName] = None
240
242 logCategory = "comb-av-switch"
243
245 Switch.init(self)
246 self.audioSwitchElement = None
247 self.videoSwitchElement = None
248 # eater name -> name of sink pad on switch element
249 self.switchPads = {}
250 self._startTimes = {}
251 self._startTimeProbeIds = {}
252 self._padProbeLock = threading.Lock()
253 self._switchLock = threading.Lock()
254 self.pads_awaiting_block = []
255 self.padsBlockedDefer = None
256
258 d = Switch.do_check(self)
259 def checkConfig(result):
260 self.debug("checking config")
261 props = self.config['properties']
262 videoParams = {}
263 audioParams = {}
264 videoParams["video-width"] = props.get("video-width", None)
265 videoParams["video-height"] = props.get("video-height", None)
266 videoParams["video-framerate"] = props.get("video-framerate", None)
267 videoParams["video-pixel-aspect-ratio"] = props.get("video-pixel-aspect-ratio", None)
268 audioParams["audio-channels"] = props.get("audio-channels", None)
269 audioParams["audio-samplerate"] = props.get("audio-samplerate", None)
270
271 nonExistantVideoParams = []
272 existsVideoParam = False
273 allVideoParams = True
274 for p in videoParams:
275 if videoParams[p] == None:
276 allVideoParams = False
277 nonExistantVideoParams.append(p)
278 else:
279 existsVideoParam = True
280 self.debug("exists video param: %d all: %d nonexistant: %r",
281 existsVideoParam, allVideoParams, nonExistantVideoParams)
282 if not allVideoParams and existsVideoParam:
283 # message
284 m = messages.Error(T_(N_(
285 "Video parameter(s) were specified but not all. "
286 "Missing parameters are: %r" % nonExistantVideoParams)),
287 id="video-params-not-specified")
288 self.addMessage(m)
289 nonExistantAudioParams = []
290 existsAudioParam = False
291 allAudioParams = True
292 for p in audioParams:
293 if audioParams[p] == None:
294 allAudioParams = False
295 nonExistantAudioParams.append(p)
296 else:
297 existsAudioParam = True
298 if not allAudioParams and existsAudioParam:
299 # message
300 m = messages.Error(T_(N_(
301 "Audio parameter(s) were specified but not all. "
302 "Missing parameters are: %r" % nonExistantAudioParams)),
303 id="audio-params-not-specified")
304 self.addMessage(m)
305 return result
306 d.addCallback(checkConfig)
307 return d
308
310 eaters = self.eater_names
311 videoForceCapsTemplate = ""
312 audioForceCapsTemplate = ""
313 if properties.get("video-width", None):
314 width = properties["video-width"]
315 height = properties["video-height"]
316 par = properties["video-pixel-aspect-ratio"]
317 framerate = properties["video-framerate"]
318 videoForceCapsTemplate = \
319 "ffmpegcolorspace ! videorate ! videoscale !" \
320 " capsfilter caps=video/x-raw-yuv,width=%d,height=%d," \
321 "framerate=%d/%d,pixel-aspect-ratio=%d/%d," \
322 "format=(fourcc)I420 " \
323 "name=capsfilter-%%(eaterName)s ! " % (width,
324 height, framerate[0], framerate[1], par[0], par[1])
325 if self.config.get("audio-channels", None):
326 channels = self.config["audio-channels"]
327 samplerate = self.config["audio-samplerate"]
328 audioForceCapsTemplate = \
329 "audioconvert ! audioconvert ! capsfilter caps=" \
330 "audio/x-raw-int,channels=%d,samplerate=%d," \
331 "width=16,depth=16,signed=true " \
332 "name=capsfilter-%%(eaterName)s ! " % (
333 channels, samplerate)
334 pipeline = "switch name=vswitch ! " \
335 "identity silent=true single-segment=true name=viden " \
336 "switch name=aswitch ! " \
337 "identity silent=true single-segment=true name=aiden "
338 for eater in eaters:
339 if "video" in eater:
340 tmpl = '@ eater:%%(eaterName)s @ ! %s vswitch. ' % videoForceCapsTemplate
341 if "audio" in eater:
342 tmpl = '@ eater:%%(eaterName)s @ ! %s aswitch. ' % audioForceCapsTemplate
343 pipeline += tmpl % dict(eaterName=eater)
344
345 pipeline += 'viden. ! @feeder:video@ aiden. ! @feeder:audio@'
346 return pipeline
347
349 self.videoSwitchElement = vsw = pipeline.get_by_name("vswitch")
350 self.audioSwitchElement = asw = pipeline.get_by_name("aswitch")
351
352 # figure out how many pads should be connected for the eaters
353 # 1 + number of eaters with eaterName *-backup
354 numVideoPads = 1 + len(self.config["eater"]["video-backup"])
355 numAudioPads = 1 + len(self.config["eater"]["audio-backup"])
356 padPeers = {} # (padName, switchElement) -> peer element name
357 for sinkPadNumber in range(0, numVideoPads):
358 padPeers[("sink%d" % sinkPadNumber, vsw)] = \
359 vsw.get_pad("sink%d" % (
360 sinkPadNumber)).get_peer().get_parent().get_name()
361 for sinkPadNumber in range(0, numAudioPads):
362 padPeers[("sink%d" % sinkPadNumber, asw)] = \
363 asw.get_pad("sink%d" % (
364 sinkPadNumber)).get_peer().get_parent().get_name()
365
366 for feedId in self.eater_names:
367 eaterName = self.get_eater_name_for_feed_id(feedId)
368 self.debug("feedId %s is mapped to eater name %s", feedId,
369 eaterName)
370 if eaterName:
371 for sinkPadName, switchElement in padPeers:
372 if feedId in padPeers[(sinkPadName, switchElement)]:
373 self.switchPads[eaterName] = sinkPadName
374 if not self.switchPads.has_key(eaterName):
375 self.warning("could not find sink pad for eater %s",
376 eaterName )
377 # make sure switch has the correct sink pad as active
378 self.debug("Setting video switch's active-pad to %s",
379 self.switchPads["video-%s" % self._idealEater])
380 vsw.set_property("active-pad",
381 self.switchPads["video-%s" % self._idealEater])
382 self.debug("Setting audio switch's active-pad to %s",
383 self.switchPads["audio-%s" % self._idealEater])
384 asw.set_property("active-pad",
385 self.switchPads["audio-%s" % self._idealEater])
386 self.uiState.set("active-eater", self._idealEater)
387 self.debug("active-eater set to %s", self._idealEater)
388
389 # So switching audio and video is not that easy
390 # We have to make sure the current segment on both
391 # the audio and video switch element have the same
392 # stop value, and the next segment on both to have
393 # the same start value to maintain sync.
394 # In order to do this:
395 # 1) we need to block all src pads of elements connected
396 # to the switches' sink pads
397 # 2) we need to set the property "stop-value" on both the
398 # switches to the highest value of "last-timestamp" on the two
399 # switches.
400 # 3) the pads should be switched (ie active-pad set) on the two switched
401 # 4) the switch elements should be told to queue buffers coming on their
402 # active sinkpads by setting the queue-buffers property to TRUE
403 # 5) pad buffer probes should be added to the now active sink pads of the
404 # switch elements, so that the start value of the enxt new segment can
405 # be determined
406 # 6) the src pads we blocked in 1) should be unblocked
407 # 7) when both pad probes have fired once, use the lowest timestamp
408 # received as the start value for the switch elements
409 # 8) set the queue-buffers property on the switch elements to FALSE
411 if not (self.videoSwitchElement and self.audioSwitchElement):
412 self.warning("switch_to called with eater %s but before pipeline "
413 "configured")
414 return False
415 if eater not in [ "master", "backup" ]:
416 self.warning("%s is not master or backup", eater)
417 return False
418 if self._switchLock.locked():
419 self.warning("Told to switch to %s while a current switch is going on.", eater)
420 return False
421 # Lock is acquired here and released once buffers are told to queue again
422 self._switchLock.acquire()
423 if self.is_active(eater) and self._startTimes == {} and \
424 self.uiState.get("active-eater") != eater:
425 self._startTimes = {"abc":None}
426 self.padsBlockedDefer = defer.Deferred()
427 self.debug("eaterSwitchingTo switching to %s", eater)
428 self.eaterSwitchingTo = eater
429 self._block_switch_sink_pads(True)
430 return self.padsBlockedDefer
431 else:
432 self._switchLock.release()
433 if self.uiState.get("active-eater") == eater:
434 self.warning("Could not switch to %s because it is already active",
435 eater)
436 elif self._startTimes == {}:
437 self.warning("Could not switch to %s because at least "
438 "one of the %s eaters is not active." % (eater, eater))
439 m = messages.Warning(T_(N_(
440 "Could not switch to %s because at least "
441 "one of the %s eaters is not active." % (eater, eater))),
442 id="cannot-switch",
443 priority=40)
444 self.state.append('messages', m)
445 else:
446 self.warning("Could not switch because startTimes is %r",
447 self._startTimes)
448 m = messages.Warning(T_(N_(
449 "Could not switch to %s because "
450 "startTimes is %r." % (eater, self._startTimes))),
451 id="cannot-switch",
452 priority=40)
453 self.state.append('messages', m)
454 return False
455
457 vswTs = self.videoSwitchElement.get_property("last-timestamp")
458 aswTs = self.audioSwitchElement.get_property("last-timestamp")
459 tsToSet = vswTs
460 if aswTs > vswTs:
461 tsToSet = aswTs
462 self.log("Setting stop-value on video switch to %u",
463 tsToSet)
464 self.log("Setting stop-value on audio switch to %u",
465 tsToSet)
466 self.videoSwitchElement.set_property("stop-value",
467 tsToSet)
468 self.audioSwitchElement.set_property("stop-value",
469 tsToSet)
470 message = None
471 if (aswTs > vswTs) and (aswTs - vswTs > gst.SECOND * 10):
472 message = "When switching to %s the other source's video" \
473 " and audio timestamps differ by %u" % (self.eaterSwitchingTo,
474 aswTs - vswTs)
475 elif (vswTs > aswTs) and (vswTs - aswTs > gst.SECOND * 10):
476 message = "When switching to %s the other source's video" \
477 " and audio timestamps differ by %u" % (self.eaterSwitchingTo,
478 vswTs - aswTs)
479 if message:
480 m = messages.Warning(T_(N_(
481 message)),
482 id="large-timestamp-difference",
483 priority=40)
484 self.state.append('messages', m)
485
487 self.log("here with pad %r and blocked %d", pad, blocked)
488 if blocked:
489 if not pad in self.pads_awaiting_block:
490 return
491 self.pads_awaiting_block.remove(pad)
492 self.log("Pads awaiting block are: %r", self.pads_awaiting_block)
493
495 if block:
496 self.pads_awaiting_block = []
497 for eaterName in self.switchPads:
498 if "audio" in eaterName:
499 pad = self.audioSwitchElement.get_pad(
500 self.switchPads[eaterName]).get_peer()
501 else:
502 pad = self.videoSwitchElement.get_pad(
503 self.switchPads[eaterName]).get_peer()
504 if pad:
505 self.pads_awaiting_block.append(pad)
506
507 for eaterName in self.switchPads:
508 if "audio" in eaterName:
509 pad = self.audioSwitchElement.get_pad(
510 self.switchPads[eaterName]).get_peer()
511 else:
512 pad = self.videoSwitchElement.get_pad(
513 self.switchPads[eaterName]).get_peer()
514 if pad:
515 self.debug("Pad: %r blocked being set to: %d", pad, block)
516 ret = pad.set_blocked_async(block, self._block_cb)
517 self.debug("Return of pad block is: %d", ret)
518 self.debug("Pad %r is blocked: %d", pad, pad.is_blocked())
519 if block:
520 self.on_pads_blocked()
521
523 eater = self.eaterSwitchingTo
524 if not eater:
525 self.warning("Eaterswitchingto is None, crying time")
526 self.log("Block callback")
527 self._set_last_timestamp()
528 self.videoSwitchElement.set_property("active-pad",
529 self.switchPads["video-%s" % eater])
530 self.audioSwitchElement.set_property("active-pad",
531 self.switchPads["audio-%s" % eater])
532 self.videoSwitchElement.set_property("queue-buffers",
533 True)
534 self.audioSwitchElement.set_property("queue-buffers",
535 True)
536 self.uiState.set("active-eater", eater)
537 self._add_pad_probes_for_start_time(eater)
538 self._block_switch_sink_pads(False)
539 if self.padsBlockedDefer:
540 self.padsBlockedDefer.callback(True)
541 else:
542 self.warning("Our pad block defer is None, inconsistency time to cry")
543 self.padsBlockedDefer = None
544
546 self.debug("adding buffer probes here for %s", activeEater)
547 for eaterName in ["video-%s" % activeEater, "audio-%s" % activeEater]:
548 if "audio" in eaterName:
549 pad = self.audioSwitchElement.get_pad(
550 self.switchPads[eaterName])
551 else:
552 pad = self.videoSwitchElement.get_pad(
553 self.switchPads[eaterName])
554 self._padProbeLock.acquire()
555 self._startTimeProbeIds[eaterName] = pad.add_buffer_probe(
556 self._start_time_buffer_probe, eaterName)
557 self._padProbeLock.release()
558
560 self.debug("start time buffer probe for %s buf ts: %u",
561 eaterName, buffer.timestamp)
562 self._padProbeLock.acquire()
563 if eaterName in self._startTimeProbeIds:
564 self._startTimes[eaterName] = buffer.timestamp
565 pad.remove_buffer_probe(self._startTimeProbeIds[eaterName])
566 del self._startTimeProbeIds[eaterName]
567 self.debug("pad probe for %s", eaterName)
568 self._check_start_times_received()
569 self._padProbeLock.release()
570 return True
571
573 self.debug("here")
574 activeEater = self.uiState.get("active-eater")
575 haveAllStartTimes = True
576 lowestTs = 0
577 for eaterName in ["video-%s" % activeEater, "audio-%s" % activeEater]:
578 haveAllStartTimes = haveAllStartTimes and \
579 (eaterName in self._startTimes)
580 if eaterName in self._startTimes and \
581 (lowestTs == 0 or self._startTimes[eaterName] < lowestTs):
582 lowestTs = self._startTimes[eaterName]
583 self.debug("lowest ts received from buffer probes: %u",
584 lowestTs)
585
586 if haveAllStartTimes:
587 self.debug("have all start times")
588 self.videoSwitchElement.set_property("start-value", lowestTs)
589 self.audioSwitchElement.set_property("start-value", lowestTs)
590 self._startTimes = {}
591 # we can also turn off the queue-buffers property
592 self.audioSwitchElement.set_property("queue-buffers", False)
593 self.videoSwitchElement.set_property("queue-buffers", False)
594 self.log("eaterSwitchingTo becoming None from %s",
595 self.eaterSwitchingTo)
596 self.eaterSwitchingTo = None
597 self._switchLock.release()
598
600 Switch.eaterSetActive(self, feedId)
601 eaterName = self.get_eater_name_for_feed_id(feedId)
602 d = None
603 if "master" in eaterName and self.is_active("master"):
604 d = self._eaterReadyDefers["master"]
605 self._eaterReadyDefers["master"] = None
606 elif "backup" in eaterName and self.is_active("backup"):
607 d = self._eaterReadyDefers["backup"]
608 self._eaterReadyDefers["backup"] = None
609 if d:
610 d.callback(True)
611
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Thu Aug 7 15:45:45 2008 | http://epydoc.sourceforge.net |