]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncq.c
fix some comments
[pulseaudio] / src / pulsecore / asyncq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006-2008 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/llist.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncq.h"
41 #include "fdsem.h"
42
43 #define ASYNCQ_SIZE 256
44
45 /* For debugging purposes we can define _Y to put an extra thread
46 * yield between each operation. */
47
48 /* #define PROFILE */
49
50 #ifdef PROFILE
51 #define _Y pa_thread_yield()
52 #else
53 #define _Y do { } while(0)
54 #endif
55
56 struct localq {
57 void *data;
58 PA_LLIST_FIELDS(struct localq);
59 };
60
61 struct pa_asyncq {
62 unsigned size;
63 unsigned read_idx;
64 unsigned write_idx;
65 pa_fdsem *read_fdsem, *write_fdsem;
66
67 PA_LLIST_HEAD(struct localq, localq);
68 struct localq *last_localq;
69 pa_bool_t waiting_for_post;
70 };
71
72 PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
73
74 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
75
76 static int reduce(pa_asyncq *l, int value) {
77 return value & (unsigned) (l->size - 1);
78 }
79
80 pa_asyncq *pa_asyncq_new(unsigned size) {
81 pa_asyncq *l;
82
83 if (!size)
84 size = ASYNCQ_SIZE;
85
86 pa_assert(pa_is_power_of_two(size));
87
88 l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
89
90 l->size = size;
91
92 PA_LLIST_HEAD_INIT(struct localq, l->localq);
93 l->last_localq = NULL;
94 l->waiting_for_post = FALSE;
95
96 if (!(l->read_fdsem = pa_fdsem_new())) {
97 pa_xfree(l);
98 return NULL;
99 }
100
101 if (!(l->write_fdsem = pa_fdsem_new())) {
102 pa_fdsem_free(l->read_fdsem);
103 pa_xfree(l);
104 return NULL;
105 }
106
107 return l;
108 }
109
110 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
111 struct localq *q;
112 pa_assert(l);
113
114 if (free_cb) {
115 void *p;
116
117 while ((p = pa_asyncq_pop(l, 0)))
118 free_cb(p);
119 }
120
121 while ((q = l->localq)) {
122 if (free_cb)
123 free_cb(q->data);
124
125 PA_LLIST_REMOVE(struct localq, l->localq, q);
126
127 if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
128 pa_xfree(q);
129 }
130
131 pa_fdsem_free(l->read_fdsem);
132 pa_fdsem_free(l->write_fdsem);
133 pa_xfree(l);
134 }
135
136 static int push(pa_asyncq*l, void *p, pa_bool_t wait) {
137 int idx;
138 pa_atomic_ptr_t *cells;
139
140 pa_assert(l);
141 pa_assert(p);
142
143 cells = PA_ASYNCQ_CELLS(l);
144
145 _Y;
146 idx = reduce(l, l->write_idx);
147
148 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
149
150 if (!wait)
151 return -1;
152
153 /* pa_log("sleeping on push"); */
154
155 do {
156 pa_fdsem_wait(l->read_fdsem);
157 } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
158 }
159
160 _Y;
161 l->write_idx++;
162
163 pa_fdsem_post(l->write_fdsem);
164
165 return 0;
166 }
167
168 static pa_bool_t flush_postq(pa_asyncq *l) {
169 struct localq *q;
170
171 pa_assert(l);
172
173 while ((q = l->last_localq)) {
174
175 if (push(l, q->data, FALSE) < 0)
176 return FALSE;
177
178 l->last_localq = q->prev;
179
180 PA_LLIST_REMOVE(struct localq, l->localq, q);
181
182 if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
183 pa_xfree(q);
184 }
185
186 return TRUE;
187 }
188
189 int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) {
190 pa_assert(l);
191
192 if (!flush_postq(l))
193 return -1;
194
195 return push(l, p, wait);
196 }
197
198 void pa_asyncq_post(pa_asyncq*l, void *p) {
199 struct localq *q;
200
201 pa_assert(l);
202 pa_assert(p);
203
204 if (pa_asyncq_push(l, p, FALSE) >= 0)
205 return;
206
207 /* OK, we couldn't push anything in the queue. So let's queue it
208 * locally and push it later */
209
210 pa_log("q overrun, queuing locally");
211
212 if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
213 q = pa_xnew(struct localq, 1);
214
215 q->data = p;
216 PA_LLIST_PREPEND(struct localq, l->localq, q);
217
218 if (!l->last_localq)
219 l->last_localq = q;
220
221 return;
222 }
223
224 void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) {
225 int idx;
226 void *ret;
227 pa_atomic_ptr_t *cells;
228
229 pa_assert(l);
230
231 cells = PA_ASYNCQ_CELLS(l);
232
233 _Y;
234 idx = reduce(l, l->read_idx);
235
236 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
237
238 if (!wait)
239 return NULL;
240
241 /* pa_log("sleeping on pop"); */
242
243 do {
244 pa_fdsem_wait(l->write_fdsem);
245 } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
246 }
247
248 pa_assert(ret);
249
250 /* Guaranteed to succeed if we only have a single reader */
251 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
252
253 _Y;
254 l->read_idx++;
255
256 pa_fdsem_post(l->read_fdsem);
257
258 return ret;
259 }
260
261 int pa_asyncq_read_fd(pa_asyncq *q) {
262 pa_assert(q);
263
264 return pa_fdsem_get(q->write_fdsem);
265 }
266
267 int pa_asyncq_read_before_poll(pa_asyncq *l) {
268 int idx;
269 pa_atomic_ptr_t *cells;
270
271 pa_assert(l);
272
273 cells = PA_ASYNCQ_CELLS(l);
274
275 _Y;
276 idx = reduce(l, l->read_idx);
277
278 for (;;) {
279 if (pa_atomic_ptr_load(&cells[idx]))
280 return -1;
281
282 if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
283 return 0;
284 }
285
286 return 0;
287 }
288
289 void pa_asyncq_read_after_poll(pa_asyncq *l) {
290 pa_assert(l);
291
292 pa_fdsem_after_poll(l->write_fdsem);
293 }
294
295 int pa_asyncq_write_fd(pa_asyncq *q) {
296 pa_assert(q);
297
298 return pa_fdsem_get(q->read_fdsem);
299 }
300
301 void pa_asyncq_write_before_poll(pa_asyncq *l) {
302 pa_assert(l);
303
304 for (;;) {
305
306 if (flush_postq(l))
307 break;
308
309 if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
310 l->waiting_for_post = TRUE;
311 break;
312 }
313 }
314 }
315
316 void pa_asyncq_write_after_poll(pa_asyncq *l) {
317 pa_assert(l);
318
319 if (l->waiting_for_post) {
320 pa_fdsem_after_poll(l->read_fdsem);
321 l->waiting_for_post = FALSE;
322 }
323 }