]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
Use pa_hashmap_remove_and_free() where appropriate
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <math.h>
34
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(false);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 "latency_msec=<latency in ms> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define DEFAULT_LATENCY_MSEC 500
77 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
78 #define MAX_SESSIONS 16
79 #define DEATH_TIMEOUT 20
80 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
81
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 "latency_msec",
86 NULL
87 };
88
89 struct session {
90 struct userdata *userdata;
91 PA_LLIST_FIELDS(struct session);
92
93 pa_sink_input *sink_input;
94 pa_memblockq *memblockq;
95
96 bool first_packet;
97 uint32_t ssrc;
98 uint32_t offset;
99
100 struct pa_sdp_info sdp_info;
101
102 pa_rtp_context rtp_context;
103
104 pa_rtpoll_item *rtpoll_item;
105
106 pa_atomic_t timestamp;
107
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
110
111 pa_usec_t last_rate_update;
112 pa_usec_t last_latency;
113 double estimated_rate;
114 double avg_estimated_rate;
115 };
116
117 struct userdata {
118 pa_module *module;
119 pa_core *core;
120
121 pa_sap_context sap_context;
122 pa_io_event* sap_event;
123
124 pa_time_event *check_death_event;
125
126 char *sink_name;
127
128 PA_LLIST_HEAD(struct session, sessions);
129 pa_hashmap *by_origin;
130 int n_sessions;
131
132 pa_usec_t latency;
133 };
134
135 static void session_free(struct session *s);
136
137 /* Called from I/O thread context */
138 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
139 struct session *s = PA_SINK_INPUT(o)->userdata;
140
141 switch (code) {
142 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
143 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
144
145 /* Fall through, the default handler will add in the extra
146 * latency added by the resampler */
147 break;
148 }
149
150 return pa_sink_input_process_msg(o, code, data, offset, chunk);
151 }
152
153 /* Called from I/O thread context */
154 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
155 struct session *s;
156 pa_sink_input_assert_ref(i);
157 pa_assert_se(s = i->userdata);
158
159 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
160 return -1;
161
162 pa_memblockq_drop(s->memblockq, chunk->length);
163
164 return 0;
165 }
166
167 /* Called from I/O thread context */
168 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
169 struct session *s;
170
171 pa_sink_input_assert_ref(i);
172 pa_assert_se(s = i->userdata);
173
174 pa_memblockq_rewind(s->memblockq, nbytes);
175 }
176
177 /* Called from I/O thread context */
178 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
179 struct session *s;
180
181 pa_sink_input_assert_ref(i);
182 pa_assert_se(s = i->userdata);
183
184 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
185 }
186
187 /* Called from main context */
188 static void sink_input_kill(pa_sink_input* i) {
189 struct session *s;
190 pa_sink_input_assert_ref(i);
191 pa_assert_se(s = i->userdata);
192
193 pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
194 }
195
196 /* Called from IO context */
197 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
198 struct session *s;
199 pa_sink_input_assert_ref(i);
200 pa_assert_se(s = i->userdata);
201
202 if (b)
203 pa_memblockq_flush_read(s->memblockq);
204 else
205 s->first_packet = false;
206 }
207
208 /* Called from I/O thread context */
209 static int rtpoll_work_cb(pa_rtpoll_item *i) {
210 pa_memchunk chunk;
211 int64_t k, j, delta;
212 struct timeval now = { 0, 0 };
213 struct session *s;
214 struct pollfd *p;
215
216 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
217
218 p = pa_rtpoll_item_get_pollfd(i, NULL);
219
220 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
221 pa_log("poll() signalled bad revents.");
222 return -1;
223 }
224
225 if ((p->revents & POLLIN) == 0)
226 return 0;
227
228 p->revents = 0;
229
230 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
231 return 0;
232
233 if (s->sdp_info.payload != s->rtp_context.payload ||
234 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
235 pa_memblock_unref(chunk.memblock);
236 return 0;
237 }
238
239 if (!s->first_packet) {
240 s->first_packet = true;
241
242 s->ssrc = s->rtp_context.ssrc;
243 s->offset = s->rtp_context.timestamp;
244
245 if (s->ssrc == s->userdata->module->core->cookie)
246 pa_log_warn("Detected RTP packet loop!");
247 } else {
248 if (s->ssrc != s->rtp_context.ssrc) {
249 pa_memblock_unref(chunk.memblock);
250 return 0;
251 }
252 }
253
254 /* Check whether there was a timestamp overflow */
255 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
256 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
257
258 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
259 delta = k;
260 else
261 delta = j;
262
263 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
264
265 if (now.tv_sec == 0) {
266 PA_ONCE_BEGIN {
267 pa_log_warn("Using artificial time instead of timestamp");
268 } PA_ONCE_END;
269 pa_rtclock_get(&now);
270 } else
271 pa_rtclock_from_wallclock(&now);
272
273 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
274 pa_log_warn("Queue overrun");
275 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
276 }
277
278 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
279
280 pa_memblock_unref(chunk.memblock);
281
282 /* The next timestamp we expect */
283 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
284
285 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
286
287 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
288 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
289 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
290 uint32_t current_rate = s->sink_input->sample_spec.rate;
291 uint32_t new_rate;
292 double estimated_rate, alpha = 0.02;
293
294 pa_log_debug("Updating sample rate");
295
296 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
297 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
298
299 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
300
301 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
302 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
303
304 if (ri > render_delay+sink_delay)
305 ri -= render_delay+sink_delay;
306 else
307 ri = 0;
308
309 if (wi < ri)
310 latency = 0;
311 else
312 latency = wi - ri;
313
314 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
315
316 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
317 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
318 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
319 * T
320 * R̂ = ─────────────── Rⁿ . (1)
321 * T - (Lⁿ - Lⁿ⁻ⁱ)
322 *
323 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
324 * is correct). But there is also the requirement to keep the buffer at a predefined target
325 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
326 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
327 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
328 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
329 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
330 * ʲ⁼ⁱ R̂ a a
331 * Solving for Rⁿ⁺ⁱ gives
332 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
333 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
334 * T
335 * In the code below a = 7 is used.
336 *
337 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
338 * of the estimated rate R̂ is used. This average R̅ is defined as
339 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
340 * Because it is difficult to find a fixed value for the coefficient α such that the
341 * averaging is without significant lag but oscillations are filtered out, a heuristic is
342 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
343 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
344 */
345 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
346 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
347 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
348 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
349 }
350 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
351 s->estimated_rate = estimated_rate;
352 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
353 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
354 s->last_latency = latency;
355
356 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
357 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
358 new_rate = base_rate;
359 } else {
360 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
361 new_rate = base_rate;
362 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
363 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
364 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
365 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
366 }
367 }
368 s->sink_input->sample_spec.rate = new_rate;
369
370 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
371
372 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
373
374 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
375
376 s->last_rate_update = pa_timeval_load(&now);
377 }
378
379 if (pa_memblockq_is_readable(s->memblockq) &&
380 s->sink_input->thread_info.underrun_for > 0) {
381 pa_log_debug("Requesting rewind due to end of underrun");
382 pa_sink_input_request_rewind(s->sink_input,
383 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
384 false, true, false);
385 }
386
387 return 1;
388 }
389
390 /* Called from I/O thread context */
391 static void sink_input_attach(pa_sink_input *i) {
392 struct session *s;
393 struct pollfd *p;
394
395 pa_sink_input_assert_ref(i);
396 pa_assert_se(s = i->userdata);
397
398 pa_assert(!s->rtpoll_item);
399 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
400
401 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
402 p->fd = s->rtp_context.fd;
403 p->events = POLLIN;
404 p->revents = 0;
405
406 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
407 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
408 }
409
410 /* Called from I/O thread context */
411 static void sink_input_detach(pa_sink_input *i) {
412 struct session *s;
413 pa_sink_input_assert_ref(i);
414 pa_assert_se(s = i->userdata);
415
416 pa_assert(s->rtpoll_item);
417 pa_rtpoll_item_free(s->rtpoll_item);
418 s->rtpoll_item = NULL;
419 }
420
421 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
422 int af, fd = -1, r, one;
423
424 pa_assert(sa);
425 pa_assert(salen > 0);
426
427 af = sa->sa_family;
428 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
429 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
430 goto fail;
431 }
432
433 pa_make_udp_socket_low_delay(fd);
434
435 #ifdef SO_TIMESTAMP
436 one = 1;
437 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
438 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
439 goto fail;
440 }
441 #else
442 pa_log("SO_TIMESTAMP unsupported on this platform");
443 goto fail;
444 #endif
445
446 one = 1;
447 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
448 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
449 goto fail;
450 }
451
452 r = 0;
453 if (af == AF_INET) {
454 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
455 static const uint32_t ipv4_mcast_mask = 0xe0000000;
456
457 if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
458 struct ip_mreq mr4;
459 memset(&mr4, 0, sizeof(mr4));
460 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
461 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
462 }
463 #ifdef HAVE_IPV6
464 } else if (af == AF_INET6) {
465 /* IPv6 multicast addresses have 255 as the most significant byte */
466 if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
467 struct ipv6_mreq mr6;
468 memset(&mr6, 0, sizeof(mr6));
469 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
470 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
471 }
472 #endif
473 } else
474 pa_assert_not_reached();
475
476 if (r < 0) {
477 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
478 goto fail;
479 }
480
481 if (bind(fd, sa, salen) < 0) {
482 pa_log("bind() failed: %s", pa_cstrerror(errno));
483 goto fail;
484 }
485
486 return fd;
487
488 fail:
489 if (fd >= 0)
490 close(fd);
491
492 return -1;
493 }
494
495 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
496 struct session *s = NULL;
497 pa_sink *sink;
498 int fd = -1;
499 pa_memchunk silence;
500 pa_sink_input_new_data data;
501 struct timeval now;
502
503 pa_assert(u);
504 pa_assert(sdp_info);
505
506 if (u->n_sessions >= MAX_SESSIONS) {
507 pa_log("Session limit reached.");
508 goto fail;
509 }
510
511 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
512 pa_log("Sink does not exist.");
513 goto fail;
514 }
515
516 pa_rtclock_get(&now);
517
518 s = pa_xnew0(struct session, 1);
519 s->userdata = u;
520 s->first_packet = false;
521 s->sdp_info = *sdp_info;
522 s->rtpoll_item = NULL;
523 s->intended_latency = u->latency;
524 s->last_rate_update = pa_timeval_load(&now);
525 s->last_latency = u->latency;
526 s->estimated_rate = (double) sink->sample_spec.rate;
527 s->avg_estimated_rate = (double) sink->sample_spec.rate;
528 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
529
530 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
531 goto fail;
532
533 pa_sink_input_new_data_init(&data);
534 pa_sink_input_new_data_set_sink(&data, sink, false);
535 data.driver = __FILE__;
536 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
537 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
538 "RTP Stream%s%s%s",
539 sdp_info->session_name ? " (" : "",
540 sdp_info->session_name ? sdp_info->session_name : "",
541 sdp_info->session_name ? ")" : "");
542
543 if (sdp_info->session_name)
544 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
545 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
546 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
547 data.module = u->module;
548 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
549 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
550
551 pa_sink_input_new(&s->sink_input, u->module->core, &data);
552 pa_sink_input_new_data_done(&data);
553
554 if (!s->sink_input) {
555 pa_log("Failed to create sink input.");
556 goto fail;
557 }
558
559 s->sink_input->userdata = s;
560
561 s->sink_input->parent.process_msg = sink_input_process_msg;
562 s->sink_input->pop = sink_input_pop_cb;
563 s->sink_input->process_rewind = sink_input_process_rewind_cb;
564 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
565 s->sink_input->kill = sink_input_kill;
566 s->sink_input->attach = sink_input_attach;
567 s->sink_input->detach = sink_input_detach;
568 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
569
570 pa_sink_input_get_silence(s->sink_input, &silence);
571
572 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
573
574 if (s->intended_latency < s->sink_latency*2)
575 s->intended_latency = s->sink_latency*2;
576
577 s->memblockq = pa_memblockq_new(
578 "module-rtp-recv memblockq",
579 0,
580 MEMBLOCKQ_MAXLENGTH,
581 MEMBLOCKQ_MAXLENGTH,
582 &s->sink_input->sample_spec,
583 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
584 0,
585 0,
586 &silence);
587
588 pa_memblock_unref(silence.memblock);
589
590 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
591
592 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
593 u->n_sessions++;
594 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
595
596 pa_sink_input_put(s->sink_input);
597
598 pa_log_info("New session '%s'", s->sdp_info.session_name);
599
600 return s;
601
602 fail:
603 pa_xfree(s);
604
605 if (fd >= 0)
606 pa_close(fd);
607
608 return NULL;
609 }
610
611 static void session_free(struct session *s) {
612 pa_assert(s);
613
614 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
615
616 pa_sink_input_unlink(s->sink_input);
617 pa_sink_input_unref(s->sink_input);
618
619 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
620 pa_assert(s->userdata->n_sessions >= 1);
621 s->userdata->n_sessions--;
622
623 pa_memblockq_free(s->memblockq);
624 pa_sdp_info_destroy(&s->sdp_info);
625 pa_rtp_context_destroy(&s->rtp_context);
626
627 pa_xfree(s);
628 }
629
630 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
631 struct userdata *u = userdata;
632 bool goodbye = false;
633 pa_sdp_info info;
634 struct session *s;
635
636 pa_assert(m);
637 pa_assert(e);
638 pa_assert(u);
639 pa_assert(fd == u->sap_context.fd);
640 pa_assert(flags == PA_IO_EVENT_INPUT);
641
642 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
643 return;
644
645 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
646 return;
647
648 if (goodbye) {
649 pa_hashmap_remove_and_free(u->by_origin, info.origin);
650 pa_sdp_info_destroy(&info);
651 } else {
652
653 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
654 if (!session_new(u, &info))
655 pa_sdp_info_destroy(&info);
656
657 } else {
658 struct timeval now;
659 pa_rtclock_get(&now);
660 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
661
662 pa_sdp_info_destroy(&info);
663 }
664 }
665 }
666
667 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
668 struct session *s, *n;
669 struct userdata *u = userdata;
670 struct timeval now;
671
672 pa_assert(m);
673 pa_assert(t);
674 pa_assert(u);
675
676 pa_rtclock_get(&now);
677
678 pa_log_debug("Checking for dead streams ...");
679
680 for (s = u->sessions; s; s = n) {
681 int k;
682 n = s->next;
683
684 k = pa_atomic_load(&s->timestamp);
685
686 if (k + DEATH_TIMEOUT < now.tv_sec)
687 pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
688 }
689
690 /* Restart timer */
691 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
692 }
693
694 int pa__init(pa_module*m) {
695 struct userdata *u;
696 pa_modargs *ma = NULL;
697 struct sockaddr_in sa4;
698 #ifdef HAVE_IPV6
699 struct sockaddr_in6 sa6;
700 #endif
701 struct sockaddr *sa;
702 socklen_t salen;
703 const char *sap_address;
704 uint32_t latency_msec;
705 int fd = -1;
706
707 pa_assert(m);
708
709 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
710 pa_log("failed to parse module arguments");
711 goto fail;
712 }
713
714 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
715
716 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
717 sa4.sin_family = AF_INET;
718 sa4.sin_port = htons(SAP_PORT);
719 sa = (struct sockaddr*) &sa4;
720 salen = sizeof(sa4);
721 #ifdef HAVE_IPV6
722 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
723 sa6.sin6_family = AF_INET6;
724 sa6.sin6_port = htons(SAP_PORT);
725 sa = (struct sockaddr*) &sa6;
726 salen = sizeof(sa6);
727 #endif
728 } else {
729 pa_log("Invalid SAP address '%s'", sap_address);
730 goto fail;
731 }
732
733 latency_msec = DEFAULT_LATENCY_MSEC;
734 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
735 pa_log("Invalid latency specification");
736 goto fail;
737 }
738
739 if ((fd = mcast_socket(sa, salen)) < 0)
740 goto fail;
741
742 m->userdata = u = pa_xnew(struct userdata, 1);
743 u->module = m;
744 u->core = m->core;
745 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
746 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
747
748 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
749 pa_sap_context_init_recv(&u->sap_context, fd);
750
751 PA_LLIST_HEAD_INIT(struct session, u->sessions);
752 u->n_sessions = 0;
753 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
754
755 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
756
757 pa_modargs_free(ma);
758
759 return 0;
760
761 fail:
762 if (ma)
763 pa_modargs_free(ma);
764
765 if (fd >= 0)
766 pa_close(fd);
767
768 return -1;
769 }
770
771 void pa__done(pa_module*m) {
772 struct userdata *u;
773
774 pa_assert(m);
775
776 if (!(u = m->userdata))
777 return;
778
779 if (u->sap_event)
780 m->core->mainloop->io_free(u->sap_event);
781
782 if (u->check_death_event)
783 m->core->mainloop->time_free(u->check_death_event);
784
785 pa_sap_context_destroy(&u->sap_context);
786
787 if (u->by_origin)
788 pa_hashmap_free(u->by_origin);
789
790 pa_xfree(u->sink_name);
791 pa_xfree(u);
792 }