]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
really create glitch-free branch
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54
55 #include "module-rtp-recv-symdef.h"
56
57 #include "rtp.h"
58 #include "sdp.h"
59 #include "sap.h"
60
61 PA_MODULE_AUTHOR("Lennart Poettering");
62 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP");
63 PA_MODULE_VERSION(PACKAGE_VERSION);
64 PA_MODULE_LOAD_ONCE(FALSE);
65 PA_MODULE_USAGE(
66 "sink=<name of the sink> "
67 "sap_address=<multicast address to listen on> "
68 );
69
70 #define SAP_PORT 9875
71 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
72 #define MEMBLOCKQ_MAXLENGTH (1024*170)
73 #define MAX_SESSIONS 16
74 #define DEATH_TIMEOUT 20
75
76 static const char* const valid_modargs[] = {
77 "sink",
78 "sap_address",
79 NULL
80 };
81
82 struct session {
83 struct userdata *userdata;
84 PA_LLIST_FIELDS(struct session);
85
86 pa_sink_input *sink_input;
87 pa_memblockq *memblockq;
88
89 pa_bool_t first_packet;
90 uint32_t ssrc;
91 uint32_t offset;
92
93 struct pa_sdp_info sdp_info;
94
95 pa_rtp_context rtp_context;
96
97 pa_rtpoll_item *rtpoll_item;
98
99 pa_atomic_t timestamp;
100 };
101
102 struct userdata {
103 pa_module *module;
104
105 pa_sap_context sap_context;
106 pa_io_event* sap_event;
107
108 pa_time_event *check_death_event;
109
110 char *sink_name;
111
112 PA_LLIST_HEAD(struct session, sessions);
113 pa_hashmap *by_origin;
114 int n_sessions;
115 };
116
117 static void session_free(struct session *s);
118
119 /* Called from I/O thread context */
120 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
121 struct session *s = PA_SINK_INPUT(o)->userdata;
122
123 switch (code) {
124 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
125 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
126
127 /* Fall through, the default handler will add in the extra
128 * latency added by the resampler */
129 break;
130 }
131
132 return pa_sink_input_process_msg(o, code, data, offset, chunk);
133 }
134
135 /* Called from I/O thread context */
136 static int sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
137 struct session *s;
138 pa_sink_input_assert_ref(i);
139 pa_assert_se(s = i->userdata);
140
141 return pa_memblockq_peek(s->memblockq, chunk);
142 }
143
144 /* Called from I/O thread context */
145 static void sink_input_drop(pa_sink_input *i, size_t length) {
146 struct session *s;
147 pa_sink_input_assert_ref(i);
148 pa_assert_se(s = i->userdata);
149
150 pa_memblockq_drop(s->memblockq, length);
151 }
152
153 /* Called from main context */
154 static void sink_input_kill(pa_sink_input* i) {
155 struct session *s;
156 pa_sink_input_assert_ref(i);
157 pa_assert_se(s = i->userdata);
158
159 session_free(s);
160 }
161
162 /* Called from I/O thread context */
163 static int rtpoll_work_cb(pa_rtpoll_item *i) {
164 pa_memchunk chunk;
165 int64_t k, j, delta;
166 struct timeval now;
167 struct session *s;
168 struct pollfd *p;
169
170 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
171
172 p = pa_rtpoll_item_get_pollfd(i, NULL);
173
174 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
175 pa_log("poll() signalled bad revents.");
176 return -1;
177 }
178
179 if ((p->revents & POLLIN) == 0)
180 return 0;
181
182 p->revents = 0;
183
184 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool) < 0)
185 return 0;
186
187 if (s->sdp_info.payload != s->rtp_context.payload) {
188 pa_memblock_unref(chunk.memblock);
189 return 0;
190 }
191
192 if (!s->first_packet) {
193 s->first_packet = TRUE;
194
195 s->ssrc = s->rtp_context.ssrc;
196 s->offset = s->rtp_context.timestamp;
197
198 if (s->ssrc == s->userdata->module->core->cookie)
199 pa_log_warn("Detected RTP packet loop!");
200 } else {
201 if (s->ssrc != s->rtp_context.ssrc) {
202 pa_memblock_unref(chunk.memblock);
203 return 0;
204 }
205 }
206
207 /* Check wheter there was a timestamp overflow */
208 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
209 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
210
211 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
212 delta = k;
213 else
214 delta = j;
215
216 pa_memblockq_seek(s->memblockq, delta * s->rtp_context.frame_size, PA_SEEK_RELATIVE);
217
218 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
219 /* queue overflow, let's flush it and try again */
220 pa_memblockq_flush(s->memblockq);
221 pa_memblockq_push(s->memblockq, &chunk);
222 }
223
224 /* The next timestamp we expect */
225 s->offset = s->rtp_context.timestamp + (chunk.length / s->rtp_context.frame_size);
226
227 pa_memblock_unref(chunk.memblock);
228
229 pa_rtclock_get(&now);
230 pa_atomic_store(&s->timestamp, now.tv_sec);
231
232 return 1;
233 }
234
235 /* Called from I/O thread context */
236 static void sink_input_attach(pa_sink_input *i) {
237 struct session *s;
238 struct pollfd *p;
239
240 pa_sink_input_assert_ref(i);
241 pa_assert_se(s = i->userdata);
242
243 pa_assert(!s->rtpoll_item);
244 s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
245
246 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
247 p->fd = s->rtp_context.fd;
248 p->events = POLLIN;
249 p->revents = 0;
250
251 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
252 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
253 }
254
255 /* Called from I/O thread context */
256 static void sink_input_detach(pa_sink_input *i) {
257 struct session *s;
258 pa_sink_input_assert_ref(i);
259 pa_assert_se(s = i->userdata);
260
261 pa_assert(s->rtpoll_item);
262 pa_rtpoll_item_free(s->rtpoll_item);
263 s->rtpoll_item = NULL;
264 }
265
266 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
267 int af, fd = -1, r, one;
268
269 pa_assert(sa);
270 pa_assert(salen > 0);
271
272 af = sa->sa_family;
273 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
274 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
275 goto fail;
276 }
277
278 one = 1;
279 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
280 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
281 goto fail;
282 }
283
284 if (af == AF_INET) {
285 struct ip_mreq mr4;
286 memset(&mr4, 0, sizeof(mr4));
287 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
288 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
289 } else {
290 struct ipv6_mreq mr6;
291 memset(&mr6, 0, sizeof(mr6));
292 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
293 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
294 }
295
296 if (r < 0) {
297 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
298 goto fail;
299 }
300
301 if (bind(fd, sa, salen) < 0) {
302 pa_log("bind() failed: %s", pa_cstrerror(errno));
303 goto fail;
304 }
305
306 return fd;
307
308 fail:
309 if (fd >= 0)
310 close(fd);
311
312 return -1;
313 }
314
315 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
316 struct session *s = NULL;
317 char *c;
318 pa_sink *sink;
319 int fd = -1;
320 pa_memblock *silence;
321 pa_sink_input_new_data data;
322 struct timeval now;
323
324 pa_assert(u);
325 pa_assert(sdp_info);
326
327 if (u->n_sessions >= MAX_SESSIONS) {
328 pa_log("Session limit reached.");
329 goto fail;
330 }
331
332 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK, 1))) {
333 pa_log("Sink does not exist.");
334 goto fail;
335 }
336
337 s = pa_xnew0(struct session, 1);
338 s->userdata = u;
339 s->first_packet = FALSE;
340 s->sdp_info = *sdp_info;
341 s->rtpoll_item = NULL;
342
343 pa_rtclock_get(&now);
344 pa_atomic_store(&s->timestamp, now.tv_sec);
345
346 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
347 goto fail;
348
349 c = pa_sprintf_malloc("RTP Stream%s%s%s",
350 sdp_info->session_name ? " (" : "",
351 sdp_info->session_name ? sdp_info->session_name : "",
352 sdp_info->session_name ? ")" : "");
353
354 pa_sink_input_new_data_init(&data);
355 data.sink = sink;
356 data.driver = __FILE__;
357 data.name = c;
358 data.module = u->module;
359 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
360
361 s->sink_input = pa_sink_input_new(u->module->core, &data, 0);
362 pa_xfree(c);
363
364 if (!s->sink_input) {
365 pa_log("Failed to create sink input.");
366 goto fail;
367 }
368
369 s->sink_input->userdata = s;
370
371 s->sink_input->parent.process_msg = sink_input_process_msg;
372 s->sink_input->peek = sink_input_peek;
373 s->sink_input->drop = sink_input_drop;
374 s->sink_input->kill = sink_input_kill;
375 s->sink_input->attach = sink_input_attach;
376 s->sink_input->detach = sink_input_detach;
377
378 silence = pa_silence_memblock_new(
379 s->userdata->module->core->mempool,
380 &s->sink_input->sample_spec,
381 pa_frame_align(pa_bytes_per_second(&s->sink_input->sample_spec)/128, &s->sink_input->sample_spec));
382
383 s->memblockq = pa_memblockq_new(
384 0,
385 MEMBLOCKQ_MAXLENGTH,
386 MEMBLOCKQ_MAXLENGTH,
387 pa_frame_size(&s->sink_input->sample_spec),
388 pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
389 0,
390 silence);
391
392 pa_memblock_unref(silence);
393
394 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
395
396 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
397 u->n_sessions++;
398 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
399
400 pa_sink_input_put(s->sink_input);
401
402 pa_log_info("New session '%s'", s->sdp_info.session_name);
403
404 return s;
405
406 fail:
407 pa_xfree(s);
408
409 if (fd >= 0)
410 pa_close(fd);
411
412 return NULL;
413 }
414
415 static void session_free(struct session *s) {
416 pa_assert(s);
417
418 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
419
420 pa_sink_input_unlink(s->sink_input);
421 pa_sink_input_unref(s->sink_input);
422
423 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
424 pa_assert(s->userdata->n_sessions >= 1);
425 s->userdata->n_sessions--;
426 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
427
428 pa_memblockq_free(s->memblockq);
429 pa_sdp_info_destroy(&s->sdp_info);
430 pa_rtp_context_destroy(&s->rtp_context);
431
432 pa_xfree(s);
433 }
434
435 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
436 struct userdata *u = userdata;
437 int goodbye;
438 pa_sdp_info info;
439 struct session *s;
440
441 pa_assert(m);
442 pa_assert(e);
443 pa_assert(u);
444 pa_assert(fd == u->sap_context.fd);
445 pa_assert(flags == PA_IO_EVENT_INPUT);
446
447 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
448 return;
449
450 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
451 return;
452
453 if (goodbye) {
454
455 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
456 session_free(s);
457
458 pa_sdp_info_destroy(&info);
459 } else {
460
461 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
462 if (!(s = session_new(u, &info)))
463 pa_sdp_info_destroy(&info);
464
465 } else {
466 struct timeval now;
467 pa_rtclock_get(&now);
468 pa_atomic_store(&s->timestamp, now.tv_sec);
469
470 pa_sdp_info_destroy(&info);
471 }
472 }
473 }
474
475 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
476 struct session *s, *n;
477 struct userdata *u = userdata;
478 struct timeval now;
479 struct timeval tv;
480
481 pa_assert(m);
482 pa_assert(t);
483 pa_assert(ptv);
484 pa_assert(u);
485
486 pa_rtclock_get(&now);
487
488 pa_log_debug("Checking for dead streams ...");
489
490 for (s = u->sessions; s; s = n) {
491 int k;
492 n = s->next;
493
494 k = pa_atomic_load(&s->timestamp);
495
496 if (k + DEATH_TIMEOUT < now.tv_sec)
497 session_free(s);
498 }
499
500 /* Restart timer */
501 pa_gettimeofday(&tv);
502 pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
503 m->time_restart(t, &tv);
504 }
505
506 int pa__init(pa_module*m) {
507 struct userdata *u;
508 pa_modargs *ma = NULL;
509 struct sockaddr_in sa4;
510 struct sockaddr_in6 sa6;
511 struct sockaddr *sa;
512 socklen_t salen;
513 const char *sap_address;
514 int fd = -1;
515 struct timeval tv;
516
517 pa_assert(m);
518
519 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
520 pa_log("failed to parse module arguments");
521 goto fail;
522 }
523
524 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
525
526 if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
527 sa6.sin6_family = AF_INET6;
528 sa6.sin6_port = htons(SAP_PORT);
529 sa = (struct sockaddr*) &sa6;
530 salen = sizeof(sa6);
531 } else if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
532 sa4.sin_family = AF_INET;
533 sa4.sin_port = htons(SAP_PORT);
534 sa = (struct sockaddr*) &sa4;
535 salen = sizeof(sa4);
536 } else {
537 pa_log("Invalid SAP address '%s'", sap_address);
538 goto fail;
539 }
540
541 if ((fd = mcast_socket(sa, salen)) < 0)
542 goto fail;
543
544 u = pa_xnew(struct userdata, 1);
545 m->userdata = u;
546 u->module = m;
547 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
548
549 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
550 pa_sap_context_init_recv(&u->sap_context, fd);
551
552 PA_LLIST_HEAD_INIT(struct session, u->sessions);
553 u->n_sessions = 0;
554 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
555
556 pa_gettimeofday(&tv);
557 pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
558 u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
559
560 pa_modargs_free(ma);
561
562 return 0;
563
564 fail:
565 if (ma)
566 pa_modargs_free(ma);
567
568 if (fd >= 0)
569 pa_close(fd);
570
571 return -1;
572 }
573
574 void pa__done(pa_module*m) {
575 struct userdata *u;
576 struct session *s;
577
578 pa_assert(m);
579
580 if (!(u = m->userdata))
581 return;
582
583 if (u->sap_event)
584 m->core->mainloop->io_free(u->sap_event);
585
586 if (u->check_death_event)
587 m->core->mainloop->time_free(u->check_death_event);
588
589 pa_sap_context_destroy(&u->sap_context);
590
591 if (u->by_origin) {
592 while ((s = pa_hashmap_get_first(u->by_origin)))
593 session_free(s);
594
595 pa_hashmap_free(u->by_origin, NULL, NULL);
596 }
597
598 pa_xfree(u->sink_name);
599 pa_xfree(u);
600 }