GNOME Bugzilla – Bug 745768
aggregator: Add internal queue
Last modified: 2015-08-16 13:40:13 UTC
If one has an GstAggregator based element (such as audiomixer), where the input buffers are smaller than the output buffers, and one of the input pads is not receiving data. Something like: gst-launch-1.0 audiotestsrc is-live=1 ! queue ! audio/x-raw, rate=48000, format=S16LE ! audiomixer name=am output-buffer-duration=100000000 latency=200000000 ! pulsesink udpsrc ! audioparse ! queue ! audio/x-raw, rate=48000, format=S16LE ! am. When the timeout is reached, the mixer gets one buffers from the queue, mixes it in, and then tries to get another, but if the queue's srcpad task was not fast enough, it may be that there is no buffer on the GstAggregator's sinkpad by the time the aggregate function is looped again. So the output buffer is pushed without it, then on the next iteration, it thinks the buffer is late, drops it, and we get silence! That said, please don't merge this patch, as it changes the semantics of the ->clip() vfunc in GstAggregator, and I don't understand how it's meant to work and it breaks GstVideoAggregator! What exactly is the ->clip() method meant to do? Previously, it would only be called while there was no buffer inside the pad, but with a queue, this is much more problematic, should it be the first time a buffer is pulled from a pad by gst_aggregator_pad_get/steal_buffer() instead?
Created attachment 298758 [details] [review] audiomixer: Only ignore pads with no buffers on timeout When the timeout is reached, only ignore pads with no buffers, iterate over the other pads until all buffers have been read. This is important in the cases where the input buffers are smaller than the output buffer.
Created attachment 298759 [details] [review] audiomixer: Only advance by the buffer size when a buffer is late
Created attachment 298760 [details] [review] aggregator: Be more aggressive with invalid replies to our latency query
Created attachment 298761 [details] [review] aggregator: Assume a live pipeline if latency is set Otherwise one can't have a sparse stream with nothing at the start as input.
Created attachment 298762 [details] [review] aggregator: Queue "latency" buffers at each sink pad. In the case where you have a source giving the GstAggregator smaller buffers than it uses, when it reaches a timeout, it will consume the first buffer, then try to read another buffer for the pad. If the previous element is not fast enough, it may get the next buffer even though it may be queued just before. To prevent that race, the easiest solution is to move the queue inside the GstAggregatorPad itself. It also means that there is no need for strange code cause by increasing the min latency without increasing the max latency proportionally.
Created attachment 298763 [details] [review] audiomixer: Only ignore pads with no buffers on timeout oops, previous version was a bit quickly ported.
Created attachment 298764 [details] [review] videoaggregator: Remove broken _clip vfunc Just removing the _clip() method in GstVideoAggregator makes the tests pass, if I understand the code correctly, it currently never does anything, as it is only called while pad->priv->buffer == NULL, and it does nothing in that case.
Review of attachment 298762 [details] [review]: Arg, this is more complicated than expected, we also need to push the serialized events through the queue, working on an updated patch...
Created attachment 298770 [details] [review] aggregator: Queue "latency" buffers at each sink pad. Let's add queueing the synchronized events if there is something else in the queue. I left the code to run them immediately if possible, but maybe it would be more consitent to push them all to the srcpad task?
Review of attachment 298761 [details] [review]: ::: gst-libs/gst/base/gstaggregator.c @@ +1578,3 @@ + if (latency) + self->priv->latency_live = TRUE; I don't think this change is correct. Why would you assume a live pipeline just because the aggregator subclass told the base class about its latency? Both videoaggregator and audiomixer always set a latency currently.
Comment on attachment 298770 [details] [review] aggregator: Queue "latency" buffers at each sink pad. I didn't review the change yet, but this is something that definitely should be done. I also observed this a few times in the past. Thanks for taking care of this :)
Comment on attachment 298760 [details] [review] aggregator: Be more aggressive with invalid replies to our latency query Maybe also be more aggressive in the other places where we clamp the latency if min>max?
(In reply to Sebastian Dröge (slomo) from comment #10) > Review of attachment 298761 [details] [review] [review]: > > ::: gst-libs/gst/base/gstaggregator.c > @@ +1578,3 @@ > > + if (latency) > + self->priv->latency_live = TRUE; > > I don't think this change is correct. Why would you assume a live pipeline > just because the aggregator subclass told the base class about its latency? > Both videoaggregator and audiomixer always set a latency currently. That is only triggered if the latency property is set, not if a subclass sets a latency. That said, I'm more and more convinced that if the latency property is set to non-0, it should ignore the liveness of the source and always try to sync on the clock. Otherwise, if you have a mixer where one of the pads receives nothing from the start, it waits forever.
Created attachment 298793 [details] [review] aggregator: Assume a live pipeline if latency is set Updated the patch to instead just always force the pipeline to be live if a latency is set, and non-live otherwise.
Created attachment 298794 [details] [review] audiomixer: Release pad object lock before dropping buffer Otherwise, the locking order is violated and deadlocks happen.
(In reply to Olivier Crête from comment #13) > (In reply to Sebastian Dröge (slomo) from comment #10) > > Review of attachment 298761 [details] [review] [review] [review]: > > > > ::: gst-libs/gst/base/gstaggregator.c > > @@ +1578,3 @@ > > > > + if (latency) > > + self->priv->latency_live = TRUE; > > > > I don't think this change is correct. Why would you assume a live pipeline > > just because the aggregator subclass told the base class about its latency? > > Both videoaggregator and audiomixer always set a latency currently. > > That is only triggered if the latency property is set, not if a subclass > sets a latency. That said, I'm more and more convinced that if the latency > property is set to non-0, it should ignore the liveness of the source and > always try to sync on the clock. Otherwise, if you have a mixer where one of > the pads receives nothing from the start, it waits forever. Why does it wait forever then? The latency property only has an effect if one of the upstreams is live, so in your case it shouldn't make any difference if it is set or not. But always syncing to the clock if the property is set seems a bit weird, what about just adding a sync property to aggregator like in identity instead? That would also allow you to force syncing to the clock with latency=0.
It waits forever because the pipeline only does the latency query once a buffer has hit every sink, but the audiomixer won't push out a buffer without goes into "live" mode. We could also have a "sync" property, or set "latency" to -1 to disable "live" mode.
(In reply to Olivier Crête from comment #17) > It waits forever because the pipeline only does the latency query once a > buffer has hit every sink, but the audiomixer won't push out a buffer > without goes into "live" mode. How is that related to setting the latency property though? And shouldn't it instead wake up the srcpad task whenever a pad is added, or linked, or receives a reconfigure event, and then update the latency?
(In reply to Sebastian Dröge (slomo) from comment #18) > How is that related to setting the latency property though? If you have a latency set, it's probably because you want to use it in live mode, otherwise you wouldn't set it. Also, live mode with a zero latency doesn't make much sense, as you'd be almost guaranteed to start dropping packets. > And shouldn't it instead wake up the srcpad task whenever a pad is added, or > linked, or receives a reconfigure event, and then update the latency? None of those things happen with the example gst-launch pipeline, if you just start with one source that produces buffers and the other which may produce buffers sometimes in the future (or may not), then playback never starts, because the aggregator subclass will not push anything out before getting a latency query (to figure out if the source is live), but it won't get a latency query before pushing out a buffer to complete the pre-roll.
(In reply to Olivier Crête from comment #19) > > And shouldn't it instead wake up the srcpad task whenever a pad is added, or > > linked, or receives a reconfigure event, and then update the latency? > > None of those things happen with the example gst-launch pipeline, if you > just start with one source that produces buffers and the other which may > produce buffers sometimes in the future (or may not), then playback never > starts, because the aggregator subclass will not push anything out before > getting a latency query (to figure out if the source is live), but it won't > get a latency query before pushing out a buffer to complete the pre-roll. Yes, but that doesn't prevent aggregator from doing LATENCY queries by itself when it makes sense :)
(In reply to Sebastian Dröge (slomo) from comment #20) > Yes, but that doesn't prevent aggregator from doing LATENCY queries by > itself when it makes sense :) When do you suggest the aggregator should do it in this case ?
Whenever a RECONFIGURE event is received on a sinkpad (i.e. when a new pad is added and linked)
All the linking is done in the NULL state in this case.
... and when the srcpad thread starts for the first time :) I guess it could just check the reconfigure flag on the sinkpads to see if something is to be done for a sinkpad.
Created attachment 298908 [details] [review] aggregator: Query latency on first incoming buffer. Alternate patch with your proposal. And keep on querying upstream until we get a reply. Every other case after the first buffer requires the latency to be reconfigured at the sink too, so would be better served by posting a latency message.
Created attachment 298909 [details] [review] aggregator: Query latency on first incoming buffer. And keep on querying upstream until we get a reply. Also, the _get_latency_unlocked() method required being calld with a private lock, so removed the _unlocked() variant from the API. And it now returns GST_CLOCK_TIME_NONE when the element is not live as we think that 0 upstream latency is possible.
Created attachment 299126 [details] [review] audiomixer: On timeout, resync pads with not enough data This is triggered if there is a gap with no data in the incoming stream, but the stream continues afterwards, in a case like: udpsrc ! tsdemux ! parser ! decoder ! audiomixer ! ... In this case, if the network streams is dropped for a bit, then it would start dropping everything afterwards, with this patch, it doesn't. The various other checks for timestamps continuity aren't triggered for various reasons.
First 4 patches are in: commit ff9be3ba34b62e064432244b779d524adf7fe84c Author: Olivier Crête <olivier.crete@ocrete.ca> Date: Sat Mar 7 22:08:40 2015 -0500 audiomixer: Release pad object lock before dropping buffer Otherwise, the locking order is violated and deadlocks happen. https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 2d553d1b25863a5c0d2bec99957aa3cd00301a13 Author: Olivier Crête <olivier.crete@collabora.com> Date: Fri Mar 6 20:22:13 2015 -0500 audiomixer: Only ignore pads with no buffers on timeout When the timeout is reached, only ignore pads with no buffers, iterate over the other pads until all buffers have been read. This is important in the cases where the input buffers are smaller than the output buffer. https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 0656c2fc67a3ada9c9961132e12fa71554a1e22b Author: Olivier Crête <olivier.crete@collabora.com> Date: Fri Mar 6 21:12:13 2015 -0500 aggregator: Be more aggressive with invalid replies to our latency query https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 9c49624610ab596200c0a449bd1177188cd823b1 Author: Olivier Crête <olivier.crete@collabora.com> Date: Fri Mar 6 20:25:03 2015 -0500 audiomixer: Only advance by the buffer size when a buffer is late https://bugzilla.gnome.org/show_bug.cgi?id=745768
Created attachment 299553 [details] [review] audioaggregator: On timeout, resync pads with not enough data
Rebased version at: http://cgit.collabora.com/git/user/tester/gst-plugins-bad.git/log/?h=aggregator-queue
Created attachment 299554 [details] [review] aggregator: Queue "latency" buffers at each sink pad. Just by re-reading it I found some bugs, so it shouldn't be merged without half decent test coverage!
Should we be using GstDataQueue for this? I'm not a big fan of it using GST_BUFFER_DURATION() for this instead of using the timestamps of the buffers.
Yes, GstDataQueue would probably be a good idea and make the code smaller.
Comment on attachment 298909 [details] [review] aggregator: Query latency on first incoming buffer. Another one pushed: commit 5a6024850c2b6bb4e235ce2c71a127b2ea9871a5 Author: Olivier Crête <olivier.crete@collabora.com> Date: Fri Mar 6 21:12:52 2015 -0500 aggregator: Query latency on first incoming buffer. And keep on querying upstream until we get a reply. Also, the _get_latency_unlocked() method required being calld with a private lock, so removed the _unlocked() variant from the API. And it now returns GST_CLOCK_TIME_NONE when the element is not live as we think that 0 upstream latency is possible. https://bugzilla.gnome.org/show_bug.cgi?id=745768
Then I read the GstDataQueue code and I don't like how it just uses a sum of the durations of the buffers instead of "last running time - first running time". We should probably make GstDataQueue understand buffers and events. Basically, move some of the logic that is currently inside multiqueue into GstDataQueue instead of having to duplicate it.
Created attachment 306737 [details] [review] aggregator: Queue "latency" buffers at each sink pad. In the case where you have a source giving the GstAggregator smaller buffers than it uses, when it reaches a timeout, it will consume the first buffer, then try to read another buffer for the pad. If the previous element is not fast enough, it may get the next buffer even though it may be queued just before. To prevent that race, the easiest solution is to move the queue inside the GstAggregatorPad itself. It also means that there is no need for strange code cause by increasing the min latency without increasing the max latency proportionally. This also means queuing the synchronized events and possibly acting on them on the src task.
Created attachment 306738 [details] [review] audioaggregator: On timeout, resync pads with not enough data
Created attachment 306739 [details] [review] aggregator: Switch to GstDataQueue Also do the clipping with the segment taken before the queue.
Created attachment 306740 [details] [review] audiointerleave: Return not-negotiated if no negotiation has happened
Created attachment 306741 [details] [review] tests: Add audiointerleave test to show that queuing works This tests fails without the queuing patch because incoming buffers are not delivered before they are needed.
Created attachment 306742 [details] [review] tests: Make source live to re-enable aggregator timeout tests The live mode is only enabled if one of the sources if live.
Created attachment 306743 [details] [review] tests: Add test for seeking live pipelines
I forget to squash "aggregator: Queue "latency" buffers at each sink pad" and "aggregator: Switch to GstDataQueue" before posting, I'll do it before merging.
Here is a new patchset that doesn't depend on the dataqueue changes but just re-implements its own queue. The unit tests now uses GstHarness so it needs the GstHarness fixes!
Created attachment 307938 [details] [review] aggregator: Queue "latency" buffers at each sink pad. In the case where you have a source giving the GstAggregator smaller buffers than it uses, when it reaches a timeout, it will consume the first buffer, then try to read another buffer for the pad. If the previous element is not fast enough, it may get the next buffer even though it may be queued just before. To prevent that race, the easiest solution is to move the queue inside the GstAggregatorPad itself. It also means that there is no need for strange code cause by increasing the min latency without increasing the max latency proportionally. This also means queuing the synchronized events and possibly acting on them on the src task.
Created attachment 307939 [details] [review] audioaggregator: On timeout, resync pads with not enough data
Created attachment 307940 [details] [review] audiointerleave: Avoid caps processing if not yet negotiated
Created attachment 307941 [details] [review] tests: Add audiointerleave test to show that queuing works This tests fails without the queuing patch because incoming buffers are not delivered before they are needed.
Created attachment 307942 [details] [review] tests: Make source live to re-enable aggregator timeout tests The live mode is only enabled if one of the sources if live.
Created attachment 307943 [details] [review] tests: Add test for seeking live pipelines
Full branch is there: https://git.collabora.com/cgit/user/tester/gst-plugins-bad.git/log/?h=aggregator-queue-3
Finally merged, thanks for all of those who helped! commit 035e51b010a93ab85ffb812770a9d9d7d66afce9 Author: Olivier Crête <olivier.crete@collabora.com> Date: Thu Jul 2 19:34:43 2015 -0400 tests: Add test for seeking live pipelines https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 1cb49c429bf776ae0fee75cdba310422de201e7c Author: Olivier Crête <olivier.crete@collabora.com> Date: Thu Jul 2 19:19:33 2015 -0400 tests: Make source live to re-enable aggregator timeout tests The live mode is only enabled if one of the sources if live. https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit ce6206388cbbc47450d6c052b204d6d46142d890 Author: Olivier Crête <olivier.crete@collabora.com> Date: Thu Jul 2 18:37:28 2015 -0400 tests: Add audiointerleave test to show that queuing works This tests fails without the queuing patch because incoming buffers are not delivered before they are needed. https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 10ef6403b68e33a35ff9ca177b672802c0508193 Author: Olivier Crête <olivier.crete@collabora.com> Date: Thu Jul 2 18:33:43 2015 -0400 audiointerleave: Avoid caps processing if not yet negotiated https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 3f2bc1e4b2f893d4a93a108bd3738444620821f4 Author: Olivier Crête <olivier.crete@collabora.com> Date: Mon Mar 16 17:06:46 2015 -0400 audioaggregator: On timeout, resync pads with not enough data https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit 6efc106a67ce6ba2a8313150ef919c0be9234da6 Author: Olivier Crête <olivier.crete@collabora.com> Date: Fri Mar 6 19:50:08 2015 -0500 aggregator: Queue "latency" buffers at each sink pad. In the case where you have a source giving the GstAggregator smaller buffers than it uses, when it reaches a timeout, it will consume the first buffer, then try to read another buffer for the pad. If the previous element is not fast enough, it may get the next buffer even though it may be queued just before. To prevent that race, the easiest solution is to move the queue inside the GstAggregatorPad itself. It also means that there is no need for strange code cause by increasing the min latency without increasing the max latency proportionally. This also means queuing the synchronized events and possibly acting on them on the src task. https://bugzilla.gnome.org/show_bug.cgi?id=745768 commit e4a1db2287d7d84d2985a1c8139f6aee5699d13d Author: Olivier Crête <olivier.crete@collabora.com> Date: Fri Mar 6 21:32:04 2015 -0500 videoaggregator: Remove broken _clip vfunc It never does anything. https://bugzilla.gnome.org/show_bug.cgi?id=745768