]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
Merge branch 'master' of git://0pointer.de/pulseaudio
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
58
59 #include "module-rtp-recv-symdef.h"
60
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 NULL
86 };
87
88 struct session {
89 struct userdata *userdata;
90 PA_LLIST_FIELDS(struct session);
91
92 pa_sink_input *sink_input;
93 pa_memblockq *memblockq;
94
95 pa_bool_t first_packet;
96 uint32_t ssrc;
97 uint32_t offset;
98
99 struct pa_sdp_info sdp_info;
100
101 pa_rtp_context rtp_context;
102
103 pa_rtpoll_item *rtpoll_item;
104
105 pa_atomic_t timestamp;
106
107 pa_smoother *smoother;
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
110
111 pa_usec_t last_rate_update;
112 };
113
114 struct userdata {
115 pa_module *module;
116 pa_core *core;
117
118 pa_sap_context sap_context;
119 pa_io_event* sap_event;
120
121 pa_time_event *check_death_event;
122
123 char *sink_name;
124
125 PA_LLIST_HEAD(struct session, sessions);
126 pa_hashmap *by_origin;
127 int n_sessions;
128 };
129
130 static void session_free(struct session *s);
131
132 /* Called from I/O thread context */
133 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
134 struct session *s = PA_SINK_INPUT(o)->userdata;
135
136 switch (code) {
137 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
138 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
139
140 /* Fall through, the default handler will add in the extra
141 * latency added by the resampler */
142 break;
143 }
144
145 return pa_sink_input_process_msg(o, code, data, offset, chunk);
146 }
147
148 /* Called from I/O thread context */
149 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
150 struct session *s;
151 pa_sink_input_assert_ref(i);
152 pa_assert_se(s = i->userdata);
153
154 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
155 return -1;
156
157 pa_memblockq_drop(s->memblockq, chunk->length);
158
159 return 0;
160 }
161
162 /* Called from I/O thread context */
163 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
164 struct session *s;
165
166 pa_sink_input_assert_ref(i);
167 pa_assert_se(s = i->userdata);
168
169 pa_memblockq_rewind(s->memblockq, nbytes);
170 }
171
172 /* Called from I/O thread context */
173 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
174 struct session *s;
175
176 pa_sink_input_assert_ref(i);
177 pa_assert_se(s = i->userdata);
178
179 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
180 }
181
182 /* Called from main context */
183 static void sink_input_kill(pa_sink_input* i) {
184 struct session *s;
185 pa_sink_input_assert_ref(i);
186 pa_assert_se(s = i->userdata);
187
188 session_free(s);
189 }
190
191 /* Called from IO context */
192 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
193 struct session *s;
194 pa_sink_input_assert_ref(i);
195 pa_assert_se(s = i->userdata);
196
197 if (b) {
198 pa_smoother_pause(s->smoother, pa_rtclock_now());
199 pa_memblockq_flush_read(s->memblockq);
200 } else
201 s->first_packet = FALSE;
202 }
203
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 int64_t k, j, delta;
208 struct timeval now = { 0, 0 };
209 struct session *s;
210 struct pollfd *p;
211
212 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
213
214 p = pa_rtpoll_item_get_pollfd(i, NULL);
215
216 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217 pa_log("poll() signalled bad revents.");
218 return -1;
219 }
220
221 if ((p->revents & POLLIN) == 0)
222 return 0;
223
224 p->revents = 0;
225
226 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227 return 0;
228
229 if (s->sdp_info.payload != s->rtp_context.payload ||
230 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
233 }
234
235 if (!s->first_packet) {
236 s->first_packet = TRUE;
237
238 s->ssrc = s->rtp_context.ssrc;
239 s->offset = s->rtp_context.timestamp;
240
241 if (s->ssrc == s->userdata->module->core->cookie)
242 pa_log_warn("Detected RTP packet loop!");
243 } else {
244 if (s->ssrc != s->rtp_context.ssrc) {
245 pa_memblock_unref(chunk.memblock);
246 return 0;
247 }
248 }
249
250 /* Check whether there was a timestamp overflow */
251 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
253
254 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255 delta = k;
256 else
257 delta = j;
258
259 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
260
261 if (now.tv_sec == 0) {
262 PA_ONCE_BEGIN {
263 pa_log_warn("Using artificial time instead of timestamp");
264 } PA_ONCE_END;
265 pa_rtclock_get(&now);
266 } else
267 pa_rtclock_from_wallclock(&now);
268
269 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
270
271 /* Tell the smoother that we are rolling now, in case it is still paused */
272 pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
273
274 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
275 pa_log_warn("Queue overrun");
276 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
277 }
278
279 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280
281 pa_memblock_unref(chunk.memblock);
282
283 /* The next timestamp we expect */
284 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
285
286 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
287
288 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
289 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
290 unsigned fix_samples;
291
292 pa_log_debug("Updating sample rate");
293
294 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
295 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
296
297 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
298
299 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
300 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
301
302 if (ri > render_delay+sink_delay)
303 ri -= render_delay+sink_delay;
304 else
305 ri = 0;
306
307 if (wi < ri)
308 latency = 0;
309 else
310 latency = wi - ri;
311
312 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
313
314 /* Calculate deviation */
315 if (latency < s->intended_latency)
316 fix = s->intended_latency - latency;
317 else
318 fix = latency - s->intended_latency;
319
320 /* How many samples is this per second? */
321 fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
322
323 /* Check if deviation is in bounds */
324 if (fix_samples > s->sink_input->sample_spec.rate*.50)
325 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
326 else {
327 /* Fix up rate */
328 if (latency < s->intended_latency)
329 s->sink_input->sample_spec.rate -= fix_samples;
330 else
331 s->sink_input->sample_spec.rate += fix_samples;
332
333 if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
334 s->sink_input->sample_spec.rate = PA_RATE_MAX;
335 }
336
337 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
338
339 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
340
341 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
342
343 s->last_rate_update = pa_timeval_load(&now);
344 }
345
346 if (pa_memblockq_is_readable(s->memblockq) &&
347 s->sink_input->thread_info.underrun_for > 0) {
348 pa_log_debug("Requesting rewind due to end of underrun");
349 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
350 }
351
352 return 1;
353 }
354
355 /* Called from I/O thread context */
356 static void sink_input_attach(pa_sink_input *i) {
357 struct session *s;
358 struct pollfd *p;
359
360 pa_sink_input_assert_ref(i);
361 pa_assert_se(s = i->userdata);
362
363 pa_assert(!s->rtpoll_item);
364 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
365
366 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
367 p->fd = s->rtp_context.fd;
368 p->events = POLLIN;
369 p->revents = 0;
370
371 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
372 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
373 }
374
375 /* Called from I/O thread context */
376 static void sink_input_detach(pa_sink_input *i) {
377 struct session *s;
378 pa_sink_input_assert_ref(i);
379 pa_assert_se(s = i->userdata);
380
381 pa_assert(s->rtpoll_item);
382 pa_rtpoll_item_free(s->rtpoll_item);
383 s->rtpoll_item = NULL;
384 }
385
386 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
387 int af, fd = -1, r, one;
388
389 pa_assert(sa);
390 pa_assert(salen > 0);
391
392 af = sa->sa_family;
393 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
394 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
395 goto fail;
396 }
397
398 pa_make_udp_socket_low_delay(fd);
399
400 one = 1;
401 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
402 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
403 goto fail;
404 }
405
406 one = 1;
407 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
408 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
409 goto fail;
410 }
411
412 if (af == AF_INET) {
413 struct ip_mreq mr4;
414 memset(&mr4, 0, sizeof(mr4));
415 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
416 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
417 #ifdef HAVE_IPV6
418 } else {
419 struct ipv6_mreq mr6;
420 memset(&mr6, 0, sizeof(mr6));
421 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
422 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
423 #endif
424 }
425
426 if (r < 0) {
427 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
428 goto fail;
429 }
430
431 if (bind(fd, sa, salen) < 0) {
432 pa_log("bind() failed: %s", pa_cstrerror(errno));
433 goto fail;
434 }
435
436 return fd;
437
438 fail:
439 if (fd >= 0)
440 close(fd);
441
442 return -1;
443 }
444
445 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
446 struct session *s = NULL;
447 pa_sink *sink;
448 int fd = -1;
449 pa_memchunk silence;
450 pa_sink_input_new_data data;
451 struct timeval now;
452
453 pa_assert(u);
454 pa_assert(sdp_info);
455
456 if (u->n_sessions >= MAX_SESSIONS) {
457 pa_log("Session limit reached.");
458 goto fail;
459 }
460
461 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
462 pa_log("Sink does not exist.");
463 goto fail;
464 }
465
466 pa_rtclock_get(&now);
467
468 s = pa_xnew0(struct session, 1);
469 s->userdata = u;
470 s->first_packet = FALSE;
471 s->sdp_info = *sdp_info;
472 s->rtpoll_item = NULL;
473 s->intended_latency = LATENCY_USEC;
474 s->smoother = pa_smoother_new(
475 PA_USEC_PER_SEC*5,
476 PA_USEC_PER_SEC*2,
477 TRUE,
478 TRUE,
479 10,
480 pa_timeval_load(&now),
481 TRUE);
482 s->last_rate_update = pa_timeval_load(&now);
483 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
484
485 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
486 goto fail;
487
488 pa_sink_input_new_data_init(&data);
489 data.sink = sink;
490 data.driver = __FILE__;
491 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
492 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
493 "RTP Stream%s%s%s",
494 sdp_info->session_name ? " (" : "",
495 sdp_info->session_name ? sdp_info->session_name : "",
496 sdp_info->session_name ? ")" : "");
497
498 if (sdp_info->session_name)
499 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
500 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
501 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
502 data.module = u->module;
503 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
504 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
505
506 pa_sink_input_new(&s->sink_input, u->module->core, &data);
507 pa_sink_input_new_data_done(&data);
508
509 if (!s->sink_input) {
510 pa_log("Failed to create sink input.");
511 goto fail;
512 }
513
514 s->sink_input->userdata = s;
515
516 s->sink_input->parent.process_msg = sink_input_process_msg;
517 s->sink_input->pop = sink_input_pop_cb;
518 s->sink_input->process_rewind = sink_input_process_rewind_cb;
519 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
520 s->sink_input->kill = sink_input_kill;
521 s->sink_input->attach = sink_input_attach;
522 s->sink_input->detach = sink_input_detach;
523 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
524
525 pa_sink_input_get_silence(s->sink_input, &silence);
526
527 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
528
529 if (s->intended_latency < s->sink_latency*2)
530 s->intended_latency = s->sink_latency*2;
531
532 s->memblockq = pa_memblockq_new(
533 0,
534 MEMBLOCKQ_MAXLENGTH,
535 MEMBLOCKQ_MAXLENGTH,
536 pa_frame_size(&s->sink_input->sample_spec),
537 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
538 0,
539 0,
540 &silence);
541
542 pa_memblock_unref(silence.memblock);
543
544 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
545
546 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
547 u->n_sessions++;
548 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
549
550 pa_sink_input_put(s->sink_input);
551
552 pa_log_info("New session '%s'", s->sdp_info.session_name);
553
554 return s;
555
556 fail:
557 pa_xfree(s);
558
559 if (fd >= 0)
560 pa_close(fd);
561
562 return NULL;
563 }
564
565 static void session_free(struct session *s) {
566 pa_assert(s);
567
568 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
569
570 pa_sink_input_unlink(s->sink_input);
571 pa_sink_input_unref(s->sink_input);
572
573 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
574 pa_assert(s->userdata->n_sessions >= 1);
575 s->userdata->n_sessions--;
576 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
577
578 pa_memblockq_free(s->memblockq);
579 pa_sdp_info_destroy(&s->sdp_info);
580 pa_rtp_context_destroy(&s->rtp_context);
581
582 pa_smoother_free(s->smoother);
583
584 pa_xfree(s);
585 }
586
587 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
588 struct userdata *u = userdata;
589 pa_bool_t goodbye = FALSE;
590 pa_sdp_info info;
591 struct session *s;
592
593 pa_assert(m);
594 pa_assert(e);
595 pa_assert(u);
596 pa_assert(fd == u->sap_context.fd);
597 pa_assert(flags == PA_IO_EVENT_INPUT);
598
599 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
600 return;
601
602 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
603 return;
604
605 if (goodbye) {
606
607 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
608 session_free(s);
609
610 pa_sdp_info_destroy(&info);
611 } else {
612
613 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
614 if (!session_new(u, &info))
615 pa_sdp_info_destroy(&info);
616
617 } else {
618 struct timeval now;
619 pa_rtclock_get(&now);
620 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
621
622 pa_sdp_info_destroy(&info);
623 }
624 }
625 }
626
627 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
628 struct session *s, *n;
629 struct userdata *u = userdata;
630 struct timeval now;
631
632 pa_assert(m);
633 pa_assert(t);
634 pa_assert(u);
635
636 pa_rtclock_get(&now);
637
638 pa_log_debug("Checking for dead streams ...");
639
640 for (s = u->sessions; s; s = n) {
641 int k;
642 n = s->next;
643
644 k = pa_atomic_load(&s->timestamp);
645
646 if (k + DEATH_TIMEOUT < now.tv_sec)
647 session_free(s);
648 }
649
650 /* Restart timer */
651 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
652 }
653
654 int pa__init(pa_module*m) {
655 struct userdata *u;
656 pa_modargs *ma = NULL;
657 struct sockaddr_in sa4;
658 #ifdef HAVE_IPV6
659 struct sockaddr_in6 sa6;
660 #endif
661 struct sockaddr *sa;
662 socklen_t salen;
663 const char *sap_address;
664 int fd = -1;
665
666 pa_assert(m);
667
668 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
669 pa_log("failed to parse module arguments");
670 goto fail;
671 }
672
673 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
674
675 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
676 sa4.sin_family = AF_INET;
677 sa4.sin_port = htons(SAP_PORT);
678 sa = (struct sockaddr*) &sa4;
679 salen = sizeof(sa4);
680 #ifdef HAVE_IPV6
681 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
682 sa6.sin6_family = AF_INET6;
683 sa6.sin6_port = htons(SAP_PORT);
684 sa = (struct sockaddr*) &sa6;
685 salen = sizeof(sa6);
686 #endif
687 } else {
688 pa_log("Invalid SAP address '%s'", sap_address);
689 goto fail;
690 }
691
692 if ((fd = mcast_socket(sa, salen)) < 0)
693 goto fail;
694
695 m->userdata = u = pa_xnew(struct userdata, 1);
696 u->module = m;
697 u->core = m->core;
698 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
699
700 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
701 pa_sap_context_init_recv(&u->sap_context, fd);
702
703 PA_LLIST_HEAD_INIT(struct session, u->sessions);
704 u->n_sessions = 0;
705 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
706
707 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
708
709 pa_modargs_free(ma);
710
711 return 0;
712
713 fail:
714 if (ma)
715 pa_modargs_free(ma);
716
717 if (fd >= 0)
718 pa_close(fd);
719
720 return -1;
721 }
722
723 void pa__done(pa_module*m) {
724 struct userdata *u;
725 struct session *s;
726
727 pa_assert(m);
728
729 if (!(u = m->userdata))
730 return;
731
732 if (u->sap_event)
733 m->core->mainloop->io_free(u->sap_event);
734
735 if (u->check_death_event)
736 m->core->mainloop->time_free(u->check_death_event);
737
738 pa_sap_context_destroy(&u->sap_context);
739
740 if (u->by_origin) {
741 while ((s = pa_hashmap_first(u->by_origin)))
742 session_free(s);
743
744 pa_hashmap_free(u->by_origin, NULL, NULL);
745 }
746
747 pa_xfree(u->sink_name);
748 pa_xfree(u);
749 }