]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
change default mcast address once again, to make sure our traffic doesn't leave the...
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of polypaudio.
4
5 polypaudio is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published
7 by the Free Software Foundation; either version 2 of the License,
8 or (at your option) any later version.
9
10 polypaudio is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License
16 along with polypaudio; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 USA.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <assert.h>
26 #include <stdio.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33
34 #include <polypcore/module.h>
35 #include <polypcore/llist.h>
36 #include <polypcore/sink.h>
37 #include <polypcore/sink-input.h>
38 #include <polypcore/memblockq.h>
39 #include <polypcore/log.h>
40 #include <polypcore/util.h>
41 #include <polypcore/xmalloc.h>
42 #include <polypcore/modargs.h>
43 #include <polypcore/namereg.h>
44 #include <polypcore/sample-util.h>
45
46 #include "module-rtp-recv-symdef.h"
47
48 #include "rtp.h"
49 #include "sdp.h"
50 #include "sap.h"
51
52 PA_MODULE_AUTHOR("Lennart Poettering")
53 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP")
54 PA_MODULE_VERSION(PACKAGE_VERSION)
55 PA_MODULE_USAGE(
56 "sink=<name of the sink> "
57 "sap_address=<multicast address to listen on> "
58 )
59
60 #define SAP_PORT 9875
61 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
62 #define MEMBLOCKQ_MAXLENGTH (1024*170)
63 #define MAX_SESSIONS 16
64 #define DEATH_TIMEOUT 20000000
65
66 static const char* const valid_modargs[] = {
67 "sink",
68 "sap_address",
69 NULL
70 };
71
72 struct session {
73 struct userdata *userdata;
74
75 pa_sink_input *sink_input;
76 pa_memblockq *memblockq;
77
78 pa_time_event *death_event;
79
80 int first_packet;
81 uint32_t ssrc;
82 uint32_t offset;
83
84 struct pa_sdp_info sdp_info;
85
86 pa_rtp_context rtp_context;
87 pa_io_event* rtp_event;
88 };
89
90 struct userdata {
91 pa_module *module;
92 pa_core *core;
93
94 pa_sap_context sap_context;
95 pa_io_event* sap_event;
96
97 pa_hashmap *by_origin;
98
99 char *sink_name;
100 };
101
102 static void session_free(struct session *s, int from_hash);
103
104 static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
105 struct session *s;
106 assert(i);
107 s = i->userdata;
108
109 return pa_memblockq_peek(s->memblockq, chunk);
110 }
111
112 static void sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
113 struct session *s;
114 assert(i);
115 s = i->userdata;
116
117 pa_memblockq_drop(s->memblockq, chunk, length);
118 }
119
120 static void sink_input_kill(pa_sink_input* i) {
121 struct session *s;
122 assert(i);
123 s = i->userdata;
124
125 session_free(s, 1);
126 }
127
128 static pa_usec_t sink_input_get_latency(pa_sink_input *i) {
129 struct session *s;
130 assert(i);
131 s = i->userdata;
132
133 return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
134 }
135
136 static void rtp_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
137 struct session *s = userdata;
138 pa_memchunk chunk;
139 int64_t k, j, delta;
140 struct timeval tv;
141
142 assert(m);
143 assert(e);
144 assert(s);
145 assert(fd == s->rtp_context.fd);
146 assert(flags == PA_IO_EVENT_INPUT);
147
148 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->memblock_stat) < 0)
149 return;
150
151 if (s->sdp_info.payload != s->rtp_context.payload) {
152 pa_memblock_unref(chunk.memblock);
153 return;
154 }
155
156 if (!s->first_packet) {
157 s->first_packet = 1;
158
159 s->ssrc = s->rtp_context.ssrc;
160 s->offset = s->rtp_context.timestamp;
161
162 if (s->ssrc == s->userdata->core->cookie)
163 pa_log_warn(__FILE__": WARNING! Detected RTP packet loop!");
164 } else {
165 if (s->ssrc != s->rtp_context.ssrc) {
166 pa_memblock_unref(chunk.memblock);
167 return;
168 }
169 }
170
171 /* Check wheter there was a timestamp overflow */
172 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
173 j = (int64_t) 0x100000000 - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
174
175 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
176 delta = k;
177 else
178 delta = j;
179
180 pa_memblockq_seek(s->memblockq, delta * s->rtp_context.frame_size, PA_SEEK_RELATIVE);
181
182 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
183 /* queue overflow, let's flush it and try again */
184 pa_memblockq_flush(s->memblockq);
185 pa_memblockq_push(s->memblockq, &chunk);
186 }
187
188 /* The next timestamp we expect */
189 s->offset = s->rtp_context.timestamp + (chunk.length / s->rtp_context.frame_size);
190
191 pa_memblock_unref(chunk.memblock);
192
193 /* Reset death timer */
194 pa_gettimeofday(&tv);
195 pa_timeval_add(&tv, DEATH_TIMEOUT);
196 m->time_restart(s->death_event, &tv);
197 }
198
199 static void death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
200 struct session *s = userdata;
201
202 assert(m);
203 assert(t);
204 assert(tv);
205 assert(s);
206
207 session_free(s, 1);
208 }
209
210 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
211 int af, fd = -1, r, one;
212
213 af = sa->sa_family;
214 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
215 pa_log(__FILE__": Failed to create socket: %s", strerror(errno));
216 goto fail;
217 }
218
219 one = 1;
220 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
221 pa_log(__FILE__": SO_REUSEADDR failed: %s", strerror(errno));
222 goto fail;
223 }
224
225 if (af == AF_INET) {
226 struct ip_mreq mr4;
227 memset(&mr4, 0, sizeof(mr4));
228 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
229 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
230 } else {
231 struct ipv6_mreq mr6;
232 memset(&mr6, 0, sizeof(mr6));
233 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
234 r = setsockopt(fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mr6, sizeof(mr6));
235 }
236
237 if (r < 0) {
238 pa_log_info(__FILE__": Joining mcast group failed: %s", strerror(errno));
239 goto fail;
240 }
241
242 if (bind(fd, sa, salen) < 0) {
243 pa_log(__FILE__": bind() failed: %s", strerror(errno));
244 goto fail;
245 }
246
247 return fd;
248
249 fail:
250 if (fd >= 0)
251 close(fd);
252
253 return -1;
254 }
255
256 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
257 struct session *s = NULL;
258 struct timeval tv;
259 char *c;
260 pa_sink *sink;
261 int fd = -1;
262 pa_memblock *silence;
263
264 if (!(sink = pa_namereg_get(u->core, u->sink_name, PA_NAMEREG_SINK, 1))) {
265 pa_log(__FILE__": sink does not exist.");
266 goto fail;
267 }
268
269 s = pa_xnew0(struct session, 1);
270 s->userdata = u;
271 s->first_packet = 0;
272 s->sdp_info = *sdp_info;
273
274 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
275 goto fail;
276
277 c = pa_sprintf_malloc("RTP Stream%s%s%s",
278 sdp_info->session_name ? " (" : "",
279 sdp_info->session_name ? sdp_info->session_name : "",
280 sdp_info->session_name ? ")" : "");
281
282 s->sink_input = pa_sink_input_new(sink, __FILE__, c, &sdp_info->sample_spec, NULL, 0, PA_RESAMPLER_INVALID);
283 pa_xfree(c);
284
285 if (!s->sink_input) {
286 pa_log(__FILE__": failed to create sink input.");
287 goto fail;
288 }
289
290 s->sink_input->userdata = s;
291 s->sink_input->owner = u->module;
292
293 s->sink_input->peek = sink_input_peek;
294 s->sink_input->drop = sink_input_drop;
295 s->sink_input->kill = sink_input_kill;
296 s->sink_input->get_latency = sink_input_get_latency;
297
298 silence = pa_silence_memblock_new(&s->sink_input->sample_spec,
299 (pa_bytes_per_second(&s->sink_input->sample_spec)/128/pa_frame_size(&s->sink_input->sample_spec))*
300 pa_frame_size(&s->sink_input->sample_spec),
301 s->userdata->core->memblock_stat);
302
303 s->memblockq = pa_memblockq_new(
304 0,
305 MEMBLOCKQ_MAXLENGTH,
306 MEMBLOCKQ_MAXLENGTH,
307 pa_frame_size(&s->sink_input->sample_spec),
308 pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
309 0,
310 silence,
311 u->core->memblock_stat);
312
313 pa_memblock_unref(silence);
314
315 s->rtp_event = u->core->mainloop->io_new(u->core->mainloop, fd, PA_IO_EVENT_INPUT, rtp_event_cb, s);
316
317 pa_gettimeofday(&tv);
318 pa_timeval_add(&tv, DEATH_TIMEOUT);
319 s->death_event = u->core->mainloop->time_new(u->core->mainloop, &tv, death_event_cb, s);
320
321 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
322
323 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
324
325 pa_log_info(__FILE__": Found new session '%s'", s->sdp_info.session_name);
326
327 return s;
328
329 fail:
330 if (s) {
331 if (fd >= 0)
332 close(fd);
333
334 pa_xfree(s);
335 }
336
337 return NULL;
338 }
339
340 static void session_free(struct session *s, int from_hash) {
341 assert(s);
342
343 pa_log_info(__FILE__": Freeing session '%s'", s->sdp_info.session_name);
344
345 s->userdata->core->mainloop->time_free(s->death_event);
346 s->userdata->core->mainloop->io_free(s->rtp_event);
347
348 if (from_hash)
349 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
350
351 pa_sink_input_disconnect(s->sink_input);
352 pa_sink_input_unref(s->sink_input);
353
354 pa_memblockq_free(s->memblockq);
355 pa_sdp_info_destroy(&s->sdp_info);
356 pa_rtp_context_destroy(&s->rtp_context);
357
358 pa_xfree(s);
359 }
360
361 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
362 struct userdata *u = userdata;
363 int goodbye;
364 pa_sdp_info info;
365 struct session *s;
366
367 assert(m);
368 assert(e);
369 assert(u);
370 assert(fd == u->sap_context.fd);
371 assert(flags == PA_IO_EVENT_INPUT);
372
373 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
374 return;
375
376 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
377 return;
378
379 if (goodbye) {
380
381 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
382 session_free(s, 1);
383
384 pa_sdp_info_destroy(&info);
385 } else {
386
387 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
388 if (!(s = session_new(u, &info)))
389 pa_sdp_info_destroy(&info);
390
391 } else {
392 struct timeval tv;
393
394 pa_gettimeofday(&tv);
395 pa_timeval_add(&tv, DEATH_TIMEOUT);
396 m->time_restart(s->death_event, &tv);
397
398 pa_sdp_info_destroy(&info);
399 }
400 }
401 }
402
403 int pa__init(pa_core *c, pa_module*m) {
404 struct userdata *u;
405 pa_modargs *ma = NULL;
406 struct sockaddr_in sa4;
407 struct sockaddr_in6 sa6;
408 struct sockaddr *sa;
409 socklen_t salen;
410 const char *sap_address;
411 int fd = -1;
412
413 assert(c);
414 assert(m);
415
416 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
417 pa_log(__FILE__": failed to parse module arguments");
418 goto fail;
419 }
420
421 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
422
423 if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
424 sa6.sin6_family = AF_INET6;
425 sa6.sin6_port = htons(SAP_PORT);
426 sa = (struct sockaddr*) &sa6;
427 salen = sizeof(sa6);
428 } else if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
429 sa4.sin_family = AF_INET;
430 sa4.sin_port = htons(SAP_PORT);
431 sa = (struct sockaddr*) &sa4;
432 salen = sizeof(sa4);
433 } else {
434 pa_log(__FILE__": invalid SAP address '%s'", sap_address);
435 goto fail;
436 }
437
438 if ((fd = mcast_socket(sa, salen)) < 0)
439 goto fail;
440
441 u = pa_xnew(struct userdata, 1);
442 m->userdata = u;
443 u->module = m;
444 u->core = c;
445 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
446
447 u->sap_event = c->mainloop->io_new(c->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
448
449 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
450
451 pa_sap_context_init_recv(&u->sap_context, fd);
452
453 pa_modargs_free(ma);
454
455 return 0;
456
457 fail:
458 if (ma)
459 pa_modargs_free(ma);
460
461 if (fd >= 0)
462 close(fd);
463
464 return -1;
465 }
466
467 static void free_func(void *p, void *userdata) {
468 session_free(p, 0);
469 }
470
471 void pa__done(pa_core *c, pa_module*m) {
472 struct userdata *u;
473 assert(c);
474 assert(m);
475
476 if (!(u = m->userdata))
477 return;
478
479 c->mainloop->io_free(u->sap_event);
480 pa_sap_context_destroy(&u->sap_context);
481
482 pa_hashmap_free(u->by_origin, free_func, NULL);
483
484 pa_xfree(u->sink_name);
485 pa_xfree(u);
486 }