]>
code.delx.au - pulseaudio/blob - src/pulsecore/asyncmsgq.c
4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
40 #include "asyncmsgq.h"
42 PA_STATIC_FLIST_DECLARE(asyncmsgq
, 0, pa_xfree
);
43 PA_STATIC_FLIST_DECLARE(semaphores
, 0, (void(*)(void*)) pa_semaphore_free
);
45 struct asyncmsgq_item
{
52 pa_semaphore
*semaphore
;
59 pa_mutex
*mutex
; /* only for the writer side */
61 struct asyncmsgq_item
*current
;
64 pa_asyncmsgq
*pa_asyncmsgq_new(unsigned size
) {
67 a
= pa_xnew(pa_asyncmsgq
, 1);
70 pa_assert_se(a
->asyncq
= pa_asyncq_new(size
));
71 pa_assert_se(a
->mutex
= pa_mutex_new(FALSE
, TRUE
));
77 static void asyncmsgq_free(pa_asyncmsgq
*a
) {
78 struct asyncmsgq_item
*i
;
81 while ((i
= pa_asyncq_pop(a
->asyncq
, 0))) {
83 pa_assert(!i
->semaphore
);
86 pa_msgobject_unref(i
->object
);
88 if (i
->memchunk
.memblock
)
89 pa_memblock_unref(i
->memchunk
.memblock
);
92 i
->free_cb(i
->userdata
);
94 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq
), i
) < 0)
98 pa_asyncq_free(a
->asyncq
, NULL
);
99 pa_mutex_free(a
->mutex
);
103 pa_asyncmsgq
* pa_asyncmsgq_ref(pa_asyncmsgq
*q
) {
104 pa_assert(PA_REFCNT_VALUE(q
) > 0);
110 void pa_asyncmsgq_unref(pa_asyncmsgq
* q
) {
111 pa_assert(PA_REFCNT_VALUE(q
) > 0);
113 if (PA_REFCNT_DEC(q
) <= 0)
117 void pa_asyncmsgq_post(pa_asyncmsgq
*a
, pa_msgobject
*object
, int code
, const void *userdata
, int64_t offset
, const pa_memchunk
*chunk
, pa_free_cb_t free_cb
) {
118 struct asyncmsgq_item
*i
;
119 pa_assert(PA_REFCNT_VALUE(a
) > 0);
121 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq
))))
122 i
= pa_xnew(struct asyncmsgq_item
, 1);
125 i
->object
= object
? pa_msgobject_ref(object
) : NULL
;
126 i
->userdata
= (void*) userdata
;
127 i
->free_cb
= free_cb
;
130 pa_assert(chunk
->memblock
);
131 i
->memchunk
= *chunk
;
132 pa_memblock_ref(i
->memchunk
.memblock
);
134 pa_memchunk_reset(&i
->memchunk
);
137 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
138 pa_mutex_lock(a
->mutex
);
139 pa_asyncq_post(a
->asyncq
, i
);
140 pa_mutex_unlock(a
->mutex
);
143 int pa_asyncmsgq_send(pa_asyncmsgq
*a
, pa_msgobject
*object
, int code
, const void *userdata
, int64_t offset
, const pa_memchunk
*chunk
) {
144 struct asyncmsgq_item i
;
145 pa_assert(PA_REFCNT_VALUE(a
) > 0);
149 i
.userdata
= (void*) userdata
;
154 pa_assert(chunk
->memblock
);
157 pa_memchunk_reset(&i
.memchunk
);
159 if (!(i
.semaphore
= pa_flist_pop(PA_STATIC_FLIST_GET(semaphores
))))
160 i
.semaphore
= pa_semaphore_new(0);
162 pa_assert_se(i
.semaphore
);
164 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
165 pa_mutex_lock(a
->mutex
);
166 pa_assert_se(pa_asyncq_push(a
->asyncq
, &i
, TRUE
) == 0);
167 pa_mutex_unlock(a
->mutex
);
169 pa_semaphore_wait(i
.semaphore
);
171 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores
), i
.semaphore
) < 0)
172 pa_semaphore_free(i
.semaphore
);
177 int pa_asyncmsgq_get(pa_asyncmsgq
*a
, pa_msgobject
**object
, int *code
, void **userdata
, int64_t *offset
, pa_memchunk
*chunk
, pa_bool_t wait
) {
178 pa_assert(PA_REFCNT_VALUE(a
) > 0);
179 pa_assert(!a
->current
);
181 if (!(a
->current
= pa_asyncq_pop(a
->asyncq
, wait
))) {
182 /* pa_log("failure"); */
186 /* pa_log("success"); */
189 *code
= a
->current
->code
;
191 *userdata
= a
->current
->userdata
;
193 *offset
= a
->current
->offset
;
195 if ((*object
= a
->current
->object
))
196 pa_msgobject_assert_ref(*object
);
199 *chunk
= a
->current
->memchunk
;
201 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", (void*) a, (void*) a->current->object, a->current->object ? a->current->object->parent.type_name : NULL, a->current->code, (void*) a->current->userdata, (unsigned long) a->current->memchunk.length); */
206 void pa_asyncmsgq_done(pa_asyncmsgq
*a
, int ret
) {
207 pa_assert(PA_REFCNT_VALUE(a
) > 0);
209 pa_assert(a
->current
);
211 if (a
->current
->semaphore
) {
212 a
->current
->ret
= ret
;
213 pa_semaphore_post(a
->current
->semaphore
);
216 if (a
->current
->free_cb
)
217 a
->current
->free_cb(a
->current
->userdata
);
219 if (a
->current
->object
)
220 pa_msgobject_unref(a
->current
->object
);
222 if (a
->current
->memchunk
.memblock
)
223 pa_memblock_unref(a
->current
->memchunk
.memblock
);
225 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq
), a
->current
) < 0)
226 pa_xfree(a
->current
);
232 int pa_asyncmsgq_wait_for(pa_asyncmsgq
*a
, int code
) {
234 pa_assert(PA_REFCNT_VALUE(a
) > 0);
245 if (pa_asyncmsgq_get(a
, &o
, &c
, &data
, &offset
, &chunk
, 1) < 0)
248 ret
= pa_asyncmsgq_dispatch(o
, c
, data
, offset
, &chunk
);
249 pa_asyncmsgq_done(a
, ret
);
253 pa_asyncmsgq_unref(a
);
258 int pa_asyncmsgq_process_one(pa_asyncmsgq
*a
) {
259 pa_msgobject
*object
;
266 pa_assert(PA_REFCNT_VALUE(a
) > 0);
268 if (pa_asyncmsgq_get(a
, &object
, &code
, &data
, &offset
, &chunk
, 0) < 0)
272 ret
= pa_asyncmsgq_dispatch(object
, code
, data
, offset
, &chunk
);
273 pa_asyncmsgq_done(a
, ret
);
274 pa_asyncmsgq_unref(a
);
279 int pa_asyncmsgq_read_fd(pa_asyncmsgq
*a
) {
280 pa_assert(PA_REFCNT_VALUE(a
) > 0);
282 return pa_asyncq_read_fd(a
->asyncq
);
285 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq
*a
) {
286 pa_assert(PA_REFCNT_VALUE(a
) > 0);
288 return pa_asyncq_read_before_poll(a
->asyncq
);
291 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq
*a
) {
292 pa_assert(PA_REFCNT_VALUE(a
) > 0);
294 pa_asyncq_read_after_poll(a
->asyncq
);
297 int pa_asyncmsgq_write_fd(pa_asyncmsgq
*a
) {
298 pa_assert(PA_REFCNT_VALUE(a
) > 0);
300 return pa_asyncq_write_fd(a
->asyncq
);
303 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq
*a
) {
304 pa_assert(PA_REFCNT_VALUE(a
) > 0);
306 pa_asyncq_write_before_poll(a
->asyncq
);
309 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq
*a
) {
310 pa_assert(PA_REFCNT_VALUE(a
) > 0);
312 pa_asyncq_write_after_poll(a
->asyncq
);
315 int pa_asyncmsgq_dispatch(pa_msgobject
*object
, int code
, void *userdata
, int64_t offset
, pa_memchunk
*memchunk
) {
318 return object
->process_msg(object
, code
, userdata
, offset
, memchunk
);