| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 import errno
23 import os
24 import time
25 from datetime import datetime
26
27 import gobject
28 import gst
29 import time
30
31 from twisted.internet import reactor
32
33 from flumotion.component import feedcomponent
34 from flumotion.common import log, gstreamer, pygobject, messages, errors
35 from flumotion.common import common
36
37 # proxy import
38 from flumotion.component.component import moods
39 from flumotion.common.pygobject import gsignal
40
41 from flumotion.common.messages import N_
42 T_ = messages.gettexter('flumotion')
43
44 __all__ = ['Disker']
45
46
47 """
48 Disker has a property 'ical-schedule'. This allows an ical file to be
49 specified in the config and have recordings scheduled based on events.
50 This file will be monitored for changes and events reloaded if this
51 happens.
52
53 The filename of a recording started from an ical file will be produced
54 via passing the ical event summary through strftime, so that an archive
55 can encode the date and time that it was begun.
56
57 The time that will be given to strftime will be given in the timezone of
58 the ical event. In practice this will either be UTC or the local time of
59 the machine running the disker, as the ical scheduler does not
60 understand arbitrary timezones.
61 """
62
63 try:
64 # icalendar and dateutil modules needed for scheduling recordings
65 from icalendar import Calendar
66 from dateutil import rrule
67 HAS_ICAL = True
68 except:
69 HAS_ICAL = False
70
72 # called when admin ui wants to stop recording. call changeFilename to
73 # restart
75 self.comp.stop_recording()
76
77 # called when admin ui wants to change filename (this starts recording if
78 # the disker isn't currently writing to disk)
80 self.comp.change_filename(filenameTemplate)
81
83 if HAS_ICAL:
84 cal = Calendar.from_string(ical)
85 self.addEvents(self.comp.icalScheduler.parseCalendar(cal))
86
87 # called when admin ui wants updated state (current filename info)
89 self.comp.update_ui_state()
90
92 componentMediumClass = DiskerMedium
93 checkOffset = True
94 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false'
95 file = None
96 directory = None
97 location = None
98 caps = None
99
101 self.uiState.addKey('filename', None)
102 self.uiState.addKey('recording', False)
103 self.uiState.addKey('can-schedule', HAS_ICAL)
104
106 directory = properties['directory']
107
108 self.directory = directory
109
110 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')])
111
112 rotateType = properties.get('rotate-type', 'none')
113
114 # validate rotate-type and size/time properties first
115 if not rotateType in ['none', 'size', 'time']:
116 m = messages.Error(T_(N_(
117 "The configuration property 'rotate-type' should be set to "
118 "'size', time', or 'none', not '%s'. "
119 "Please fix the configuration."),
120 rotateType), id='rotate-type')
121 self.addMessage(m)
122 raise errors.ComponentSetupHandledError()
123
124 # size and time types need the property specified
125 if rotateType in ['size', 'time']:
126 if rotateType not in properties.keys():
127 m = messages.Error(T_(N_(
128 "The configuration property '%s' should be set. "
129 "Please fix the configuration."),
130 rotateType), id='rotate-type')
131 self.addMessage(m)
132 raise errors.ComponentSetupHandledError()
133
134 # now act on the properties
135 if rotateType == 'size':
136 self.setSizeRotate(properties['size'])
137 elif rotateType == 'time':
138 self.setTimeRotate(properties['time'])
139 # FIXME: should add a way of saying "do first cycle at this time"
140
141 return self.pipe_template
142
144 """
145 @param time: duration of file (in seconds)
146 """
147 reactor.callLater(time, self._rotateTimeCallback, time)
148
150 """
151 @param size: size of file (in bytes)
152 """
153 reactor.callLater(5, self._rotateSizeCallback, size)
154
156 self.change_filename()
157
158 # Add a new one
159 reactor.callLater(time, self._rotateTimeCallback, time)
160
162 if not self.location:
163 self.warning('Cannot rotate file, no file location set')
164 else:
165 if os.stat(self.location).st_size > size:
166 self.change_filename()
167
168 # Add a new one
169 reactor.callLater(5, self._rotateTimeCallback, size)
170
174
176 mime = self.get_mime()
177 if mime == 'multipart/x-mixed-replace':
178 mime += ";boundary=ThisRandomString"
179 return mime
180
182 """
183 @param filenameTemplate: strftime formatted string to decide filename
184 @param timeOrTuple: a valid time to pass to strftime, defaulting
185 to time.localtime(). A 9-tuple may be passed instead.
186 """
187 mime = self.get_mime()
188 if mime == 'application/ogg':
189 ext = 'ogg'
190 elif mime == 'multipart/x-mixed-replace':
191 ext = 'multipart'
192 elif mime == 'audio/mpeg':
193 ext = 'mp3'
194 elif mime == 'video/x-msvideo':
195 ext = 'avi'
196 elif mime == 'video/x-ms-asf':
197 ext = 'asf'
198 elif mime == 'audio/x-flac':
199 ext = 'flac'
200 elif mime == 'audio/x-wav':
201 ext = 'wav'
202 elif mime == 'video/x-matroska':
203 ext = 'mkv'
204 elif mime == 'video/x-dv':
205 ext = 'dv'
206 elif mime == 'video/x-flv':
207 ext = 'flv'
208 elif mime == 'video/mpegts':
209 ext = 'ts'
210 else:
211 ext = 'data'
212
213 self.stop_recording()
214
215 sink = self.get_element('fdsink')
216 if sink.get_state() == gst.STATE_NULL:
217 sink.set_state(gst.STATE_READY)
218
219 filename = ""
220 if not filenameTemplate:
221 filenameTemplate = self._defaultFilenameTemplate
222 filename = "%s.%s" % (common.strftime(filenameTemplate,
223 timeOrTuple or time.localtime()), ext)
224 self.location = os.path.join(self.directory, filename)
225 self.info("Changing filename to %s", self.location)
226 try:
227 self.file = open(self.location, 'a')
228 except IOError, e:
229 self.warning("Failed to open output file %s: %s",
230 self.location, log.getExceptionMessage(e))
231 m = messages.Error(T_(N_("Failed to open output file "
232 "%s. Check your permissions."
233 % (self.location,))))
234 self.addMessage(m)
235 return
236 self._plug_recording_started(self.file, self.location)
237 sink.emit('add', self.file.fileno())
238 self.uiState.set('filename', self.location)
239 self.uiState.set('recording', True)
240
241 if self.symlink_to_current_recording:
242 self.update_symlink(self.location,
243 self.symlink_to_current_recording)
244
246 if not dest.startswith('/'):
247 dest = os.path.join(self.directory, dest)
248 self.debug("updating symbolic link %s to point to %s", src, dest)
249 try:
250 try:
251 os.symlink(src, dest)
252 except OSError, e:
253 if e.errno == errno.EEXIST and os.path.islink(dest):
254 os.unlink(dest)
255 os.symlink(src, dest)
256 else:
257 raise
258 except Exception, e:
259 self.info("Failed to update link %s: %s", dest,
260 log.getExceptionMessage(e))
261 m = messages.Warning(T_(N_("Failed to update symbolic link "
262 "%s. Check your permissions."
263 % (dest,))),
264 debug=log.getExceptionMessage(e))
265 self.addMessage(m)
266
268 sink = self.get_element('fdsink')
269 if sink.get_state() == gst.STATE_NULL:
270 sink.set_state(gst.STATE_READY)
271
272 if self.file:
273 self.file.flush()
274 sink.emit('remove', self.file.fileno())
275 self._plug_recording_stopped(self.file, self.location)
276 self.file = None
277 self.uiState.set('filename', None)
278 self.uiState.set('recording', False)
279 if self.symlink_to_last_recording:
280 self.update_symlink(self.location,
281 self.symlink_to_last_recording)
282
284 caps = pad.get_negotiated_caps()
285 if caps == None:
286 return
287
288 caps_str = gstreamer.caps_repr(caps)
289 self.debug('Got caps: %s' % caps_str)
290
291 new = True
292 if not self.caps == None:
293 self.warning('Already had caps: %s, replacing' % caps_str)
294 new = False
295
296 self.debug('Storing caps: %s' % caps_str)
297 self.caps = caps
298
299 if new and self._recordAtStart:
300 reactor.callLater(0, self.change_filename,
301 self._startFilenameTemplate)
302
303 # callback for when a client is removed so we can figure out
304 # errors
306 # check if status is error
307 if client_status == 4:
308 reactor.callFromThread(self._client_error_cb)
309
311 self.file.close()
312 self.file = None
313 # make element sad
314 self.setMood(moods.sad)
315 id = "error-writing-%s" % self.location
316 m = messages.Error(T_(N_(
317 "Error writing to file %s. Maybe disk is full." % (
318 self.location))),
319 id=id, priority=40)
320 self.addMessage(m)
321
323 self.debug('configure_pipeline for disker')
324 self.symlink_to_last_recording = \
325 properties.get('symlink-to-last-recording', None)
326 self.symlink_to_current_recording = \
327 properties.get('symlink-to-current-recording', None)
328 self._recordAtStart = properties.get('start-recording', True)
329 self._defaultFilenameTemplate = properties.get('filename',
330 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName())
331 self._startFilenameTemplate = self._defaultFilenameTemplate
332 icalfn = properties.get('ical-schedule')
333 if HAS_ICAL and icalfn:
334 from flumotion.component.base import scheduler
335 try:
336 self.icalScheduler = scheduler.ICalScheduler(open(
337 icalfn, 'r'))
338 self.icalScheduler.subscribe(self.eventStarted,
339 self.eventStopped)
340 currentEvents = self.icalScheduler.getCurrentEvents()
341 if currentEvents:
342 self._startFilenameTemplate = currentEvents[0]
343 self._recordAtStart = True
344 else:
345 self._recordAtStart = False
346 except ValueError:
347 m = messages.Warning(T_(N_(
348 "Error parsing ical file %s, so not scheduling any"
349 " events." % icalfn)), id="error-parsing-ical")
350 self.addMessage(m)
351
352 elif icalfn:
353 warnStr = "An ical file has been specified for " \
354 "scheduling but the necessary modules " \
355 "dateutil and/or icalendar are not installed"
356 self.warning(warnStr)
357 m = messages.Warning(T_(N_(warnStr)), id="error-parsing-ical")
358 self.addMessage(m)
359
360 sink = self.get_element('fdsink')
361 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb)
362 # connect to client-removed so we can detect errors in file writing
363 sink.connect('client-removed', self._client_removed_cb)
364
365 # set event probe if we should react to video mark events
366 react_to_marks = properties.get('react-to-stream-markers', False)
367 if react_to_marks:
368 pfx = properties.get('stream-marker-filename-prefix', '%03d.')
369 self._marker_prefix = pfx
370 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
371
374
376 self.stop_recording()
377
379 if HAS_ICAL:
380 cal = Calendar.from_string(icsStr)
381 if self.icalScheduler:
382 events = self.icalScheduler.parseCalendar(cal)
383 if events:
384 self.icalScheduler.addEvents(events)
385 else:
386 self.warning("No events found in the ical string")
387 else:
388 self.warning("Cannot parse ICAL; neccesary modules not installed")
389
391 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
392 # make sure plugs are configured with our socket, see #732
393 if socket not in self.plugs:
394 return
395 for plug in self.plugs[socket]:
396 self.debug('invoking recording_started on '
397 'plug %r on socket %s', plug, socket)
398 plug.recording_started(file, location)
399
401 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
402 # make sure plugs are configured with our socket, see #732
403 if socket not in self.plugs:
404 return
405 for plug in self.plugs[socket]:
406 self.debug('invoking recording_stopped on '
407 'plug %r on socket %s', plug, socket)
408 plug.recording_stopped(file, location)
409
411 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM:
412 evt_struct = event.get_structure()
413 if evt_struct.get_name() == 'FluStreamMark':
414 if evt_struct['action'] == 'start':
415 self._on_marker_start(evt_struct['prog_id'])
416 elif evt_struct['action'] == 'stop':
417 self._on_marker_stop()
418 return True
419
421 self.stop_recording()
422
424 tmpl = self._defaultFilenameTemplate
425 if self._marker_prefix:
426 try:
427 tmpl = '%s%s' % (self._marker_prefix % data,
428 self._defaultFilenameTemplate)
429 except TypeError, err:
430 m = messages.Warning(T_(N_('Failed expanding filename prefix: '
431 '%r <-- %r.'),
432 self._marker_prefix, data),
433 id='expand-marker-prefix')
434 self.addMessage(m)
435 self.warning('Failed expanding filename prefix: '
436 '%r <-- %r; %r' %
437 (self._marker_prefix, data, err))
438 self.change_filename(tmpl)
439
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Thu Aug 7 15:45:56 2008 | http://epydoc.sourceforge.net |