]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncmsgq.c
simplify rt loops a bit by moving more code into pa_rtpoll. It is now possible to...
[pulseaudio] / src / pulsecore / asyncmsgq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
43
44 struct asyncmsgq_item {
45 int code;
46 pa_msgobject *object;
47 void *userdata;
48 pa_free_cb_t free_cb;
49 int64_t offset;
50 pa_memchunk memchunk;
51 pa_semaphore *semaphore;
52 int ret;
53 };
54
55 struct pa_asyncmsgq {
56 PA_REFCNT_DECLARE;
57 pa_asyncq *asyncq;
58 pa_mutex *mutex; /* only for the writer side */
59
60 struct asyncmsgq_item *current;
61 };
62
63 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
64 pa_asyncmsgq *a;
65
66 a = pa_xnew(pa_asyncmsgq, 1);
67
68 PA_REFCNT_INIT(a);
69 pa_assert_se(a->asyncq = pa_asyncq_new(size));
70 pa_assert_se(a->mutex = pa_mutex_new(0));
71 a->current = NULL;
72
73 return a;
74 }
75
76 static void asyncmsgq_free(pa_asyncmsgq *a) {
77 struct asyncmsgq_item *i;
78 pa_assert(a);
79
80 while ((i = pa_asyncq_pop(a->asyncq, 0))) {
81
82 pa_assert(!i->semaphore);
83
84 if (i->object)
85 pa_msgobject_unref(i->object);
86
87 if (i->memchunk.memblock)
88 pa_memblock_unref(i->memchunk.memblock);
89
90 if (i->free_cb)
91 i->free_cb(i->userdata);
92
93 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
94 pa_xfree(i);
95 }
96
97 pa_asyncq_free(a->asyncq, NULL);
98 pa_mutex_free(a->mutex);
99 pa_xfree(a);
100 }
101
102 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
103 pa_assert(PA_REFCNT_VALUE(q) > 0);
104
105 PA_REFCNT_INC(q);
106 return q;
107 }
108
109 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
110 pa_assert(PA_REFCNT_VALUE(q) > 0);
111
112 if (PA_REFCNT_DEC(q) <= 0)
113 asyncmsgq_free(q);
114 }
115
116 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
117 struct asyncmsgq_item *i;
118 pa_assert(PA_REFCNT_VALUE(a) > 0);
119
120 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
121 i = pa_xnew(struct asyncmsgq_item, 1);
122
123 i->code = code;
124 i->object = object ? pa_msgobject_ref(object) : NULL;
125 i->userdata = (void*) userdata;
126 i->free_cb = free_cb;
127 i->offset = offset;
128 if (chunk) {
129 pa_assert(chunk->memblock);
130 i->memchunk = *chunk;
131 pa_memblock_ref(i->memchunk.memblock);
132 } else
133 pa_memchunk_reset(&i->memchunk);
134 i->semaphore = NULL;
135
136 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
137 pa_mutex_lock(a->mutex);
138 pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
139 pa_mutex_unlock(a->mutex);
140 }
141
142 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
143 struct asyncmsgq_item i;
144 pa_assert(PA_REFCNT_VALUE(a) > 0);
145
146 i.code = code;
147 i.object = object;
148 i.userdata = (void*) userdata;
149 i.free_cb = NULL;
150 i.ret = -1;
151 i.offset = offset;
152 if (chunk) {
153 pa_assert(chunk->memblock);
154 i.memchunk = *chunk;
155 } else
156 pa_memchunk_reset(&i.memchunk);
157 pa_assert_se(i.semaphore = pa_semaphore_new(0));
158
159 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
160 pa_mutex_lock(a->mutex);
161 pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
162 pa_mutex_unlock(a->mutex);
163
164 pa_semaphore_wait(i.semaphore);
165 pa_semaphore_free(i.semaphore);
166
167 return i.ret;
168 }
169
170 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
171 pa_assert(PA_REFCNT_VALUE(a) > 0);
172 pa_assert(!a->current);
173
174 if (!(a->current = pa_asyncq_pop(a->asyncq, wait))) {
175 /* pa_log("failure"); */
176 return -1;
177 }
178
179 /* pa_log("success"); */
180
181 if (code)
182 *code = a->current->code;
183 if (userdata)
184 *userdata = a->current->userdata;
185 if (offset)
186 *offset = a->current->offset;
187 if (object) {
188 if ((*object = a->current->object))
189 pa_msgobject_assert_ref(*object);
190 }
191 if (chunk)
192 *chunk = a->current->memchunk;
193
194 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", (void*) a, (void*) a->current->object, a->current->object ? a->current->object->parent.type_name : NULL, a->current->code, (void*) a->current->userdata, (unsigned long) a->current->memchunk.length); */
195
196 return 0;
197 }
198
199 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
200 pa_assert(PA_REFCNT_VALUE(a) > 0);
201 pa_assert(a);
202 pa_assert(a->current);
203
204 if (a->current->semaphore) {
205 a->current->ret = ret;
206 pa_semaphore_post(a->current->semaphore);
207 } else {
208
209 if (a->current->free_cb)
210 a->current->free_cb(a->current->userdata);
211
212 if (a->current->object)
213 pa_msgobject_unref(a->current->object);
214
215 if (a->current->memchunk.memblock)
216 pa_memblock_unref(a->current->memchunk.memblock);
217
218 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
219 pa_xfree(a->current);
220 }
221
222 a->current = NULL;
223 }
224
225 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
226 int c;
227 pa_assert(PA_REFCNT_VALUE(a) > 0);
228
229 pa_asyncmsgq_ref(a);
230
231 do {
232 pa_msgobject *o;
233 void *data;
234 int64_t offset;
235 pa_memchunk chunk;
236 int ret;
237
238 if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
239 return -1;
240
241 ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
242 pa_asyncmsgq_done(a, ret);
243
244 } while (c != code);
245
246 pa_asyncmsgq_unref(a);
247
248 return 0;
249 }
250
251 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
252 pa_msgobject *object;
253 int code;
254 void *data;
255 pa_memchunk chunk;
256 int64_t offset;
257 int ret;
258
259 pa_assert(PA_REFCNT_VALUE(a) > 0);
260
261 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
262 return 0;
263
264 pa_asyncmsgq_ref(a);
265 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
266 pa_asyncmsgq_done(a, ret);
267 pa_asyncmsgq_unref(a);
268
269 return 1;
270 }
271
272 int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
273 pa_assert(PA_REFCNT_VALUE(a) > 0);
274
275 return pa_asyncq_get_fd(a->asyncq);
276 }
277
278 int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
279 pa_assert(PA_REFCNT_VALUE(a) > 0);
280
281 return pa_asyncq_before_poll(a->asyncq);
282 }
283
284 void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
285 pa_assert(PA_REFCNT_VALUE(a) > 0);
286
287 pa_asyncq_after_poll(a->asyncq);
288 }
289
290 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
291
292 if (object)
293 return object->process_msg(object, code, userdata, offset, memchunk);
294
295 return 0;
296 }