]>
code.delx.au - pulseaudio/blob - src/pulsecore/asyncmsgq.c
4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
40 #include "asyncmsgq.h"
42 PA_STATIC_FLIST_DECLARE(asyncmsgq
, 0, pa_xfree
);
44 struct asyncmsgq_item
{
50 pa_semaphore
*semaphore
;
56 pa_mutex
*mutex
; /* only for the writer side */
58 struct asyncmsgq_item
*current
;
61 pa_asyncmsgq
*pa_asyncmsgq_new(unsigned size
) {
64 a
= pa_xnew(pa_asyncmsgq
, 1);
66 pa_assert_se(a
->asyncq
= pa_asyncq_new(size
));
67 pa_assert_se(a
->mutex
= pa_mutex_new(0));
73 void pa_asyncmsgq_free(pa_asyncmsgq
*a
) {
74 struct asyncmsgq_item
*i
;
77 while ((i
= pa_asyncq_pop(a
->asyncq
, 0))) {
79 pa_assert(!i
->semaphore
);
82 pa_msgobject_unref(i
->object
);
84 if (i
->memchunk
.memblock
)
85 pa_memblock_unref(i
->memchunk
.memblock
);
88 i
->free_cb(i
->userdata
);
90 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq
), i
) < 0)
94 pa_asyncq_free(a
->asyncq
, NULL
);
95 pa_mutex_free(a
->mutex
);
99 void pa_asyncmsgq_post(pa_asyncmsgq
*a
, pa_msgobject
*object
, int code
, const void *userdata
, const pa_memchunk
*chunk
, pa_free_cb_t free_cb
) {
100 struct asyncmsgq_item
*i
;
103 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq
))))
104 i
= pa_xnew(struct asyncmsgq_item
, 1);
107 i
->object
= object
? pa_msgobject_ref(object
) : NULL
;
108 i
->userdata
= (void*) userdata
;
109 i
->free_cb
= free_cb
;
111 pa_assert(chunk
->memblock
);
112 i
->memchunk
= *chunk
;
113 pa_memblock_ref(i
->memchunk
.memblock
);
115 pa_memchunk_reset(&i
->memchunk
);
118 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
119 pa_mutex_lock(a
->mutex
);
120 pa_assert_se(pa_asyncq_push(a
->asyncq
, i
, 1) == 0);
121 pa_mutex_unlock(a
->mutex
);
124 int pa_asyncmsgq_send(pa_asyncmsgq
*a
, pa_msgobject
*object
, int code
, const void *userdata
, const pa_memchunk
*chunk
) {
125 struct asyncmsgq_item i
;
130 i
.userdata
= (void*) userdata
;
134 pa_assert(chunk
->memblock
);
137 pa_memchunk_reset(&i
.memchunk
);
138 pa_assert_se(i
.semaphore
= pa_semaphore_new(0));
140 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
141 pa_mutex_lock(a
->mutex
);
142 pa_assert_se(pa_asyncq_push(a
->asyncq
, &i
, 1) == 0);
143 pa_mutex_unlock(a
->mutex
);
145 pa_semaphore_wait(i
.semaphore
);
146 pa_semaphore_free(i
.semaphore
);
151 int pa_asyncmsgq_get(pa_asyncmsgq
*a
, pa_msgobject
**object
, int *code
, void **userdata
, pa_memchunk
*chunk
, int wait
) {
154 pa_assert(!a
->current
);
156 if (!(a
->current
= pa_asyncq_pop(a
->asyncq
, wait
))) {
157 /* pa_log("failure"); */
161 /* pa_log("success"); */
163 *code
= a
->current
->code
;
165 *userdata
= a
->current
->userdata
;
167 if ((*object
= a
->current
->object
))
168 pa_msgobject_assert_ref(*object
);
171 *chunk
= a
->current
->memchunk
;
173 pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%u", (void*) a
, (void*) a
->current
->object
, a
->current
->object
? a
->current
->object
->parent
.type_name
: NULL
, a
->current
->code
, (void*) a
->current
->userdata
, a
->current
->memchunk
.length
);
178 void pa_asyncmsgq_done(pa_asyncmsgq
*a
, int ret
) {
180 pa_assert(a
->current
);
182 if (a
->current
->semaphore
) {
183 a
->current
->ret
= ret
;
184 pa_semaphore_post(a
->current
->semaphore
);
187 if (a
->current
->free_cb
)
188 a
->current
->free_cb(a
->current
->userdata
);
190 if (a
->current
->object
)
191 pa_msgobject_unref(a
->current
->object
);
193 if (a
->current
->memchunk
.memblock
)
194 pa_memblock_unref(a
->current
->memchunk
.memblock
);
196 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq
), a
->current
) < 0)
197 pa_xfree(a
->current
);
203 int pa_asyncmsgq_wait_for(pa_asyncmsgq
*a
, int code
) {
213 if (pa_asyncmsgq_get(a
, &o
, &c
, &data
, &chunk
, 1) < 0)
216 ret
= pa_asyncmsgq_dispatch(o
, c
, data
, &chunk
);
217 pa_asyncmsgq_done(a
, ret
);
224 int pa_asyncmsgq_get_fd(pa_asyncmsgq
*a
) {
227 return pa_asyncq_get_fd(a
->asyncq
);
230 int pa_asyncmsgq_before_poll(pa_asyncmsgq
*a
) {
233 return pa_asyncq_before_poll(a
->asyncq
);
236 void pa_asyncmsgq_after_poll(pa_asyncmsgq
*a
) {
239 pa_asyncq_after_poll(a
->asyncq
);
242 int pa_asyncmsgq_dispatch(pa_msgobject
*object
, int code
, void *userdata
, pa_memchunk
*memchunk
) {
245 return object
->process_msg(object
, code
, userdata
, memchunk
);