]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
Merge commit 'origin/master-tx'
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/socket-util.h>
56 #include <pulsecore/once.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(FALSE);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82 "sink",
83 "sap_address",
84 NULL
85 };
86
87 struct session {
88 struct userdata *userdata;
89 PA_LLIST_FIELDS(struct session);
90
91 pa_sink_input *sink_input;
92 pa_memblockq *memblockq;
93
94 pa_bool_t first_packet;
95 uint32_t ssrc;
96 uint32_t offset;
97
98 struct pa_sdp_info sdp_info;
99
100 pa_rtp_context rtp_context;
101
102 pa_rtpoll_item *rtpoll_item;
103
104 pa_atomic_t timestamp;
105
106 pa_smoother *smoother;
107 pa_usec_t intended_latency;
108 pa_usec_t sink_latency;
109
110 pa_usec_t last_rate_update;
111 };
112
113 struct userdata {
114 pa_module *module;
115
116 pa_sap_context sap_context;
117 pa_io_event* sap_event;
118
119 pa_time_event *check_death_event;
120
121 char *sink_name;
122
123 PA_LLIST_HEAD(struct session, sessions);
124 pa_hashmap *by_origin;
125 int n_sessions;
126 };
127
128 static void session_free(struct session *s);
129
130 /* Called from I/O thread context */
131 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
132 struct session *s = PA_SINK_INPUT(o)->userdata;
133
134 switch (code) {
135 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
136 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
137
138 /* Fall through, the default handler will add in the extra
139 * latency added by the resampler */
140 break;
141 }
142
143 return pa_sink_input_process_msg(o, code, data, offset, chunk);
144 }
145
146 /* Called from I/O thread context */
147 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
148 struct session *s;
149 pa_sink_input_assert_ref(i);
150 pa_assert_se(s = i->userdata);
151
152 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
153 return -1;
154
155 pa_memblockq_drop(s->memblockq, chunk->length);
156
157 return 0;
158 }
159
160 /* Called from I/O thread context */
161 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
162 struct session *s;
163
164 pa_sink_input_assert_ref(i);
165 pa_assert_se(s = i->userdata);
166
167 pa_memblockq_rewind(s->memblockq, nbytes);
168 }
169
170 /* Called from I/O thread context */
171 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
172 struct session *s;
173
174 pa_sink_input_assert_ref(i);
175 pa_assert_se(s = i->userdata);
176
177 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
178 }
179
180 /* Called from main context */
181 static void sink_input_kill(pa_sink_input* i) {
182 struct session *s;
183 pa_sink_input_assert_ref(i);
184 pa_assert_se(s = i->userdata);
185
186 session_free(s);
187 }
188
189 /* Called from IO context */
190 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
191 struct session *s;
192 pa_sink_input_assert_ref(i);
193 pa_assert_se(s = i->userdata);
194
195 if (b) {
196 pa_smoother_pause(s->smoother, pa_rtclock_usec());
197 pa_memblockq_flush_read(s->memblockq);
198 } else
199 s->first_packet = FALSE;
200 }
201
202 /* Called from I/O thread context */
203 static int rtpoll_work_cb(pa_rtpoll_item *i) {
204 pa_memchunk chunk;
205 int64_t k, j, delta;
206 struct timeval now = { 0, 0 };
207 struct session *s;
208 struct pollfd *p;
209
210 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
211
212 p = pa_rtpoll_item_get_pollfd(i, NULL);
213
214 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
215 pa_log("poll() signalled bad revents.");
216 return -1;
217 }
218
219 if ((p->revents & POLLIN) == 0)
220 return 0;
221
222 p->revents = 0;
223
224 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
225 return 0;
226
227 if (s->sdp_info.payload != s->rtp_context.payload ||
228 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
229 pa_memblock_unref(chunk.memblock);
230 return 0;
231 }
232
233 if (!s->first_packet) {
234 s->first_packet = TRUE;
235
236 s->ssrc = s->rtp_context.ssrc;
237 s->offset = s->rtp_context.timestamp;
238
239 if (s->ssrc == s->userdata->module->core->cookie)
240 pa_log_warn("Detected RTP packet loop!");
241 } else {
242 if (s->ssrc != s->rtp_context.ssrc) {
243 pa_memblock_unref(chunk.memblock);
244 return 0;
245 }
246 }
247
248 /* Check whether there was a timestamp overflow */
249 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
250 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
251
252 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
253 delta = k;
254 else
255 delta = j;
256
257 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
258
259 if (now.tv_sec == 0) {
260 PA_ONCE_BEGIN {
261 pa_log_warn("Using artificial time instead of timestamp");
262 } PA_ONCE_END;
263 pa_rtclock_get(&now);
264 } else
265 pa_rtclock_from_wallclock(&now);
266
267 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
268
269 /* Tell the smoother that we are rolling now, in case it is still paused */
270 pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
271
272 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
273 pa_log_warn("Queue overrun");
274 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
275 }
276
277 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
278
279 pa_memblock_unref(chunk.memblock);
280
281 /* The next timestamp we expect */
282 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
283
284 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
285
286 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
287 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
288 unsigned fix_samples;
289
290 pa_log_debug("Updating sample rate");
291
292 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
293 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
294
295 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
296
297 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
298 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
299
300 if (ri > render_delay+sink_delay)
301 ri -= render_delay+sink_delay;
302 else
303 ri = 0;
304
305 if (wi < ri)
306 latency = 0;
307 else
308 latency = wi - ri;
309
310 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
311
312 /* Calculate deviation */
313 if (latency < s->intended_latency)
314 fix = s->intended_latency - latency;
315 else
316 fix = latency - s->intended_latency;
317
318 /* How many samples is this per second? */
319 fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
320
321 /* Check if deviation is in bounds */
322 if (fix_samples > s->sink_input->sample_spec.rate*.50)
323 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
324 else {
325 /* Fix up rate */
326 if (latency < s->intended_latency)
327 s->sink_input->sample_spec.rate -= fix_samples;
328 else
329 s->sink_input->sample_spec.rate += fix_samples;
330
331 if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
332 s->sink_input->sample_spec.rate = PA_RATE_MAX;
333 }
334
335 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
336
337 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
338
339 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
340
341 s->last_rate_update = pa_timeval_load(&now);
342 }
343
344 if (pa_memblockq_is_readable(s->memblockq) &&
345 s->sink_input->thread_info.underrun_for > 0) {
346 pa_log_debug("Requesting rewind due to end of underrun");
347 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
348 }
349
350 return 1;
351 }
352
353 /* Called from I/O thread context */
354 static void sink_input_attach(pa_sink_input *i) {
355 struct session *s;
356 struct pollfd *p;
357
358 pa_sink_input_assert_ref(i);
359 pa_assert_se(s = i->userdata);
360
361 pa_assert(!s->rtpoll_item);
362 s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
363
364 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
365 p->fd = s->rtp_context.fd;
366 p->events = POLLIN;
367 p->revents = 0;
368
369 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
370 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
371 }
372
373 /* Called from I/O thread context */
374 static void sink_input_detach(pa_sink_input *i) {
375 struct session *s;
376 pa_sink_input_assert_ref(i);
377 pa_assert_se(s = i->userdata);
378
379 pa_assert(s->rtpoll_item);
380 pa_rtpoll_item_free(s->rtpoll_item);
381 s->rtpoll_item = NULL;
382 }
383
384 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
385 int af, fd = -1, r, one;
386
387 pa_assert(sa);
388 pa_assert(salen > 0);
389
390 af = sa->sa_family;
391 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
392 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
393 goto fail;
394 }
395
396 pa_make_udp_socket_low_delay(fd);
397
398 one = 1;
399 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
400 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
401 goto fail;
402 }
403
404 one = 1;
405 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
406 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
407 goto fail;
408 }
409
410 if (af == AF_INET) {
411 struct ip_mreq mr4;
412 memset(&mr4, 0, sizeof(mr4));
413 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
414 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
415 #ifdef HAVE_IPV6
416 } else {
417 struct ipv6_mreq mr6;
418 memset(&mr6, 0, sizeof(mr6));
419 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
420 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
421 #endif
422 }
423
424 if (r < 0) {
425 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
426 goto fail;
427 }
428
429 if (bind(fd, sa, salen) < 0) {
430 pa_log("bind() failed: %s", pa_cstrerror(errno));
431 goto fail;
432 }
433
434 return fd;
435
436 fail:
437 if (fd >= 0)
438 close(fd);
439
440 return -1;
441 }
442
443 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
444 struct session *s = NULL;
445 pa_sink *sink;
446 int fd = -1;
447 pa_memchunk silence;
448 pa_sink_input_new_data data;
449 struct timeval now;
450
451 pa_assert(u);
452 pa_assert(sdp_info);
453
454 if (u->n_sessions >= MAX_SESSIONS) {
455 pa_log("Session limit reached.");
456 goto fail;
457 }
458
459 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
460 pa_log("Sink does not exist.");
461 goto fail;
462 }
463
464 pa_rtclock_get(&now);
465
466 s = pa_xnew0(struct session, 1);
467 s->userdata = u;
468 s->first_packet = FALSE;
469 s->sdp_info = *sdp_info;
470 s->rtpoll_item = NULL;
471 s->intended_latency = LATENCY_USEC;
472 s->smoother = pa_smoother_new(
473 PA_USEC_PER_SEC*5,
474 PA_USEC_PER_SEC*2,
475 TRUE,
476 TRUE,
477 10,
478 pa_timeval_load(&now),
479 TRUE);
480 s->last_rate_update = pa_timeval_load(&now);
481 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
482
483 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
484 goto fail;
485
486 pa_sink_input_new_data_init(&data);
487 data.sink = sink;
488 data.driver = __FILE__;
489 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
490 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
491 "RTP Stream%s%s%s",
492 sdp_info->session_name ? " (" : "",
493 sdp_info->session_name ? sdp_info->session_name : "",
494 sdp_info->session_name ? ")" : "");
495
496 if (sdp_info->session_name)
497 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
498 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
499 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
500 data.module = u->module;
501 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
502
503 pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
504 pa_sink_input_new_data_done(&data);
505
506 if (!s->sink_input) {
507 pa_log("Failed to create sink input.");
508 goto fail;
509 }
510
511 s->sink_input->userdata = s;
512
513 s->sink_input->parent.process_msg = sink_input_process_msg;
514 s->sink_input->pop = sink_input_pop_cb;
515 s->sink_input->process_rewind = sink_input_process_rewind_cb;
516 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
517 s->sink_input->kill = sink_input_kill;
518 s->sink_input->attach = sink_input_attach;
519 s->sink_input->detach = sink_input_detach;
520 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
521
522 pa_sink_input_get_silence(s->sink_input, &silence);
523
524 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
525
526 if (s->intended_latency < s->sink_latency*2)
527 s->intended_latency = s->sink_latency*2;
528
529 s->memblockq = pa_memblockq_new(
530 0,
531 MEMBLOCKQ_MAXLENGTH,
532 MEMBLOCKQ_MAXLENGTH,
533 pa_frame_size(&s->sink_input->sample_spec),
534 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
535 0,
536 0,
537 &silence);
538
539 pa_memblock_unref(silence.memblock);
540
541 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
542
543 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
544 u->n_sessions++;
545 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
546
547 pa_sink_input_put(s->sink_input);
548
549 pa_log_info("New session '%s'", s->sdp_info.session_name);
550
551 return s;
552
553 fail:
554 pa_xfree(s);
555
556 if (fd >= 0)
557 pa_close(fd);
558
559 return NULL;
560 }
561
562 static void session_free(struct session *s) {
563 pa_assert(s);
564
565 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
566
567 pa_sink_input_unlink(s->sink_input);
568 pa_sink_input_unref(s->sink_input);
569
570 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
571 pa_assert(s->userdata->n_sessions >= 1);
572 s->userdata->n_sessions--;
573 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
574
575 pa_memblockq_free(s->memblockq);
576 pa_sdp_info_destroy(&s->sdp_info);
577 pa_rtp_context_destroy(&s->rtp_context);
578
579 pa_smoother_free(s->smoother);
580
581 pa_xfree(s);
582 }
583
584 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
585 struct userdata *u = userdata;
586 pa_bool_t goodbye = FALSE;
587 pa_sdp_info info;
588 struct session *s;
589
590 pa_assert(m);
591 pa_assert(e);
592 pa_assert(u);
593 pa_assert(fd == u->sap_context.fd);
594 pa_assert(flags == PA_IO_EVENT_INPUT);
595
596 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
597 return;
598
599 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
600 return;
601
602 if (goodbye) {
603
604 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
605 session_free(s);
606
607 pa_sdp_info_destroy(&info);
608 } else {
609
610 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
611 if (!session_new(u, &info))
612 pa_sdp_info_destroy(&info);
613
614 } else {
615 struct timeval now;
616 pa_rtclock_get(&now);
617 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
618
619 pa_sdp_info_destroy(&info);
620 }
621 }
622 }
623
624 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
625 struct session *s, *n;
626 struct userdata *u = userdata;
627 struct timeval now;
628 struct timeval tv;
629
630 pa_assert(m);
631 pa_assert(t);
632 pa_assert(ptv);
633 pa_assert(u);
634
635 pa_rtclock_get(&now);
636
637 pa_log_debug("Checking for dead streams ...");
638
639 for (s = u->sessions; s; s = n) {
640 int k;
641 n = s->next;
642
643 k = pa_atomic_load(&s->timestamp);
644
645 if (k + DEATH_TIMEOUT < now.tv_sec)
646 session_free(s);
647 }
648
649 /* Restart timer */
650 pa_gettimeofday(&tv);
651 pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
652 m->time_restart(t, &tv);
653 }
654
655 int pa__init(pa_module*m) {
656 struct userdata *u;
657 pa_modargs *ma = NULL;
658 struct sockaddr_in sa4;
659 #ifdef HAVE_IPV6
660 struct sockaddr_in6 sa6;
661 #endif
662 struct sockaddr *sa;
663 socklen_t salen;
664 const char *sap_address;
665 int fd = -1;
666 struct timeval tv;
667
668 pa_assert(m);
669
670 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
671 pa_log("failed to parse module arguments");
672 goto fail;
673 }
674
675 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
676
677 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
678 sa4.sin_family = AF_INET;
679 sa4.sin_port = htons(SAP_PORT);
680 sa = (struct sockaddr*) &sa4;
681 salen = sizeof(sa4);
682 #ifdef HAVE_IPV6
683 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
684 sa6.sin6_family = AF_INET6;
685 sa6.sin6_port = htons(SAP_PORT);
686 sa = (struct sockaddr*) &sa6;
687 salen = sizeof(sa6);
688 #endif
689 } else {
690 pa_log("Invalid SAP address '%s'", sap_address);
691 goto fail;
692 }
693
694 if ((fd = mcast_socket(sa, salen)) < 0)
695 goto fail;
696
697 m->userdata = u = pa_xnew(struct userdata, 1);
698 u->module = m;
699 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
700
701 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
702 pa_sap_context_init_recv(&u->sap_context, fd);
703
704 PA_LLIST_HEAD_INIT(struct session, u->sessions);
705 u->n_sessions = 0;
706 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
707
708 pa_gettimeofday(&tv);
709 pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
710 u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
711
712 pa_modargs_free(ma);
713
714 return 0;
715
716 fail:
717 if (ma)
718 pa_modargs_free(ma);
719
720 if (fd >= 0)
721 pa_close(fd);
722
723 return -1;
724 }
725
726 void pa__done(pa_module*m) {
727 struct userdata *u;
728 struct session *s;
729
730 pa_assert(m);
731
732 if (!(u = m->userdata))
733 return;
734
735 if (u->sap_event)
736 m->core->mainloop->io_free(u->sap_event);
737
738 if (u->check_death_event)
739 m->core->mainloop->time_free(u->check_death_event);
740
741 pa_sap_context_destroy(&u->sap_context);
742
743 if (u->by_origin) {
744 while ((s = pa_hashmap_first(u->by_origin)))
745 session_free(s);
746
747 pa_hashmap_free(u->by_origin, NULL, NULL);
748 }
749
750 pa_xfree(u->sink_name);
751 pa_xfree(u);
752 }