]> code.delx.au - pulseaudio/blob - src/pulsecore/asyncq.c
Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[pulseaudio] / src / pulsecore / asyncq.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2006 Lennart Poettering
7
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
12
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21 USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
37
38 #include "asyncq.h"
39
40 #define ASYNCQ_SIZE 128
41
42 /* For debugging purposes we can define _Y to put and extra thread
43 * yield between each operation. */
44
45 #ifdef PROFILE
46 #define _Y pa_thread_yield()
47 #else
48 #define _Y do { } while(0)
49 #endif
50
51 struct pa_asyncq {
52 unsigned size;
53 unsigned read_idx;
54 unsigned write_idx;
55 pa_atomic_int_t read_waiting;
56 pa_atomic_int_t write_waiting;
57 int read_fds[2], write_fds[2];
58 };
59
60 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
61
62 static int is_power_of_two(unsigned size) {
63 return !(size & (size - 1));
64 }
65
66 static int reduce(pa_asyncq *l, int value) {
67 return value & (unsigned) (l->size - 1);
68 }
69
70 pa_asyncq *pa_asyncq_new(unsigned size) {
71 pa_asyncq *l;
72
73 if (!size)
74 size = ASYNCQ_SIZE;
75
76 pa_assert(is_power_of_two(size));
77
78 l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
79
80 l->size = size;
81 pa_atomic_store(&l->read_waiting, 0);
82 pa_atomic_store(&l->write_waiting, 0);
83
84 if (pipe(l->read_fds) < 0) {
85 pa_xfree(l);
86 return NULL;
87 }
88
89 if (pipe(l->write_fds) < 0) {
90 pa_close(l->read_fds[0]);
91 pa_close(l->read_fds[1]);
92 pa_xfree(l);
93 return NULL;
94 }
95
96 pa_make_nonblock_fd(l->read_fds[1]);
97 pa_make_nonblock_fd(l->write_fds[1]);
98
99 return l;
100 }
101
102 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
103 pa_assert(l);
104
105 if (free_cb) {
106 void *p;
107
108 while ((p = pa_asyncq_pop(l, 0)))
109 free_cb(p);
110 }
111
112 pa_close(l->read_fds[0]);
113 pa_close(l->read_fds[1]);
114 pa_close(l->write_fds[0]);
115 pa_close(l->write_fds[1]);
116
117 pa_xfree(l);
118 }
119
120 int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
121 int idx;
122 pa_atomic_ptr_t *cells;
123
124 pa_assert(l);
125 pa_assert(p);
126
127 cells = PA_ASYNCQ_CELLS(l);
128
129 _Y;
130 idx = reduce(l, l->write_idx);
131
132 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
133
134 /* First try failed. Let's wait for changes. */
135
136 if (!wait)
137 return -1;
138
139 _Y;
140
141 pa_atomic_inc(&l->write_waiting);
142
143 for (;;) {
144 char x[20];
145
146 _Y;
147
148 if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p))
149 break;
150
151 _Y;
152
153 if (read(l->write_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
154 pa_atomic_dec(&l->write_waiting);
155 return -1;
156 }
157 }
158
159 _Y;
160
161 pa_atomic_dec(&l->write_waiting);
162 }
163
164 _Y;
165 l->write_idx++;
166
167 if (pa_atomic_load(&l->read_waiting)) {
168 char x = 'x';
169 _Y;
170 write(l->read_fds[1], &x, sizeof(x));
171 }
172
173 return 0;
174 }
175
176 void* pa_asyncq_pop(pa_asyncq*l, int wait) {
177 int idx;
178 void *ret;
179 pa_atomic_ptr_t *cells;
180
181 pa_assert(l);
182
183 cells = PA_ASYNCQ_CELLS(l);
184
185 _Y;
186 idx = reduce(l, l->read_idx);
187
188 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
189
190 /* First try failed. Let's wait for changes. */
191
192 if (!wait)
193 return NULL;
194
195 _Y;
196
197 pa_atomic_inc(&l->read_waiting);
198
199 for (;;) {
200 char x[20];
201
202 _Y;
203
204 if ((ret = pa_atomic_ptr_load(&cells[idx])))
205 break;
206
207 _Y;
208
209 if (read(l->read_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
210 pa_atomic_dec(&l->read_waiting);
211 return NULL;
212 }
213 }
214
215 _Y;
216
217 pa_atomic_dec(&l->read_waiting);
218 }
219
220 /* Guaranteed if we only have a single reader */
221 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
222
223 _Y;
224 l->read_idx++;
225
226 if (pa_atomic_load(&l->write_waiting)) {
227 char x = 'x';
228 _Y;
229 write(l->write_fds[1], &x, sizeof(x));
230 }
231
232 return ret;
233 }
234
235 int pa_asyncq_get_fd(pa_asyncq *q) {
236 pa_assert(q);
237
238 return q->read_fds[0];
239 }
240
241 int pa_asyncq_before_poll(pa_asyncq *l) {
242 int idx;
243 pa_atomic_ptr_t *cells;
244
245 pa_assert(l);
246
247 cells = PA_ASYNCQ_CELLS(l);
248
249 _Y;
250 idx = reduce(l, l->read_idx);
251
252 if (pa_atomic_ptr_load(&cells[idx]))
253 return -1;
254
255 pa_atomic_inc(&l->read_waiting);
256
257 if (pa_atomic_ptr_load(&cells[idx])) {
258 pa_atomic_dec(&l->read_waiting);
259 return -1;
260 }
261
262 return 0;
263 }
264
265 int pa_asyncq_after_poll(pa_asyncq *l) {
266 pa_assert(l);
267
268 pa_assert(pa_atomic_load(&l->read_waiting) > 0);
269
270 pa_atomic_dec(&l->read_waiting);
271 }