]> code.delx.au - pulseaudio/blob - src/pulsecore/rtpoll.c
make it easier to debug timing related problems
[pulseaudio] / src / pulsecore / rtpoll.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #ifdef __linux__
34 #include <sys/utsname.h>
35 #endif
36
37 #ifdef HAVE_POLL_H
38 #include <poll.h>
39 #else
40 #include <pulsecore/poll.h>
41 #endif
42
43 #include <pulse/xmalloc.h>
44 #include <pulse/timeval.h>
45
46 #include <pulsecore/core-error.h>
47 #include <pulsecore/rtclock.h>
48 #include <pulsecore/macro.h>
49 #include <pulsecore/llist.h>
50 #include <pulsecore/rtsig.h>
51 #include <pulsecore/flist.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/winsock.h>
54 #include <pulsecore/ratelimit.h>
55
56 #include "rtpoll.h"
57
58 /* #define DEBUG_TIMING */
59
60 struct pa_rtpoll {
61 struct pollfd *pollfd, *pollfd2;
62 unsigned n_pollfd_alloc, n_pollfd_used;
63
64 struct timeval next_elapse;
65 pa_bool_t timer_enabled:1;
66
67 pa_bool_t scan_for_dead:1;
68 pa_bool_t running:1;
69 pa_bool_t installed:1;
70 pa_bool_t rebuild_needed:1;
71 pa_bool_t quit:1;
72
73 #ifdef HAVE_PPOLL
74 pa_bool_t timer_armed:1;
75 #ifdef __linux__
76 pa_bool_t dont_use_ppoll:1;
77 #endif
78 int rtsig;
79 sigset_t sigset_unblocked;
80 timer_t timer;
81 #endif
82
83 #ifdef DEBUG_TIMING
84 pa_usec_t timestamp;
85 pa_usec_t slept, awake;
86 #endif
87
88 PA_LLIST_HEAD(pa_rtpoll_item, items);
89 };
90
91 struct pa_rtpoll_item {
92 pa_rtpoll *rtpoll;
93 pa_bool_t dead;
94
95 pa_rtpoll_priority_t priority;
96
97 struct pollfd *pollfd;
98 unsigned n_pollfd;
99
100 int (*work_cb)(pa_rtpoll_item *i);
101 int (*before_cb)(pa_rtpoll_item *i);
102 void (*after_cb)(pa_rtpoll_item *i);
103 void *userdata;
104
105 PA_LLIST_FIELDS(pa_rtpoll_item);
106 };
107
108 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
109
110 static void signal_handler_noop(int s) { /* write(2, "signal\n", 7); */ }
111
112 pa_rtpoll *pa_rtpoll_new(void) {
113 pa_rtpoll *p;
114
115 p = pa_xnew(pa_rtpoll, 1);
116
117 #ifdef HAVE_PPOLL
118
119 #ifdef __linux__
120 /* ppoll is broken on Linux < 2.6.16 */
121 p->dont_use_ppoll = FALSE;
122
123 {
124 struct utsname u;
125 unsigned major, minor, micro;
126
127 pa_assert_se(uname(&u) == 0);
128
129 if (sscanf(u.release, "%u.%u.%u", &major, &minor, &micro) != 3 ||
130 (major < 2) ||
131 (major == 2 && minor < 6) ||
132 (major == 2 && minor == 6 && micro < 16))
133
134 p->dont_use_ppoll = TRUE;
135 }
136
137 #endif
138
139 p->rtsig = -1;
140 sigemptyset(&p->sigset_unblocked);
141 p->timer = (timer_t) -1;
142 p->timer_armed = FALSE;
143
144 #endif
145
146 p->n_pollfd_alloc = 32;
147 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
148 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
149 p->n_pollfd_used = 0;
150
151 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
152 p->timer_enabled = FALSE;
153
154 p->running = FALSE;
155 p->installed = FALSE;
156 p->scan_for_dead = FALSE;
157 p->rebuild_needed = FALSE;
158 p->quit = FALSE;
159
160 PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
161
162 #ifdef DEBUG_TIMING
163 p->timestamp = pa_rtclock_usec();
164 p->slept = p->awake = 0;
165 #endif
166
167 return p;
168 }
169
170 void pa_rtpoll_install(pa_rtpoll *p) {
171 pa_assert(p);
172 pa_assert(!p->installed);
173
174 p->installed = TRUE;
175
176 #ifdef HAVE_PPOLL
177 # ifdef __linux__
178 if (p->dont_use_ppoll)
179 return;
180 # endif
181
182 if ((p->rtsig = pa_rtsig_get_for_thread()) < 0) {
183 pa_log_warn("Failed to reserve POSIX realtime signal.");
184 return;
185 }
186
187 pa_log_debug("Acquired POSIX realtime signal %s", pa_sig2str(p->rtsig));
188
189 {
190 sigset_t ss;
191 struct sigaction sa;
192
193 pa_assert_se(sigemptyset(&ss) == 0);
194 pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
195 pa_assert_se(pthread_sigmask(SIG_BLOCK, &ss, &p->sigset_unblocked) == 0);
196 pa_assert_se(sigdelset(&p->sigset_unblocked, p->rtsig) == 0);
197
198 memset(&sa, 0, sizeof(sa));
199 sa.sa_handler = signal_handler_noop;
200 pa_assert_se(sigemptyset(&sa.sa_mask) == 0);
201
202 pa_assert_se(sigaction(p->rtsig, &sa, NULL) == 0);
203
204 /* We never reset the signal handler. Why should we? */
205 }
206
207 #endif
208 }
209
210 static void rtpoll_rebuild(pa_rtpoll *p) {
211
212 struct pollfd *e, *t;
213 pa_rtpoll_item *i;
214 int ra = 0;
215
216 pa_assert(p);
217
218 p->rebuild_needed = FALSE;
219
220 if (p->n_pollfd_used > p->n_pollfd_alloc) {
221 /* Hmm, we have to allocate some more space */
222 p->n_pollfd_alloc = p->n_pollfd_used * 2;
223 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
224 ra = 1;
225 }
226
227 e = p->pollfd2;
228
229 for (i = p->items; i; i = i->next) {
230
231 if (i->n_pollfd > 0) {
232 size_t l = i->n_pollfd * sizeof(struct pollfd);
233
234 if (i->pollfd)
235 memcpy(e, i->pollfd, l);
236 else
237 memset(e, 0, l);
238
239 i->pollfd = e;
240 } else
241 i->pollfd = NULL;
242
243 e += i->n_pollfd;
244 }
245
246 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
247 t = p->pollfd;
248 p->pollfd = p->pollfd2;
249 p->pollfd2 = t;
250
251 if (ra)
252 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
253
254 }
255
256 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
257 pa_rtpoll *p;
258
259 pa_assert(i);
260
261 p = i->rtpoll;
262
263 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
264
265 p->n_pollfd_used -= i->n_pollfd;
266
267 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
268 pa_xfree(i);
269
270 p->rebuild_needed = TRUE;
271 }
272
273 void pa_rtpoll_free(pa_rtpoll *p) {
274 pa_assert(p);
275
276 while (p->items)
277 rtpoll_item_destroy(p->items);
278
279 pa_xfree(p->pollfd);
280 pa_xfree(p->pollfd2);
281
282 #ifdef HAVE_PPOLL
283 if (p->timer != (timer_t) -1)
284 timer_delete(p->timer);
285 #endif
286
287 pa_xfree(p);
288 }
289
290 static void reset_revents(pa_rtpoll_item *i) {
291 struct pollfd *f;
292 unsigned n;
293
294 pa_assert(i);
295
296 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
297 return;
298
299 for (; n > 0; n--)
300 f[n-1].revents = 0;
301 }
302
303 static void reset_all_revents(pa_rtpoll *p) {
304 pa_rtpoll_item *i;
305
306 pa_assert(p);
307
308 for (i = p->items; i; i = i->next) {
309
310 if (i->dead)
311 continue;
312
313 reset_revents(i);
314 }
315 }
316
317 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait) {
318 pa_rtpoll_item *i;
319 int r = 0;
320 struct timeval timeout;
321
322 pa_assert(p);
323 pa_assert(!p->running);
324 pa_assert(p->installed);
325
326 p->running = TRUE;
327
328 /* First, let's do some work */
329 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
330 int k;
331
332 if (i->dead)
333 continue;
334
335 if (!i->work_cb)
336 continue;
337
338 if (p->quit)
339 goto finish;
340
341 if ((k = i->work_cb(i)) != 0) {
342 if (k < 0)
343 r = k;
344
345 goto finish;
346 }
347 }
348
349 /* Now let's prepare for entering the sleep */
350 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
351 int k = 0;
352
353 if (i->dead)
354 continue;
355
356 if (!i->before_cb)
357 continue;
358
359 if (p->quit || (k = i->before_cb(i)) != 0) {
360
361 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
362
363 for (i = i->prev; i; i = i->prev) {
364
365 if (i->dead)
366 continue;
367
368 if (!i->after_cb)
369 continue;
370
371 i->after_cb(i);
372 }
373
374 if (k < 0)
375 r = k;
376
377 goto finish;
378 }
379 }
380
381 if (p->rebuild_needed)
382 rtpoll_rebuild(p);
383
384 memset(&timeout, 0, sizeof(timeout));
385
386 /* Calculate timeout */
387 if (wait && !p->quit && p->timer_enabled) {
388 struct timeval now;
389 pa_rtclock_get(&now);
390
391 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
392 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
393 }
394
395 #ifdef DEBUG_TIMING
396 {
397 pa_usec_t now = pa_rtclock_usec();
398 p->awake = now - p->timestamp;
399 p->timestamp = now;
400 }
401 #endif
402
403 /* OK, now let's sleep */
404 #ifdef HAVE_PPOLL
405
406 #ifdef __linux__
407 if (!p->dont_use_ppoll)
408 #endif
409 {
410 struct timespec ts;
411 ts.tv_sec = timeout.tv_sec;
412 ts.tv_nsec = timeout.tv_usec * 1000;
413 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? &ts : NULL, p->rtsig < 0 ? NULL : &p->sigset_unblocked);
414 }
415 #ifdef __linux__
416 else
417 #endif
418
419 #endif
420 r = poll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
421
422 #ifdef DEBUG_TIMING
423 {
424 pa_usec_t now = pa_rtclock_usec();
425 p->slept = now - p->timestamp;
426 p->timestamp = now;
427
428 pa_log("Process time %llu ms; sleep time %llu ms",
429 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
430 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
431 }
432 #endif
433
434 if (r < 0) {
435 if (errno == EAGAIN || errno == EINTR)
436 r = 0;
437 else
438 pa_log_error("poll(): %s", pa_cstrerror(errno));
439
440 reset_all_revents(p);
441 }
442
443 /* Let's tell everyone that we left the sleep */
444 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
445
446 if (i->dead)
447 continue;
448
449 if (!i->after_cb)
450 continue;
451
452 i->after_cb(i);
453 }
454
455 finish:
456
457 p->running = FALSE;
458
459 if (p->scan_for_dead) {
460 pa_rtpoll_item *n;
461
462 p->scan_for_dead = FALSE;
463
464 for (i = p->items; i; i = n) {
465 n = i->next;
466
467 if (i->dead)
468 rtpoll_item_destroy(i);
469 }
470 }
471
472 return r < 0 ? r : !p->quit;
473 }
474
475 static void update_timer(pa_rtpoll *p) {
476 pa_assert(p);
477
478 #ifdef HAVE_PPOLL
479
480 #ifdef __linux__
481 if (!p->dont_use_ppoll) {
482 #endif
483
484 if (p->timer == (timer_t) -1) {
485 struct sigevent se;
486
487 memset(&se, 0, sizeof(se));
488 se.sigev_notify = SIGEV_SIGNAL;
489 se.sigev_signo = p->rtsig;
490
491 if (timer_create(CLOCK_MONOTONIC, &se, &p->timer) < 0)
492 if (timer_create(CLOCK_REALTIME, &se, &p->timer) < 0) {
493 pa_log_warn("Failed to allocate POSIX timer: %s", pa_cstrerror(errno));
494 p->timer = (timer_t) -1;
495 }
496 }
497
498 if (p->timer != (timer_t) -1) {
499 struct itimerspec its;
500 struct timespec ts = { .tv_sec = 0, .tv_nsec = 0 };
501 sigset_t ss;
502
503 if (p->timer_armed) {
504 /* First disarm timer */
505 memset(&its, 0, sizeof(its));
506 pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
507
508 /* Remove a signal that might be waiting in the signal q */
509 pa_assert_se(sigemptyset(&ss) == 0);
510 pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
511 sigtimedwait(&ss, NULL, &ts);
512 }
513
514 /* And install the new timer */
515 if (p->timer_enabled) {
516 memset(&its, 0, sizeof(its));
517
518 its.it_value.tv_sec = p->next_elapse.tv_sec;
519 its.it_value.tv_nsec = p->next_elapse.tv_usec*1000;
520
521 /* Make sure that 0,0 is not understood as
522 * "disarming" */
523 if (its.it_value.tv_sec == 0 && its.it_value.tv_nsec == 0)
524 its.it_value.tv_nsec = 1;
525 pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
526 }
527
528 p->timer_armed = p->timer_enabled;
529 }
530
531 #ifdef __linux__
532 }
533 #endif
534
535 #endif
536 }
537
538 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
539 pa_assert(p);
540
541 pa_timeval_store(&p->next_elapse, usec);
542 p->timer_enabled = TRUE;
543
544 update_timer(p);
545 }
546
547 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
548 pa_assert(p);
549
550 /* Scheduling a timeout for more than an hour is very very suspicious */
551 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
552
553 pa_rtclock_get(&p->next_elapse);
554 pa_timeval_add(&p->next_elapse, usec);
555 p->timer_enabled = TRUE;
556
557 update_timer(p);
558 }
559
560 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
561 pa_assert(p);
562
563 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
564 p->timer_enabled = FALSE;
565
566 update_timer(p);
567 }
568
569 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
570 pa_rtpoll_item *i, *j, *l = NULL;
571
572 pa_assert(p);
573
574 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
575 i = pa_xnew(pa_rtpoll_item, 1);
576
577 i->rtpoll = p;
578 i->dead = FALSE;
579 i->n_pollfd = n_fds;
580 i->pollfd = NULL;
581 i->priority = prio;
582
583 i->userdata = NULL;
584 i->before_cb = NULL;
585 i->after_cb = NULL;
586 i->work_cb = NULL;
587
588 for (j = p->items; j; j = j->next) {
589 if (prio <= j->priority)
590 break;
591
592 l = j;
593 }
594
595 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
596
597 if (n_fds > 0) {
598 p->rebuild_needed = 1;
599 p->n_pollfd_used += n_fds;
600 }
601
602 return i;
603 }
604
605 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
606 pa_assert(i);
607
608 if (i->rtpoll->running) {
609 i->dead = TRUE;
610 i->rtpoll->scan_for_dead = TRUE;
611 return;
612 }
613
614 rtpoll_item_destroy(i);
615 }
616
617 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
618 pa_assert(i);
619
620 if (i->n_pollfd > 0)
621 if (i->rtpoll->rebuild_needed)
622 rtpoll_rebuild(i->rtpoll);
623
624 if (n_fds)
625 *n_fds = i->n_pollfd;
626
627 return i->pollfd;
628 }
629
630 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
631 pa_assert(i);
632 pa_assert(i->priority < PA_RTPOLL_NEVER);
633
634 i->before_cb = before_cb;
635 }
636
637 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
638 pa_assert(i);
639 pa_assert(i->priority < PA_RTPOLL_NEVER);
640
641 i->after_cb = after_cb;
642 }
643
644 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
645 pa_assert(i);
646 pa_assert(i->priority < PA_RTPOLL_NEVER);
647
648 i->work_cb = work_cb;
649 }
650
651 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
652 pa_assert(i);
653
654 i->userdata = userdata;
655 }
656
657 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
658 pa_assert(i);
659
660 return i->userdata;
661 }
662
663 static int fdsem_before(pa_rtpoll_item *i) {
664
665 if (pa_fdsem_before_poll(i->userdata) < 0)
666 return 1; /* 1 means immediate restart of the loop */
667
668 return 0;
669 }
670
671 static void fdsem_after(pa_rtpoll_item *i) {
672 pa_assert(i);
673
674 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
675 pa_fdsem_after_poll(i->userdata);
676 }
677
678 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
679 pa_rtpoll_item *i;
680 struct pollfd *pollfd;
681
682 pa_assert(p);
683 pa_assert(f);
684
685 i = pa_rtpoll_item_new(p, prio, 1);
686
687 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
688
689 pollfd->fd = pa_fdsem_get(f);
690 pollfd->events = POLLIN;
691
692 i->before_cb = fdsem_before;
693 i->after_cb = fdsem_after;
694 i->userdata = f;
695
696 return i;
697 }
698
699 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
700 pa_assert(i);
701
702 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
703 return 1; /* 1 means immediate restart of the loop */
704
705 return 0;
706 }
707
708 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
709 pa_assert(i);
710
711 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
712 pa_asyncmsgq_read_after_poll(i->userdata);
713 }
714
715 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
716 pa_msgobject *object;
717 int code;
718 void *data;
719 pa_memchunk chunk;
720 int64_t offset;
721
722 pa_assert(i);
723
724 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
725 int ret;
726
727 if (!object && code == PA_MESSAGE_SHUTDOWN) {
728 pa_asyncmsgq_done(i->userdata, 0);
729 pa_rtpoll_quit(i->rtpoll);
730 return 1;
731 }
732
733 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
734 pa_asyncmsgq_done(i->userdata, ret);
735 return 1;
736 }
737
738 return 0;
739 }
740
741 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
742 pa_rtpoll_item *i;
743 struct pollfd *pollfd;
744
745 pa_assert(p);
746 pa_assert(q);
747
748 i = pa_rtpoll_item_new(p, prio, 1);
749
750 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
751 pollfd->fd = pa_asyncmsgq_read_fd(q);
752 pollfd->events = POLLIN;
753
754 i->before_cb = asyncmsgq_read_before;
755 i->after_cb = asyncmsgq_read_after;
756 i->work_cb = asyncmsgq_read_work;
757 i->userdata = q;
758
759 return i;
760 }
761
762 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
763 pa_assert(i);
764
765 pa_asyncmsgq_write_before_poll(i->userdata);
766 return 0;
767 }
768
769 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
770 pa_assert(i);
771
772 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
773 pa_asyncmsgq_write_after_poll(i->userdata);
774 }
775
776 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
777 pa_rtpoll_item *i;
778 struct pollfd *pollfd;
779
780 pa_assert(p);
781 pa_assert(q);
782
783 i = pa_rtpoll_item_new(p, prio, 1);
784
785 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
786 pollfd->fd = pa_asyncmsgq_write_fd(q);
787 pollfd->events = POLLIN;
788
789 i->before_cb = asyncmsgq_write_before;
790 i->after_cb = asyncmsgq_write_after;
791 i->work_cb = NULL;
792 i->userdata = q;
793
794 return i;
795 }
796
797 void pa_rtpoll_quit(pa_rtpoll *p) {
798 pa_assert(p);
799
800 p->quit = TRUE;
801 }