]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncmsgq.c
Merge commit 'origin/master-tx'
[pulseaudio] / src / pulsecore / asyncmsgq.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2006 Lennart Poettering
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <unistd.h>
27 #include <errno.h>
28
29 #include <pulse/xmalloc.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/macro.h>
33 #include <pulsecore/log.h>
34 #include <pulsecore/thread.h>
35 #include <pulsecore/semaphore.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/core-util.h>
38 #include <pulsecore/flist.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
43 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
44
45 struct asyncmsgq_item {
46 int code;
47 pa_msgobject *object;
48 void *userdata;
49 pa_free_cb_t free_cb;
50 int64_t offset;
51 pa_memchunk memchunk;
52 pa_semaphore *semaphore;
53 int ret;
54 };
55
56 struct pa_asyncmsgq {
57 PA_REFCNT_DECLARE;
58 pa_asyncq *asyncq;
59 pa_mutex *mutex; /* only for the writer side */
60
61 struct asyncmsgq_item *current;
62 };
63
64 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
65 pa_asyncmsgq *a;
66
67 a = pa_xnew(pa_asyncmsgq, 1);
68
69 PA_REFCNT_INIT(a);
70 pa_assert_se(a->asyncq = pa_asyncq_new(size));
71 pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
72 a->current = NULL;
73
74 return a;
75 }
76
77 static void asyncmsgq_free(pa_asyncmsgq *a) {
78 struct asyncmsgq_item *i;
79 pa_assert(a);
80
81 while ((i = pa_asyncq_pop(a->asyncq, FALSE))) {
82
83 pa_assert(!i->semaphore);
84
85 if (i->object)
86 pa_msgobject_unref(i->object);
87
88 if (i->memchunk.memblock)
89 pa_memblock_unref(i->memchunk.memblock);
90
91 if (i->free_cb)
92 i->free_cb(i->userdata);
93
94 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
95 pa_xfree(i);
96 }
97
98 pa_asyncq_free(a->asyncq, NULL);
99 pa_mutex_free(a->mutex);
100 pa_xfree(a);
101 }
102
103 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
104 pa_assert(PA_REFCNT_VALUE(q) > 0);
105
106 PA_REFCNT_INC(q);
107 return q;
108 }
109
110 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
111 pa_assert(PA_REFCNT_VALUE(q) > 0);
112
113 if (PA_REFCNT_DEC(q) <= 0)
114 asyncmsgq_free(q);
115 }
116
117 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
118 struct asyncmsgq_item *i;
119 pa_assert(PA_REFCNT_VALUE(a) > 0);
120
121 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
122 i = pa_xnew(struct asyncmsgq_item, 1);
123
124 i->code = code;
125 i->object = object ? pa_msgobject_ref(object) : NULL;
126 i->userdata = (void*) userdata;
127 i->free_cb = free_cb;
128 i->offset = offset;
129 if (chunk) {
130 pa_assert(chunk->memblock);
131 i->memchunk = *chunk;
132 pa_memblock_ref(i->memchunk.memblock);
133 } else
134 pa_memchunk_reset(&i->memchunk);
135 i->semaphore = NULL;
136
137 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
138 pa_mutex_lock(a->mutex);
139 pa_asyncq_post(a->asyncq, i);
140 pa_mutex_unlock(a->mutex);
141 }
142
143 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
144 struct asyncmsgq_item i;
145 pa_assert(PA_REFCNT_VALUE(a) > 0);
146
147 i.code = code;
148 i.object = object;
149 i.userdata = (void*) userdata;
150 i.free_cb = NULL;
151 i.ret = -1;
152 i.offset = offset;
153 if (chunk) {
154 pa_assert(chunk->memblock);
155 i.memchunk = *chunk;
156 } else
157 pa_memchunk_reset(&i.memchunk);
158
159 if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
160 i.semaphore = pa_semaphore_new(0);
161
162 pa_assert_se(i.semaphore);
163
164 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
165 pa_mutex_lock(a->mutex);
166 pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
167 pa_mutex_unlock(a->mutex);
168
169 pa_semaphore_wait(i.semaphore);
170
171 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
172 pa_semaphore_free(i.semaphore);
173
174 return i.ret;
175 }
176
177 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait_op) {
178 pa_assert(PA_REFCNT_VALUE(a) > 0);
179 pa_assert(!a->current);
180
181 if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
182 /* pa_log("failure"); */
183 return -1;
184 }
185
186 /* pa_log("success"); */
187
188 if (code)
189 *code = a->current->code;
190 if (userdata)
191 *userdata = a->current->userdata;
192 if (offset)
193 *offset = a->current->offset;
194 if (object) {
195 if ((*object = a->current->object))
196 pa_msgobject_assert_ref(*object);
197 }
198 if (chunk)
199 *chunk = a->current->memchunk;
200
201 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
202 /* (void*) a, */
203 /* (void*) a->current->object, */
204 /* a->current->object ? a->current->object->parent.type_name : NULL, */
205 /* a->current->code, */
206 /* (void*) a->current->userdata, */
207 /* (unsigned long) a->current->memchunk.length); */
208
209 return 0;
210 }
211
212 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
213 pa_assert(PA_REFCNT_VALUE(a) > 0);
214 pa_assert(a);
215 pa_assert(a->current);
216
217 if (a->current->semaphore) {
218 a->current->ret = ret;
219 pa_semaphore_post(a->current->semaphore);
220 } else {
221
222 if (a->current->free_cb)
223 a->current->free_cb(a->current->userdata);
224
225 if (a->current->object)
226 pa_msgobject_unref(a->current->object);
227
228 if (a->current->memchunk.memblock)
229 pa_memblock_unref(a->current->memchunk.memblock);
230
231 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
232 pa_xfree(a->current);
233 }
234
235 a->current = NULL;
236 }
237
238 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
239 int c;
240 pa_assert(PA_REFCNT_VALUE(a) > 0);
241
242 pa_asyncmsgq_ref(a);
243
244 do {
245 pa_msgobject *o;
246 void *data;
247 int64_t offset;
248 pa_memchunk chunk;
249 int ret;
250
251 if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, TRUE) < 0)
252 return -1;
253
254 ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
255 pa_asyncmsgq_done(a, ret);
256
257 } while (c != code);
258
259 pa_asyncmsgq_unref(a);
260
261 return 0;
262 }
263
264 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
265 pa_msgobject *object;
266 int code;
267 void *data;
268 pa_memchunk chunk;
269 int64_t offset;
270 int ret;
271
272 pa_assert(PA_REFCNT_VALUE(a) > 0);
273
274 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
275 return 0;
276
277 pa_asyncmsgq_ref(a);
278 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
279 pa_asyncmsgq_done(a, ret);
280 pa_asyncmsgq_unref(a);
281
282 return 1;
283 }
284
285 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
286 pa_assert(PA_REFCNT_VALUE(a) > 0);
287
288 return pa_asyncq_read_fd(a->asyncq);
289 }
290
291 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
292 pa_assert(PA_REFCNT_VALUE(a) > 0);
293
294 return pa_asyncq_read_before_poll(a->asyncq);
295 }
296
297 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
298 pa_assert(PA_REFCNT_VALUE(a) > 0);
299
300 pa_asyncq_read_after_poll(a->asyncq);
301 }
302
303 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
304 pa_assert(PA_REFCNT_VALUE(a) > 0);
305
306 return pa_asyncq_write_fd(a->asyncq);
307 }
308
309 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
310 pa_assert(PA_REFCNT_VALUE(a) > 0);
311
312 pa_asyncq_write_before_poll(a->asyncq);
313 }
314
315 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
316 pa_assert(PA_REFCNT_VALUE(a) > 0);
317
318 pa_asyncq_write_after_poll(a->asyncq);
319 }
320
321 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
322
323 if (object)
324 return object->process_msg(object, code, userdata, offset, memchunk);
325
326 return 0;
327 }
328
329 void pa_asyncmsgq_flush(pa_asyncmsgq *a, pa_bool_t run) {
330 pa_assert(PA_REFCNT_VALUE(a) > 0);
331
332 for (;;) {
333 pa_msgobject *object;
334 int code;
335 void *data;
336 int64_t offset;
337 pa_memchunk chunk;
338 int ret;
339
340 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
341 return;
342
343 if (!run) {
344 pa_asyncmsgq_done(a, -1);
345 continue;
346 }
347
348 pa_asyncmsgq_ref(a);
349 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
350 pa_asyncmsgq_done(a, ret);
351 pa_asyncmsgq_unref(a);
352 }
353 }
354
355 pa_bool_t pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
356 pa_assert(PA_REFCNT_VALUE(a) > 0);
357
358 return !!a->current;
359 }