4 This file is part of PulseAudio.
6 Copyright 2006-2008 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulsecore/llist.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
43 #define ASYNCQ_SIZE 256
45 /* For debugging purposes we can define _Y to put an extra thread
46 * yield between each operation. */
51 #define _Y pa_thread_yield()
53 #define _Y do { } while(0)
58 PA_LLIST_FIELDS(struct localq
);
65 pa_fdsem
*read_fdsem
, *write_fdsem
;
67 PA_LLIST_HEAD(struct localq
, localq
);
68 struct localq
*last_localq
;
69 pa_bool_t waiting_for_post
;
72 PA_STATIC_FLIST_DECLARE(localq
, 0, pa_xfree
);
74 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
76 static int reduce(pa_asyncq
*l
, int value
) {
77 return value
& (unsigned) (l
->size
- 1);
80 pa_asyncq
*pa_asyncq_new(unsigned size
) {
86 pa_assert(pa_is_power_of_two(size
));
88 l
= pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq
)) + (sizeof(pa_atomic_ptr_t
) * size
));
92 PA_LLIST_HEAD_INIT(struct localq
, l
->localq
);
93 l
->last_localq
= NULL
;
94 l
->waiting_for_post
= FALSE
;
96 if (!(l
->read_fdsem
= pa_fdsem_new())) {
101 if (!(l
->write_fdsem
= pa_fdsem_new())) {
102 pa_fdsem_free(l
->read_fdsem
);
110 void pa_asyncq_free(pa_asyncq
*l
, pa_free_cb_t free_cb
) {
117 while ((p
= pa_asyncq_pop(l
, 0)))
121 while ((q
= l
->localq
)) {
125 PA_LLIST_REMOVE(struct localq
, l
->localq
, q
);
127 if (pa_flist_push(PA_STATIC_FLIST_GET(localq
), q
) < 0)
131 pa_fdsem_free(l
->read_fdsem
);
132 pa_fdsem_free(l
->write_fdsem
);
136 static int push(pa_asyncq
*l
, void *p
, pa_bool_t wait
) {
138 pa_atomic_ptr_t
*cells
;
143 cells
= PA_ASYNCQ_CELLS(l
);
146 idx
= reduce(l
, l
->write_idx
);
148 if (!pa_atomic_ptr_cmpxchg(&cells
[idx
], NULL
, p
)) {
153 /* pa_log("sleeping on push"); */
156 pa_fdsem_wait(l
->read_fdsem
);
157 } while (!pa_atomic_ptr_cmpxchg(&cells
[idx
], NULL
, p
));
163 pa_fdsem_post(l
->write_fdsem
);
168 static pa_bool_t
flush_postq(pa_asyncq
*l
) {
173 while ((q
= l
->last_localq
)) {
175 if (push(l
, q
->data
, FALSE
) < 0)
178 l
->last_localq
= q
->prev
;
180 PA_LLIST_REMOVE(struct localq
, l
->localq
, q
);
182 if (pa_flist_push(PA_STATIC_FLIST_GET(localq
), q
) < 0)
189 int pa_asyncq_push(pa_asyncq
*l
, void *p
, pa_bool_t wait
) {
195 return push(l
, p
, wait
);
198 void pa_asyncq_post(pa_asyncq
*l
, void *p
) {
204 if (pa_asyncq_push(l
, p
, FALSE
) >= 0)
207 /* OK, we couldn't push anything in the queue. So let's queue it
208 * locally and push it later */
210 pa_log("q overrun, queuing locally");
212 if (!(q
= pa_flist_pop(PA_STATIC_FLIST_GET(localq
))))
213 q
= pa_xnew(struct localq
, 1);
216 PA_LLIST_PREPEND(struct localq
, l
->localq
, q
);
224 void* pa_asyncq_pop(pa_asyncq
*l
, pa_bool_t wait
) {
227 pa_atomic_ptr_t
*cells
;
231 cells
= PA_ASYNCQ_CELLS(l
);
234 idx
= reduce(l
, l
->read_idx
);
236 if (!(ret
= pa_atomic_ptr_load(&cells
[idx
]))) {
241 /* pa_log("sleeping on pop"); */
244 pa_fdsem_wait(l
->write_fdsem
);
245 } while (!(ret
= pa_atomic_ptr_load(&cells
[idx
])));
250 /* Guaranteed to succeed if we only have a single reader */
251 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells
[idx
], ret
, NULL
));
256 pa_fdsem_post(l
->read_fdsem
);
261 int pa_asyncq_read_fd(pa_asyncq
*q
) {
264 return pa_fdsem_get(q
->write_fdsem
);
267 int pa_asyncq_read_before_poll(pa_asyncq
*l
) {
269 pa_atomic_ptr_t
*cells
;
273 cells
= PA_ASYNCQ_CELLS(l
);
276 idx
= reduce(l
, l
->read_idx
);
279 if (pa_atomic_ptr_load(&cells
[idx
]))
282 if (pa_fdsem_before_poll(l
->write_fdsem
) >= 0)
289 void pa_asyncq_read_after_poll(pa_asyncq
*l
) {
292 pa_fdsem_after_poll(l
->write_fdsem
);
295 int pa_asyncq_write_fd(pa_asyncq
*q
) {
298 return pa_fdsem_get(q
->read_fdsem
);
301 void pa_asyncq_write_before_poll(pa_asyncq
*l
) {
309 if (pa_fdsem_before_poll(l
->read_fdsem
) >= 0) {
310 l
->waiting_for_post
= TRUE
;
316 void pa_asyncq_write_after_poll(pa_asyncq
*l
) {
319 if (l
->waiting_for_post
) {
320 pa_fdsem_after_poll(l
->read_fdsem
);
321 l
->waiting_for_post
= FALSE
;