]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
3cf5d890831323377890af7ed8bd103a815ef3ad
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
58
59 #include "module-rtp-recv-symdef.h"
60
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 NULL
86 };
87
88 struct session {
89 struct userdata *userdata;
90 PA_LLIST_FIELDS(struct session);
91
92 pa_sink_input *sink_input;
93 pa_memblockq *memblockq;
94
95 pa_bool_t first_packet;
96 uint32_t ssrc;
97 uint32_t offset;
98
99 struct pa_sdp_info sdp_info;
100
101 pa_rtp_context rtp_context;
102
103 pa_rtpoll_item *rtpoll_item;
104
105 pa_atomic_t timestamp;
106
107 pa_smoother *smoother;
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
110
111 pa_usec_t last_rate_update;
112 pa_usec_t last_latency;
113 double estimated_rate;
114 double avg_estimated_rate;
115 };
116
117 struct userdata {
118 pa_module *module;
119 pa_core *core;
120
121 pa_sap_context sap_context;
122 pa_io_event* sap_event;
123
124 pa_time_event *check_death_event;
125
126 char *sink_name;
127
128 PA_LLIST_HEAD(struct session, sessions);
129 pa_hashmap *by_origin;
130 int n_sessions;
131 };
132
133 static void session_free(struct session *s);
134
135 /* Called from I/O thread context */
136 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
137 struct session *s = PA_SINK_INPUT(o)->userdata;
138
139 switch (code) {
140 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
141 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
142
143 /* Fall through, the default handler will add in the extra
144 * latency added by the resampler */
145 break;
146 }
147
148 return pa_sink_input_process_msg(o, code, data, offset, chunk);
149 }
150
151 /* Called from I/O thread context */
152 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
153 struct session *s;
154 pa_sink_input_assert_ref(i);
155 pa_assert_se(s = i->userdata);
156
157 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
158 return -1;
159
160 pa_memblockq_drop(s->memblockq, chunk->length);
161
162 return 0;
163 }
164
165 /* Called from I/O thread context */
166 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
167 struct session *s;
168
169 pa_sink_input_assert_ref(i);
170 pa_assert_se(s = i->userdata);
171
172 pa_memblockq_rewind(s->memblockq, nbytes);
173 }
174
175 /* Called from I/O thread context */
176 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
177 struct session *s;
178
179 pa_sink_input_assert_ref(i);
180 pa_assert_se(s = i->userdata);
181
182 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
183 }
184
185 /* Called from main context */
186 static void sink_input_kill(pa_sink_input* i) {
187 struct session *s;
188 pa_sink_input_assert_ref(i);
189 pa_assert_se(s = i->userdata);
190
191 session_free(s);
192 }
193
194 /* Called from IO context */
195 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
196 struct session *s;
197 pa_sink_input_assert_ref(i);
198 pa_assert_se(s = i->userdata);
199
200 if (b) {
201 pa_smoother_pause(s->smoother, pa_rtclock_now());
202 pa_memblockq_flush_read(s->memblockq);
203 } else
204 s->first_packet = FALSE;
205 }
206
207 /* Called from I/O thread context */
208 static int rtpoll_work_cb(pa_rtpoll_item *i) {
209 pa_memchunk chunk;
210 int64_t k, j, delta;
211 struct timeval now = { 0, 0 };
212 struct session *s;
213 struct pollfd *p;
214
215 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
216
217 p = pa_rtpoll_item_get_pollfd(i, NULL);
218
219 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
220 pa_log("poll() signalled bad revents.");
221 return -1;
222 }
223
224 if ((p->revents & POLLIN) == 0)
225 return 0;
226
227 p->revents = 0;
228
229 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
230 return 0;
231
232 if (s->sdp_info.payload != s->rtp_context.payload ||
233 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
234 pa_memblock_unref(chunk.memblock);
235 return 0;
236 }
237
238 if (!s->first_packet) {
239 s->first_packet = TRUE;
240
241 s->ssrc = s->rtp_context.ssrc;
242 s->offset = s->rtp_context.timestamp;
243
244 if (s->ssrc == s->userdata->module->core->cookie)
245 pa_log_warn("Detected RTP packet loop!");
246 } else {
247 if (s->ssrc != s->rtp_context.ssrc) {
248 pa_memblock_unref(chunk.memblock);
249 return 0;
250 }
251 }
252
253 /* Check whether there was a timestamp overflow */
254 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
255 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
256
257 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
258 delta = k;
259 else
260 delta = j;
261
262 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
263
264 if (now.tv_sec == 0) {
265 PA_ONCE_BEGIN {
266 pa_log_warn("Using artificial time instead of timestamp");
267 } PA_ONCE_END;
268 pa_rtclock_get(&now);
269 } else
270 pa_rtclock_from_wallclock(&now);
271
272 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
273
274 /* Tell the smoother that we are rolling now, in case it is still paused */
275 pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
276
277 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
278 pa_log_warn("Queue overrun");
279 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
280 }
281
282 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
283
284 pa_memblock_unref(chunk.memblock);
285
286 /* The next timestamp we expect */
287 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
288
289 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
290
291 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
292 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
293 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
294 uint32_t current_rate = s->sink_input->sample_spec.rate;
295 uint32_t new_rate;
296 double estimated_rate, alpha = 0.02;
297
298 pa_log_debug("Updating sample rate");
299
300 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
301 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
302
303 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
304
305 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
306 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
307
308 if (ri > render_delay+sink_delay)
309 ri -= render_delay+sink_delay;
310 else
311 ri = 0;
312
313 if (wi < ri)
314 latency = 0;
315 else
316 latency = wi - ri;
317
318 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
319
320 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
321 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
322 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
323 * T
324 * R̂ = ─────────────── Rⁿ . (1)
325 * T - (Lⁿ - Lⁿ⁻ⁱ)
326 *
327 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
328 * is correct). But there is also the requirement to keep the buffer at a predefined target
329 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
330 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
331 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
332 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
333 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
334 * ʲ⁼ⁱ R̂ a a
335 * Solving for Rⁿ⁺ⁱ gives
336 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
337 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
338 * T
339 * In the code below a = 7 is used.
340 *
341 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
342 * of the estimated rate R̂ is used. This average R̅ is defined as
343 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
344 * Because it is difficult to find a fixed value for the coefficient α such that the
345 * averaging is without significant lag but oscillations are filtered out, a heuristic is
346 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
347 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
348 */
349 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
350 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
351 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
352 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
353 }
354 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
355 s->estimated_rate = estimated_rate;
356 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
357 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
358 s->last_latency = latency;
359
360 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
361 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
362 new_rate = base_rate;
363 } else {
364 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
365 new_rate = base_rate;
366 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
367 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
368 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
369 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
370 }
371 }
372 s->sink_input->sample_spec.rate = new_rate;
373
374 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
375
376 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
377
378 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
379
380 s->last_rate_update = pa_timeval_load(&now);
381 }
382
383 if (pa_memblockq_is_readable(s->memblockq) &&
384 s->sink_input->thread_info.underrun_for > 0) {
385 pa_log_debug("Requesting rewind due to end of underrun");
386 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
387 }
388
389 return 1;
390 }
391
392 /* Called from I/O thread context */
393 static void sink_input_attach(pa_sink_input *i) {
394 struct session *s;
395 struct pollfd *p;
396
397 pa_sink_input_assert_ref(i);
398 pa_assert_se(s = i->userdata);
399
400 pa_assert(!s->rtpoll_item);
401 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
402
403 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
404 p->fd = s->rtp_context.fd;
405 p->events = POLLIN;
406 p->revents = 0;
407
408 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
409 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
410 }
411
412 /* Called from I/O thread context */
413 static void sink_input_detach(pa_sink_input *i) {
414 struct session *s;
415 pa_sink_input_assert_ref(i);
416 pa_assert_se(s = i->userdata);
417
418 pa_assert(s->rtpoll_item);
419 pa_rtpoll_item_free(s->rtpoll_item);
420 s->rtpoll_item = NULL;
421 }
422
423 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
424 int af, fd = -1, r, one;
425
426 pa_assert(sa);
427 pa_assert(salen > 0);
428
429 af = sa->sa_family;
430 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
431 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
432 goto fail;
433 }
434
435 pa_make_udp_socket_low_delay(fd);
436
437 one = 1;
438 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
439 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
440 goto fail;
441 }
442
443 one = 1;
444 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
445 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
446 goto fail;
447 }
448
449 if (af == AF_INET) {
450 struct ip_mreq mr4;
451 memset(&mr4, 0, sizeof(mr4));
452 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
453 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
454 #ifdef HAVE_IPV6
455 } else {
456 struct ipv6_mreq mr6;
457 memset(&mr6, 0, sizeof(mr6));
458 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
459 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
460 #endif
461 }
462
463 if (r < 0) {
464 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
465 goto fail;
466 }
467
468 if (bind(fd, sa, salen) < 0) {
469 pa_log("bind() failed: %s", pa_cstrerror(errno));
470 goto fail;
471 }
472
473 return fd;
474
475 fail:
476 if (fd >= 0)
477 close(fd);
478
479 return -1;
480 }
481
482 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
483 struct session *s = NULL;
484 pa_sink *sink;
485 int fd = -1;
486 pa_memchunk silence;
487 pa_sink_input_new_data data;
488 struct timeval now;
489
490 pa_assert(u);
491 pa_assert(sdp_info);
492
493 if (u->n_sessions >= MAX_SESSIONS) {
494 pa_log("Session limit reached.");
495 goto fail;
496 }
497
498 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
499 pa_log("Sink does not exist.");
500 goto fail;
501 }
502
503 pa_rtclock_get(&now);
504
505 s = pa_xnew0(struct session, 1);
506 s->userdata = u;
507 s->first_packet = FALSE;
508 s->sdp_info = *sdp_info;
509 s->rtpoll_item = NULL;
510 s->intended_latency = LATENCY_USEC;
511 s->smoother = pa_smoother_new(
512 PA_USEC_PER_SEC*5,
513 PA_USEC_PER_SEC*2,
514 TRUE,
515 TRUE,
516 10,
517 pa_timeval_load(&now),
518 TRUE);
519 s->last_rate_update = pa_timeval_load(&now);
520 s->last_latency = LATENCY_USEC;
521 s->estimated_rate = (double) sink->sample_spec.rate;
522 s->avg_estimated_rate = (double) sink->sample_spec.rate;
523 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
524
525 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
526 goto fail;
527
528 pa_sink_input_new_data_init(&data);
529 data.sink = sink;
530 data.driver = __FILE__;
531 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
532 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
533 "RTP Stream%s%s%s",
534 sdp_info->session_name ? " (" : "",
535 sdp_info->session_name ? sdp_info->session_name : "",
536 sdp_info->session_name ? ")" : "");
537
538 if (sdp_info->session_name)
539 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
540 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
541 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
542 data.module = u->module;
543 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
544 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
545
546 pa_sink_input_new(&s->sink_input, u->module->core, &data);
547 pa_sink_input_new_data_done(&data);
548
549 if (!s->sink_input) {
550 pa_log("Failed to create sink input.");
551 goto fail;
552 }
553
554 s->sink_input->userdata = s;
555
556 s->sink_input->parent.process_msg = sink_input_process_msg;
557 s->sink_input->pop = sink_input_pop_cb;
558 s->sink_input->process_rewind = sink_input_process_rewind_cb;
559 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
560 s->sink_input->kill = sink_input_kill;
561 s->sink_input->attach = sink_input_attach;
562 s->sink_input->detach = sink_input_detach;
563 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
564
565 pa_sink_input_get_silence(s->sink_input, &silence);
566
567 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
568
569 if (s->intended_latency < s->sink_latency*2)
570 s->intended_latency = s->sink_latency*2;
571
572 s->memblockq = pa_memblockq_new(
573 0,
574 MEMBLOCKQ_MAXLENGTH,
575 MEMBLOCKQ_MAXLENGTH,
576 pa_frame_size(&s->sink_input->sample_spec),
577 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
578 0,
579 0,
580 &silence);
581
582 pa_memblock_unref(silence.memblock);
583
584 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
585
586 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
587 u->n_sessions++;
588 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
589
590 pa_sink_input_put(s->sink_input);
591
592 pa_log_info("New session '%s'", s->sdp_info.session_name);
593
594 return s;
595
596 fail:
597 pa_xfree(s);
598
599 if (fd >= 0)
600 pa_close(fd);
601
602 return NULL;
603 }
604
605 static void session_free(struct session *s) {
606 pa_assert(s);
607
608 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
609
610 pa_sink_input_unlink(s->sink_input);
611 pa_sink_input_unref(s->sink_input);
612
613 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
614 pa_assert(s->userdata->n_sessions >= 1);
615 s->userdata->n_sessions--;
616 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
617
618 pa_memblockq_free(s->memblockq);
619 pa_sdp_info_destroy(&s->sdp_info);
620 pa_rtp_context_destroy(&s->rtp_context);
621
622 pa_smoother_free(s->smoother);
623
624 pa_xfree(s);
625 }
626
627 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
628 struct userdata *u = userdata;
629 pa_bool_t goodbye = FALSE;
630 pa_sdp_info info;
631 struct session *s;
632
633 pa_assert(m);
634 pa_assert(e);
635 pa_assert(u);
636 pa_assert(fd == u->sap_context.fd);
637 pa_assert(flags == PA_IO_EVENT_INPUT);
638
639 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
640 return;
641
642 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
643 return;
644
645 if (goodbye) {
646
647 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
648 session_free(s);
649
650 pa_sdp_info_destroy(&info);
651 } else {
652
653 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
654 if (!session_new(u, &info))
655 pa_sdp_info_destroy(&info);
656
657 } else {
658 struct timeval now;
659 pa_rtclock_get(&now);
660 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
661
662 pa_sdp_info_destroy(&info);
663 }
664 }
665 }
666
667 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
668 struct session *s, *n;
669 struct userdata *u = userdata;
670 struct timeval now;
671
672 pa_assert(m);
673 pa_assert(t);
674 pa_assert(u);
675
676 pa_rtclock_get(&now);
677
678 pa_log_debug("Checking for dead streams ...");
679
680 for (s = u->sessions; s; s = n) {
681 int k;
682 n = s->next;
683
684 k = pa_atomic_load(&s->timestamp);
685
686 if (k + DEATH_TIMEOUT < now.tv_sec)
687 session_free(s);
688 }
689
690 /* Restart timer */
691 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
692 }
693
694 int pa__init(pa_module*m) {
695 struct userdata *u;
696 pa_modargs *ma = NULL;
697 struct sockaddr_in sa4;
698 #ifdef HAVE_IPV6
699 struct sockaddr_in6 sa6;
700 #endif
701 struct sockaddr *sa;
702 socklen_t salen;
703 const char *sap_address;
704 int fd = -1;
705
706 pa_assert(m);
707
708 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
709 pa_log("failed to parse module arguments");
710 goto fail;
711 }
712
713 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
714
715 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
716 sa4.sin_family = AF_INET;
717 sa4.sin_port = htons(SAP_PORT);
718 sa = (struct sockaddr*) &sa4;
719 salen = sizeof(sa4);
720 #ifdef HAVE_IPV6
721 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
722 sa6.sin6_family = AF_INET6;
723 sa6.sin6_port = htons(SAP_PORT);
724 sa = (struct sockaddr*) &sa6;
725 salen = sizeof(sa6);
726 #endif
727 } else {
728 pa_log("Invalid SAP address '%s'", sap_address);
729 goto fail;
730 }
731
732 if ((fd = mcast_socket(sa, salen)) < 0)
733 goto fail;
734
735 m->userdata = u = pa_xnew(struct userdata, 1);
736 u->module = m;
737 u->core = m->core;
738 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
739
740 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
741 pa_sap_context_init_recv(&u->sap_context, fd);
742
743 PA_LLIST_HEAD_INIT(struct session, u->sessions);
744 u->n_sessions = 0;
745 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
746
747 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
748
749 pa_modargs_free(ma);
750
751 return 0;
752
753 fail:
754 if (ma)
755 pa_modargs_free(ma);
756
757 if (fd >= 0)
758 pa_close(fd);
759
760 return -1;
761 }
762
763 void pa__done(pa_module*m) {
764 struct userdata *u;
765 struct session *s;
766
767 pa_assert(m);
768
769 if (!(u = m->userdata))
770 return;
771
772 if (u->sap_event)
773 m->core->mainloop->io_free(u->sap_event);
774
775 if (u->check_death_event)
776 m->core->mainloop->time_free(u->check_death_event);
777
778 pa_sap_context_destroy(&u->sap_context);
779
780 if (u->by_origin) {
781 while ((s = pa_hashmap_first(u->by_origin)))
782 session_free(s);
783
784 pa_hashmap_free(u->by_origin, NULL, NULL);
785 }
786
787 pa_xfree(u->sink_name);
788 pa_xfree(u);
789 }