]> code.delx.au - pulseaudio/blobdiff - src/modules/rtp/rtp.c
port module-rtp-send.c to lock-free core
[pulseaudio] / src / modules / rtp / rtp.c
index 31dec653f38cecdce15be322b6f1c43a17bda540..e2496c7d2700a4ca3d2dbb0fe22e64fc418cf20a 100644 (file)
@@ -25,7 +25,6 @@
 #include <config.h>
 #endif
 
-#include <assert.h>
 #include <fcntl.h>
 #include <stdlib.h>
 #include <string.h>
 
 #include <pulsecore/core-error.h>
 #include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/core-util.h>
 
 #include "rtp.h"
 
 pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssrc, uint8_t payload, size_t frame_size) {
-    assert(c);
-    assert(fd >= 0);
+    pa_assert(c);
+    pa_assert(fd >= 0);
 
     c->fd = fd;
     c->sequence = (uint16_t) (rand()*rand());
@@ -63,11 +64,11 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
     struct iovec iov[MAX_IOVECS];
     pa_memblock* mb[MAX_IOVECS];
     int iov_idx = 1;
-    size_t n = 0, skip = 0;
+    size_t n = 0;
 
-    assert(c);
-    assert(size > 0);
-    assert(q);
+    pa_assert(c);
+    pa_assert(size > 0);
+    pa_assert(q);
 
     if (pa_memblockq_get_length(q) < size)
         return 0;
@@ -76,24 +77,26 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
         int r;
         pa_memchunk chunk;
 
+        pa_memchunk_reset(&chunk);
+        
         if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
 
             size_t k = n + chunk.length > size ? size - n : chunk.length;
 
-            if (chunk.memblock) {
-                iov[iov_idx].iov_base = (void*)((uint8_t*) pa_memblock_acquire(chunk.memblock) + chunk.index);
-                iov[iov_idx].iov_len = k;
-                mb[iov_idx] = chunk.memblock;
-                iov_idx ++;
-
-                n += k;
-            }
-
-            skip += k;
+            pa_assert(chunk.memblock);
+            
+            iov[iov_idx].iov_base = ((uint8_t*) pa_memblock_acquire(chunk.memblock) + chunk.index);
+            iov[iov_idx].iov_len = k;
+            mb[iov_idx] = chunk.memblock;
+            iov_idx ++;
+            
+            n += k;
             pa_memblockq_drop(q, k);
         }
 
-        if (r < 0 || !chunk.memblock || n >= size || iov_idx >= MAX_IOVECS) {
+        pa_assert(n % c->frame_size == 0);
+        
+        if (r < 0 || n >= size || iov_idx >= MAX_IOVECS) {
             uint32_t header[3];
             struct msghdr m;
             int k, i;
@@ -125,10 +128,10 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
             } else
                 k = 0;
 
-            c->timestamp += skip/c->frame_size;
+            c->timestamp += n/c->frame_size;
 
             if (k < 0) {
-                if (errno != EAGAIN) /* If the queue is full, just ignore it */
+                if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */
                     pa_log("sendmsg() failed: %s", pa_cstrerror(errno));
                 return -1;
             }
@@ -137,7 +140,6 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
                 break;
 
             n = 0;
-            skip = 0;
             iov_idx = 1;
         }
     }
@@ -146,7 +148,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
 }
 
 pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size) {
-    assert(c);
+    pa_assert(c);
 
     c->fd = fd;
     c->frame_size = frame_size;
@@ -161,13 +163,13 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     int cc;
     ssize_t r;
 
-    assert(c);
-    assert(chunk);
+    pa_assert(c);
+    pa_assert(chunk);
 
     chunk->memblock = NULL;
 
     if (ioctl(c->fd, FIONREAD, &size) < 0) {
-        pa_log("FIONREAD failed: %s", pa_cstrerror(errno));
+        pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
         goto fail;
     }
 
@@ -188,12 +190,14 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     m.msg_flags = 0;
 
     if ((r = recvmsg(c->fd, &m, 0)) != size) {
-        pa_log("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch");
+        if (r < 0 && errno != EAGAIN && errno != EINTR)
+            pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch");
+        
         goto fail;
     }
 
     if (size < 12) {
-        pa_log("RTP packet too short.");
+        pa_log_warn("RTP packet too short.");
         goto fail;
     }
 
@@ -206,17 +210,17 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     c->ssrc = ntohl(c->ssrc);
 
     if ((header >> 30) != 2) {
-        pa_log("Unsupported RTP version.");
+        pa_log_warn("Unsupported RTP version.");
         goto fail;
     }
 
     if ((header >> 29) & 1) {
-        pa_log("RTP padding not supported.");
+        pa_log_warn("RTP padding not supported.");
         goto fail;
     }
 
     if ((header >> 28) & 1) {
-        pa_log("RTP header extensions not supported.");
+        pa_log_warn("RTP header extensions not supported.");
         goto fail;
     }
 
@@ -225,7 +229,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     c->sequence = header & 0xFFFF;
 
     if (12 + cc*4 > size) {
-        pa_log("RTP packet too short. (CSRC)");
+        pa_log_warn("RTP packet too short. (CSRC)");
         goto fail;
     }
 
@@ -233,7 +237,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     chunk->length = size - chunk->index;
 
     if (chunk->length % c->frame_size != 0) {
-        pa_log("Vad RTP packet size.");
+        pa_log_warn("Bad RTP packet size.");
         goto fail;
     }
 
@@ -249,7 +253,7 @@ fail:
 }
 
 uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
-    assert(ss);
+    pa_assert(ss);
 
     if (ss->format == PA_SAMPLE_ULAW && ss->rate == 8000 && ss->channels == 1)
         return 0;
@@ -264,7 +268,7 @@ uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
 }
 
 pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) {
-    assert(ss);
+    pa_assert(ss);
 
     switch (payload) {
         case 0:
@@ -299,17 +303,17 @@ pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec
 }
 
 pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) {
-    assert(ss);
+    pa_assert(ss);
 
     if (!pa_rtp_sample_spec_valid(ss))
         ss->format = PA_SAMPLE_S16BE;
 
-    assert(pa_rtp_sample_spec_valid(ss));
+    pa_assert(pa_rtp_sample_spec_valid(ss));
     return ss;
 }
 
 int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
-    assert(ss);
+    pa_assert(ss);
 
     if (!pa_sample_spec_valid(ss))
         return 0;
@@ -322,9 +326,9 @@ int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
 }
 
 void pa_rtp_context_destroy(pa_rtp_context *c) {
-    assert(c);
+    pa_assert(c);
 
-    close(c->fd);
+    pa_close(c->fd);
 }
 
 const char* pa_rtp_format_to_string(pa_sample_format_t f) {
@@ -343,7 +347,7 @@ const char* pa_rtp_format_to_string(pa_sample_format_t f) {
 }
 
 pa_sample_format_t pa_rtp_string_to_format(const char *s) {
-    assert(s);
+    pa_assert(s);
 
     if (!(strcmp(s, "L16")))
         return PA_SAMPLE_S16BE;