]> code.delx.au - pulseaudio/blob - src/pulsecore/rtpoll.c
perl -p -i -e 's/pa_rtclock_usec/pa_rtclock_now/g' `find . -name '*.[ch]'`
[pulseaudio] / src / pulsecore / rtpoll.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #ifdef __linux__
34 #include <sys/utsname.h>
35 #endif
36
37 #ifdef HAVE_POLL_H
38 #include <poll.h>
39 #else
40 #include <pulsecore/poll.h>
41 #endif
42
43 #include <pulse/xmalloc.h>
44 #include <pulse/timeval.h>
45
46 #include <pulsecore/core-error.h>
47 #include <pulsecore/rtclock.h>
48 #include <pulsecore/macro.h>
49 #include <pulsecore/llist.h>
50 #include <pulsecore/rtsig.h>
51 #include <pulsecore/flist.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/winsock.h>
54 #include <pulsecore/ratelimit.h>
55
56 #include "rtpoll.h"
57
58 /* #define DEBUG_TIMING */
59
60 struct pa_rtpoll {
61 struct pollfd *pollfd, *pollfd2;
62 unsigned n_pollfd_alloc, n_pollfd_used;
63
64 struct timeval next_elapse;
65 pa_bool_t timer_enabled:1;
66
67 pa_bool_t scan_for_dead:1;
68 pa_bool_t running:1;
69 pa_bool_t installed:1;
70 pa_bool_t rebuild_needed:1;
71 pa_bool_t quit:1;
72
73 #ifdef HAVE_PPOLL
74 pa_bool_t timer_armed:1;
75 #ifdef __linux__
76 pa_bool_t dont_use_ppoll:1;
77 #endif
78 int rtsig;
79 sigset_t sigset_unblocked;
80 timer_t timer;
81 #endif
82
83 #ifdef DEBUG_TIMING
84 pa_usec_t timestamp;
85 pa_usec_t slept, awake;
86 #endif
87
88 PA_LLIST_HEAD(pa_rtpoll_item, items);
89 };
90
91 struct pa_rtpoll_item {
92 pa_rtpoll *rtpoll;
93 pa_bool_t dead;
94
95 pa_rtpoll_priority_t priority;
96
97 struct pollfd *pollfd;
98 unsigned n_pollfd;
99
100 int (*work_cb)(pa_rtpoll_item *i);
101 int (*before_cb)(pa_rtpoll_item *i);
102 void (*after_cb)(pa_rtpoll_item *i);
103 void *userdata;
104
105 PA_LLIST_FIELDS(pa_rtpoll_item);
106 };
107
108 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
109
110 static void signal_handler_noop(int s) { /* write(2, "signal\n", 7); */ }
111
112 pa_rtpoll *pa_rtpoll_new(void) {
113 pa_rtpoll *p;
114
115 p = pa_xnew(pa_rtpoll, 1);
116
117 #ifdef HAVE_PPOLL
118
119 #ifdef __linux__
120 /* ppoll is broken on Linux < 2.6.16 */
121 p->dont_use_ppoll = FALSE;
122
123 {
124 struct utsname u;
125 unsigned major, minor, micro;
126
127 pa_assert_se(uname(&u) == 0);
128
129 if (sscanf(u.release, "%u.%u.%u", &major, &minor, &micro) != 3 ||
130 (major < 2) ||
131 (major == 2 && minor < 6) ||
132 (major == 2 && minor == 6 && micro < 16))
133
134 p->dont_use_ppoll = TRUE;
135 }
136
137 #endif
138
139 p->rtsig = -1;
140 sigemptyset(&p->sigset_unblocked);
141 p->timer = (timer_t) -1;
142 p->timer_armed = FALSE;
143
144 #endif
145
146 p->n_pollfd_alloc = 32;
147 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
148 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
149 p->n_pollfd_used = 0;
150
151 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
152 p->timer_enabled = FALSE;
153
154 p->running = FALSE;
155 p->installed = FALSE;
156 p->scan_for_dead = FALSE;
157 p->rebuild_needed = FALSE;
158 p->quit = FALSE;
159
160 PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
161
162 #ifdef DEBUG_TIMING
163 p->timestamp = pa_rtclock_now();
164 p->slept = p->awake = 0;
165 #endif
166
167 return p;
168 }
169
170 void pa_rtpoll_install(pa_rtpoll *p) {
171 pa_assert(p);
172 pa_assert(!p->installed);
173
174 p->installed = TRUE;
175
176 #ifdef HAVE_PPOLL
177 # ifdef __linux__
178 if (p->dont_use_ppoll)
179 return;
180 # endif
181
182 if ((p->rtsig = pa_rtsig_get_for_thread()) < 0) {
183 pa_log_warn("Failed to reserve POSIX realtime signal.");
184 return;
185 }
186
187 pa_log_debug("Acquired POSIX realtime signal %s", pa_sig2str(p->rtsig));
188
189 {
190 sigset_t ss;
191 struct sigaction sa;
192
193 pa_assert_se(sigemptyset(&ss) == 0);
194 pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
195 pa_assert_se(pthread_sigmask(SIG_BLOCK, &ss, &p->sigset_unblocked) == 0);
196 pa_assert_se(sigdelset(&p->sigset_unblocked, p->rtsig) == 0);
197
198 memset(&sa, 0, sizeof(sa));
199 sa.sa_handler = signal_handler_noop;
200 pa_assert_se(sigemptyset(&sa.sa_mask) == 0);
201
202 pa_assert_se(sigaction(p->rtsig, &sa, NULL) == 0);
203
204 /* We never reset the signal handler. Why should we? */
205 }
206
207 #endif
208 }
209
210 static void rtpoll_rebuild(pa_rtpoll *p) {
211
212 struct pollfd *e, *t;
213 pa_rtpoll_item *i;
214 int ra = 0;
215
216 pa_assert(p);
217
218 p->rebuild_needed = FALSE;
219
220 if (p->n_pollfd_used > p->n_pollfd_alloc) {
221 /* Hmm, we have to allocate some more space */
222 p->n_pollfd_alloc = p->n_pollfd_used * 2;
223 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
224 ra = 1;
225 }
226
227 e = p->pollfd2;
228
229 for (i = p->items; i; i = i->next) {
230
231 if (i->n_pollfd > 0) {
232 size_t l = i->n_pollfd * sizeof(struct pollfd);
233
234 if (i->pollfd)
235 memcpy(e, i->pollfd, l);
236 else
237 memset(e, 0, l);
238
239 i->pollfd = e;
240 } else
241 i->pollfd = NULL;
242
243 e += i->n_pollfd;
244 }
245
246 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
247 t = p->pollfd;
248 p->pollfd = p->pollfd2;
249 p->pollfd2 = t;
250
251 if (ra)
252 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
253
254 }
255
256 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
257 pa_rtpoll *p;
258
259 pa_assert(i);
260
261 p = i->rtpoll;
262
263 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
264
265 p->n_pollfd_used -= i->n_pollfd;
266
267 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
268 pa_xfree(i);
269
270 p->rebuild_needed = TRUE;
271 }
272
273 void pa_rtpoll_free(pa_rtpoll *p) {
274 pa_assert(p);
275
276 while (p->items)
277 rtpoll_item_destroy(p->items);
278
279 pa_xfree(p->pollfd);
280 pa_xfree(p->pollfd2);
281
282 #ifdef HAVE_PPOLL
283 if (p->timer != (timer_t) -1)
284 timer_delete(p->timer);
285 #endif
286
287 pa_xfree(p);
288 }
289
290 static void reset_revents(pa_rtpoll_item *i) {
291 struct pollfd *f;
292 unsigned n;
293
294 pa_assert(i);
295
296 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
297 return;
298
299 for (; n > 0; n--)
300 f[n-1].revents = 0;
301 }
302
303 static void reset_all_revents(pa_rtpoll *p) {
304 pa_rtpoll_item *i;
305
306 pa_assert(p);
307
308 for (i = p->items; i; i = i->next) {
309
310 if (i->dead)
311 continue;
312
313 reset_revents(i);
314 }
315 }
316
317 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait) {
318 pa_rtpoll_item *i;
319 int r = 0;
320 struct timeval timeout;
321
322 pa_assert(p);
323 pa_assert(!p->running);
324 pa_assert(p->installed);
325
326 p->running = TRUE;
327
328 /* First, let's do some work */
329 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
330 int k;
331
332 if (i->dead)
333 continue;
334
335 if (!i->work_cb)
336 continue;
337
338 if (p->quit)
339 goto finish;
340
341 if ((k = i->work_cb(i)) != 0) {
342 if (k < 0)
343 r = k;
344
345 goto finish;
346 }
347 }
348
349 /* Now let's prepare for entering the sleep */
350 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
351 int k = 0;
352
353 if (i->dead)
354 continue;
355
356 if (!i->before_cb)
357 continue;
358
359 if (p->quit || (k = i->before_cb(i)) != 0) {
360
361 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
362
363 for (i = i->prev; i; i = i->prev) {
364
365 if (i->dead)
366 continue;
367
368 if (!i->after_cb)
369 continue;
370
371 i->after_cb(i);
372 }
373
374 if (k < 0)
375 r = k;
376
377 goto finish;
378 }
379 }
380
381 if (p->rebuild_needed)
382 rtpoll_rebuild(p);
383
384 memset(&timeout, 0, sizeof(timeout));
385
386 /* Calculate timeout */
387 if (wait && !p->quit && p->timer_enabled) {
388 struct timeval now;
389 pa_rtclock_get(&now);
390
391 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
392 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
393 }
394
395 #ifdef DEBUG_TIMING
396 {
397 pa_usec_t now = pa_rtclock_now();
398 p->awake = now - p->timestamp;
399 p->timestamp = now;
400 }
401 #endif
402
403 /* OK, now let's sleep */
404 #ifdef HAVE_PPOLL
405
406 #ifdef __linux__
407 if (!p->dont_use_ppoll)
408 #endif
409 {
410 struct timespec ts;
411 ts.tv_sec = timeout.tv_sec;
412 ts.tv_nsec = timeout.tv_usec * 1000;
413 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? &ts : NULL, p->rtsig < 0 ? NULL : &p->sigset_unblocked);
414 }
415 #ifdef __linux__
416 else
417 #endif
418
419 #endif
420 r = poll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
421
422 #ifdef DEBUG_TIMING
423 {
424 pa_usec_t now = pa_rtclock_now();
425 p->slept = now - p->timestamp;
426 p->timestamp = now;
427
428 pa_log("Process time %llu ms; sleep time %llu ms",
429 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
430 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
431 }
432 #endif
433
434 if (r < 0) {
435 if (errno == EAGAIN || errno == EINTR)
436 r = 0;
437 else
438 pa_log_error("poll(): %s", pa_cstrerror(errno));
439
440 reset_all_revents(p);
441 }
442
443 /* Let's tell everyone that we left the sleep */
444 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
445
446 if (i->dead)
447 continue;
448
449 if (!i->after_cb)
450 continue;
451
452 i->after_cb(i);
453 }
454
455 finish:
456
457 p->running = FALSE;
458
459 if (p->scan_for_dead) {
460 pa_rtpoll_item *n;
461
462 p->scan_for_dead = FALSE;
463
464 for (i = p->items; i; i = n) {
465 n = i->next;
466
467 if (i->dead)
468 rtpoll_item_destroy(i);
469 }
470 }
471
472 return r < 0 ? r : !p->quit;
473 }
474
475 static void update_timer(pa_rtpoll *p) {
476 pa_assert(p);
477
478 #ifdef HAVE_PPOLL
479
480 #ifdef __linux__
481 if (p->dont_use_ppoll)
482 return;
483 #endif
484
485 if (p->timer == (timer_t) -1) {
486 struct sigevent se;
487
488 memset(&se, 0, sizeof(se));
489 se.sigev_notify = SIGEV_SIGNAL;
490 se.sigev_signo = p->rtsig;
491
492 if (timer_create(CLOCK_MONOTONIC, &se, &p->timer) < 0)
493 if (timer_create(CLOCK_REALTIME, &se, &p->timer) < 0) {
494 pa_log_warn("Failed to allocate POSIX timer: %s", pa_cstrerror(errno));
495 p->timer = (timer_t) -1;
496 }
497 }
498
499 if (p->timer != (timer_t) -1) {
500 struct itimerspec its;
501 struct timespec ts = { .tv_sec = 0, .tv_nsec = 0 };
502 sigset_t ss;
503
504 if (p->timer_armed) {
505 /* First disarm timer */
506 memset(&its, 0, sizeof(its));
507 pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
508
509 /* Remove a signal that might be waiting in the signal q */
510 pa_assert_se(sigemptyset(&ss) == 0);
511 pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
512 sigtimedwait(&ss, NULL, &ts);
513 }
514
515 /* And install the new timer */
516 if (p->timer_enabled) {
517 memset(&its, 0, sizeof(its));
518
519 its.it_value.tv_sec = p->next_elapse.tv_sec;
520 its.it_value.tv_nsec = p->next_elapse.tv_usec*1000;
521
522 /* Make sure that 0,0 is not understood as
523 * "disarming" */
524 if (its.it_value.tv_sec == 0 && its.it_value.tv_nsec == 0)
525 its.it_value.tv_nsec = 1;
526 pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
527 }
528
529 p->timer_armed = p->timer_enabled;
530 }
531
532 #endif
533 }
534
535 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
536 pa_assert(p);
537
538 pa_timeval_store(&p->next_elapse, usec);
539 p->timer_enabled = TRUE;
540
541 update_timer(p);
542 }
543
544 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
545 pa_assert(p);
546
547 /* Scheduling a timeout for more than an hour is very very suspicious */
548 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
549
550 pa_rtclock_get(&p->next_elapse);
551 pa_timeval_add(&p->next_elapse, usec);
552 p->timer_enabled = TRUE;
553
554 update_timer(p);
555 }
556
557 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
558 pa_assert(p);
559
560 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
561 p->timer_enabled = FALSE;
562
563 update_timer(p);
564 }
565
566 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
567 pa_rtpoll_item *i, *j, *l = NULL;
568
569 pa_assert(p);
570
571 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
572 i = pa_xnew(pa_rtpoll_item, 1);
573
574 i->rtpoll = p;
575 i->dead = FALSE;
576 i->n_pollfd = n_fds;
577 i->pollfd = NULL;
578 i->priority = prio;
579
580 i->userdata = NULL;
581 i->before_cb = NULL;
582 i->after_cb = NULL;
583 i->work_cb = NULL;
584
585 for (j = p->items; j; j = j->next) {
586 if (prio <= j->priority)
587 break;
588
589 l = j;
590 }
591
592 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
593
594 if (n_fds > 0) {
595 p->rebuild_needed = 1;
596 p->n_pollfd_used += n_fds;
597 }
598
599 return i;
600 }
601
602 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
603 pa_assert(i);
604
605 if (i->rtpoll->running) {
606 i->dead = TRUE;
607 i->rtpoll->scan_for_dead = TRUE;
608 return;
609 }
610
611 rtpoll_item_destroy(i);
612 }
613
614 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
615 pa_assert(i);
616
617 if (i->n_pollfd > 0)
618 if (i->rtpoll->rebuild_needed)
619 rtpoll_rebuild(i->rtpoll);
620
621 if (n_fds)
622 *n_fds = i->n_pollfd;
623
624 return i->pollfd;
625 }
626
627 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
628 pa_assert(i);
629 pa_assert(i->priority < PA_RTPOLL_NEVER);
630
631 i->before_cb = before_cb;
632 }
633
634 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
635 pa_assert(i);
636 pa_assert(i->priority < PA_RTPOLL_NEVER);
637
638 i->after_cb = after_cb;
639 }
640
641 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
642 pa_assert(i);
643 pa_assert(i->priority < PA_RTPOLL_NEVER);
644
645 i->work_cb = work_cb;
646 }
647
648 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
649 pa_assert(i);
650
651 i->userdata = userdata;
652 }
653
654 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
655 pa_assert(i);
656
657 return i->userdata;
658 }
659
660 static int fdsem_before(pa_rtpoll_item *i) {
661
662 if (pa_fdsem_before_poll(i->userdata) < 0)
663 return 1; /* 1 means immediate restart of the loop */
664
665 return 0;
666 }
667
668 static void fdsem_after(pa_rtpoll_item *i) {
669 pa_assert(i);
670
671 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
672 pa_fdsem_after_poll(i->userdata);
673 }
674
675 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
676 pa_rtpoll_item *i;
677 struct pollfd *pollfd;
678
679 pa_assert(p);
680 pa_assert(f);
681
682 i = pa_rtpoll_item_new(p, prio, 1);
683
684 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
685
686 pollfd->fd = pa_fdsem_get(f);
687 pollfd->events = POLLIN;
688
689 i->before_cb = fdsem_before;
690 i->after_cb = fdsem_after;
691 i->userdata = f;
692
693 return i;
694 }
695
696 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
697 pa_assert(i);
698
699 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
700 return 1; /* 1 means immediate restart of the loop */
701
702 return 0;
703 }
704
705 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
706 pa_assert(i);
707
708 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
709 pa_asyncmsgq_read_after_poll(i->userdata);
710 }
711
712 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
713 pa_msgobject *object;
714 int code;
715 void *data;
716 pa_memchunk chunk;
717 int64_t offset;
718
719 pa_assert(i);
720
721 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
722 int ret;
723
724 if (!object && code == PA_MESSAGE_SHUTDOWN) {
725 pa_asyncmsgq_done(i->userdata, 0);
726 pa_rtpoll_quit(i->rtpoll);
727 return 1;
728 }
729
730 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
731 pa_asyncmsgq_done(i->userdata, ret);
732 return 1;
733 }
734
735 return 0;
736 }
737
738 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
739 pa_rtpoll_item *i;
740 struct pollfd *pollfd;
741
742 pa_assert(p);
743 pa_assert(q);
744
745 i = pa_rtpoll_item_new(p, prio, 1);
746
747 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
748 pollfd->fd = pa_asyncmsgq_read_fd(q);
749 pollfd->events = POLLIN;
750
751 i->before_cb = asyncmsgq_read_before;
752 i->after_cb = asyncmsgq_read_after;
753 i->work_cb = asyncmsgq_read_work;
754 i->userdata = q;
755
756 return i;
757 }
758
759 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
760 pa_assert(i);
761
762 pa_asyncmsgq_write_before_poll(i->userdata);
763 return 0;
764 }
765
766 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
767 pa_assert(i);
768
769 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
770 pa_asyncmsgq_write_after_poll(i->userdata);
771 }
772
773 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
774 pa_rtpoll_item *i;
775 struct pollfd *pollfd;
776
777 pa_assert(p);
778 pa_assert(q);
779
780 i = pa_rtpoll_item_new(p, prio, 1);
781
782 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
783 pollfd->fd = pa_asyncmsgq_write_fd(q);
784 pollfd->events = POLLIN;
785
786 i->before_cb = asyncmsgq_write_before;
787 i->after_cb = asyncmsgq_write_after;
788 i->work_cb = NULL;
789 i->userdata = q;
790
791 return i;
792 }
793
794 void pa_rtpoll_quit(pa_rtpoll *p) {
795 pa_assert(p);
796
797 p->quit = TRUE;
798 }