]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
9dc2febea04e383041c803103f0b2957d9da4298
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <math.h>
34
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(false);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 "latency_msec=<latency in ms> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define DEFAULT_LATENCY_MSEC 500
77 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
78 #define MAX_SESSIONS 16
79 #define DEATH_TIMEOUT 20
80 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
81
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 "latency_msec",
86 NULL
87 };
88
89 struct session {
90 struct userdata *userdata;
91 PA_LLIST_FIELDS(struct session);
92
93 pa_sink_input *sink_input;
94 pa_memblockq *memblockq;
95
96 bool first_packet;
97 uint32_t ssrc;
98 uint32_t offset;
99
100 struct pa_sdp_info sdp_info;
101
102 pa_rtp_context rtp_context;
103
104 pa_rtpoll_item *rtpoll_item;
105
106 pa_atomic_t timestamp;
107
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
110
111 pa_usec_t last_rate_update;
112 pa_usec_t last_latency;
113 double estimated_rate;
114 double avg_estimated_rate;
115 };
116
117 struct userdata {
118 pa_module *module;
119 pa_core *core;
120
121 pa_sap_context sap_context;
122 pa_io_event* sap_event;
123
124 pa_time_event *check_death_event;
125
126 char *sink_name;
127
128 PA_LLIST_HEAD(struct session, sessions);
129 pa_hashmap *by_origin;
130 int n_sessions;
131
132 pa_usec_t latency;
133 };
134
135 static void session_free(struct session *s);
136
137 /* Called from I/O thread context */
138 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
139 struct session *s = PA_SINK_INPUT(o)->userdata;
140
141 switch (code) {
142 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
143 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
144
145 /* Fall through, the default handler will add in the extra
146 * latency added by the resampler */
147 break;
148 }
149
150 return pa_sink_input_process_msg(o, code, data, offset, chunk);
151 }
152
153 /* Called from I/O thread context */
154 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
155 struct session *s;
156 pa_sink_input_assert_ref(i);
157 pa_assert_se(s = i->userdata);
158
159 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
160 return -1;
161
162 pa_memblockq_drop(s->memblockq, chunk->length);
163
164 return 0;
165 }
166
167 /* Called from I/O thread context */
168 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
169 struct session *s;
170
171 pa_sink_input_assert_ref(i);
172 pa_assert_se(s = i->userdata);
173
174 pa_memblockq_rewind(s->memblockq, nbytes);
175 }
176
177 /* Called from I/O thread context */
178 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
179 struct session *s;
180
181 pa_sink_input_assert_ref(i);
182 pa_assert_se(s = i->userdata);
183
184 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
185 }
186
187 /* Called from main context */
188 static void sink_input_kill(pa_sink_input* i) {
189 struct session *s;
190 pa_sink_input_assert_ref(i);
191 pa_assert_se(s = i->userdata);
192
193 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
194 session_free(s);
195 }
196
197 /* Called from IO context */
198 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
199 struct session *s;
200 pa_sink_input_assert_ref(i);
201 pa_assert_se(s = i->userdata);
202
203 if (b)
204 pa_memblockq_flush_read(s->memblockq);
205 else
206 s->first_packet = false;
207 }
208
209 /* Called from I/O thread context */
210 static int rtpoll_work_cb(pa_rtpoll_item *i) {
211 pa_memchunk chunk;
212 int64_t k, j, delta;
213 struct timeval now = { 0, 0 };
214 struct session *s;
215 struct pollfd *p;
216
217 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
218
219 p = pa_rtpoll_item_get_pollfd(i, NULL);
220
221 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
222 pa_log("poll() signalled bad revents.");
223 return -1;
224 }
225
226 if ((p->revents & POLLIN) == 0)
227 return 0;
228
229 p->revents = 0;
230
231 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
232 return 0;
233
234 if (s->sdp_info.payload != s->rtp_context.payload ||
235 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
236 pa_memblock_unref(chunk.memblock);
237 return 0;
238 }
239
240 if (!s->first_packet) {
241 s->first_packet = true;
242
243 s->ssrc = s->rtp_context.ssrc;
244 s->offset = s->rtp_context.timestamp;
245
246 if (s->ssrc == s->userdata->module->core->cookie)
247 pa_log_warn("Detected RTP packet loop!");
248 } else {
249 if (s->ssrc != s->rtp_context.ssrc) {
250 pa_memblock_unref(chunk.memblock);
251 return 0;
252 }
253 }
254
255 /* Check whether there was a timestamp overflow */
256 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
257 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
258
259 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
260 delta = k;
261 else
262 delta = j;
263
264 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
265
266 if (now.tv_sec == 0) {
267 PA_ONCE_BEGIN {
268 pa_log_warn("Using artificial time instead of timestamp");
269 } PA_ONCE_END;
270 pa_rtclock_get(&now);
271 } else
272 pa_rtclock_from_wallclock(&now);
273
274 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
275 pa_log_warn("Queue overrun");
276 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
277 }
278
279 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280
281 pa_memblock_unref(chunk.memblock);
282
283 /* The next timestamp we expect */
284 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
285
286 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
287
288 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
289 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
290 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
291 uint32_t current_rate = s->sink_input->sample_spec.rate;
292 uint32_t new_rate;
293 double estimated_rate, alpha = 0.02;
294
295 pa_log_debug("Updating sample rate");
296
297 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
298 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
299
300 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
301
302 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
303 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
304
305 if (ri > render_delay+sink_delay)
306 ri -= render_delay+sink_delay;
307 else
308 ri = 0;
309
310 if (wi < ri)
311 latency = 0;
312 else
313 latency = wi - ri;
314
315 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
316
317 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
318 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
319 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
320 * T
321 * R̂ = ─────────────── Rⁿ . (1)
322 * T - (Lⁿ - Lⁿ⁻ⁱ)
323 *
324 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
325 * is correct). But there is also the requirement to keep the buffer at a predefined target
326 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
327 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
328 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
329 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
330 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
331 * ʲ⁼ⁱ R̂ a a
332 * Solving for Rⁿ⁺ⁱ gives
333 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
334 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
335 * T
336 * In the code below a = 7 is used.
337 *
338 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
339 * of the estimated rate R̂ is used. This average R̅ is defined as
340 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
341 * Because it is difficult to find a fixed value for the coefficient α such that the
342 * averaging is without significant lag but oscillations are filtered out, a heuristic is
343 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
344 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
345 */
346 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
347 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
348 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
349 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
350 }
351 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
352 s->estimated_rate = estimated_rate;
353 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
354 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
355 s->last_latency = latency;
356
357 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
358 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
359 new_rate = base_rate;
360 } else {
361 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
362 new_rate = base_rate;
363 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
364 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
365 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
366 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
367 }
368 }
369 s->sink_input->sample_spec.rate = new_rate;
370
371 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
372
373 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
374
375 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
376
377 s->last_rate_update = pa_timeval_load(&now);
378 }
379
380 if (pa_memblockq_is_readable(s->memblockq) &&
381 s->sink_input->thread_info.underrun_for > 0) {
382 pa_log_debug("Requesting rewind due to end of underrun");
383 pa_sink_input_request_rewind(s->sink_input,
384 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
385 false, true, false);
386 }
387
388 return 1;
389 }
390
391 /* Called from I/O thread context */
392 static void sink_input_attach(pa_sink_input *i) {
393 struct session *s;
394 struct pollfd *p;
395
396 pa_sink_input_assert_ref(i);
397 pa_assert_se(s = i->userdata);
398
399 pa_assert(!s->rtpoll_item);
400 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
401
402 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
403 p->fd = s->rtp_context.fd;
404 p->events = POLLIN;
405 p->revents = 0;
406
407 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
408 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
409 }
410
411 /* Called from I/O thread context */
412 static void sink_input_detach(pa_sink_input *i) {
413 struct session *s;
414 pa_sink_input_assert_ref(i);
415 pa_assert_se(s = i->userdata);
416
417 pa_assert(s->rtpoll_item);
418 pa_rtpoll_item_free(s->rtpoll_item);
419 s->rtpoll_item = NULL;
420 }
421
422 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
423 int af, fd = -1, r, one;
424
425 pa_assert(sa);
426 pa_assert(salen > 0);
427
428 af = sa->sa_family;
429 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
430 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
431 goto fail;
432 }
433
434 pa_make_udp_socket_low_delay(fd);
435
436 #ifdef SO_TIMESTAMP
437 one = 1;
438 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
439 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
440 goto fail;
441 }
442 #else
443 pa_log("SO_TIMESTAMP unsupported on this platform");
444 goto fail;
445 #endif
446
447 one = 1;
448 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
449 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
450 goto fail;
451 }
452
453 r = 0;
454 if (af == AF_INET) {
455 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
456 static const uint32_t ipv4_mcast_mask = 0xe0000000;
457
458 if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
459 struct ip_mreq mr4;
460 memset(&mr4, 0, sizeof(mr4));
461 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
462 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
463 }
464 #ifdef HAVE_IPV6
465 } else if (af == AF_INET6) {
466 /* IPv6 multicast addresses have 255 as the most significant byte */
467 if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
468 struct ipv6_mreq mr6;
469 memset(&mr6, 0, sizeof(mr6));
470 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
471 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
472 }
473 #endif
474 } else
475 pa_assert_not_reached();
476
477 if (r < 0) {
478 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
479 goto fail;
480 }
481
482 if (bind(fd, sa, salen) < 0) {
483 pa_log("bind() failed: %s", pa_cstrerror(errno));
484 goto fail;
485 }
486
487 return fd;
488
489 fail:
490 if (fd >= 0)
491 close(fd);
492
493 return -1;
494 }
495
496 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
497 struct session *s = NULL;
498 pa_sink *sink;
499 int fd = -1;
500 pa_memchunk silence;
501 pa_sink_input_new_data data;
502 struct timeval now;
503
504 pa_assert(u);
505 pa_assert(sdp_info);
506
507 if (u->n_sessions >= MAX_SESSIONS) {
508 pa_log("Session limit reached.");
509 goto fail;
510 }
511
512 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
513 pa_log("Sink does not exist.");
514 goto fail;
515 }
516
517 pa_rtclock_get(&now);
518
519 s = pa_xnew0(struct session, 1);
520 s->userdata = u;
521 s->first_packet = false;
522 s->sdp_info = *sdp_info;
523 s->rtpoll_item = NULL;
524 s->intended_latency = u->latency;
525 s->last_rate_update = pa_timeval_load(&now);
526 s->last_latency = u->latency;
527 s->estimated_rate = (double) sink->sample_spec.rate;
528 s->avg_estimated_rate = (double) sink->sample_spec.rate;
529 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
530
531 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
532 goto fail;
533
534 pa_sink_input_new_data_init(&data);
535 pa_sink_input_new_data_set_sink(&data, sink, false);
536 data.driver = __FILE__;
537 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
538 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
539 "RTP Stream%s%s%s",
540 sdp_info->session_name ? " (" : "",
541 sdp_info->session_name ? sdp_info->session_name : "",
542 sdp_info->session_name ? ")" : "");
543
544 if (sdp_info->session_name)
545 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
546 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
547 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
548 data.module = u->module;
549 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
550 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
551
552 pa_sink_input_new(&s->sink_input, u->module->core, &data);
553 pa_sink_input_new_data_done(&data);
554
555 if (!s->sink_input) {
556 pa_log("Failed to create sink input.");
557 goto fail;
558 }
559
560 s->sink_input->userdata = s;
561
562 s->sink_input->parent.process_msg = sink_input_process_msg;
563 s->sink_input->pop = sink_input_pop_cb;
564 s->sink_input->process_rewind = sink_input_process_rewind_cb;
565 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
566 s->sink_input->kill = sink_input_kill;
567 s->sink_input->attach = sink_input_attach;
568 s->sink_input->detach = sink_input_detach;
569 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
570
571 pa_sink_input_get_silence(s->sink_input, &silence);
572
573 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
574
575 if (s->intended_latency < s->sink_latency*2)
576 s->intended_latency = s->sink_latency*2;
577
578 s->memblockq = pa_memblockq_new(
579 "module-rtp-recv memblockq",
580 0,
581 MEMBLOCKQ_MAXLENGTH,
582 MEMBLOCKQ_MAXLENGTH,
583 &s->sink_input->sample_spec,
584 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
585 0,
586 0,
587 &silence);
588
589 pa_memblock_unref(silence.memblock);
590
591 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
592
593 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
594 u->n_sessions++;
595 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
596
597 pa_sink_input_put(s->sink_input);
598
599 pa_log_info("New session '%s'", s->sdp_info.session_name);
600
601 return s;
602
603 fail:
604 pa_xfree(s);
605
606 if (fd >= 0)
607 pa_close(fd);
608
609 return NULL;
610 }
611
612 static void session_free(struct session *s) {
613 pa_assert(s);
614
615 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
616
617 pa_sink_input_unlink(s->sink_input);
618 pa_sink_input_unref(s->sink_input);
619
620 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
621 pa_assert(s->userdata->n_sessions >= 1);
622 s->userdata->n_sessions--;
623
624 pa_memblockq_free(s->memblockq);
625 pa_sdp_info_destroy(&s->sdp_info);
626 pa_rtp_context_destroy(&s->rtp_context);
627
628 pa_xfree(s);
629 }
630
631 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
632 struct userdata *u = userdata;
633 bool goodbye = false;
634 pa_sdp_info info;
635 struct session *s;
636
637 pa_assert(m);
638 pa_assert(e);
639 pa_assert(u);
640 pa_assert(fd == u->sap_context.fd);
641 pa_assert(flags == PA_IO_EVENT_INPUT);
642
643 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
644 return;
645
646 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
647 return;
648
649 if (goodbye) {
650
651 if ((s = pa_hashmap_remove(u->by_origin, info.origin)))
652 session_free(s);
653
654 pa_sdp_info_destroy(&info);
655 } else {
656
657 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
658 if (!session_new(u, &info))
659 pa_sdp_info_destroy(&info);
660
661 } else {
662 struct timeval now;
663 pa_rtclock_get(&now);
664 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
665
666 pa_sdp_info_destroy(&info);
667 }
668 }
669 }
670
671 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
672 struct session *s, *n;
673 struct userdata *u = userdata;
674 struct timeval now;
675
676 pa_assert(m);
677 pa_assert(t);
678 pa_assert(u);
679
680 pa_rtclock_get(&now);
681
682 pa_log_debug("Checking for dead streams ...");
683
684 for (s = u->sessions; s; s = n) {
685 int k;
686 n = s->next;
687
688 k = pa_atomic_load(&s->timestamp);
689
690 if (k + DEATH_TIMEOUT < now.tv_sec) {
691 pa_hashmap_remove(u->by_origin, s->sdp_info.origin);
692 session_free(s);
693 }
694 }
695
696 /* Restart timer */
697 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
698 }
699
700 int pa__init(pa_module*m) {
701 struct userdata *u;
702 pa_modargs *ma = NULL;
703 struct sockaddr_in sa4;
704 #ifdef HAVE_IPV6
705 struct sockaddr_in6 sa6;
706 #endif
707 struct sockaddr *sa;
708 socklen_t salen;
709 const char *sap_address;
710 uint32_t latency_msec;
711 int fd = -1;
712
713 pa_assert(m);
714
715 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
716 pa_log("failed to parse module arguments");
717 goto fail;
718 }
719
720 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
721
722 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
723 sa4.sin_family = AF_INET;
724 sa4.sin_port = htons(SAP_PORT);
725 sa = (struct sockaddr*) &sa4;
726 salen = sizeof(sa4);
727 #ifdef HAVE_IPV6
728 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
729 sa6.sin6_family = AF_INET6;
730 sa6.sin6_port = htons(SAP_PORT);
731 sa = (struct sockaddr*) &sa6;
732 salen = sizeof(sa6);
733 #endif
734 } else {
735 pa_log("Invalid SAP address '%s'", sap_address);
736 goto fail;
737 }
738
739 latency_msec = DEFAULT_LATENCY_MSEC;
740 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
741 pa_log("Invalid latency specification");
742 goto fail;
743 }
744
745 if ((fd = mcast_socket(sa, salen)) < 0)
746 goto fail;
747
748 m->userdata = u = pa_xnew(struct userdata, 1);
749 u->module = m;
750 u->core = m->core;
751 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
752 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
753
754 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
755 pa_sap_context_init_recv(&u->sap_context, fd);
756
757 PA_LLIST_HEAD_INIT(struct session, u->sessions);
758 u->n_sessions = 0;
759 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
760
761 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
762
763 pa_modargs_free(ma);
764
765 return 0;
766
767 fail:
768 if (ma)
769 pa_modargs_free(ma);
770
771 if (fd >= 0)
772 pa_close(fd);
773
774 return -1;
775 }
776
777 void pa__done(pa_module*m) {
778 struct userdata *u;
779
780 pa_assert(m);
781
782 if (!(u = m->userdata))
783 return;
784
785 if (u->sap_event)
786 m->core->mainloop->io_free(u->sap_event);
787
788 if (u->check_death_event)
789 m->core->mainloop->time_free(u->check_death_event);
790
791 pa_sap_context_destroy(&u->sap_context);
792
793 if (u->by_origin)
794 pa_hashmap_free(u->by_origin);
795
796 pa_xfree(u->sink_name);
797 pa_xfree(u);
798 }