]> code.delx.au - pulseaudio/commitdiff
port asyncq to make use of new fdsem object
authorLennart Poettering <lennart@poettering.net>
Thu, 26 Jul 2007 13:15:05 +0000 (13:15 +0000)
committerLennart Poettering <lennart@poettering.net>
Thu, 26 Jul 2007 13:15:05 +0000 (13:15 +0000)
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1538 fefdeb5f-60dc-0310-8127-8f9354f1896f

src/pulsecore/asyncq.c

index 025c695e533a116f24298d9a18da746b3d68d73d..2238124874ce7b49d52edc584c768d42f9d8c478 100644 (file)
 #include <pulse/xmalloc.h>
 
 #include "asyncq.h"
+#include "fdsem.h"
 
 #define ASYNCQ_SIZE 128
 
 /* For debugging purposes we can define _Y to put and extra thread
  * yield between each operation. */
 
+/* #define PROFILE */
+
 #ifdef PROFILE
 #define _Y pa_thread_yield()
 #else
@@ -52,10 +55,7 @@ struct pa_asyncq {
     unsigned size;
     unsigned read_idx;
     unsigned write_idx;
-    pa_atomic_t read_waiting; /* a bool */
-    pa_atomic_t write_waiting; /* a bool */
-    int read_fds[2], write_fds[2];
-    pa_atomic_t in_read_fifo, in_write_fifo;
+    pa_fdsem *read_fdsem, *write_fdsem;
 };
 
 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
@@ -79,26 +79,18 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
     l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
 
     l->size = size;
-    pa_atomic_store(&l->read_waiting, 0);
-    pa_atomic_store(&l->write_waiting, 0);
-    pa_atomic_store(&l->in_read_fifo, 0);
-    pa_atomic_store(&l->in_write_fifo, 0);
 
-    if (pipe(l->read_fds) < 0) {
+    if (!(l->read_fdsem = pa_fdsem_new())) {
         pa_xfree(l);
         return NULL;
     }
 
-    if (pipe(l->write_fds) < 0) {
-        pa_close(l->read_fds[0]);
-        pa_close(l->read_fds[1]);
+    if (!(l->write_fdsem = pa_fdsem_new())) {
+        pa_fdsem_free(l->read_fdsem);
         pa_xfree(l);
         return NULL;
     }
 
-    pa_make_nonblock_fd(l->read_fds[1]);
-    pa_make_nonblock_fd(l->write_fds[1]);
-
     return l;
 }
 
@@ -112,11 +104,8 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
             free_cb(p);
     }
 
-    pa_close(l->read_fds[0]);
-    pa_close(l->read_fds[1]);
-    pa_close(l->write_fds[0]);
-    pa_close(l->write_fds[1]);
-
+    pa_fdsem_free(l->read_fdsem);
+    pa_fdsem_free(l->write_fdsem);
     pa_xfree(l);
 }
 
@@ -134,80 +123,20 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
 
     if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
 
-        /* Let's empty the FIFO from old notifications, before we return */
-            
-        while (pa_atomic_load(&l->in_write_fifo) > 0) {
-            ssize_t r;
-            int x[20];
-
-            if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) {
-
-                if (errno == EINTR)
-                    continue;
-                
-                return -1;
-            }
-
-            pa_assert(r > 0);
-                
-            if (pa_atomic_sub(&l->in_write_fifo, r) <= r)
-                break;
-
-        }
-
-        /* Now let's make sure that we didn't lose any events */
-        if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
-
-            if (!wait)
-                return -1;
-
-            /* Let's wait for changes. */
-
-            _Y;
-
-            pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 0, 1));
-
-            for (;;) {
-                char x[20];
-                ssize_t r;
-                
-                _Y;
-                
-                if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p))
-                    break;
-                
-                _Y;
-
-                if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) {
-
-                    if (errno == EINTR)
-                        continue;
-                    
-                    pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0));
-                    return -1;
-                }
-
-                pa_assert(r > 0);
-                pa_atomic_sub(&l->in_write_fifo, r);
-            }
-            
-            _Y;
-            
-            pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0));
-        }
+        if (!wait)
+            return -1;
+
+/*         pa_log("sleeping on push"); */
+        
+        do {
+            pa_fdsem_wait(l->read_fdsem);
+        } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
     }
 
     _Y;
     l->write_idx++;
 
