GNOME Bugzilla – Bug 785684
rtspconnection: Add API for sending multiple messages at once, and for having the message body consist of multiple chunks of memory
Last modified: 2018-11-04 09:04:50 UTC
An API to write messages to the connection without blocking. This is need for the support for buffer lists in gst-rtsp-server in the tcp case.MSG_DONTWAIT flag is send to sendmsg () so that nonblocking write of messages are possible.
This is need for fixing https://bugzilla.gnome.org/show_bug.cgi?id=771525
Created attachment 356721 [details] [review] api to write messages to rtsp connection without blocking
Review of attachment 356721 [details] [review]: ::: gst-libs/gst/rtsp/gstrtspconnection.c @@ +1478,3 @@ + num_written = + g_socket_send_message (socket, NULL, &vecs[i], send_len, NULL, 0, + MSG_DONTWAIT, NULL, &err); MSG_DONTWAIT doesn'T exist on Windows. You can't do it like that. I think you want: https://developer.gnome.org/gio/stable/GDatagramBased.html#g-datagram-based-send-messages
Thank you for the review. Will update it soon.
Created attachment 358816 [details] [review] Api to send messages to rtspconnection without blocking As MSG_DONTWAIT doesn't exist on Windows and g_datagram_based_send_messages cannot be used as the socket for TCP is not datagram type. changed the Api so that we set the socket to non blocking before sending and set it back again to blocking after sending.
Created attachment 358934 [details] [review] Api to send messages to rtspconnection without blocking Attached the wrong patch file before
Hi Can you please review the code. Thanks & Regards, Anila
Created attachment 360351 [details] [review] Api to send messages to rtspconnection without blocking Is this better than setting the socket to blocking before sending?
Created attachment 360948 [details] [review] Api to write messages to rtspconnection without block New patch that fixes the use of eagain_err and only unblocks for the send call. This patch it based on the same as the previous patch was based on.
As I already told Anila, I don't understand this whole nonblocking business here. To me it seems like there are two separate concerns: a) something about non-blocking, not sure what that is ultimately supposed to fix, b) write bufferlists/vectors more efficiently. The while loop in this patch looks to me like it does the exact same thing as a blocking send_messages call would do.
Hi Tim, If we use the blocking send then we block in the send systemcall till the IPStack timeout is reached (which is around 120secs), thats the main reason we where trying to use non blocking send(for which we used the MSG_DONTWAIT flag in the beginning) because the thread comes out of the send systemcall as soon as we receive EWOULDBLOCK or EAGAIN. Thanks & regards, Anila
Is this based on observation? Are we talking about a send call inside gio? As far as I'm aware, gio always does all networking stuff in async mode and fakes blocking I/O so that it's cancellable at all times.
It is true that gio is involved on master, where g_output_stream_write is called for example. But here in gst_rtsp_connection_write_vectors the operations are done on the socket, and ultimately the system call sendmsg is called. The man page explains that the send() call can block if the message does not fit into the buffer.
Again: is this based on you actually observing it blocking? Or is this based on how you think it will behave? The man page says here. > When the message does not fit into the send buffer of > the socket, send() normally blocks, unless the socket has > been placed in nonblocking I/O mode. In nonblocking mode it > would fail with the error EAGAIN or EWOULDBLOCK in this case. My understanding is that gio always puts sockets into non-blocking mode, and that gst_socket_set_blocking() only affects whether gio functions will behave blockingly themselves (whilst always cancellable) or will return EWOULDBLOCK in case they would block.
Created attachment 361405 [details] [review] Test to show difference between block and non-block Hi, Sorry for the late response. I now have had time to test a bit. I was not sure at first if the blocking issue was only based on assumption or not. However I found in the gio code, like you said, that it was setting the socket to non-blocking with g_unix_set_fd_nonblocking in g_socket_constructed. So I was under the assumption that it was non-blocking during the time I was testing. In the test case I am writing to a socket until it gets full. Then it hangs in g_socket_condition_timed_wait (in block_on_timeout, called from g_socket_send_message). However, when I then explicitly set the socket to non-blocking before calling send_message, the call does not hang anymore but instead return an EWOULDBLOCK with a message "Error sending message: Resource temporarily unavailable". So this shows that the sockets created from GSocketConnection in the test cases are not put into non-blocking mode? PS. sorry for the long buffer in the test case.
I don't really understand the gsocket code just before it calls block_on_timeout though (https://git.gnome.org/browse/glib/tree/gio/gsocket.c#n4349). timeout is != 0 if it is blocking (socket->priv->blocking ? -1 : 0). So it checks if it is in blocking mode but at the same time it checks if errno is EWOULDBLOCK or EAGAIN. I thought those errors only appeared in non-blocking mode. But perhaps not, so it gets EWOULDBLOCK from sendmsg() and because socket->priv->blocking is TRUE it calls block_on_timeout, and that's where the test hangs unless the socket is explicitly set to non-blocking before the call.
(In reply to Tim-Philipp Müller from comment #10) > As I already told Anila, I don't understand this whole nonblocking business > here. > > To me it seems like there are two separate concerns: a) something about > non-blocking, not sure what that is ultimately supposed to fix, b) write > bufferlists/vectors more efficiently. > > The while loop in this patch looks to me like it does the exact same thing > as a blocking send_messages call would do. You are right, the actual socket is in non-blocking mode, and g_socket_set_blocking() decides whether gio will behave blocking or not. So there is no need to call g_socket_set_blocking(). Regarding the while loop, g_socket_send_message() behaves in a similar way. If sendmsg() returns an error it will try again. However, if sendmsg() manages to write a part of the data then g_socket_send_message() will not try to write the rest. It will simply return the number of bytes written. Thus the handling of partial writes in the while loop. Unfortunately I discovered that while handling partial writes the new function changes the GOutputVectors that it gets as in-parameter. This may lead to weird behavior if the caller e.g. needs to de-allocate the data chunks. One possible solution for this is to modify the function to reset vecs[i]->buffer and vecs[i]->size after it has handled a partial write. But I think that is kind of ugly, because it means that although the caller still has the ownership of the data, it is not allowed to use it while the call to the function is ongoing. Also the code in the while loop becomes slightly more complicated. Another is to say that the function takes ownership of the GOutputVector array, and thus can do whatever it wants with it. The GOutputVecor array that is, not the actual data it points to. I think this is a clearer approach. Which approach do you prefer, if any?
> Regarding the while loop, g_socket_send_message() behaves in a similar way. > If sendmsg() returns an error it will try again. However, if sendmsg() > manages to write a part of the data then g_socket_send_message() will not > try to write the rest. It will simply return the number of bytes written. > Thus the handling of partial writes in the while loop. This is when the socket is in non-blocking mode, correct? But when you set it to blocking mode, surely it will retry and loop until it has sent out all messages? (Or there's an error) > Unfortunately I discovered that while handling partial writes the new > function changes the GOutputVectors that it gets as in-parameter. This may > lead to weird behavior if the caller e.g. needs to de-allocate the data > chunks. Why is that? Not sure I understand the problem here. Could you explain? > One possible solution for this is to modify the function to reset > vecs[i]->buffer and vecs[i]->size after it has handled a partial write. But > I think that is kind of ugly, because it means that although the caller > still has the ownership of the data, it is not allowed to use it while the > call to the function is ongoing. Also the code in the while loop becomes > slightly more complicated. > > Another is to say that the function takes ownership of the GOutputVector > array, and thus can do whatever it wants with it. The GOutputVecor array > that is, not the actual data it points to. I think this is a clearer > approach. > > Which approach do you prefer, if any? Not sure, I don't think it should take ownership of a GOutputVector? I would expect that a GOutputVector array is often/usually allocated by the caller on the stack anyway? Or in a caller-owned scratch area that is reused for subsequent calls.
(In reply to Tim-Philipp Müller from comment #18) > > Regarding the while loop, g_socket_send_message() behaves in a similar way. > > If sendmsg() returns an error it will try again. However, if sendmsg() > > manages to write a part of the data then g_socket_send_message() will not > > try to write the rest. It will simply return the number of bytes written. > > Thus the handling of partial writes in the while loop. > > This is when the socket is in non-blocking mode, correct? > > But when you set it to blocking mode, surely it will retry and loop until it > has sent out all messages? (Or there's an error) Nope, unless I misunderstand the code, as soon sendmsg() returns a value >= 0 g_socket_send_message() will return that value. Which I found a bit surprising. I had expected it to retry until all data was written, or a fatal error occured. > > > Unfortunately I discovered that while handling partial writes the new > > function changes the GOutputVectors that it gets as in-parameter. This may > > lead to weird behavior if the caller e.g. needs to de-allocate the data > > chunks. > > Why is that? Not sure I understand the problem here. Could you explain? The code here is from the function. vecs is the GOutputVector array. ... /* recalculate to deal with partial writes */ while (num_written > 0) { if (num_written < vecs[i].size) { vecs[i].buffer = (gchar *) vecs[i].buffer + num_written; vecs[i].size -= num_written; num_written = 0; } else { ... If sendmsg() failed to write all data of an GOutputVector we will end up in the if-statement. There the buffer-pointer and the size of that GOutputVector is modified, meaning that vecs[i].buffer no longer points to the same address that it pointed to when the call was made. In my unit test the GOutputVector I pass to gst_rtsp_connection_write_vectors() points to malloc():ed data. After returning from the call the test deallocates the data, which gives funny errors when the pointers no longer points to the beginning of the malloc():ed data.
Sorry, I was thinking of send_messages() earlier (plural). My contention was that the entire while() loop, so most of the function, can be replaced with _send_messages() on a blocking socket instead. I am not sure if the code you quote makes sense. I think a single message will always be fully written or not. It doesn't make sense to send a partial message/packet (and there's an error code for when it's too big). And makes even less sense to re-send parts of a message/packet, and there's no mechanism to say that this is the continuation from the previous one. So I don't think we need to worry about this.
(In reply to Tim-Philipp Müller from comment #20) > Sorry, I was thinking of send_messages() earlier (plural). My contention was > that the entire while() loop, so most of the function, can be replaced with > _send_messages() on a blocking socket instead. > > I am not sure if the code you quote makes sense. I think a single message > will always be fully written or not. It doesn't make sense to send a partial > message/packet (and there's an error code for when it's too big). And makes > even less sense to re-send parts of a message/packet, and there's no > mechanism to say that this is the continuation from the previous one. > > So I don't think we need to worry about this. Well, g_socket_send_messages() uses sendmmsg(), which behaves that way for datagram sockets. So for UDP it behaves that way. For stream sockets it does not behave that way. In fact up until recently you could even get data from the different messages mixed. This "recent" patch in the kernel remedies that particular flaw: https://patchwork.kernel.org/patch/9616969. But even after that patch partial writes may occur.
This approach does not look too promising, to go via sendmmsg(). It will also break with TLS connections that are going over TLS or any other transport that is between the socket and the GOutputStream. What I'm looking into now is to add some kind of "writev()" support to GOutputStream inside GLib, and make use of that throughout the stack. writev() works for other stream-like fds like files, so it probably also works well for TCP streams. Unlike sendmmsg() which is more for datagram oriented sockets.
See https://gitlab.gnome.org/GNOME/glib/issues/1431 for the GLib part, and I'll add patches here in a bit that at least carry over all data as chunks from gst-rtsp-server until the g_output_stream_write() calls inside GstRTSPConnection. That can then later be replaced by something else, and would already remove the need of copying all memories of a buffer into a single memory.
Comment on attachment 360948 [details] [review] Api to write messages to rtspconnection without block See https://bugzilla.gnome.org/show_bug.cgi?id=771525#c19 also. And the comment above here, this will break for TLS or other transports that don't go directly through a socket. I have a plan
Created attachment 373371 [details] [review] rtsp: Add support for storing GstBuffers directly as body payload of messages This makes it unnecessary for callers to first merge together all memories. They are now written out one-by-one instead. At a later time this can make use of a g_output_stream_writev() and could also be extended with new API for sending multiple messages at once.
This now implements part of this and should already improve performance quite a bit for TCP-interleaved mode. As a next step it will be required to add the writev() API to GLib, make use of that in GstRTSPConnection and add support for sending multiple messages at once (so that the buffer list is not split inside rtsp-client anymore).
Created attachment 373375 [details] [review] rtsp: Add support for storing GstBuffers directly as body payload of messages This makes it unnecessary for callers to first merge together all memories. They are now written out one-by-one instead. At a later time this can make use of a g_output_stream_writev() and could also be extended with new API for sending multiple messages at once.
Created attachment 373681 [details] [review] rtsp: Add support for storing GstBuffers directly as body payload of messages This makes it unnecessary for callers to first merge together all memories. They are now written out one-by-one instead. At a later time this can make use of a g_output_stream_writev() and could also be extended with new API for sending multiple messages at once.
Created attachment 373682 [details] [review] rtsp-connection: Add support for new g_output_stream_writev() API Depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333
Comment on attachment 373681 [details] [review] rtsp: Add support for storing GstBuffers directly as body payload of messages This patch is a bit unwieldy, maybe you could split out the "add new API to GstRTSPMessage" bit from the "make use of it in RtspConnection" bit? >Subject: [PATCH] rtsp: Add support for storing GstBuffers directly as body > payload of messages > >This makes it unnecessary for callers to first merge together all >memories. They are now written out one-by-one instead. > >At a later time this can make use of a g_output_stream_writev() and >could also be extended with new API for sending multiple messages at >once. Might be worth explaining the background / scenario / reason you want to do this, i.e. in what cases will this be helpful later (tcp interleaved?) or is it a general optimisation? Looking first at the RtspMessage part: - new API looks fine - The ABI compat union in the header is not really needed here if it's just a pointer, is it? Not that it matters much, but it does make the code a bit uglier, would be nicer not to do that :) >--- a/gst-libs/gst/rtsp/gstrtspmessage.c >+++ b/gst-libs/gst/rtsp/gstrtspmessage.c >@@ -985,6 +990,13 @@ gst_rtsp_message_get_body (const GstRTSPMessage * msg, guint8 ** data, > g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); > g_return_val_if_fail (size != NULL, GST_RTSP_EINVAL); > >+ if (msg->ABI.abi.body_buffer) { >+ guint8 *data = g_malloc (gst_buffer_get_size (msg->ABI.abi.body_buffer)); >+ gst_buffer_extract (msg->ABI.abi.body_buffer, 0, data, gst_buffer_get_size (msg->ABI.abi.body_buffer)); >+ gst_buffer_replace (&((GstRTSPMessage *)msg)->ABI.abi.body_buffer, NULL); >+ ((GstRTSPMessage *)msg)->body = data; >+ } Can we use gst_buffer_extract_dup() here? Should we add a performance logging/warning or is it expected? >@@ -1009,6 +1021,13 @@ gst_rtsp_message_steal_body (GstRTSPMessage * msg, guint8 ** data, guint * size) > g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); > g_return_val_if_fail (size != NULL, GST_RTSP_EINVAL); > >+ if (msg->ABI.abi.body_buffer) { >+ guint8 *data = g_malloc (gst_buffer_get_size (msg->ABI.abi.body_buffer)); >+ gst_buffer_extract (msg->ABI.abi.body_buffer, 0, data, gst_buffer_get_size (msg->ABI.abi.body_buffer)); >+ gst_buffer_replace (&msg->ABI.abi.body_buffer, NULL); >+ msg->body = data; >+ } Can we use gst_buffer_extract_dup() here? >+/** >+ * gst_rtsp_message_set_body_buffer: >+ * @msg: a #GstRTSPMessage >+ * @buffer: a #GstBuffer >+ * >+ * Set the body of @msg to @buffer. >+ * >+ * Returns: #GST_RTSP_OK. >+ */ Please add a "Since: 1.16" marker here (also to the other functions). >+GstRTSPResult >+gst_rtsp_message_get_body_buffer (const GstRTSPMessage * msg, GstBuffer ** buffer) >+{ >+ g_return_val_if_fail (msg != NULL, GST_RTSP_EINVAL); >+ g_return_val_if_fail (buffer != NULL, GST_RTSP_EINVAL); >+ >+ *buffer = msg->ABI.abi.body_buffer; >+ >+ return GST_RTSP_OK; >+} Is it an error to call this if only a normal data body was set but not a buffer? Or is it expected that in this case NULL will be returned to indicate that a buffer was not set and perhaps the caller should try the old method? Or should we create a fallback buffer? Should document what's expected/allowed in the docs. >@@ -1045,6 +1154,7 @@ gst_rtsp_message_dump (GstRTSPMessage * msg) > { > guint8 *data; > guint size; >+ GstBuffer *body_buffer; I would suggest initialising this to NULL since we don't check return values below and that will make coverity and such unhappy. RtspConnection in separate comment.
Comment on attachment 373682 [details] [review] rtsp-connection: Add support for new g_output_stream_writev() API Don't know if this can really easily be reviewed... So instead you get some nitpicks. This >+ /* All other data is borrowed */ >+ g_free (serialized_messages[i].data); is duplicated a million times everywhere, maybe that should be moved into an gst_rtsp_serialized_message_clear() inline function? >+/** >+ * gst_rtsp_watch_send_messages: >+ * @watch: a #GstRTSPWatch >+ * @messages: (array length=n_messages): the messages to send >+ * @n_messages: the number of messages to send >+ * @id: (out) (allow-none): location for a message ID or %NULL >+ * >+ * Send a @messages using the connection of the @watch. If it cannot be sent >+ * immediately, it will be queued for transmission in @watch. The contents of >+ * @messages will then be serialized and transmitted when the connection of the >+ * @watch becomes writable. In case the @messages is queued, the ID returned in >+ * @id will be non-zero and used as the ID argument in the message_sent >+ * callback once the last message is sent. >+ * >+ * Returns: #GST_RTSP_OK on success. >+ */ This sounds like it was copy'n'pasted, it talks about messages as if it's a single message, should be updated to describe the expected behaviour now that it's multiple ones, e.g. will the callback be called for each message with the id or only for the last one?
Created attachment 373904 [details] [review] rtsp-message: Add support for storing GstBuffers directly as body payload of messages This makes it unnecessary for callers to first merge together all memories, and it allows API like GstRTSPConnection to write them out without first copying all memories together or using writev()-style API to write multiple memories out in one go.
Created attachment 373905 [details] [review] rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffer and add API for writing multiple messages By doing so we can send a whole GstBufferList and each memory in the contained buffers without copying into a single memory area and with a single writev() call. This improves performance considerably for high-packet-rate streams. This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333 to be efficient, otherwise each chunk of memory is a separate write() call.
Created attachment 373906 [details] [review] rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffer and add API for writing multiple messages By doing so we can send a whole GstBufferList and each memory in the contained buffers without copying into a single memory area and with a single writev() call. This improves performance considerably for high-packet-rate streams. This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333 to be efficient, otherwise each chunk of memory is a separate write() call.
This should cover all the review comments now, and I've also fixed some bugs I noticed.
Created attachment 373938 [details] [review] rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffer and add API for writing multiple messages By doing so we can send a whole GstBufferList and each memory in the contained buffers without copying into a single memory area and with a single writev() call. This improves performance considerably for high-packet-rate streams. This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333 to be efficient, otherwise each chunk of memory is a separate write() call.
Created attachment 373940 [details] [review] rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffer and add API for writing multiple messages By doing so we can send a whole GstBufferList and each memory in the contained buffers without copying into a single memory area and with a single writev() call. This improves performance considerably for high-packet-rate streams. This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333 to be efficient, otherwise each chunk of memory is a separate write() call.
Created attachment 374029 [details] [review] rtspconnection: Fixes for corrupt RTP packets in dispatch_write() A few small adjustments to the patches solving an issue with corrupt RTP packet when incomplete data is written from dispatch_write()
Created attachment 374030 [details] [review] Keep a reference to the watch from dispatch_write() This patch makes sure dispatch_write() will not use the watch after it has been freed.
Review of attachment 374029 [details] [review]: Thanks! Please also describe in the commit message what exactly you're fixing :) ::: gst-libs/gst/rtsp/gstrtspconnection.c @@ +3881,3 @@ * before */ + for (int m = 0; m < k; m++) { + gst_memory_unmap (map_infos[m].memory, &map_infos[m]); This is C99, please declare the variable elsewhere :) @@ +3905,2 @@ if (bytes_written >= 0) { + if (bytes_written >= (msg->data_size - msg->data_offset)) { The parenthesis are not needed here @@ +3909,3 @@ /* all data of this message is sent, check body and otherwise * skip the whole message for next time */ + bytes_written -= (msg->data_size - msg->data_offset); and here @@ +3920,3 @@ } + if ((bytes_written + msg->body_offset) >= body_size) { and here
Review of attachment 374030 [details] [review]: ::: gst-libs/gst/rtsp/gstrtspconnection.c @@ +4446,3 @@ g_source_set_callback (watch->writesrc, + (GSourceFunc) gst_rtsp_source_dispatch_write, g_source_ref (watch), + g_source_unref); I think this creates a circular reference: The watch has a reference to the writesrc, the writesrc now has a reference to the watch. Better to use a GWeakRef here, or otherwise break the reference cycle.
Created attachment 374035 [details] [review] rtspconnection: Fixes for corrupt RTP packets in dispatch_write() fixed review comments
Comment on attachment 374035 [details] [review] rtspconnection: Fixes for corrupt RTP packets in dispatch_write() Looks good, thanks! What should we do about the reference problem?
(In reply to Sebastian Dröge (slomo) from comment #43) > Comment on attachment 374035 [details] [review] [review] > rtspconnection: Fixes for corrupt RTP packets in dispatch_write() > > Looks good, thanks! > > What should we do about the reference problem? I am going to fix it, but have not had chance to work on it yet
-- GitLab Migration Automatic Message -- This bug has been migrated to freedesktop.org's GitLab instance and has been closed from further activity. You can subscribe and participate further through the new bug through this link to our GitLab instance: https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/370.
The patches are now in https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/2