GNOME Bugzilla – Bug 769713
Unable to write multiple simultaneous filesinks from appsrc.
Last modified: 2016-08-16 15:57:00 UTC
I'm having trouble saving from appsrc to filesink when I have multiple gstreamer processes running simultaneously. Only one of the gstreamer processes will write correctly while the others will all write nearly empty files. There appears to be write contention during the filesink operation. Note: I am using gstreamer1.0 (v1.8.2) with python3 (v3.5.2) on MAC OS 10.11.6. Here's what my code is actually doing: In the background, I am reading in frames from a single video stream, converting each frame to BGR numpy arrays of size 1920x800x3, and storing each frame in a circular buffer. I have built a "gstreamer_writer function" that reads frames from this circular buffer, converts the frames into a byte stream, and feeds this stream into appsrc. This works by instantiating a new multiprocess (multiprocessing.Process) and pointing this at the "gstreamer_writer function". This works completely fine for a single multiprocess/function call. Appsrc is correctly fed the byte stream and I save these BGR frames into a mp4 with h264 encoding using the following gstreamer pipeline: appsrc format=3 name=app emit-signals=true do-timestamp=true is-live=true blocksize=4608000 max-bytes=0 caps=video/x-raw,format=BGR,width=1920,height=800 ! videoconvert ! video/x-raw,format=I420,width=1920,height=800 ! vtenc_h264 ! mp4mux ! filesink location=test1.mp4 However, if I instantiate two or more multiprocesses and point them at the function only one of filesinks will work properly. For example, if one is writing to "test1.mp4" and the other is writing to "test2.mp4" then one of the videos will be written correctly and the other will fail and write a nearly empty mp4 (~500kb). It's not always the same mp4, 50% of the time test1.mp4 gets written correctly and 50% of the time test2.mp4 gets written correctly. It looks like there is some kind of race condition or write contention that is preventing both mp4s from being written to file properly. One thing to note is that each multiprocess is accessing the same frames from the same ring buffer. I thought this may have been causing problems with gstreamer. However, if I display the streams with autovideosink instead of writing them to file, I can display as many streams/multiprocesses as I'd like. This means the data is being properly passed through the pipeline and is only failing during the write stage. I tested this using the gstreamer command: appsrc format=3 name=app emit-signals=true do-timestamp=true is-live=true blocksize=4608000 max-bytes=0 caps=video/x-raw,format=BGR,width=1920,height=800 ! videoconvert ! video/x-raw,format=I420,width=1920,height=800 ! vtenc_h264 ! avdec_h264 ! autovideosink If anyone has any suggestions for how I can fix this problem I would appreciate it. I'm hoping it's a simple change but you never know with gstreamer! Thanks!
Thanks for the bug report. Please bear with us while we're trying to wrap our head around that. I'm not sure if I properly understand the issue yet. If you run two instances of gst-launch-1.0 like this: gst-launch-1.0 videotestsrc is-live=true ! queue ! progressreport ! rndbuffersize ! queue ! filesink location=/tmp/1.out gst-launch-1.0 videotestsrc is-live=true ! queue ! progressreport ! rndbuffersize ! queue ! filesink location=/tmp/2.out Does this work fine? Does the multiprocess.Process() interface effectively call fork() ? I don't think you can really do that safely with GStreamer/GLib applications after gst_init().
If I run the two gst-launch instances on the command line they both work completely fine. This was the first thing I tested as well. The multiprocess.Process interface is effectively calling fork. Right now each multiprocess.Process calls a "gstreamer writer function". Within each multiprocess we run gst_init(). Maybe that isn't the right way to be doing this? Another person suggested to add queues before the h264 encoder and the filesync process. They said "Queue has two functions: + make a buffer before consuming element (which encoder is - it needs many frames before it can start encoding) + separate further processing into new thread - so that filesink would process data in new thread." Unfortunately this didn't fix our problem. Here is the code from the function if that helps at all. And the variables we pass are: ring - a ring buffer containing the np.array BGR frames. reader - a reader that reads from the ring buffer. stream_params - a class containing stream info like height/width of frame. exit_event - a multiprocess.Event() flag that is used to send the EOS signal to the gstreamer process and exit the multiprocess. appname - the name of the appsrc process filename - the path/filename of the mp4 file we are writing to file with gstreamer. import gi gi.require_version('Gst', '1.0') from gi.repository import GLib from gi.repository import Gst as gst import logging import pod.framewriter.gstreamer_writer_params as gstreamer_writer_params import pod.cameras.ringbuffer_camera_streamer as ringbuffer_camera_streamer logger = logging.getLogger(__name__) def gstreamer_writer(ring, reader, stream_params, exit_event, appname, filename): """ Reads frames from a ring buffer, converts them to h264 encoded mp4 and writes the video to file. This function is designed to be called within a multiprocessing.Process() subprocess. Args: -ring: a ring buffer containing frames in BGR format. -reader: a reader that reads frames from the ring bufffer. -stream_params: a stream paramater object that contains information about the video stream. -exit_event: a multiprocess.Event() that will be set when the process is flagged to end -appname: the name for the gstreamer appsrc operation -filename: the full path + filename of the movie file that will be saved to file. """ # Initialize debugger. gst.debug_set_active(True) gst.debug_set_default_threshold(1) gst.init(None) # Initialize streamer to read frames from the ring buffer. streamer = ( ringbuffer_camera_streamer.RingbufferCameraMosaicStreamer(ring, reader, stream_params)) # Generate gstreamer command to write BGR bytestream to mp4. gstreamer_params = ( gstreamer_writer_params.GstreamerWriteParameters(stream_params, appname, filename)) command = gstreamer_params.get_pipeline_command() command = ' '.join(command) print (command) # Initialize gstreamer pipeline using generated gstreamer command. pipeline = gst.parse_launch(command) appsrc = pipeline.get_by_name(appname) # Start the main loop. mainloop = GLib.MainLoop() def need_data(src, need_bytes): """ Reads frames from a ring buffer, converts them to h264 encoded mp4 and writes the video to file. """ """ NOTE!!!!! eventually streamer with pass a mosaic frame and not a frame dict so this code will have to change! """ ts, frame_dict = streamer.get_timestamp_and_frames() for feed_name in frame_dict: frame = frame_dict[feed_name] # If the exit flag is triggered emit end-of-stream message if exit_event.is_set(): logging.info(( "EXIT_EVENT triggered for reader with ID: " + str(id(reader)))) src.emit("end-of-stream") # otherwise convert incoming BGR frames to bytes and push into buffer elif (len(frame) > 0): buf = gst.Buffer.new_wrapped(frame.tobytes()) src.emit("push-buffer", buf) """ Adds a listerner for the need-data signal listener that is called whenever appsrc needs data """ appsrc.connect("need-data", need_data) def on_message(bus, msg): """ Handle messages that are passed from the bus. When "exit_event" is triggered and the end-of-stream message is read, the mainloop will exit and the pipeline will be emptied to allow a clean tear down. Args: -bus: bus for storing gstreamer messages -message: gstreamer message. """ msg_type = msg.type print(msg_type.get_name(msg_type)) if (msg.type == gst.MessageType.EOS or msg.type == gst.MessageType.ERROR): mainloop.quit() pipeline.set_state(gst.State.PAUSED) pipeline.set_state(gst.State.READY) pipeline.set_state(gst.State.NULL) # Initialize a bus for message forwarding bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", on_message) # Initalize the pipeline and mainloop pipeline.set_state(gst.State.PLAYING) mainloop.run()
Please could you attach a gzipped/compressed GST_DEBUG=*:6 debug log as well as a stand-alone example application with which to reproduce the issue? (If the above code reproduces it please add it as attachment, thanks)
(In reply to tyler from comment #2) > The multiprocess.Process interface is effectively calling fork. Is the fork() happening before you call *any* GLib or GStreamer function? Also in that context the library constructors of GLib might be problematic (after they're called you also can't safely fork() anymore). fork() without replacing the whole process image (aka execv() and friends) is not really supported by GLib and is also in general is not supported for multi-threaded applications after more threads than the main one exist.
Tim - right now the code requires a lot of moving parts to work correctly so i'll try to package it into something small so you could run a standalone application. In regards to Sebastian. Some of the forks definitely happen after a glib or streamer function is called. We have a circular/ring buffer that is being queued with videos from a video stream. Whenever we need that video to be captured and saved to file we: 1) Begin a new multiprocess.process() that points to the gstreamer writer function as a target. 2) Within that multiprocess/function call we initialize the gstreamer process, pull the frames from appsrc, and write them to file with filesync. If this first multiprocess is still writing video when an ADDITIONAL signal is sent to save video, then a new mulitprocess.process will spawn and a new gstreamer instance will run on that multiprocess. However, this fork is obviously happening while another gstreamer function is already running. We really need the program to work this way though. We have a single video feed that, when signaled, we need to saved into a downsampled and compressed video. Is this type of implementation to be possible with gstreamer? It looks like the fork() issue is the problem we've been having.
(In reply to tyler from comment #5) > We really need the program to work this way though. We have a single video > feed that, when signaled, we need to saved into a downsampled and compressed > video. Is this type of implementation to be possible with gstreamer? > > It looks like the fork() issue is the problem we've been having. Anything using fork() like this is not going to be possible with GStreamer or anything GLib based, or generally anything that is using threads (or if you want to support Windows later, which has no fork() at all). There's not much we can do about that unfortunately. The bigger picture you describe is definitely possible with GStreamer though, but you have to model your application in a different way for that. For example by using threads, or by using really separate processes that are communicating with each other via IPC (and e.g. exchange the actual media data with shared memory).