]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncmsgq.c
A couple of comment typo fixes.
[pulseaudio] / src / pulsecore / asyncmsgq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
43 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
44
45 struct asyncmsgq_item {
46 int code;
47 pa_msgobject *object;
48 void *userdata;
49 pa_free_cb_t free_cb;
50 int64_t offset;
51 pa_memchunk memchunk;
52 pa_semaphore *semaphore;
53 int ret;
54 };
55
56 struct pa_asyncmsgq {
57 PA_REFCNT_DECLARE;
58 pa_asyncq *asyncq;
59 pa_mutex *mutex; /* only for the writer side */
60
61 struct asyncmsgq_item *current;
62 };
63
64 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
65 pa_asyncmsgq *a;
66
67 a = pa_xnew(pa_asyncmsgq, 1);
68
69 PA_REFCNT_INIT(a);
70 pa_assert_se(a->asyncq = pa_asyncq_new(size));
71 pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
72 a->current = NULL;
73
74 return a;
75 }
76
77 static void asyncmsgq_free(pa_asyncmsgq *a) {
78 struct asyncmsgq_item *i;
79 pa_assert(a);
80
81 while ((i = pa_asyncq_pop(a->asyncq, 0))) {
82
83 pa_assert(!i->semaphore);
84
85 if (i->object)
86 pa_msgobject_unref(i->object);
87
88 if (i->memchunk.memblock)
89 pa_memblock_unref(i->memchunk.memblock);
90
91 if (i->free_cb)
92 i->free_cb(i->userdata);
93
94 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
95 pa_xfree(i);
96 }
97
98 pa_asyncq_free(a->asyncq, NULL);
99 pa_mutex_free(a->mutex);
100 pa_xfree(a);
101 }
102
103 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
104 pa_assert(PA_REFCNT_VALUE(q) > 0);
105
106 PA_REFCNT_INC(q);
107 return q;
108 }
109
110 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
111 pa_assert(PA_REFCNT_VALUE(q) > 0);
112
113 if (PA_REFCNT_DEC(q) <= 0)
114 asyncmsgq_free(q);
115 }
116
117 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
118 struct asyncmsgq_item *i;
119 pa_assert(PA_REFCNT_VALUE(a) > 0);
120
121 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
122 i = pa_xnew(struct asyncmsgq_item, 1);
123
124 i->code = code;
125 i->object = object ? pa_msgobject_ref(object) : NULL;
126 i->userdata = (void*) userdata;
127 i->free_cb = free_cb;
128 i->offset = offset;
129 if (chunk) {
130 pa_assert(chunk->memblock);
131 i->memchunk = *chunk;
132 pa_memblock_ref(i->memchunk.memblock);
133 } else
134 pa_memchunk_reset(&i->memchunk);
135 i->semaphore = NULL;
136
137 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
138 pa_mutex_lock(a->mutex);
139 pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
140 pa_mutex_unlock(a->mutex);
141 }
142
143 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
144 struct asyncmsgq_item i;
145 pa_assert(PA_REFCNT_VALUE(a) > 0);
146
147 i.code = code;
148 i.object = object;
149 i.userdata = (void*) userdata;
150 i.free_cb = NULL;
151 i.ret = -1;
152 i.offset = offset;
153 if (chunk) {
154 pa_assert(chunk->memblock);
155 i.memchunk = *chunk;
156 } else
157 pa_memchunk_reset(&i.memchunk);
158
159 if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
160 i.semaphore = pa_semaphore_new(0);
161
162 pa_assert_se(i.semaphore);
163
164 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
165 pa_mutex_lock(a->mutex);
166 pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
167 pa_mutex_unlock(a->mutex);
168
169 pa_semaphore_wait(i.semaphore);
170
171 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
172 pa_semaphore_free(i.semaphore);
173
174 return i.ret;
175 }
176
177 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
178 pa_assert(PA_REFCNT_VALUE(a) > 0);
179 pa_assert(!a->current);
180
181 if (!(a->current = pa_asyncq_pop(a->asyncq, wait))) {
182 /* pa_log("failure"); */
183 return -1;
184 }
185
186 /* pa_log("success"); */
187
188 if (code)
189 *code = a->current->code;
190 if (userdata)
191 *userdata = a->current->userdata;
192 if (offset)
193 *offset = a->current->offset;
194 if (object) {
195 if ((*object = a->current->object))
196 pa_msgobject_assert_ref(*object);
197 }
198 if (chunk)
199 *chunk = a->current->memchunk;
200
201 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", (void*) a, (void*) a->current->object, a->current->object ? a->current->object->parent.type_name : NULL, a->current->code, (void*) a->current->userdata, (unsigned long) a->current->memchunk.length); */
202
203 return 0;
204 }
205
206 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
207 pa_assert(PA_REFCNT_VALUE(a) > 0);
208 pa_assert(a);
209 pa_assert(a->current);
210
211 if (a->current->semaphore) {
212 a->current->ret = ret;
213 pa_semaphore_post(a->current->semaphore);
214 } else {
215
216 if (a->current->free_cb)
217 a->current->free_cb(a->current->userdata);
218
219 if (a->current->object)
220 pa_msgobject_unref(a->current->object);
221
222 if (a->current->memchunk.memblock)
223 pa_memblock_unref(a->current->memchunk.memblock);
224
225 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
226 pa_xfree(a->current);
227 }
228
229 a->current = NULL;
230 }
231
232 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
233 int c;
234 pa_assert(PA_REFCNT_VALUE(a) > 0);
235
236 pa_asyncmsgq_ref(a);
237
238 do {
239 pa_msgobject *o;
240 void *data;
241 int64_t offset;
242 pa_memchunk chunk;
243 int ret;
244
245 if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
246 return -1;
247
248 ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
249 pa_asyncmsgq_done(a, ret);
250
251 } while (c != code);
252
253 pa_asyncmsgq_unref(a);
254
255 return 0;
256 }
257
258 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
259 pa_msgobject *object;
260 int code;
261 void *data;
262 pa_memchunk chunk;
263 int64_t offset;
264 int ret;
265
266 pa_assert(PA_REFCNT_VALUE(a) > 0);
267
268 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
269 return 0;
270
271 pa_asyncmsgq_ref(a);
272 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
273 pa_asyncmsgq_done(a, ret);
274 pa_asyncmsgq_unref(a);
275
276 return 1;
277 }
278
279 int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
280 pa_assert(PA_REFCNT_VALUE(a) > 0);
281
282 return pa_asyncq_get_fd(a->asyncq);
283 }
284
285 int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
286 pa_assert(PA_REFCNT_VALUE(a) > 0);
287
288 return pa_asyncq_before_poll(a->asyncq);
289 }
290
291 void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
292 pa_assert(PA_REFCNT_VALUE(a) > 0);
293
294 pa_asyncq_after_poll(a->asyncq);
295 }
296
297 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
298
299 if (object)
300 return object->process_msg(object, code, userdata, offset, memchunk);
301
302 return 0;
303 }