]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
pulse: move pa_rtclock_now in pulsecommon
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
58
59 #include "module-rtp-recv-symdef.h"
60
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 NULL
86 };
87
88 struct session {
89 struct userdata *userdata;
90 PA_LLIST_FIELDS(struct session);
91
92 pa_sink_input *sink_input;
93 pa_memblockq *memblockq;
94
95 pa_bool_t first_packet;
96 uint32_t ssrc;
97 uint32_t offset;
98
99 struct pa_sdp_info sdp_info;
100
101 pa_rtp_context rtp_context;
102
103 pa_rtpoll_item *rtpoll_item;
104
105 pa_atomic_t timestamp;
106
107 pa_smoother *smoother;
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
110
111 pa_usec_t last_rate_update;
112 };
113
114 struct userdata {
115 pa_module *module;
116
117 pa_sap_context sap_context;
118 pa_io_event* sap_event;
119
120 pa_time_event *check_death_event;
121
122 char *sink_name;
123
124 PA_LLIST_HEAD(struct session, sessions);
125 pa_hashmap *by_origin;
126 int n_sessions;
127 };
128
129 static void session_free(struct session *s);
130
131 /* Called from I/O thread context */
132 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
133 struct session *s = PA_SINK_INPUT(o)->userdata;
134
135 switch (code) {
136 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
137 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
138
139 /* Fall through, the default handler will add in the extra
140 * latency added by the resampler */
141 break;
142 }
143
144 return pa_sink_input_process_msg(o, code, data, offset, chunk);
145 }
146
147 /* Called from I/O thread context */
148 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
149 struct session *s;
150 pa_sink_input_assert_ref(i);
151 pa_assert_se(s = i->userdata);
152
153 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
154 return -1;
155
156 pa_memblockq_drop(s->memblockq, chunk->length);
157
158 return 0;
159 }
160
161 /* Called from I/O thread context */
162 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
163 struct session *s;
164
165 pa_sink_input_assert_ref(i);
166 pa_assert_se(s = i->userdata);
167
168 pa_memblockq_rewind(s->memblockq, nbytes);
169 }
170
171 /* Called from I/O thread context */
172 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
173 struct session *s;
174
175 pa_sink_input_assert_ref(i);
176 pa_assert_se(s = i->userdata);
177
178 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
179 }
180
181 /* Called from main context */
182 static void sink_input_kill(pa_sink_input* i) {
183 struct session *s;
184 pa_sink_input_assert_ref(i);
185 pa_assert_se(s = i->userdata);
186
187 session_free(s);
188 }
189
190 /* Called from IO context */
191 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
192 struct session *s;
193 pa_sink_input_assert_ref(i);
194 pa_assert_se(s = i->userdata);
195
196 if (b) {
197 pa_smoother_pause(s->smoother, pa_rtclock_now());
198 pa_memblockq_flush_read(s->memblockq);
199 } else
200 s->first_packet = FALSE;
201 }
202
203 /* Called from I/O thread context */
204 static int rtpoll_work_cb(pa_rtpoll_item *i) {
205 pa_memchunk chunk;
206 int64_t k, j, delta;
207 struct timeval now = { 0, 0 };
208 struct session *s;
209 struct pollfd *p;
210
211 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
212
213 p = pa_rtpoll_item_get_pollfd(i, NULL);
214
215 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
216 pa_log("poll() signalled bad revents.");
217 return -1;
218 }
219
220 if ((p->revents & POLLIN) == 0)
221 return 0;
222
223 p->revents = 0;
224
225 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
226 return 0;
227
228 if (s->sdp_info.payload != s->rtp_context.payload ||
229 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
230 pa_memblock_unref(chunk.memblock);
231 return 0;
232 }
233
234 if (!s->first_packet) {
235 s->first_packet = TRUE;
236
237 s->ssrc = s->rtp_context.ssrc;
238 s->offset = s->rtp_context.timestamp;
239
240 if (s->ssrc == s->userdata->module->core->cookie)
241 pa_log_warn("Detected RTP packet loop!");
242 } else {
243 if (s->ssrc != s->rtp_context.ssrc) {
244 pa_memblock_unref(chunk.memblock);
245 return 0;
246 }
247 }
248
249 /* Check whether there was a timestamp overflow */
250 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
251 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
252
253 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
254 delta = k;
255 else
256 delta = j;
257
258 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
259
260 if (now.tv_sec == 0) {
261 PA_ONCE_BEGIN {
262 pa_log_warn("Using artificial time instead of timestamp");
263 } PA_ONCE_END;
264 pa_rtclock_get(&now);
265 } else
266 pa_rtclock_from_wallclock(&now);
267
268 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
269
270 /* Tell the smoother that we are rolling now, in case it is still paused */
271 pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
272
273 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
274 pa_log_warn("Queue overrun");
275 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
276 }
277
278 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
279
280 pa_memblock_unref(chunk.memblock);
281
282 /* The next timestamp we expect */
283 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
284
285 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
286
287 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
288 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
289 unsigned fix_samples;
290
291 pa_log_debug("Updating sample rate");
292
293 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
294 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
295
296 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
297
298 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
299 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
300
301 if (ri > render_delay+sink_delay)
302 ri -= render_delay+sink_delay;
303 else
304 ri = 0;
305
306 if (wi < ri)
307 latency = 0;
308 else
309 latency = wi - ri;
310
311 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
312
313 /* Calculate deviation */
314 if (latency < s->intended_latency)
315 fix = s->intended_latency - latency;
316 else
317 fix = latency - s->intended_latency;
318
319 /* How many samples is this per second? */
320 fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
321
322 /* Check if deviation is in bounds */
323 if (fix_samples > s->sink_input->sample_spec.rate*.50)
324 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
325 else {
326 /* Fix up rate */
327 if (latency < s->intended_latency)
328 s->sink_input->sample_spec.rate -= fix_samples;
329 else
330 s->sink_input->sample_spec.rate += fix_samples;
331
332 if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
333 s->sink_input->sample_spec.rate = PA_RATE_MAX;
334 }
335
336 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
337
338 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
339
340 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
341
342 s->last_rate_update = pa_timeval_load(&now);
343 }
344
345 if (pa_memblockq_is_readable(s->memblockq) &&
346 s->sink_input->thread_info.underrun_for > 0) {
347 pa_log_debug("Requesting rewind due to end of underrun");
348 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
349 }
350
351 return 1;
352 }
353
354 /* Called from I/O thread context */
355 static void sink_input_attach(pa_sink_input *i) {
356 struct session *s;
357 struct pollfd *p;
358
359 pa_sink_input_assert_ref(i);
360 pa_assert_se(s = i->userdata);
361
362 pa_assert(!s->rtpoll_item);
363 s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
364
365 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
366 p->fd = s->rtp_context.fd;
367 p->events = POLLIN;
368 p->revents = 0;
369
370 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
371 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
372 }
373
374 /* Called from I/O thread context */
375 static void sink_input_detach(pa_sink_input *i) {
376 struct session *s;
377 pa_sink_input_assert_ref(i);
378 pa_assert_se(s = i->userdata);
379
380 pa_assert(s->rtpoll_item);
381 pa_rtpoll_item_free(s->rtpoll_item);
382 s->rtpoll_item = NULL;
383 }
384
385 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
386 int af, fd = -1, r, one;
387
388 pa_assert(sa);
389 pa_assert(salen > 0);
390
391 af = sa->sa_family;
392 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
393 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
394 goto fail;
395 }
396
397 pa_make_udp_socket_low_delay(fd);
398
399 one = 1;
400 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
401 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
402 goto fail;
403 }
404
405 one = 1;
406 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
407 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
408 goto fail;
409 }
410
411 if (af == AF_INET) {
412 struct ip_mreq mr4;
413 memset(&mr4, 0, sizeof(mr4));
414 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
415 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
416 #ifdef HAVE_IPV6
417 } else {
418 struct ipv6_mreq mr6;
419 memset(&mr6, 0, sizeof(mr6));
420 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
421 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
422 #endif
423 }
424
425 if (r < 0) {
426 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
427 goto fail;
428 }
429
430 if (bind(fd, sa, salen) < 0) {
431 pa_log("bind() failed: %s", pa_cstrerror(errno));
432 goto fail;
433 }
434
435 return fd;
436
437 fail:
438 if (fd >= 0)
439 close(fd);
440
441 return -1;
442 }
443
444 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
445 struct session *s = NULL;
446 pa_sink *sink;
447 int fd = -1;
448 pa_memchunk silence;
449 pa_sink_input_new_data data;
450 struct timeval now;
451
452 pa_assert(u);
453 pa_assert(sdp_info);
454
455 if (u->n_sessions >= MAX_SESSIONS) {
456 pa_log("Session limit reached.");
457 goto fail;
458 }
459
460 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
461 pa_log("Sink does not exist.");
462 goto fail;
463 }
464
465 pa_rtclock_get(&now);
466
467 s = pa_xnew0(struct session, 1);
468 s->userdata = u;
469 s->first_packet = FALSE;
470 s->sdp_info = *sdp_info;
471 s->rtpoll_item = NULL;
472 s->intended_latency = LATENCY_USEC;
473 s->smoother = pa_smoother_new(
474 PA_USEC_PER_SEC*5,
475 PA_USEC_PER_SEC*2,
476 TRUE,
477 TRUE,
478 10,
479 pa_timeval_load(&now),
480 TRUE);
481 s->last_rate_update = pa_timeval_load(&now);
482 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
483
484 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
485 goto fail;
486
487 pa_sink_input_new_data_init(&data);
488 data.sink = sink;
489 data.driver = __FILE__;
490 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
491 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
492 "RTP Stream%s%s%s",
493 sdp_info->session_name ? " (" : "",
494 sdp_info->session_name ? sdp_info->session_name : "",
495 sdp_info->session_name ? ")" : "");
496
497 if (sdp_info->session_name)
498 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
499 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
500 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
501 data.module = u->module;
502 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
503
504 pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
505 pa_sink_input_new_data_done(&data);
506
507 if (!s->sink_input) {
508 pa_log("Failed to create sink input.");
509 goto fail;
510 }
511
512 s->sink_input->userdata = s;
513
514 s->sink_input->parent.process_msg = sink_input_process_msg;
515 s->sink_input->pop = sink_input_pop_cb;
516 s->sink_input->process_rewind = sink_input_process_rewind_cb;
517 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
518 s->sink_input->kill = sink_input_kill;
519 s->sink_input->attach = sink_input_attach;
520 s->sink_input->detach = sink_input_detach;
521 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
522
523 pa_sink_input_get_silence(s->sink_input, &silence);
524
525 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
526
527 if (s->intended_latency < s->sink_latency*2)
528 s->intended_latency = s->sink_latency*2;
529
530 s->memblockq = pa_memblockq_new(
531 0,
532 MEMBLOCKQ_MAXLENGTH,
533 MEMBLOCKQ_MAXLENGTH,
534 pa_frame_size(&s->sink_input->sample_spec),
535 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
536 0,
537 0,
538 &silence);
539
540 pa_memblock_unref(silence.memblock);
541
542 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
543
544 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
545 u->n_sessions++;
546 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
547
548 pa_sink_input_put(s->sink_input);
549
550 pa_log_info("New session '%s'", s->sdp_info.session_name);
551
552 return s;
553
554 fail:
555 pa_xfree(s);
556
557 if (fd >= 0)
558 pa_close(fd);
559
560 return NULL;
561 }
562
563 static void session_free(struct session *s) {
564 pa_assert(s);
565
566 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
567
568 pa_sink_input_unlink(s->sink_input);
569 pa_sink_input_unref(s->sink_input);
570
571 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
572 pa_assert(s->userdata->n_sessions >= 1);
573 s->userdata->n_sessions--;
574 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
575
576 pa_memblockq_free(s->memblockq);
577 pa_sdp_info_destroy(&s->sdp_info);
578 pa_rtp_context_destroy(&s->rtp_context);
579
580 pa_smoother_free(s->smoother);
581
582 pa_xfree(s);
583 }
584
585 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
586 struct userdata *u = userdata;
587 pa_bool_t goodbye = FALSE;
588 pa_sdp_info info;
589 struct session *s;
590
591 pa_assert(m);
592 pa_assert(e);
593 pa_assert(u);
594 pa_assert(fd == u->sap_context.fd);
595 pa_assert(flags == PA_IO_EVENT_INPUT);
596
597 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
598 return;
599
600 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
601 return;
602
603 if (goodbye) {
604
605 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
606 session_free(s);
607
608 pa_sdp_info_destroy(&info);
609 } else {
610
611 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
612 if (!session_new(u, &info))
613 pa_sdp_info_destroy(&info);
614
615 } else {
616 struct timeval now;
617 pa_rtclock_get(&now);
618 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
619
620 pa_sdp_info_destroy(&info);
621 }
622 }
623 }
624
625 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
626 struct session *s, *n;
627 struct userdata *u = userdata;
628 struct timeval now;
629 struct timeval tv;
630
631 pa_assert(m);
632 pa_assert(t);
633 pa_assert(ptv);
634 pa_assert(u);
635
636 pa_rtclock_get(&now);
637
638 pa_log_debug("Checking for dead streams ...");
639
640 for (s = u->sessions; s; s = n) {
641 int k;
642 n = s->next;
643
644 k = pa_atomic_load(&s->timestamp);
645
646 if (k + DEATH_TIMEOUT < now.tv_sec)
647 session_free(s);
648 }
649
650 /* Restart timer */
651 pa_gettimeofday(&tv);
652 pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
653 m->time_restart(t, &tv);
654 }
655
656 int pa__init(pa_module*m) {
657 struct userdata *u;
658 pa_modargs *ma = NULL;
659 struct sockaddr_in sa4;
660 #ifdef HAVE_IPV6
661 struct sockaddr_in6 sa6;
662 #endif
663 struct sockaddr *sa;
664 socklen_t salen;
665 const char *sap_address;
666 int fd = -1;
667 struct timeval tv;
668
669 pa_assert(m);
670
671 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
672 pa_log("failed to parse module arguments");
673 goto fail;
674 }
675
676 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
677
678 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
679 sa4.sin_family = AF_INET;
680 sa4.sin_port = htons(SAP_PORT);
681 sa = (struct sockaddr*) &sa4;
682 salen = sizeof(sa4);
683 #ifdef HAVE_IPV6
684 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
685 sa6.sin6_family = AF_INET6;
686 sa6.sin6_port = htons(SAP_PORT);
687 sa = (struct sockaddr*) &sa6;
688 salen = sizeof(sa6);
689 #endif
690 } else {
691 pa_log("Invalid SAP address '%s'", sap_address);
692 goto fail;
693 }
694
695 if ((fd = mcast_socket(sa, salen)) < 0)
696 goto fail;
697
698 m->userdata = u = pa_xnew(struct userdata, 1);
699 u->module = m;
700 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
701
702 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
703 pa_sap_context_init_recv(&u->sap_context, fd);
704
705 PA_LLIST_HEAD_INIT(struct session, u->sessions);
706 u->n_sessions = 0;
707 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
708
709 pa_gettimeofday(&tv);
710 pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
711 u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
712
713 pa_modargs_free(ma);
714
715 return 0;
716
717 fail:
718 if (ma)
719 pa_modargs_free(ma);
720
721 if (fd >= 0)
722 pa_close(fd);
723
724 return -1;
725 }
726
727 void pa__done(pa_module*m) {
728 struct userdata *u;
729 struct session *s;
730
731 pa_assert(m);
732
733 if (!(u = m->userdata))
734 return;
735
736 if (u->sap_event)
737 m->core->mainloop->io_free(u->sap_event);
738
739 if (u->check_death_event)
740 m->core->mainloop->time_free(u->check_death_event);
741
742 pa_sap_context_destroy(&u->sap_context);
743
744 if (u->by_origin) {
745 while ((s = pa_hashmap_first(u->by_origin)))
746 session_free(s);
747
748 pa_hashmap_free(u->by_origin, NULL, NULL);
749 }
750
751 pa_xfree(u->sink_name);
752 pa_xfree(u);
753 }