-/* $Id$ */
-
/***
This file is part of PulseAudio.
#include <unistd.h>
#include <errno.h>
-#include <pulsecore/atomic.h>
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/macro.h>
#include <pulsecore/log.h>
-#include <pulsecore/thread.h>
#include <pulsecore/semaphore.h>
#include <pulsecore/macro.h>
-#include <pulsecore/core-util.h>
+#include <pulsecore/mutex.h>
#include <pulsecore/flist.h>
-#include <pulse/xmalloc.h>
#include "asyncmsgq.h"
PA_REFCNT_INIT(a);
pa_assert_se(a->asyncq = pa_asyncq_new(size));
- pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
+ pa_assert_se(a->mutex = pa_mutex_new(false, true));
a->current = NULL;
return a;
struct asyncmsgq_item *i;
pa_assert(a);
- while ((i = pa_asyncq_pop(a->asyncq, 0))) {
+ while ((i = pa_asyncq_pop(a->asyncq, false))) {
pa_assert(!i->semaphore);
/* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
pa_mutex_lock(a->mutex);
- pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
+ pa_asyncq_post(a->asyncq, i);
pa_mutex_unlock(a->mutex);
}
pa_assert_se(i.semaphore);
- /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
+ /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
pa_mutex_lock(a->mutex);
- pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
+ pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
pa_mutex_unlock(a->mutex);
pa_semaphore_wait(i.semaphore);
return i.ret;
}
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
pa_assert(!a->current);
- if (!(a->current = pa_asyncq_pop(a->asyncq, wait))) {
+ if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
/* pa_log("failure"); */
return -1;
}
if (chunk)
*chunk = a->current->memchunk;
-/* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", (void*) a, (void*) a->current->object, a->current->object ? a->current->object->parent.type_name : NULL, a->current->code, (void*) a->current->userdata, (unsigned long) a->current->memchunk.length); */
+/* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
+/* (void*) a, */
+/* (void*) a->current->object, */
+/* a->current->object ? a->current->object->parent.type_name : NULL, */
+/* a->current->code, */
+/* (void*) a->current->userdata, */
+/* (unsigned long) a->current->memchunk.length); */
return 0;
}
pa_memchunk chunk;
int ret;
- if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
+ if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
return -1;
ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
pa_assert(PA_REFCNT_VALUE(a) > 0);
- if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
+ if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
return 0;
pa_asyncmsgq_ref(a);
return 1;
}
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ return pa_asyncq_read_fd(a->asyncq);
+}
+
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ return pa_asyncq_read_before_poll(a->asyncq);
+}
+
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ pa_asyncq_read_after_poll(a->asyncq);
+}
+
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- return pa_asyncq_get_fd(a->asyncq);
+ return pa_asyncq_write_fd(a->asyncq);
}
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- return pa_asyncq_before_poll(a->asyncq);
+ pa_asyncq_write_before_poll(a->asyncq);
}
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- pa_asyncq_after_poll(a->asyncq);
+ pa_asyncq_write_after_poll(a->asyncq);
}
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
if (object)
- return object->process_msg(object, code, userdata, offset, memchunk);
+ return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
return 0;
}
+
+void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ for (;;) {
+ pa_msgobject *object;
+ int code;
+ void *data;
+ int64_t offset;
+ pa_memchunk chunk;
+ int ret;
+
+ if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
+ return;
+
+ if (!run) {
+ pa_asyncmsgq_done(a, -1);
+ continue;
+ }
+
+ pa_asyncmsgq_ref(a);
+ ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+ pa_asyncmsgq_done(a, ret);
+ pa_asyncmsgq_unref(a);
+ }
+}
+
+bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ return !!a->current;
+}