]> code.delx.au - pulseaudio/blobdiff - src/pulsecore/asyncq.c
remap: Change remapping function argument type from void to int16_t / float as approp...
[pulseaudio] / src / pulsecore / asyncq.c
index 75b15c0e27705debf7be15e57ec694ed79cc1098..559b57744251737cb553bd3011ae30067b980e4d 100644 (file)
@@ -1,9 +1,7 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
-  Copyright 2006 Lennart Poettering
+  Copyright 2006-2008 Lennart Poettering
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as
 #include <unistd.h>
 #include <errno.h>
 
+#include <pulse/xmalloc.h>
+
 #include <pulsecore/atomic.h>
 #include <pulsecore/log.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/core-util.h>
-#include <pulse/xmalloc.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/flist.h>
+#include <pulsecore/fdsem.h>
 
 #include "asyncq.h"
-#include "fdsem.h"
 
-#define ASYNCQ_SIZE 128
+#define ASYNCQ_SIZE 256
 
-/* For debugging purposes we can define _Y to put and extra thread
+/* For debugging purposes we can define _Y to put an extra thread
  * yield between each operation. */
 
 /* #define PROFILE */
 #define _Y do { } while(0)
 #endif
 
+struct localq {
+    void *data;
+    PA_LLIST_FIELDS(struct localq);
+};
+
 struct pa_asyncq {
     unsigned size;
     unsigned read_idx;
     unsigned write_idx;
     pa_fdsem *read_fdsem, *write_fdsem;
+
+    PA_LLIST_HEAD(struct localq, localq);
+    struct localq *last_localq;
+    bool waiting_for_post;
 };
 
-#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
+PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
 
-static int is_power_of_two(unsigned size) {
-    return !(size & (size - 1));
-}
+#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
 
-static int reduce(pa_asyncq *l, int value) {
+static unsigned reduce(pa_asyncq *l, unsigned value) {
     return value & (unsigned) (l->size - 1);
 }
 
@@ -74,12 +82,16 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
     if (!size)
         size = ASYNCQ_SIZE;
 
-    pa_assert(is_power_of_two(size));
+    pa_assert(pa_is_power_of_two(size));
 
     l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
 
     l->size = size;
 
+    PA_LLIST_HEAD_INIT(struct localq, l->localq);
+    l->last_localq = NULL;
+    l->waiting_for_post = false;
+
     if (!(l->read_fdsem = pa_fdsem_new())) {
         pa_xfree(l);
         return NULL;
@@ -95,6 +107,7 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
 }
 
 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
+    struct localq *q;
     pa_assert(l);
 
     if (free_cb) {
@@ -104,13 +117,23 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
             free_cb(p);
     }
 
+    while ((q = l->localq)) {
+        if (free_cb)
+            free_cb(q->data);
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
+    }
+
     pa_fdsem_free(l->read_fdsem);
     pa_fdsem_free(l->write_fdsem);
     pa_xfree(l);
 }
 
-int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
-    int idx;
+static int push(pa_asyncq*l, void *p, bool wait_op) {
+    unsigned idx;
     pa_atomic_ptr_t *cells;
 
     pa_assert(l);
@@ -123,7 +146,7 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
 
     if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
 
-        if (!wait)
+        if (!wait_op)
             return -1;
 
 /*         pa_log("sleeping on push"); */
@@ -141,8 +164,66 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
     return 0;
 }
 
-void* pa_asyncq_pop(pa_asyncq*l, int wait) {
-    int idx;
+static bool flush_postq(pa_asyncq *l, bool wait_op) {
+    struct localq *q;
+
+    pa_assert(l);
+
+    while ((q = l->last_localq)) {
+
+        if (push(l, q->data, wait_op) < 0)
+            return false;
+
+        l->last_localq = q->prev;
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
+    }
+
+    return true;
+}
+
+int pa_asyncq_push(pa_asyncq*l, void *p, bool wait_op) {
+    pa_assert(l);
+
+    if (!flush_postq(l, wait_op))
+        return -1;
+
+    return push(l, p, wait_op);
+}
+
+void pa_asyncq_post(pa_asyncq*l, void *p) {
+    struct localq *q;
+
+    pa_assert(l);
+    pa_assert(p);
+
+    if (flush_postq(l, false))
+        if (pa_asyncq_push(l, p, false) >= 0)
+            return;
+
+    /* OK, we couldn't push anything in the queue. So let's queue it
+     * locally and push it later */
+
+    if (pa_log_ratelimit(PA_LOG_WARN))
+        pa_log_warn("q overrun, queuing locally");
+
+    if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
+        q = pa_xnew(struct localq, 1);
+
+    q->data = p;
+    PA_LLIST_PREPEND(struct localq, l->localq, q);
+
+    if (!l->last_localq)
+        l->last_localq = q;
+
+    return;
+}
+
+void* pa_asyncq_pop(pa_asyncq*l, bool wait_op) {
+    unsigned idx;
     void *ret;
     pa_atomic_ptr_t *cells;
 
@@ -155,7 +236,7 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
 
     if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
 
-        if (!wait)
+        if (!wait_op)
             return NULL;
 
 /*         pa_log("sleeping on pop"); */
@@ -178,14 +259,14 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
     return ret;
 }
 
-int pa_asyncq_get_fd(pa_asyncq *q) {
+int pa_asyncq_read_fd(pa_asyncq *q) {
     pa_assert(q);
 
     return pa_fdsem_get(q->write_fdsem);
 }
 
-int pa_asyncq_before_poll(pa_asyncq *l) {
-    int idx;
+int pa_asyncq_read_before_poll(pa_asyncq *l) {
+    unsigned idx;
     pa_atomic_ptr_t *cells;
 
     pa_assert(l);
@@ -202,12 +283,40 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
         if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
             return 0;
     }
-
-    return 0;
 }
 
-void pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_read_after_poll(pa_asyncq *l) {
     pa_assert(l);
 
     pa_fdsem_after_poll(l->write_fdsem);
 }
+
+int pa_asyncq_write_fd(pa_asyncq *q) {
+    pa_assert(q);
+
+    return pa_fdsem_get(q->read_fdsem);
+}
+
+void pa_asyncq_write_before_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    for (;;) {
+
+        if (flush_postq(l, false))
+            break;
+
+        if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
+            l->waiting_for_post = true;
+            break;
+        }
+    }
+}
+
+void pa_asyncq_write_after_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    if (l->waiting_for_post) {
+        pa_fdsem_after_poll(l->read_fdsem);
+        l->waiting_for_post = false;
+    }
+}