]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncmsgq.c
A lot of more work to get the lock-free stuff in place
[pulseaudio] / src / pulsecore / asyncmsgq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0);
43
44 struct asyncmsgq_item {
45 int code;
46 pa_msgobject *object;
47 void *userdata;
48 pa_free_cb_t free_cb;
49 pa_memchunk memchunk;
50 pa_semaphore *semaphore;
51 int ret;
52 };
53
54 struct pa_asyncmsgq {
55 pa_asyncq *asyncq;
56 pa_mutex *mutex; /* only for the writer side */
57
58 struct asyncmsgq_item *current;
59 };
60
61 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
62 pa_asyncmsgq *a;
63
64 a = pa_xnew(pa_asyncmsgq, 1);
65
66 pa_assert_se(a->asyncq = pa_asyncq_new(size));
67 pa_assert_se(a->mutex = pa_mutex_new(0));
68 a->current = NULL;
69
70 return a;
71 }
72
73 void pa_asyncmsgq_free(pa_asyncmsgq *a) {
74 struct asyncmsgq_item *i;
75 pa_assert(a);
76
77 while ((i = pa_asyncq_pop(a->asyncq, 0))) {
78
79 pa_assert(!i->semaphore);
80
81 if (i->object)
82 pa_msgobject_unref(i->object);
83
84 if (i->memchunk.memblock)
85 pa_memblock_unref(i->memchunk.memblock);
86
87 if (i->free_cb)
88 i->free_cb(i->userdata);
89
90 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
91 pa_xfree(i);
92 }
93
94 pa_asyncq_free(a->asyncq, NULL);
95 pa_mutex_free(a->mutex);
96 pa_xfree(a);
97 }
98
99 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
100 struct asyncmsgq_item *i;
101 pa_assert(a);
102
103 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
104 i = pa_xnew(struct asyncmsgq_item, 1);
105
106 i->code = code;
107 i->object = object ? pa_msgobject_ref(object) : NULL;
108 i->userdata = (void*) userdata;
109 i->free_cb = free_cb;
110 if (chunk) {
111 pa_assert(chunk->memblock);
112 i->memchunk = *chunk;
113 pa_memblock_ref(i->memchunk.memblock);
114 } else
115 pa_memchunk_reset(&i->memchunk);
116 i->semaphore = NULL;
117
118 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
119 pa_mutex_lock(a->mutex);
120 pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
121 pa_mutex_unlock(a->mutex);
122 }
123
124 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk) {
125 struct asyncmsgq_item i;
126 pa_assert(a);
127
128 i.code = code;
129 i.object = object;
130 i.userdata = (void*) userdata;
131 i.free_cb = NULL;
132 i.ret = -1;
133 if (chunk) {
134 pa_assert(chunk->memblock);
135 i.memchunk = *chunk;
136 } else
137 pa_memchunk_reset(&i.memchunk);
138 pa_assert_se(i.semaphore = pa_semaphore_new(0));
139
140 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
141 pa_mutex_lock(a->mutex);
142 pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
143 pa_mutex_unlock(a->mutex);
144
145 pa_semaphore_wait(i.semaphore);
146 pa_semaphore_free(i.semaphore);
147
148 return i.ret;
149 }
150
151 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, pa_memchunk *chunk, int wait) {
152 pa_assert(a);
153 pa_assert(code);
154 pa_assert(!a->current);
155
156 if (!(a->current = pa_asyncq_pop(a->asyncq, wait)))
157 return -1;
158
159 *code = a->current->code;
160 if (userdata)
161 *userdata = a->current->userdata;
162 if (object)
163 *object = a->current->object;
164 if (chunk)
165 *chunk = a->current->memchunk;
166
167 pa_log_debug("q=%p object=%p code=%i data=%p", a, a->current->object, a->current->code, a->current->userdata);
168
169 return 0;
170 }
171
172 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
173 pa_assert(a);
174 pa_assert(a->current);
175
176 if (a->current->semaphore) {
177 a->current->ret = ret;
178 pa_semaphore_post(a->current->semaphore);
179 } else {
180
181 if (a->current->free_cb)
182 a->current->free_cb(a->current->userdata);
183
184 if (a->current->object)
185 pa_msgobject_unref(a->current->object);
186
187 if (a->current->memchunk.memblock)
188 pa_memblock_unref(a->current->memchunk.memblock);
189
190 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
191 pa_xfree(a->current);
192 }
193
194 a->current = NULL;
195 }
196
197 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
198 int c;
199 pa_assert(a);
200
201 do {
202 pa_msgobject *o;
203 void *data;
204 pa_memchunk chunk;
205 int ret;
206
207 if (pa_asyncmsgq_get(a, &o, &c, &data, &chunk, 1) < 0)
208 return -1;
209
210 ret = pa_asyncmsgq_dispatch(o, c, data, &chunk);
211 pa_asyncmsgq_done(a, ret);
212
213 } while (c != code);
214
215 return 0;
216 }
217
218 int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
219 pa_assert(a);
220
221 return pa_asyncq_get_fd(a->asyncq);
222 }
223
224 int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
225 pa_assert(a);
226
227 return pa_asyncq_before_poll(a->asyncq);
228 }
229
230 void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
231 pa_assert(a);
232
233 pa_asyncq_after_poll(a->asyncq);
234 }
235
236 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
237
238 if (object)
239 return object->process_msg(object, code, userdata, memchunk);
240
241 return 0;
242 }