-    if (pa_atomic_load(&l->read_waiting) > 0) {
-        char x = 'x';
-        _Y;
-        if (write(l->read_fds[1], &x, sizeof(x)) > 0) {
-            pa_atomic_inc(&l->in_read_fifo);
-/*             pa_log("increasing %p by 1", l); */
-        }
-    }
+    pa_fdsem_post(l->write_fdsem);
 
     return 0;
 }
@@ -226,95 +155,33 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
 
     if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
 
-/*         pa_log("pop failed wait=%i", wait); */
-
-        /* Hmm, nothing, here, so let's drop all queued events. */
-        while (pa_atomic_load(&l->in_read_fifo) > 0) {
-            ssize_t r;
-            int x[20];
-            
-            if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) {
-                
-                if (errno == EINTR)
-                    continue;
-                
-                return NULL;
-            }
-
-            pa_assert(r > 0);
-
-/*             pa_log("decreasing %p by %i", l, r); */
-            
-            if (pa_atomic_sub(&l->in_read_fifo, r) <= r)
-                break;
-        }
-
-        /* Now let's make sure that we didn't lose any events */
-        if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
-
-            if (!wait)
-                return NULL;
-
-            /* Let's wait for changes. */
-            
-            _Y;
-            
-            pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1));
-            
-            for (;;) {
-                char x[20];
-                ssize_t r;
-                
-                _Y;
-                
-                if ((ret = pa_atomic_ptr_load(&cells[idx])))
-                    break;
-                
-                _Y;
-                
-                if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) {
-
-                    if (errno == EINTR)
-                        continue;
-                    
-                    pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));
-                    return NULL;
-                }
-
-/*                 pa_log("decreasing %p by %i", l, r); */
-                
-                pa_assert(r > 0);
-                pa_atomic_sub(&l->in_read_fifo, r);
-            }
-
-            _Y;
-
-            pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));
-        }
+        if (!wait)
+            return NULL;
+
+/*         pa_log("sleeping on pop"); */
+        
+        do {
+            pa_fdsem_wait(l->write_fdsem);
+        } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
     }
 
     pa_assert(ret);
 
-    /* Guaranteed if we only have a single reader */
+    /* Guaranteed to succeed if we only have a single reader */
     pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
 
     _Y;
     l->read_idx++;
 
-    if (pa_atomic_load(&l->write_waiting) > 0) {
-        char x = 'x';
-        _Y;
-        if (write(l->write_fds[1], &x, sizeof(x)) >= 0)
-            pa_atomic_inc(&l->in_write_fifo);
-    }
-
+    pa_fdsem_post(l->read_fdsem);
+    
     return ret;
 }
 
 int pa_asyncq_get_fd(pa_asyncq *q) {
     pa_assert(q);
 
-    return q->read_fds[0];
+    return pa_fdsem_get(q->write_fdsem);
 }
 
 int pa_asyncq_before_poll(pa_asyncq *l) {
@@ -328,14 +195,12 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
     _Y;
     idx = reduce(l, l->read_idx);
 
-    if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0)
-        return -1;
-
-    pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1));
+    for (;;) {
+        if (pa_atomic_ptr_load(&cells[idx]))
+            return -1;
 
-    if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0) {
-        pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));
-        return -1;
+        if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
+            return 0;
     }
 
     return 0;
@@ -344,5 +209,5 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
 void pa_asyncq_after_poll(pa_asyncq *l) {
     pa_assert(l);
 
-    pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0));
+    pa_fdsem_after_poll(l->write_fdsem);
 }