]> code.delx.au - pulseaudio/blob - src/pulsecore/rtpoll.c
alsa: automatically decrease watermark after a time of stability
[pulseaudio] / src / pulsecore / rtpoll.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
32
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #else
36 #include <pulsecore/poll.h>
37 #endif
38
39 #include <pulse/xmalloc.h>
40 #include <pulse/timeval.h>
41
42 #include <pulsecore/core-error.h>
43 #include <pulsecore/core-rtclock.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/llist.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/winsock.h>
49 #include <pulsecore/ratelimit.h>
50
51 #include "rtpoll.h"
52
53 /* #define DEBUG_TIMING */
54
55 struct pa_rtpoll {
56 struct pollfd *pollfd, *pollfd2;
57 unsigned n_pollfd_alloc, n_pollfd_used;
58
59 struct timeval next_elapse;
60 pa_bool_t timer_enabled:1;
61
62 pa_bool_t scan_for_dead:1;
63 pa_bool_t running:1;
64 pa_bool_t rebuild_needed:1;
65 pa_bool_t quit:1;
66 pa_bool_t timer_elapsed:1;
67
68 #ifdef DEBUG_TIMING
69 pa_usec_t timestamp;
70 pa_usec_t slept, awake;
71 #endif
72
73 PA_LLIST_HEAD(pa_rtpoll_item, items);
74 };
75
76 struct pa_rtpoll_item {
77 pa_rtpoll *rtpoll;
78 pa_bool_t dead;
79
80 pa_rtpoll_priority_t priority;
81
82 struct pollfd *pollfd;
83 unsigned n_pollfd;
84
85 int (*work_cb)(pa_rtpoll_item *i);
86 int (*before_cb)(pa_rtpoll_item *i);
87 void (*after_cb)(pa_rtpoll_item *i);
88 void *userdata;
89
90 PA_LLIST_FIELDS(pa_rtpoll_item);
91 };
92
93 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
94
95 pa_rtpoll *pa_rtpoll_new(void) {
96 pa_rtpoll *p;
97
98 p = pa_xnew0(pa_rtpoll, 1);
99
100 p->n_pollfd_alloc = 32;
101 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
102 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
103
104 #ifdef DEBUG_TIMING
105 p->timestamp = pa_rtclock_now();
106 #endif
107
108 return p;
109 }
110
111 static void rtpoll_rebuild(pa_rtpoll *p) {
112
113 struct pollfd *e, *t;
114 pa_rtpoll_item *i;
115 int ra = 0;
116
117 pa_assert(p);
118
119 p->rebuild_needed = FALSE;
120
121 if (p->n_pollfd_used > p->n_pollfd_alloc) {
122 /* Hmm, we have to allocate some more space */
123 p->n_pollfd_alloc = p->n_pollfd_used * 2;
124 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
125 ra = 1;
126 }
127
128 e = p->pollfd2;
129
130 for (i = p->items; i; i = i->next) {
131
132 if (i->n_pollfd > 0) {
133 size_t l = i->n_pollfd * sizeof(struct pollfd);
134
135 if (i->pollfd)
136 memcpy(e, i->pollfd, l);
137 else
138 memset(e, 0, l);
139
140 i->pollfd = e;
141 } else
142 i->pollfd = NULL;
143
144 e += i->n_pollfd;
145 }
146
147 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
148 t = p->pollfd;
149 p->pollfd = p->pollfd2;
150 p->pollfd2 = t;
151
152 if (ra)
153 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
154 }
155
156 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
157 pa_rtpoll *p;
158
159 pa_assert(i);
160
161 p = i->rtpoll;
162
163 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
164
165 p->n_pollfd_used -= i->n_pollfd;
166
167 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
168 pa_xfree(i);
169
170 p->rebuild_needed = TRUE;
171 }
172
173 void pa_rtpoll_free(pa_rtpoll *p) {
174 pa_assert(p);
175
176 while (p->items)
177 rtpoll_item_destroy(p->items);
178
179 pa_xfree(p->pollfd);
180 pa_xfree(p->pollfd2);
181
182 pa_xfree(p);
183 }
184
185 static void reset_revents(pa_rtpoll_item *i) {
186 struct pollfd *f;
187 unsigned n;
188
189 pa_assert(i);
190
191 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
192 return;
193
194 for (; n > 0; n--)
195 f[n-1].revents = 0;
196 }
197
198 static void reset_all_revents(pa_rtpoll *p) {
199 pa_rtpoll_item *i;
200
201 pa_assert(p);
202
203 for (i = p->items; i; i = i->next) {
204
205 if (i->dead)
206 continue;
207
208 reset_revents(i);
209 }
210 }
211
212 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait_op) {
213 pa_rtpoll_item *i;
214 int r = 0;
215 struct timeval timeout;
216
217 pa_assert(p);
218 pa_assert(!p->running);
219
220 p->running = TRUE;
221 p->timer_elapsed = FALSE;
222
223 /* First, let's do some work */
224 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
225 int k;
226
227 if (i->dead)
228 continue;
229
230 if (!i->work_cb)
231 continue;
232
233 if (p->quit)
234 goto finish;
235
236 if ((k = i->work_cb(i)) != 0) {
237 if (k < 0)
238 r = k;
239
240 goto finish;
241 }
242 }
243
244 /* Now let's prepare for entering the sleep */
245 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
246 int k = 0;
247
248 if (i->dead)
249 continue;
250
251 if (!i->before_cb)
252 continue;
253
254 if (p->quit || (k = i->before_cb(i)) != 0) {
255
256 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
257
258 for (i = i->prev; i; i = i->prev) {
259
260 if (i->dead)
261 continue;
262
263 if (!i->after_cb)
264 continue;
265
266 i->after_cb(i);
267 }
268
269 if (k < 0)
270 r = k;
271
272 goto finish;
273 }
274 }
275
276 if (p->rebuild_needed)
277 rtpoll_rebuild(p);
278
279 pa_zero(timeout);
280
281 /* Calculate timeout */
282 if (wait_op && !p->quit && p->timer_enabled) {
283 struct timeval now;
284 pa_rtclock_get(&now);
285
286 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
287 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
288 }
289
290 #ifdef DEBUG_TIMING
291 {
292 pa_usec_t now = pa_rtclock_now();
293 p->awake = now - p->timestamp;
294 p->timestamp = now;
295 }
296 #endif
297
298 /* OK, now let's sleep */
299 #ifdef HAVE_PPOLL
300 {
301 struct timespec ts;
302 ts.tv_sec = timeout.tv_sec;
303 ts.tv_nsec = timeout.tv_usec * 1000;
304 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? &ts : NULL, NULL);
305 }
306 #else
307 r = poll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
308 #endif
309
310 p->timer_elapsed = r == 0;
311
312 #ifdef DEBUG_TIMING
313 {
314 pa_usec_t now = pa_rtclock_now();
315 p->slept = now - p->timestamp;
316 p->timestamp = now;
317
318 pa_log("Process time %llu ms; sleep time %llu ms",
319 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
320 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
321 }
322 #endif
323
324 if (r < 0) {
325 if (errno == EAGAIN || errno == EINTR)
326 r = 0;
327 else
328 pa_log_error("poll(): %s", pa_cstrerror(errno));
329
330 reset_all_revents(p);
331 }
332
333 /* Let's tell everyone that we left the sleep */
334 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
335
336 if (i->dead)
337 continue;
338
339 if (!i->after_cb)
340 continue;
341
342 i->after_cb(i);
343 }
344
345 finish:
346
347 p->running = FALSE;
348
349 if (p->scan_for_dead) {
350 pa_rtpoll_item *n;
351
352 p->scan_for_dead = FALSE;
353
354 for (i = p->items; i; i = n) {
355 n = i->next;
356
357 if (i->dead)
358 rtpoll_item_destroy(i);
359 }
360 }
361
362 return r < 0 ? r : !p->quit;
363 }
364
365 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
366 pa_assert(p);
367
368 pa_timeval_store(&p->next_elapse, usec);
369 p->timer_enabled = TRUE;
370 }
371
372 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
373 pa_assert(p);
374
375 /* Scheduling a timeout for more than an hour is very very suspicious */
376 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
377
378 pa_rtclock_get(&p->next_elapse);
379 pa_timeval_add(&p->next_elapse, usec);
380 p->timer_enabled = TRUE;
381 }
382
383 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
384 pa_assert(p);
385
386 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
387 p->timer_enabled = FALSE;
388 }
389
390 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
391 pa_rtpoll_item *i, *j, *l = NULL;
392
393 pa_assert(p);
394
395 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
396 i = pa_xnew(pa_rtpoll_item, 1);
397
398 i->rtpoll = p;
399 i->dead = FALSE;
400 i->n_pollfd = n_fds;
401 i->pollfd = NULL;
402 i->priority = prio;
403
404 i->userdata = NULL;
405 i->before_cb = NULL;
406 i->after_cb = NULL;
407 i->work_cb = NULL;
408
409 for (j = p->items; j; j = j->next) {
410 if (prio <= j->priority)
411 break;
412
413 l = j;
414 }
415
416 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
417
418 if (n_fds > 0) {
419 p->rebuild_needed = 1;
420 p->n_pollfd_used += n_fds;
421 }
422
423 return i;
424 }
425
426 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
427 pa_assert(i);
428
429 if (i->rtpoll->running) {
430 i->dead = TRUE;
431 i->rtpoll->scan_for_dead = TRUE;
432 return;
433 }
434
435 rtpoll_item_destroy(i);
436 }
437
438 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
439 pa_assert(i);
440
441 if (i->n_pollfd > 0)
442 if (i->rtpoll->rebuild_needed)
443 rtpoll_rebuild(i->rtpoll);
444
445 if (n_fds)
446 *n_fds = i->n_pollfd;
447
448 return i->pollfd;
449 }
450
451 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
452 pa_assert(i);
453 pa_assert(i->priority < PA_RTPOLL_NEVER);
454
455 i->before_cb = before_cb;
456 }
457
458 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
459 pa_assert(i);
460 pa_assert(i->priority < PA_RTPOLL_NEVER);
461
462 i->after_cb = after_cb;
463 }
464
465 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
466 pa_assert(i);
467 pa_assert(i->priority < PA_RTPOLL_NEVER);
468
469 i->work_cb = work_cb;
470 }
471
472 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
473 pa_assert(i);
474
475 i->userdata = userdata;
476 }
477
478 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
479 pa_assert(i);
480
481 return i->userdata;
482 }
483
484 static int fdsem_before(pa_rtpoll_item *i) {
485
486 if (pa_fdsem_before_poll(i->userdata) < 0)
487 return 1; /* 1 means immediate restart of the loop */
488
489 return 0;
490 }
491
492 static void fdsem_after(pa_rtpoll_item *i) {
493 pa_assert(i);
494
495 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
496 pa_fdsem_after_poll(i->userdata);
497 }
498
499 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
500 pa_rtpoll_item *i;
501 struct pollfd *pollfd;
502
503 pa_assert(p);
504 pa_assert(f);
505
506 i = pa_rtpoll_item_new(p, prio, 1);
507
508 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
509
510 pollfd->fd = pa_fdsem_get(f);
511 pollfd->events = POLLIN;
512
513 i->before_cb = fdsem_before;
514 i->after_cb = fdsem_after;
515 i->userdata = f;
516
517 return i;
518 }
519
520 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
521 pa_assert(i);
522
523 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
524 return 1; /* 1 means immediate restart of the loop */
525
526 return 0;
527 }
528
529 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
530 pa_assert(i);
531
532 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
533 pa_asyncmsgq_read_after_poll(i->userdata);
534 }
535
536 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
537 pa_msgobject *object;
538 int code;
539 void *data;
540 pa_memchunk chunk;
541 int64_t offset;
542
543 pa_assert(i);
544
545 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
546 int ret;
547
548 if (!object && code == PA_MESSAGE_SHUTDOWN) {
549 pa_asyncmsgq_done(i->userdata, 0);
550 pa_rtpoll_quit(i->rtpoll);
551 return 1;
552 }
553
554 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
555 pa_asyncmsgq_done(i->userdata, ret);
556 return 1;
557 }
558
559 return 0;
560 }
561
562 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
563 pa_rtpoll_item *i;
564 struct pollfd *pollfd;
565
566 pa_assert(p);
567 pa_assert(q);
568
569 i = pa_rtpoll_item_new(p, prio, 1);
570
571 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
572 pollfd->fd = pa_asyncmsgq_read_fd(q);
573 pollfd->events = POLLIN;
574
575 i->before_cb = asyncmsgq_read_before;
576 i->after_cb = asyncmsgq_read_after;
577 i->work_cb = asyncmsgq_read_work;
578 i->userdata = q;
579
580 return i;
581 }
582
583 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
584 pa_assert(i);
585
586 pa_asyncmsgq_write_before_poll(i->userdata);
587 return 0;
588 }
589
590 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
591 pa_assert(i);
592
593 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
594 pa_asyncmsgq_write_after_poll(i->userdata);
595 }
596
597 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
598 pa_rtpoll_item *i;
599 struct pollfd *pollfd;
600
601 pa_assert(p);
602 pa_assert(q);
603
604 i = pa_rtpoll_item_new(p, prio, 1);
605
606 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
607 pollfd->fd = pa_asyncmsgq_write_fd(q);
608 pollfd->events = POLLIN;
609
610 i->before_cb = asyncmsgq_write_before;
611 i->after_cb = asyncmsgq_write_after;
612 i->work_cb = NULL;
613 i->userdata = q;
614
615 return i;
616 }
617
618 void pa_rtpoll_quit(pa_rtpoll *p) {
619 pa_assert(p);
620
621 p->quit = TRUE;
622 }
623
624 pa_bool_t pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
625 pa_assert(p);
626
627 return p->timer_elapsed;
628 }