]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
hashmap: Add the ability to free keys
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <math.h>
34
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(false);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82 "sink",
83 "sap_address",
84 NULL
85 };
86
87 struct session {
88 struct userdata *userdata;
89 PA_LLIST_FIELDS(struct session);
90
91 pa_sink_input *sink_input;
92 pa_memblockq *memblockq;
93
94 bool first_packet;
95 uint32_t ssrc;
96 uint32_t offset;
97
98 struct pa_sdp_info sdp_info;
99
100 pa_rtp_context rtp_context;
101
102 pa_rtpoll_item *rtpoll_item;
103
104 pa_atomic_t timestamp;
105
106 pa_usec_t intended_latency;
107 pa_usec_t sink_latency;
108
109 pa_usec_t last_rate_update;
110 pa_usec_t last_latency;
111 double estimated_rate;
112 double avg_estimated_rate;
113 };
114
115 struct userdata {
116 pa_module *module;
117 pa_core *core;
118
119 pa_sap_context sap_context;
120 pa_io_event* sap_event;
121
122 pa_time_event *check_death_event;
123
124 char *sink_name;
125
126 PA_LLIST_HEAD(struct session, sessions);
127 pa_hashmap *by_origin;
128 int n_sessions;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135 struct session *s = PA_SINK_INPUT(o)->userdata;
136
137 switch (code) {
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
143 break;
144 }
145
146 return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151 struct session *s;
152 pa_sink_input_assert_ref(i);
153 pa_assert_se(s = i->userdata);
154
155 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156 return -1;
157
158 pa_memblockq_drop(s->memblockq, chunk->length);
159
160 return 0;
161 }
162
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165 struct session *s;
166
167 pa_sink_input_assert_ref(i);
168 pa_assert_se(s = i->userdata);
169
170 pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175 struct session *s;
176
177 pa_sink_input_assert_ref(i);
178 pa_assert_se(s = i->userdata);
179
180 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input* i) {
185 struct session *s;
186 pa_sink_input_assert_ref(i);
187 pa_assert_se(s = i->userdata);
188
189 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
190 session_free(s);
191 }
192
193 /* Called from IO context */
194 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
195 struct session *s;
196 pa_sink_input_assert_ref(i);
197 pa_assert_se(s = i->userdata);
198
199 if (b)
200 pa_memblockq_flush_read(s->memblockq);
201 else
202 s->first_packet = false;
203 }
204
205 /* Called from I/O thread context */
206 static int rtpoll_work_cb(pa_rtpoll_item *i) {
207 pa_memchunk chunk;
208 int64_t k, j, delta;
209 struct timeval now = { 0, 0 };
210 struct session *s;
211 struct pollfd *p;
212
213 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
214
215 p = pa_rtpoll_item_get_pollfd(i, NULL);
216
217 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218 pa_log("poll() signalled bad revents.");
219 return -1;
220 }
221
222 if ((p->revents & POLLIN) == 0)
223 return 0;
224
225 p->revents = 0;
226
227 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
228 return 0;
229
230 if (s->sdp_info.payload != s->rtp_context.payload ||
231 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
232 pa_memblock_unref(chunk.memblock);
233 return 0;
234 }
235
236 if (!s->first_packet) {
237 s->first_packet = true;
238
239 s->ssrc = s->rtp_context.ssrc;
240 s->offset = s->rtp_context.timestamp;
241
242 if (s->ssrc == s->userdata->module->core->cookie)
243 pa_log_warn("Detected RTP packet loop!");
244 } else {
245 if (s->ssrc != s->rtp_context.ssrc) {
246 pa_memblock_unref(chunk.memblock);
247 return 0;
248 }
249 }
250
251 /* Check whether there was a timestamp overflow */
252 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
253 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
254
255 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
256 delta = k;
257 else
258 delta = j;
259
260 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
261
262 if (now.tv_sec == 0) {
263 PA_ONCE_BEGIN {
264 pa_log_warn("Using artificial time instead of timestamp");
265 } PA_ONCE_END;
266 pa_rtclock_get(&now);
267 } else
268 pa_rtclock_from_wallclock(&now);
269
270 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
271 pa_log_warn("Queue overrun");
272 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
273 }
274
275 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
276
277 pa_memblock_unref(chunk.memblock);
278
279 /* The next timestamp we expect */
280 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
281
282 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
283
284 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
285 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
286 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
287 uint32_t current_rate = s->sink_input->sample_spec.rate;
288 uint32_t new_rate;
289 double estimated_rate, alpha = 0.02;
290
291 pa_log_debug("Updating sample rate");
292
293 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
294 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
295
296 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
297
298 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
299 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
300
301 if (ri > render_delay+sink_delay)
302 ri -= render_delay+sink_delay;
303 else
304 ri = 0;
305
306 if (wi < ri)
307 latency = 0;
308 else
309 latency = wi - ri;
310
311 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
312
313 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
314 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
315 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
316 * T
317 * R̂ = ─────────────── Rⁿ . (1)
318 * T - (Lⁿ - Lⁿ⁻ⁱ)
319 *
320 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
321 * is correct). But there is also the requirement to keep the buffer at a predefined target
322 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
323 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
324 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
325 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
326 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
327 * ʲ⁼ⁱ R̂ a a
328 * Solving for Rⁿ⁺ⁱ gives
329 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
330 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
331 * T
332 * In the code below a = 7 is used.
333 *
334 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
335 * of the estimated rate R̂ is used. This average R̅ is defined as
336 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
337 * Because it is difficult to find a fixed value for the coefficient α such that the
338 * averaging is without significant lag but oscillations are filtered out, a heuristic is
339 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
340 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
341 */
342 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
343 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
344 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
345 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
346 }
347 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
348 s->estimated_rate = estimated_rate;
349 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
350 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
351 s->last_latency = latency;
352
353 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
354 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
355 new_rate = base_rate;
356 } else {
357 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
358 new_rate = base_rate;
359 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
360 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
361 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
362 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
363 }
364 }
365 s->sink_input->sample_spec.rate = new_rate;
366
367 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
368
369 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
370
371 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
372
373 s->last_rate_update = pa_timeval_load(&now);
374 }
375
376 if (pa_memblockq_is_readable(s->memblockq) &&
377 s->sink_input->thread_info.underrun_for > 0) {
378 pa_log_debug("Requesting rewind due to end of underrun");
379 pa_sink_input_request_rewind(s->sink_input,
380 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
381 false, true, false);
382 }
383
384 return 1;
385 }
386
387 /* Called from I/O thread context */
388 static void sink_input_attach(pa_sink_input *i) {
389 struct session *s;
390 struct pollfd *p;
391
392 pa_sink_input_assert_ref(i);
393 pa_assert_se(s = i->userdata);
394
395 pa_assert(!s->rtpoll_item);
396 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
397
398 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
399 p->fd = s->rtp_context.fd;
400 p->events = POLLIN;
401 p->revents = 0;
402
403 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
404 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
405 }
406
407 /* Called from I/O thread context */
408 static void sink_input_detach(pa_sink_input *i) {
409 struct session *s;
410 pa_sink_input_assert_ref(i);
411 pa_assert_se(s = i->userdata);
412
413 pa_assert(s->rtpoll_item);
414 pa_rtpoll_item_free(s->rtpoll_item);
415 s->rtpoll_item = NULL;
416 }
417
418 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
419 int af, fd = -1, r, one;
420
421 pa_assert(sa);
422 pa_assert(salen > 0);
423
424 af = sa->sa_family;
425 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
426 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
427 goto fail;
428 }
429
430 pa_make_udp_socket_low_delay(fd);
431
432 #ifdef SO_TIMESTAMP
433 one = 1;
434 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
435 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
436 goto fail;
437 }
438 #else
439 pa_log("SO_TIMESTAMP unsupported on this platform");
440 goto fail;
441 #endif
442
443 one = 1;
444 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
445 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
446 goto fail;
447 }
448
449 if (af == AF_INET) {
450 struct ip_mreq mr4;
451 memset(&mr4, 0, sizeof(mr4));
452 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
453 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
454 #ifdef HAVE_IPV6
455 } else if (af == AF_INET6) {
456 struct ipv6_mreq mr6;
457 memset(&mr6, 0, sizeof(mr6));
458 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
459 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
460 #endif
461 } else
462 pa_assert_not_reached();
463
464 if (r < 0) {
465 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
466 goto fail;
467 }
468
469 if (bind(fd, sa, salen) < 0) {
470 pa_log("bind() failed: %s", pa_cstrerror(errno));
471 goto fail;
472 }
473
474 return fd;
475
476 fail:
477 if (fd >= 0)
478 close(fd);
479
480 return -1;
481 }
482
483 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
484 struct session *s = NULL;
485 pa_sink *sink;
486 int fd = -1;
487 pa_memchunk silence;
488 pa_sink_input_new_data data;
489 struct timeval now;
490
491 pa_assert(u);
492 pa_assert(sdp_info);
493
494 if (u->n_sessions >= MAX_SESSIONS) {
495 pa_log("Session limit reached.");
496 goto fail;
497 }
498
499 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
500 pa_log("Sink does not exist.");
501 goto fail;
502 }
503
504 pa_rtclock_get(&now);
505
506 s = pa_xnew0(struct session, 1);
507 s->userdata = u;
508 s->first_packet = false;
509 s->sdp_info = *sdp_info;
510 s->rtpoll_item = NULL;
511 s->intended_latency = LATENCY_USEC;
512 s->last_rate_update = pa_timeval_load(&now);
513 s->last_latency = LATENCY_USEC;
514 s->estimated_rate = (double) sink->sample_spec.rate;
515 s->avg_estimated_rate = (double) sink->sample_spec.rate;
516 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
517
518 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
519 goto fail;
520
521 pa_sink_input_new_data_init(&data);
522 pa_sink_input_new_data_set_sink(&data, sink, false);
523 data.driver = __FILE__;
524 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
525 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
526 "RTP Stream%s%s%s",
527 sdp_info->session_name ? " (" : "",
528 sdp_info->session_name ? sdp_info->session_name : "",
529 sdp_info->session_name ? ")" : "");
530
531 if (sdp_info->session_name)
532 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
533 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
534 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
535 data.module = u->module;
536 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
537 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
538
539 pa_sink_input_new(&s->sink_input, u->module->core, &data);
540 pa_sink_input_new_data_done(&data);
541
542 if (!s->sink_input) {
543 pa_log("Failed to create sink input.");
544 goto fail;
545 }
546
547 s->sink_input->userdata = s;
548
549 s->sink_input->parent.process_msg = sink_input_process_msg;
550 s->sink_input->pop = sink_input_pop_cb;
551 s->sink_input->process_rewind = sink_input_process_rewind_cb;
552 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
553 s->sink_input->kill = sink_input_kill;
554 s->sink_input->attach = sink_input_attach;
555 s->sink_input->detach = sink_input_detach;
556 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
557
558 pa_sink_input_get_silence(s->sink_input, &silence);
559
560 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
561
562 if (s->intended_latency < s->sink_latency*2)
563 s->intended_latency = s->sink_latency*2;
564
565 s->memblockq = pa_memblockq_new(
566 "module-rtp-recv memblockq",
567 0,
568 MEMBLOCKQ_MAXLENGTH,
569 MEMBLOCKQ_MAXLENGTH,
570 &s->sink_input->sample_spec,
571 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
572 0,
573 0,
574 &silence);
575
576 pa_memblock_unref(silence.memblock);
577
578 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
579
580 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
581 u->n_sessions++;
582 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
583
584 pa_sink_input_put(s->sink_input);
585
586 pa_log_info("New session '%s'", s->sdp_info.session_name);
587
588 return s;
589
590 fail:
591 pa_xfree(s);
592
593 if (fd >= 0)
594 pa_close(fd);
595
596 return NULL;
597 }
598
599 static void session_free(struct session *s) {
600 pa_assert(s);
601
602 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
603
604 pa_sink_input_unlink(s->sink_input);
605 pa_sink_input_unref(s->sink_input);
606
607 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
608 pa_assert(s->userdata->n_sessions >= 1);
609 s->userdata->n_sessions--;
610
611 pa_memblockq_free(s->memblockq);
612 pa_sdp_info_destroy(&s->sdp_info);
613 pa_rtp_context_destroy(&s->rtp_context);
614
615 pa_xfree(s);
616 }
617
618 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
619 struct userdata *u = userdata;
620 bool goodbye = false;
621 pa_sdp_info info;
622 struct session *s;
623
624 pa_assert(m);
625 pa_assert(e);
626 pa_assert(u);
627 pa_assert(fd == u->sap_context.fd);
628 pa_assert(flags == PA_IO_EVENT_INPUT);
629
630 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
631 return;
632
633 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
634 return;
635
636 if (goodbye) {
637
638 if ((s = pa_hashmap_remove(u->by_origin, info.origin)))
639 session_free(s);
640
641 pa_sdp_info_destroy(&info);
642 } else {
643
644 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
645 if (!session_new(u, &info))
646 pa_sdp_info_destroy(&info);
647
648 } else {
649 struct timeval now;
650 pa_rtclock_get(&now);
651 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
652
653 pa_sdp_info_destroy(&info);
654 }
655 }
656 }
657
658 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
659 struct session *s, *n;
660 struct userdata *u = userdata;
661 struct timeval now;
662
663 pa_assert(m);
664 pa_assert(t);
665 pa_assert(u);
666
667 pa_rtclock_get(&now);
668
669 pa_log_debug("Checking for dead streams ...");
670
671 for (s = u->sessions; s; s = n) {
672 int k;
673 n = s->next;
674
675 k = pa_atomic_load(&s->timestamp);
676
677 if (k + DEATH_TIMEOUT < now.tv_sec) {
678 pa_hashmap_remove(u->by_origin, s->sdp_info.origin);
679 session_free(s);
680 }
681 }
682
683 /* Restart timer */
684 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
685 }
686
687 int pa__init(pa_module*m) {
688 struct userdata *u;
689 pa_modargs *ma = NULL;
690 struct sockaddr_in sa4;
691 #ifdef HAVE_IPV6
692 struct sockaddr_in6 sa6;
693 #endif
694 struct sockaddr *sa;
695 socklen_t salen;
696 const char *sap_address;
697 int fd = -1;
698
699 pa_assert(m);
700
701 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
702 pa_log("failed to parse module arguments");
703 goto fail;
704 }
705
706 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
707
708 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
709 sa4.sin_family = AF_INET;
710 sa4.sin_port = htons(SAP_PORT);
711 sa = (struct sockaddr*) &sa4;
712 salen = sizeof(sa4);
713 #ifdef HAVE_IPV6
714 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
715 sa6.sin6_family = AF_INET6;
716 sa6.sin6_port = htons(SAP_PORT);
717 sa = (struct sockaddr*) &sa6;
718 salen = sizeof(sa6);
719 #endif
720 } else {
721 pa_log("Invalid SAP address '%s'", sap_address);
722 goto fail;
723 }
724
725 if ((fd = mcast_socket(sa, salen)) < 0)
726 goto fail;
727
728 m->userdata = u = pa_xnew(struct userdata, 1);
729 u->module = m;
730 u->core = m->core;
731 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
732
733 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
734 pa_sap_context_init_recv(&u->sap_context, fd);
735
736 PA_LLIST_HEAD_INIT(struct session, u->sessions);
737 u->n_sessions = 0;
738 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
739
740 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
741
742 pa_modargs_free(ma);
743
744 return 0;
745
746 fail:
747 if (ma)
748 pa_modargs_free(ma);
749
750 if (fd >= 0)
751 pa_close(fd);
752
753 return -1;
754 }
755
756 void pa__done(pa_module*m) {
757 struct userdata *u;
758
759 pa_assert(m);
760
761 if (!(u = m->userdata))
762 return;
763
764 if (u->sap_event)
765 m->core->mainloop->io_free(u->sap_event);
766
767 if (u->check_death_event)
768 m->core->mainloop->time_free(u->check_death_event);
769
770 pa_sap_context_destroy(&u->sap_context);
771
772 if (u->by_origin)
773 pa_hashmap_free(u->by_origin);
774
775 pa_xfree(u->sink_name);
776 pa_xfree(u);
777 }