]> code.delx.au - pulseaudio/blob - src/pulsecore/rtpoll.c
Use <pulsecore/socket.h> instead of <sys/socket.h>
[pulseaudio] / src / pulsecore / rtpoll.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #include <pulse/xmalloc.h>
34 #include <pulse/timeval.h>
35
36 #include <pulsecore/poll.h>
37 #include <pulsecore/core-error.h>
38 #include <pulsecore/core-rtclock.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/llist.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/ratelimit.h>
44 #include <pulse/rtclock.h>
45
46 #include "rtpoll.h"
47
48 /* #define DEBUG_TIMING */
49
50 struct pa_rtpoll {
51 struct pollfd *pollfd, *pollfd2;
52 unsigned n_pollfd_alloc, n_pollfd_used;
53
54 struct timeval next_elapse;
55 pa_bool_t timer_enabled:1;
56
57 pa_bool_t scan_for_dead:1;
58 pa_bool_t running:1;
59 pa_bool_t rebuild_needed:1;
60 pa_bool_t quit:1;
61 pa_bool_t timer_elapsed:1;
62
63 #ifdef DEBUG_TIMING
64 pa_usec_t timestamp;
65 pa_usec_t slept, awake;
66 #endif
67
68 PA_LLIST_HEAD(pa_rtpoll_item, items);
69 };
70
71 struct pa_rtpoll_item {
72 pa_rtpoll *rtpoll;
73 pa_bool_t dead;
74
75 pa_rtpoll_priority_t priority;
76
77 struct pollfd *pollfd;
78 unsigned n_pollfd;
79
80 int (*work_cb)(pa_rtpoll_item *i);
81 int (*before_cb)(pa_rtpoll_item *i);
82 void (*after_cb)(pa_rtpoll_item *i);
83 void *userdata;
84
85 PA_LLIST_FIELDS(pa_rtpoll_item);
86 };
87
88 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89
90 pa_rtpoll *pa_rtpoll_new(void) {
91 pa_rtpoll *p;
92
93 p = pa_xnew0(pa_rtpoll, 1);
94
95 p->n_pollfd_alloc = 32;
96 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
97 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
98
99 #ifdef DEBUG_TIMING
100 p->timestamp = pa_rtclock_now();
101 #endif
102
103 return p;
104 }
105
106 static void rtpoll_rebuild(pa_rtpoll *p) {
107
108 struct pollfd *e, *t;
109 pa_rtpoll_item *i;
110 int ra = 0;
111
112 pa_assert(p);
113
114 p->rebuild_needed = FALSE;
115
116 if (p->n_pollfd_used > p->n_pollfd_alloc) {
117 /* Hmm, we have to allocate some more space */
118 p->n_pollfd_alloc = p->n_pollfd_used * 2;
119 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
120 ra = 1;
121 }
122
123 e = p->pollfd2;
124
125 for (i = p->items; i; i = i->next) {
126
127 if (i->n_pollfd > 0) {
128 size_t l = i->n_pollfd * sizeof(struct pollfd);
129
130 if (i->pollfd)
131 memcpy(e, i->pollfd, l);
132 else
133 memset(e, 0, l);
134
135 i->pollfd = e;
136 } else
137 i->pollfd = NULL;
138
139 e += i->n_pollfd;
140 }
141
142 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
143 t = p->pollfd;
144 p->pollfd = p->pollfd2;
145 p->pollfd2 = t;
146
147 if (ra)
148 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
149 }
150
151 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
152 pa_rtpoll *p;
153
154 pa_assert(i);
155
156 p = i->rtpoll;
157
158 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
159
160 p->n_pollfd_used -= i->n_pollfd;
161
162 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
163 pa_xfree(i);
164
165 p->rebuild_needed = TRUE;
166 }
167
168 void pa_rtpoll_free(pa_rtpoll *p) {
169 pa_assert(p);
170
171 while (p->items)
172 rtpoll_item_destroy(p->items);
173
174 pa_xfree(p->pollfd);
175 pa_xfree(p->pollfd2);
176
177 pa_xfree(p);
178 }
179
180 static void reset_revents(pa_rtpoll_item *i) {
181 struct pollfd *f;
182 unsigned n;
183
184 pa_assert(i);
185
186 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
187 return;
188
189 for (; n > 0; n--)
190 f[n-1].revents = 0;
191 }
192
193 static void reset_all_revents(pa_rtpoll *p) {
194 pa_rtpoll_item *i;
195
196 pa_assert(p);
197
198 for (i = p->items; i; i = i->next) {
199
200 if (i->dead)
201 continue;
202
203 reset_revents(i);
204 }
205 }
206
207 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait_op) {
208 pa_rtpoll_item *i;
209 int r = 0;
210 struct timeval timeout;
211
212 pa_assert(p);
213 pa_assert(!p->running);
214
215 #ifdef DEBUG_TIMING
216 pa_log("rtpoll_run");
217 #endif
218
219 p->running = TRUE;
220 p->timer_elapsed = FALSE;
221
222 /* First, let's do some work */
223 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
224 int k;
225
226 if (i->dead)
227 continue;
228
229 if (!i->work_cb)
230 continue;
231
232 if (p->quit) {
233 #ifdef DEBUG_TIMING
234 pa_log("rtpoll finish");
235 #endif
236 goto finish;
237 }
238
239 if ((k = i->work_cb(i)) != 0) {
240 if (k < 0)
241 r = k;
242 #ifdef DEBUG_TIMING
243 pa_log("rtpoll finish");
244 #endif
245 goto finish;
246 }
247 }
248
249 /* Now let's prepare for entering the sleep */
250 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
251 int k = 0;
252
253 if (i->dead)
254 continue;
255
256 if (!i->before_cb)
257 continue;
258
259 if (p->quit || (k = i->before_cb(i)) != 0) {
260
261 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
262
263 for (i = i->prev; i; i = i->prev) {
264
265 if (i->dead)
266 continue;
267
268 if (!i->after_cb)
269 continue;
270
271 i->after_cb(i);
272 }
273
274 if (k < 0)
275 r = k;
276 #ifdef DEBUG_TIMING
277 pa_log("rtpoll finish");
278 #endif
279 goto finish;
280 }
281 }
282
283 if (p->rebuild_needed)
284 rtpoll_rebuild(p);
285
286 pa_zero(timeout);
287
288 /* Calculate timeout */
289 if (wait_op && !p->quit && p->timer_enabled) {
290 struct timeval now;
291 pa_rtclock_get(&now);
292
293 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
294 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
295 }
296
297 #ifdef DEBUG_TIMING
298 {
299 pa_usec_t now = pa_rtclock_now();
300 p->awake = now - p->timestamp;
301 p->timestamp = now;
302 if (!wait_op || p->quit || p->timer_enabled)
303 pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
304 else
305 pa_log("poll timeout is ZERO");
306 }
307 #endif
308
309 /* OK, now let's sleep */
310 #ifdef HAVE_PPOLL
311 {
312 struct timespec ts;
313 ts.tv_sec = timeout.tv_sec;
314 ts.tv_nsec = timeout.tv_usec * 1000;
315 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? &ts : NULL, NULL);
316 }
317 #else
318 r = pa_poll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
319 #endif
320
321 p->timer_elapsed = r == 0;
322
323 #ifdef DEBUG_TIMING
324 {
325 pa_usec_t now = pa_rtclock_now();
326 p->slept = now - p->timestamp;
327 p->timestamp = now;
328
329 pa_log("Process time %llu ms; sleep time %llu ms",
330 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
331 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
332 }
333 #endif
334
335 if (r < 0) {
336 if (errno == EAGAIN || errno == EINTR)
337 r = 0;
338 else
339 pa_log_error("poll(): %s", pa_cstrerror(errno));
340
341 reset_all_revents(p);
342 }
343
344 /* Let's tell everyone that we left the sleep */
345 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
346
347 if (i->dead)
348 continue;
349
350 if (!i->after_cb)
351 continue;
352
353 i->after_cb(i);
354 }
355
356 finish:
357
358 p->running = FALSE;
359
360 if (p->scan_for_dead) {
361 pa_rtpoll_item *n;
362
363 p->scan_for_dead = FALSE;
364
365 for (i = p->items; i; i = n) {
366 n = i->next;
367
368 if (i->dead)
369 rtpoll_item_destroy(i);
370 }
371 }
372
373 return r < 0 ? r : !p->quit;
374 }
375
376 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
377 pa_assert(p);
378
379 pa_timeval_store(&p->next_elapse, usec);
380 p->timer_enabled = TRUE;
381 }
382
383 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
384 pa_assert(p);
385
386 /* Scheduling a timeout for more than an hour is very very suspicious */
387 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
388
389 pa_rtclock_get(&p->next_elapse);
390 pa_timeval_add(&p->next_elapse, usec);
391 p->timer_enabled = TRUE;
392 }
393
394 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
395 pa_assert(p);
396
397 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
398 p->timer_enabled = FALSE;
399 }
400
401 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
402 pa_rtpoll_item *i, *j, *l = NULL;
403
404 pa_assert(p);
405
406 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
407 i = pa_xnew(pa_rtpoll_item, 1);
408
409 i->rtpoll = p;
410 i->dead = FALSE;
411 i->n_pollfd = n_fds;
412 i->pollfd = NULL;
413 i->priority = prio;
414
415 i->userdata = NULL;
416 i->before_cb = NULL;
417 i->after_cb = NULL;
418 i->work_cb = NULL;
419
420 for (j = p->items; j; j = j->next) {
421 if (prio <= j->priority)
422 break;
423
424 l = j;
425 }
426
427 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
428
429 if (n_fds > 0) {
430 p->rebuild_needed = 1;
431 p->n_pollfd_used += n_fds;
432 }
433
434 return i;
435 }
436
437 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
438 pa_assert(i);
439
440 if (i->rtpoll->running) {
441 i->dead = TRUE;
442 i->rtpoll->scan_for_dead = TRUE;
443 return;
444 }
445
446 rtpoll_item_destroy(i);
447 }
448
449 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
450 pa_assert(i);
451
452 if (i->n_pollfd > 0)
453 if (i->rtpoll->rebuild_needed)
454 rtpoll_rebuild(i->rtpoll);
455
456 if (n_fds)
457 *n_fds = i->n_pollfd;
458
459 return i->pollfd;
460 }
461
462 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
463 pa_assert(i);
464 pa_assert(i->priority < PA_RTPOLL_NEVER);
465
466 i->before_cb = before_cb;
467 }
468
469 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
470 pa_assert(i);
471 pa_assert(i->priority < PA_RTPOLL_NEVER);
472
473 i->after_cb = after_cb;
474 }
475
476 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
477 pa_assert(i);
478 pa_assert(i->priority < PA_RTPOLL_NEVER);
479
480 i->work_cb = work_cb;
481 }
482
483 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
484 pa_assert(i);
485
486 i->userdata = userdata;
487 }
488
489 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
490 pa_assert(i);
491
492 return i->userdata;
493 }
494
495 static int fdsem_before(pa_rtpoll_item *i) {
496
497 if (pa_fdsem_before_poll(i->userdata) < 0)
498 return 1; /* 1 means immediate restart of the loop */
499
500 return 0;
501 }
502
503 static void fdsem_after(pa_rtpoll_item *i) {
504 pa_assert(i);
505
506 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
507 pa_fdsem_after_poll(i->userdata);
508 }
509
510 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
511 pa_rtpoll_item *i;
512 struct pollfd *pollfd;
513
514 pa_assert(p);
515 pa_assert(f);
516
517 i = pa_rtpoll_item_new(p, prio, 1);
518
519 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
520
521 pollfd->fd = pa_fdsem_get(f);
522 pollfd->events = POLLIN;
523
524 i->before_cb = fdsem_before;
525 i->after_cb = fdsem_after;
526 i->userdata = f;
527
528 return i;
529 }
530
531 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
532 pa_assert(i);
533
534 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
535 return 1; /* 1 means immediate restart of the loop */
536
537 return 0;
538 }
539
540 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
541 pa_assert(i);
542
543 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
544 pa_asyncmsgq_read_after_poll(i->userdata);
545 }
546
547 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
548 pa_msgobject *object;
549 int code;
550 void *data;
551 pa_memchunk chunk;
552 int64_t offset;
553
554 pa_assert(i);
555
556 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
557 int ret;
558
559 if (!object && code == PA_MESSAGE_SHUTDOWN) {
560 pa_asyncmsgq_done(i->userdata, 0);
561 pa_rtpoll_quit(i->rtpoll);
562 return 1;
563 }
564
565 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
566 pa_asyncmsgq_done(i->userdata, ret);
567 return 1;
568 }
569
570 return 0;
571 }
572
573 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
574 pa_rtpoll_item *i;
575 struct pollfd *pollfd;
576
577 pa_assert(p);
578 pa_assert(q);
579
580 i = pa_rtpoll_item_new(p, prio, 1);
581
582 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
583 pollfd->fd = pa_asyncmsgq_read_fd(q);
584 pollfd->events = POLLIN;
585
586 i->before_cb = asyncmsgq_read_before;
587 i->after_cb = asyncmsgq_read_after;
588 i->work_cb = asyncmsgq_read_work;
589 i->userdata = q;
590
591 return i;
592 }
593
594 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
595 pa_assert(i);
596
597 pa_asyncmsgq_write_before_poll(i->userdata);
598 return 0;
599 }
600
601 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
602 pa_assert(i);
603
604 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
605 pa_asyncmsgq_write_after_poll(i->userdata);
606 }
607
608 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
609 pa_rtpoll_item *i;
610 struct pollfd *pollfd;
611
612 pa_assert(p);
613 pa_assert(q);
614
615 i = pa_rtpoll_item_new(p, prio, 1);
616
617 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
618 pollfd->fd = pa_asyncmsgq_write_fd(q);
619 pollfd->events = POLLIN;
620
621 i->before_cb = asyncmsgq_write_before;
622 i->after_cb = asyncmsgq_write_after;
623 i->work_cb = NULL;
624 i->userdata = q;
625
626 return i;
627 }
628
629 void pa_rtpoll_quit(pa_rtpoll *p) {
630 pa_assert(p);
631
632 p->quit = TRUE;
633 }
634
635 pa_bool_t pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
636 pa_assert(p);
637
638 return p->timer_elapsed;
639 }