]> code.delx.au - pulseaudio/blob - src/pulsecore/rtpoll.c
add another assert to catch sleep time miscalculations easier
[pulseaudio] / src / pulsecore / rtpoll.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <sys/types.h>
30 #include <stdio.h>
31 #include <signal.h>
32 #include <string.h>
33 #include <errno.h>
34
35 #ifdef __linux__
36 #include <sys/utsname.h>
37 #endif
38
39 #ifdef HAVE_POLL_H
40 #include <poll.h>
41 #else
42 #include <pulsecore/poll.h>
43 #endif
44
45 #include <pulse/xmalloc.h>
46 #include <pulse/timeval.h>
47
48 #include <pulsecore/core-error.h>
49 #include <pulsecore/rtclock.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/llist.h>
52 #include <pulsecore/rtsig.h>
53 #include <pulsecore/flist.h>
54 #include <pulsecore/core-util.h>
55
56 #include <pulsecore/winsock.h>
57
58 #include "rtpoll.h"
59
60 struct pa_rtpoll {
61 struct pollfd *pollfd, *pollfd2;
62 unsigned n_pollfd_alloc, n_pollfd_used;
63
64 pa_bool_t timer_enabled;
65 struct timeval next_elapse;
66
67 pa_bool_t scan_for_dead;
68 pa_bool_t running, installed, rebuild_needed, quit;
69
70 #ifdef HAVE_PPOLL
71 int rtsig;
72 sigset_t sigset_unblocked;
73 timer_t timer;
74 pa_bool_t timer_armed;
75 #ifdef __linux__
76 pa_bool_t dont_use_ppoll;
77 #endif
78 #endif
79
80 PA_LLIST_HEAD(pa_rtpoll_item, items);
81 };
82
83 struct pa_rtpoll_item {
84 pa_rtpoll *rtpoll;
85 pa_bool_t dead;
86
87 pa_rtpoll_priority_t priority;
88
89 struct pollfd *pollfd;
90 unsigned n_pollfd;
91
92 int (*work_cb)(pa_rtpoll_item *i);
93 int (*before_cb)(pa_rtpoll_item *i);
94 void (*after_cb)(pa_rtpoll_item *i);
95 void *userdata;
96
97 PA_LLIST_FIELDS(pa_rtpoll_item);
98 };
99
100 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
101
102 static void signal_handler_noop(int s) { /* write(2, "signal\n", 7); */ }
103
104 pa_rtpoll *pa_rtpoll_new(void) {
105 pa_rtpoll *p;
106
107 p = pa_xnew(pa_rtpoll, 1);
108
109 #ifdef HAVE_PPOLL
110
111 #ifdef __linux__
112 /* ppoll is broken on Linux < 2.6.16 */
113 p->dont_use_ppoll = FALSE;
114
115 {
116 struct utsname u;
117 unsigned major, minor, micro;
118
119 pa_assert_se(uname(&u) == 0);
120
121 if (sscanf(u.release, "%u.%u.%u", &major, &minor, &micro) != 3 ||
122 (major < 2) ||
123 (major == 2 && minor < 6) ||
124 (major == 2 && minor == 6 && micro < 16))
125
126 p->dont_use_ppoll = TRUE;
127 }
128
129 #endif
130
131 p->rtsig = -1;
132 sigemptyset(&p->sigset_unblocked);
133 p->timer = (timer_t) -1;
134 p->timer_armed = FALSE;
135
136 #endif
137
138 p->n_pollfd_alloc = 32;
139 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
140 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
141 p->n_pollfd_used = 0;
142
143 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
144 p->timer_enabled = FALSE;
145
146 p->running = FALSE;
147 p->installed = FALSE;
148 p->scan_for_dead = FALSE;
149 p->rebuild_needed = FALSE;
150 p->quit = FALSE;
151
152 PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
153
154 return p;
155 }
156
157 void pa_rtpoll_install(pa_rtpoll *p) {
158 pa_assert(p);
159 pa_assert(!p->installed);
160
161 p->installed = 1;
162
163 #ifdef HAVE_PPOLL
164 # ifdef __linux__
165 if (p->dont_use_ppoll)
166 return;
167 # endif
168
169 if ((p->rtsig = pa_rtsig_get_for_thread()) < 0) {
170 pa_log_warn("Failed to reserve POSIX realtime signal.");
171 return;
172 }
173
174 pa_log_debug("Acquired POSIX realtime signal %s", pa_sig2str(p->rtsig));
175
176 {
177 sigset_t ss;
178 struct sigaction sa;
179
180 pa_assert_se(sigemptyset(&ss) == 0);
181 pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
182 pa_assert_se(pthread_sigmask(SIG_BLOCK, &ss, &p->sigset_unblocked) == 0);
183 pa_assert_se(sigdelset(&p->sigset_unblocked, p->rtsig) == 0);
184
185 memset(&sa, 0, sizeof(sa));
186 sa.sa_handler = signal_handler_noop;
187 pa_assert_se(sigemptyset(&sa.sa_mask) == 0);
188
189 pa_assert_se(sigaction(p->rtsig, &sa, NULL) == 0);
190
191 /* We never reset the signal handler. Why should we? */
192 }
193
194 #endif
195 }
196
197 static void rtpoll_rebuild(pa_rtpoll *p) {
198
199 struct pollfd *e, *t;
200 pa_rtpoll_item *i;
201 int ra = 0;
202
203 pa_assert(p);
204
205 p->rebuild_needed = FALSE;
206
207 if (p->n_pollfd_used > p->n_pollfd_alloc) {
208 /* Hmm, we have to allocate some more space */
209 p->n_pollfd_alloc = p->n_pollfd_used * 2;
210 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
211 ra = 1;
212 }
213
214 e = p->pollfd2;
215
216 for (i = p->items; i; i = i->next) {
217
218 if (i->n_pollfd > 0) {
219 size_t l = i->n_pollfd * sizeof(struct pollfd);
220
221 if (i->pollfd)
222 memcpy(e, i->pollfd, l);
223 else
224 memset(e, 0, l);
225
226 i->pollfd = e;
227 } else
228 i->pollfd = NULL;
229
230 e += i->n_pollfd;
231 }
232
233 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
234 t = p->pollfd;
235 p->pollfd = p->pollfd2;
236 p->pollfd2 = t;
237
238 if (ra)
239 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
240
241 }
242
243 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
244 pa_rtpoll *p;
245
246 pa_assert(i);
247
248 p = i->rtpoll;
249
250 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
251
252 p->n_pollfd_used -= i->n_pollfd;
253
254 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
255 pa_xfree(i);
256
257 p->rebuild_needed = TRUE;
258 }
259
260 void pa_rtpoll_free(pa_rtpoll *p) {
261 pa_assert(p);
262
263 while (p->items)
264 rtpoll_item_destroy(p->items);
265
266 pa_xfree(p->pollfd);
267 pa_xfree(p->pollfd2);
268
269 #ifdef HAVE_PPOLL
270 if (p->timer != (timer_t) -1)
271 timer_delete(p->timer);
272 #endif
273
274 pa_xfree(p);
275 }
276
277 static void reset_revents(pa_rtpoll_item *i) {
278 struct pollfd *f;
279 unsigned n;
280
281 pa_assert(i);
282
283 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
284 return;
285
286 for (; n > 0; n--)
287 f[n-1].revents = 0;
288 }
289
290 static void reset_all_revents(pa_rtpoll *p) {
291 pa_rtpoll_item *i;
292
293 pa_assert(p);
294
295 for (i = p->items; i; i = i->next) {
296
297 if (i->dead)
298 continue;
299
300 reset_revents(i);
301 }
302 }
303
304 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait) {
305 pa_rtpoll_item *i;
306 int r = 0;
307 struct timeval timeout;
308
309 pa_assert(p);
310 pa_assert(!p->running);
311 pa_assert(p->installed);
312
313 p->running = TRUE;
314
315 /* First, let's do some work */
316 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
317 int k;
318
319 if (i->dead)
320 continue;
321
322 if (!i->work_cb)
323 continue;
324
325 if (p->quit)
326 goto finish;
327
328 if ((k = i->work_cb(i)) != 0) {
329 if (k < 0)
330 r = k;
331
332 goto finish;
333 }
334 }
335
336 /* Now let's prepare for entering the sleep */
337 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
338 int k = 0;
339
340 if (i->dead)
341 continue;
342
343 if (!i->before_cb)
344 continue;
345
346 if (p->quit || (k = i->before_cb(i)) != 0) {
347
348 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
349
350 for (i = i->prev; i; i = i->prev) {
351
352 if (i->dead)
353 continue;
354
355 if (!i->after_cb)
356 continue;
357
358 i->after_cb(i);
359 }
360
361 if (k < 0)
362 r = k;
363
364 goto finish;
365 }
366 }
367
368 if (p->rebuild_needed)
369 rtpoll_rebuild(p);
370
371 memset(&timeout, 0, sizeof(timeout));
372
373 /* Calculate timeout */
374 if (wait && !p->quit && p->timer_enabled) {
375 struct timeval now;
376 pa_rtclock_get(&now);
377
378 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
379 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
380 }
381
382 /* OK, now let's sleep */
383 #ifdef HAVE_PPOLL
384
385 #ifdef __linux__
386 if (!p->dont_use_ppoll)
387 #endif
388 {
389 struct timespec ts;
390 ts.tv_sec = timeout.tv_sec;
391 ts.tv_nsec = timeout.tv_usec * 1000;
392 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? &ts : NULL, p->rtsig < 0 ? NULL : &p->sigset_unblocked);
393 }
394 #ifdef __linux__
395 else
396 #endif
397
398 #endif
399 r = poll(p->pollfd, p->n_pollfd_used, (!wait || p->quit || p->timer_enabled) ? (timeout.tv_sec*1000) + (timeout.tv_usec / 1000) : -1);
400
401 if (r < 0) {
402 if (errno == EAGAIN || errno == EINTR)
403 r = 0;
404 else
405 pa_log_error("poll(): %s", pa_cstrerror(errno));
406
407 reset_all_revents(p);
408 }
409
410 /* Let's tell everyone that we left the sleep */
411 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
412
413 if (i->dead)
414 continue;
415
416 if (!i->after_cb)
417 continue;
418
419 i->after_cb(i);
420 }
421
422 finish:
423
424 p->running = FALSE;
425
426 if (p->scan_for_dead) {
427 pa_rtpoll_item *n;
428
429 p->scan_for_dead = FALSE;
430
431 for (i = p->items; i; i = n) {
432 n = i->next;
433
434 if (i->dead)
435 rtpoll_item_destroy(i);
436 }
437 }
438
439 return r < 0 ? r : !p->quit;
440 }
441
442 static void update_timer(pa_rtpoll *p) {
443 pa_assert(p);
444
445 #ifdef HAVE_PPOLL
446
447 #ifdef __linux__
448 if (!p->dont_use_ppoll) {
449 #endif
450
451 if (p->timer == (timer_t) -1) {
452 struct sigevent se;
453
454 memset(&se, 0, sizeof(se));
455 se.sigev_notify = SIGEV_SIGNAL;
456 se.sigev_signo = p->rtsig;
457
458 if (timer_create(CLOCK_MONOTONIC, &se, &p->timer) < 0)
459 if (timer_create(CLOCK_REALTIME, &se, &p->timer) < 0) {
460 pa_log_warn("Failed to allocate POSIX timer: %s", pa_cstrerror(errno));
461 p->timer = (timer_t) -1;
462 }
463 }
464
465 if (p->timer != (timer_t) -1) {
466 struct itimerspec its;
467 struct timespec ts = { .tv_sec = 0, .tv_nsec = 0 };
468 sigset_t ss;
469
470 if (p->timer_armed) {
471 /* First disarm timer */
472 memset(&its, 0, sizeof(its));
473 pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
474
475 /* Remove a signal that might be waiting in the signal q */
476 pa_assert_se(sigemptyset(&ss) == 0);
477 pa_assert_se(sigaddset(&ss, p->rtsig) == 0);
478 sigtimedwait(&ss, NULL, &ts);
479 }
480
481 /* And install the new timer */
482 if (p->timer_enabled) {
483 memset(&its, 0, sizeof(its));
484
485 its.it_value.tv_sec = p->next_elapse.tv_sec;
486 its.it_value.tv_nsec = p->next_elapse.tv_usec*1000;
487
488 /* Make sure that 0,0 is not understood as
489 * "disarming" */
490 if (its.it_value.tv_sec == 0 && its.it_value.tv_nsec == 0)
491 its.it_value.tv_nsec = 1;
492 pa_assert_se(timer_settime(p->timer, TIMER_ABSTIME, &its, NULL) == 0);
493 }
494
495 p->timer_armed = p->timer_enabled;
496 }
497
498 #ifdef __linux__
499 }
500 #endif
501
502 #endif
503 }
504
505 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
506 pa_assert(p);
507
508 pa_timeval_store(&p->next_elapse, usec);
509 p->timer_enabled = TRUE;
510
511 update_timer(p);
512 }
513
514 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
515 pa_assert(p);
516
517 /* Scheduling a timeout for more than an hour is very very suspicious */
518 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
519
520 pa_rtclock_get(&p->next_elapse);
521 pa_timeval_add(&p->next_elapse, usec);
522 p->timer_enabled = TRUE;
523
524 update_timer(p);
525 }
526
527 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
528 pa_assert(p);
529
530 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
531 p->timer_enabled = FALSE;
532
533 update_timer(p);
534 }
535
536 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
537 pa_rtpoll_item *i, *j, *l = NULL;
538
539 pa_assert(p);
540
541 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
542 i = pa_xnew(pa_rtpoll_item, 1);
543
544 i->rtpoll = p;
545 i->dead = FALSE;
546 i->n_pollfd = n_fds;
547 i->pollfd = NULL;
548 i->priority = prio;
549
550 i->userdata = NULL;
551 i->before_cb = NULL;
552 i->after_cb = NULL;
553 i->work_cb = NULL;
554
555 for (j = p->items; j; j = j->next) {
556 if (prio <= j->priority)
557 break;
558
559 l = j;
560 }
561
562 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
563
564 if (n_fds > 0) {
565 p->rebuild_needed = 1;
566 p->n_pollfd_used += n_fds;
567 }
568
569 return i;
570 }
571
572 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
573 pa_assert(i);
574
575 if (i->rtpoll->running) {
576 i->dead = TRUE;
577 i->rtpoll->scan_for_dead = TRUE;
578 return;
579 }
580
581 rtpoll_item_destroy(i);
582 }
583
584 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
585 pa_assert(i);
586
587 if (i->n_pollfd > 0)
588 if (i->rtpoll->rebuild_needed)
589 rtpoll_rebuild(i->rtpoll);
590
591 if (n_fds)
592 *n_fds = i->n_pollfd;
593
594 return i->pollfd;
595 }
596
597 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
598 pa_assert(i);
599 pa_assert(i->priority < PA_RTPOLL_NEVER);
600
601 i->before_cb = before_cb;
602 }
603
604 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
605 pa_assert(i);
606 pa_assert(i->priority < PA_RTPOLL_NEVER);
607
608 i->after_cb = after_cb;
609 }
610
611 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
612 pa_assert(i);
613 pa_assert(i->priority < PA_RTPOLL_NEVER);
614
615 i->work_cb = work_cb;
616 }
617
618 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
619 pa_assert(i);
620
621 i->userdata = userdata;
622 }
623
624 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
625 pa_assert(i);
626
627 return i->userdata;
628 }
629
630 static int fdsem_before(pa_rtpoll_item *i) {
631
632 if (pa_fdsem_before_poll(i->userdata) < 0)
633 return 1; /* 1 means immediate restart of the loop */
634
635 return 0;
636 }
637
638 static void fdsem_after(pa_rtpoll_item *i) {
639 pa_assert(i);
640
641 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
642 pa_fdsem_after_poll(i->userdata);
643 }
644
645 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
646 pa_rtpoll_item *i;
647 struct pollfd *pollfd;
648
649 pa_assert(p);
650 pa_assert(f);
651
652 i = pa_rtpoll_item_new(p, prio, 1);
653
654 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
655
656 pollfd->fd = pa_fdsem_get(f);
657 pollfd->events = POLLIN;
658
659 i->before_cb = fdsem_before;
660 i->after_cb = fdsem_after;
661 i->userdata = f;
662
663 return i;
664 }
665
666 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
667 pa_assert(i);
668
669 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
670 return 1; /* 1 means immediate restart of the loop */
671
672 return 0;
673 }
674
675 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
676 pa_assert(i);
677
678 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
679 pa_asyncmsgq_read_after_poll(i->userdata);
680 }
681
682 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
683 pa_msgobject *object;
684 int code;
685 void *data;
686 pa_memchunk chunk;
687 int64_t offset;
688
689 pa_assert(i);
690
691 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
692 int ret;
693
694 if (!object && code == PA_MESSAGE_SHUTDOWN) {
695 pa_asyncmsgq_done(i->userdata, 0);
696 pa_rtpoll_quit(i->rtpoll);
697 return 1;
698 }
699
700 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
701 pa_asyncmsgq_done(i->userdata, ret);
702 return 1;
703 }
704
705 return 0;
706 }
707
708 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
709 pa_rtpoll_item *i;
710 struct pollfd *pollfd;
711
712 pa_assert(p);
713 pa_assert(q);
714
715 i = pa_rtpoll_item_new(p, prio, 1);
716
717 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
718 pollfd->fd = pa_asyncmsgq_read_fd(q);
719 pollfd->events = POLLIN;
720
721 i->before_cb = asyncmsgq_read_before;
722 i->after_cb = asyncmsgq_read_after;
723 i->work_cb = asyncmsgq_read_work;
724 i->userdata = q;
725
726 return i;
727 }
728
729 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
730 pa_assert(i);
731
732 pa_asyncmsgq_write_before_poll(i->userdata);
733 return 0;
734 }
735
736 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
737 pa_assert(i);
738
739 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
740 pa_asyncmsgq_write_after_poll(i->userdata);
741 }
742
743 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
744 pa_rtpoll_item *i;
745 struct pollfd *pollfd;
746
747 pa_assert(p);
748 pa_assert(q);
749
750 i = pa_rtpoll_item_new(p, prio, 1);
751
752 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
753 pollfd->fd = pa_asyncmsgq_write_fd(q);
754 pollfd->events = POLLIN;
755
756 i->before_cb = asyncmsgq_write_before;
757 i->after_cb = asyncmsgq_write_after;
758 i->work_cb = NULL;
759 i->userdata = q;
760
761 return i;
762 }
763
764 void pa_rtpoll_quit(pa_rtpoll *p) {
765 pa_assert(p);
766
767 p->quit = TRUE;
768 }