]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
module-rtp-recv: fail when SO_TIMESTAMP is not defined
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <math.h>
34
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(FALSE);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82 "sink",
83 "sap_address",
84 NULL
85 };
86
87 struct session {
88 struct userdata *userdata;
89 PA_LLIST_FIELDS(struct session);
90
91 pa_sink_input *sink_input;
92 pa_memblockq *memblockq;
93
94 pa_bool_t first_packet;
95 uint32_t ssrc;
96 uint32_t offset;
97
98 struct pa_sdp_info sdp_info;
99
100 pa_rtp_context rtp_context;
101
102 pa_rtpoll_item *rtpoll_item;
103
104 pa_atomic_t timestamp;
105
106 pa_usec_t intended_latency;
107 pa_usec_t sink_latency;
108
109 pa_usec_t last_rate_update;
110 pa_usec_t last_latency;
111 double estimated_rate;
112 double avg_estimated_rate;
113 };
114
115 struct userdata {
116 pa_module *module;
117 pa_core *core;
118
119 pa_sap_context sap_context;
120 pa_io_event* sap_event;
121
122 pa_time_event *check_death_event;
123
124 char *sink_name;
125
126 PA_LLIST_HEAD(struct session, sessions);
127 pa_hashmap *by_origin;
128 int n_sessions;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135 struct session *s = PA_SINK_INPUT(o)->userdata;
136
137 switch (code) {
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
143 break;
144 }
145
146 return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151 struct session *s;
152 pa_sink_input_assert_ref(i);
153 pa_assert_se(s = i->userdata);
154
155 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156 return -1;
157
158 pa_memblockq_drop(s->memblockq, chunk->length);
159
160 return 0;
161 }
162
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165 struct session *s;
166
167 pa_sink_input_assert_ref(i);
168 pa_assert_se(s = i->userdata);
169
170 pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175 struct session *s;
176
177 pa_sink_input_assert_ref(i);
178 pa_assert_se(s = i->userdata);
179
180 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input* i) {
185 struct session *s;
186 pa_sink_input_assert_ref(i);
187 pa_assert_se(s = i->userdata);
188
189 session_free(s);
190 }
191
192 /* Called from IO context */
193 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
194 struct session *s;
195 pa_sink_input_assert_ref(i);
196 pa_assert_se(s = i->userdata);
197
198 if (b)
199 pa_memblockq_flush_read(s->memblockq);
200 else
201 s->first_packet = FALSE;
202 }
203
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 int64_t k, j, delta;
208 struct timeval now = { 0, 0 };
209 struct session *s;
210 struct pollfd *p;
211
212 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
213
214 p = pa_rtpoll_item_get_pollfd(i, NULL);
215
216 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217 pa_log("poll() signalled bad revents.");
218 return -1;
219 }
220
221 if ((p->revents & POLLIN) == 0)
222 return 0;
223
224 p->revents = 0;
225
226 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227 return 0;
228
229 if (s->sdp_info.payload != s->rtp_context.payload ||
230 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
233 }
234
235 if (!s->first_packet) {
236 s->first_packet = TRUE;
237
238 s->ssrc = s->rtp_context.ssrc;
239 s->offset = s->rtp_context.timestamp;
240
241 if (s->ssrc == s->userdata->module->core->cookie)
242 pa_log_warn("Detected RTP packet loop!");
243 } else {
244 if (s->ssrc != s->rtp_context.ssrc) {
245 pa_memblock_unref(chunk.memblock);
246 return 0;
247 }
248 }
249
250 /* Check whether there was a timestamp overflow */
251 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
253
254 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255 delta = k;
256 else
257 delta = j;
258
259 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
260
261 if (now.tv_sec == 0) {
262 PA_ONCE_BEGIN {
263 pa_log_warn("Using artificial time instead of timestamp");
264 } PA_ONCE_END;
265 pa_rtclock_get(&now);
266 } else
267 pa_rtclock_from_wallclock(&now);
268
269 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
270 pa_log_warn("Queue overrun");
271 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
272 }
273
274 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
275
276 pa_memblock_unref(chunk.memblock);
277
278 /* The next timestamp we expect */
279 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
280
281 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
282
283 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
284 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
285 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
286 uint32_t current_rate = s->sink_input->sample_spec.rate;
287 uint32_t new_rate;
288 double estimated_rate, alpha = 0.02;
289
290 pa_log_debug("Updating sample rate");
291
292 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
293 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
294
295 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
296
297 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
298 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
299
300 if (ri > render_delay+sink_delay)
301 ri -= render_delay+sink_delay;
302 else
303 ri = 0;
304
305 if (wi < ri)
306 latency = 0;
307 else
308 latency = wi - ri;
309
310 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
311
312 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
313 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
314 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
315 * T
316 * R̂ = ─────────────── Rⁿ . (1)
317 * T - (Lⁿ - Lⁿ⁻ⁱ)
318 *
319 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
320 * is correct). But there is also the requirement to keep the buffer at a predefined target
321 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
322 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
323 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
324 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
325 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
326 * ʲ⁼ⁱ R̂ a a
327 * Solving for Rⁿ⁺ⁱ gives
328 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
329 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
330 * T
331 * In the code below a = 7 is used.
332 *
333 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
334 * of the estimated rate R̂ is used. This average R̅ is defined as
335 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
336 * Because it is difficult to find a fixed value for the coefficient α such that the
337 * averaging is without significant lag but oscillations are filtered out, a heuristic is
338 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
339 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
340 */
341 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
342 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
343 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
344 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
345 }
346 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
347 s->estimated_rate = estimated_rate;
348 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
349 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
350 s->last_latency = latency;
351
352 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
353 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
354 new_rate = base_rate;
355 } else {
356 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
357 new_rate = base_rate;
358 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
359 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
360 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
361 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
362 }
363 }
364 s->sink_input->sample_spec.rate = new_rate;
365
366 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
367
368 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
369
370 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
371
372 s->last_rate_update = pa_timeval_load(&now);
373 }
374
375 if (pa_memblockq_is_readable(s->memblockq) &&
376 s->sink_input->thread_info.underrun_for > 0) {
377 pa_log_debug("Requesting rewind due to end of underrun");
378 pa_sink_input_request_rewind(s->sink_input,
379 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
380 FALSE, TRUE, FALSE);
381 }
382
383 return 1;
384 }
385
386 /* Called from I/O thread context */
387 static void sink_input_attach(pa_sink_input *i) {
388 struct session *s;
389 struct pollfd *p;
390
391 pa_sink_input_assert_ref(i);
392 pa_assert_se(s = i->userdata);
393
394 pa_assert(!s->rtpoll_item);
395 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
396
397 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
398 p->fd = s->rtp_context.fd;
399 p->events = POLLIN;
400 p->revents = 0;
401
402 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
403 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
404 }
405
406 /* Called from I/O thread context */
407 static void sink_input_detach(pa_sink_input *i) {
408 struct session *s;
409 pa_sink_input_assert_ref(i);
410 pa_assert_se(s = i->userdata);
411
412 pa_assert(s->rtpoll_item);
413 pa_rtpoll_item_free(s->rtpoll_item);
414 s->rtpoll_item = NULL;
415 }
416
417 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
418 int af, fd = -1, r, one;
419
420 pa_assert(sa);
421 pa_assert(salen > 0);
422
423 af = sa->sa_family;
424 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
425 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
426 goto fail;
427 }
428
429 pa_make_udp_socket_low_delay(fd);
430
431 #ifdef SO_TIMESTAMP
432 one = 1;
433 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
434 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
435 goto fail;
436 }
437 #else
438 pa_log("SO_TIMESTAMP unsupported on this platform");
439 goto fail;
440 #endif
441
442 one = 1;
443 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
444 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
445 goto fail;
446 }
447
448 if (af == AF_INET) {
449 struct ip_mreq mr4;
450 memset(&mr4, 0, sizeof(mr4));
451 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
452 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
453 #ifdef HAVE_IPV6
454 } else {
455 struct ipv6_mreq mr6;
456 memset(&mr6, 0, sizeof(mr6));
457 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
458 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
459 #endif
460 }
461
462 if (r < 0) {
463 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
464 goto fail;
465 }
466
467 if (bind(fd, sa, salen) < 0) {
468 pa_log("bind() failed: %s", pa_cstrerror(errno));
469 goto fail;
470 }
471
472 return fd;
473
474 fail:
475 if (fd >= 0)
476 close(fd);
477
478 return -1;
479 }
480
481 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
482 struct session *s = NULL;
483 pa_sink *sink;
484 int fd = -1;
485 pa_memchunk silence;
486 pa_sink_input_new_data data;
487 struct timeval now;
488
489 pa_assert(u);
490 pa_assert(sdp_info);
491
492 if (u->n_sessions >= MAX_SESSIONS) {
493 pa_log("Session limit reached.");
494 goto fail;
495 }
496
497 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
498 pa_log("Sink does not exist.");
499 goto fail;
500 }
501
502 pa_rtclock_get(&now);
503
504 s = pa_xnew0(struct session, 1);
505 s->userdata = u;
506 s->first_packet = FALSE;
507 s->sdp_info = *sdp_info;
508 s->rtpoll_item = NULL;
509 s->intended_latency = LATENCY_USEC;
510 s->last_rate_update = pa_timeval_load(&now);
511 s->last_latency = LATENCY_USEC;
512 s->estimated_rate = (double) sink->sample_spec.rate;
513 s->avg_estimated_rate = (double) sink->sample_spec.rate;
514 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
515
516 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
517 goto fail;
518
519 pa_sink_input_new_data_init(&data);
520 pa_sink_input_new_data_set_sink(&data, sink, FALSE);
521 data.driver = __FILE__;
522 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
523 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
524 "RTP Stream%s%s%s",
525 sdp_info->session_name ? " (" : "",
526 sdp_info->session_name ? sdp_info->session_name : "",
527 sdp_info->session_name ? ")" : "");
528
529 if (sdp_info->session_name)
530 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
531 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
532 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
533 data.module = u->module;
534 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
535 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
536
537 pa_sink_input_new(&s->sink_input, u->module->core, &data);
538 pa_sink_input_new_data_done(&data);
539
540 if (!s->sink_input) {
541 pa_log("Failed to create sink input.");
542 goto fail;
543 }
544
545 s->sink_input->userdata = s;
546
547 s->sink_input->parent.process_msg = sink_input_process_msg;
548 s->sink_input->pop = sink_input_pop_cb;
549 s->sink_input->process_rewind = sink_input_process_rewind_cb;
550 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
551 s->sink_input->kill = sink_input_kill;
552 s->sink_input->attach = sink_input_attach;
553 s->sink_input->detach = sink_input_detach;
554 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
555
556 pa_sink_input_get_silence(s->sink_input, &silence);
557
558 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
559
560 if (s->intended_latency < s->sink_latency*2)
561 s->intended_latency = s->sink_latency*2;
562
563 s->memblockq = pa_memblockq_new(
564 "module-rtp-recv memblockq",
565 0,
566 MEMBLOCKQ_MAXLENGTH,
567 MEMBLOCKQ_MAXLENGTH,
568 &s->sink_input->sample_spec,
569 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
570 0,
571 0,
572 &silence);
573
574 pa_memblock_unref(silence.memblock);
575
576 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
577
578 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
579 u->n_sessions++;
580 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
581
582 pa_sink_input_put(s->sink_input);
583
584 pa_log_info("New session '%s'", s->sdp_info.session_name);
585
586 return s;
587
588 fail:
589 pa_xfree(s);
590
591 if (fd >= 0)
592 pa_close(fd);
593
594 return NULL;
595 }
596
597 static void session_free(struct session *s) {
598 pa_assert(s);
599
600 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
601
602 pa_sink_input_unlink(s->sink_input);
603 pa_sink_input_unref(s->sink_input);
604
605 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
606 pa_assert(s->userdata->n_sessions >= 1);
607 s->userdata->n_sessions--;
608 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
609
610 pa_memblockq_free(s->memblockq);
611 pa_sdp_info_destroy(&s->sdp_info);
612 pa_rtp_context_destroy(&s->rtp_context);
613
614 pa_xfree(s);
615 }
616
617 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
618 struct userdata *u = userdata;
619 pa_bool_t goodbye = FALSE;
620 pa_sdp_info info;
621 struct session *s;
622
623 pa_assert(m);
624 pa_assert(e);
625 pa_assert(u);
626 pa_assert(fd == u->sap_context.fd);
627 pa_assert(flags == PA_IO_EVENT_INPUT);
628
629 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
630 return;
631
632 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
633 return;
634
635 if (goodbye) {
636
637 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
638 session_free(s);
639
640 pa_sdp_info_destroy(&info);
641 } else {
642
643 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
644 if (!session_new(u, &info))
645 pa_sdp_info_destroy(&info);
646
647 } else {
648 struct timeval now;
649 pa_rtclock_get(&now);
650 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
651
652 pa_sdp_info_destroy(&info);
653 }
654 }
655 }
656
657 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
658 struct session *s, *n;
659 struct userdata *u = userdata;
660 struct timeval now;
661
662 pa_assert(m);
663 pa_assert(t);
664 pa_assert(u);
665
666 pa_rtclock_get(&now);
667
668 pa_log_debug("Checking for dead streams ...");
669
670 for (s = u->sessions; s; s = n) {
671 int k;
672 n = s->next;
673
674 k = pa_atomic_load(&s->timestamp);
675
676 if (k + DEATH_TIMEOUT < now.tv_sec)
677 session_free(s);
678 }
679
680 /* Restart timer */
681 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
682 }
683
684 int pa__init(pa_module*m) {
685 struct userdata *u;
686 pa_modargs *ma = NULL;
687 struct sockaddr_in sa4;
688 #ifdef HAVE_IPV6
689 struct sockaddr_in6 sa6;
690 #endif
691 struct sockaddr *sa;
692 socklen_t salen;
693 const char *sap_address;
694 int fd = -1;
695
696 pa_assert(m);
697
698 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
699 pa_log("failed to parse module arguments");
700 goto fail;
701 }
702
703 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
704
705 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
706 sa4.sin_family = AF_INET;
707 sa4.sin_port = htons(SAP_PORT);
708 sa = (struct sockaddr*) &sa4;
709 salen = sizeof(sa4);
710 #ifdef HAVE_IPV6
711 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
712 sa6.sin6_family = AF_INET6;
713 sa6.sin6_port = htons(SAP_PORT);
714 sa = (struct sockaddr*) &sa6;
715 salen = sizeof(sa6);
716 #endif
717 } else {
718 pa_log("Invalid SAP address '%s'", sap_address);
719 goto fail;
720 }
721
722 if ((fd = mcast_socket(sa, salen)) < 0)
723 goto fail;
724
725 m->userdata = u = pa_xnew(struct userdata, 1);
726 u->module = m;
727 u->core = m->core;
728 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
729
730 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
731 pa_sap_context_init_recv(&u->sap_context, fd);
732
733 PA_LLIST_HEAD_INIT(struct session, u->sessions);
734 u->n_sessions = 0;
735 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
736
737 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
738
739 pa_modargs_free(ma);
740
741 return 0;
742
743 fail:
744 if (ma)
745 pa_modargs_free(ma);
746
747 if (fd >= 0)
748 pa_close(fd);
749
750 return -1;
751 }
752
753 void pa__done(pa_module*m) {
754 struct userdata *u;
755 struct session *s;
756
757 pa_assert(m);
758
759 if (!(u = m->userdata))
760 return;
761
762 if (u->sap_event)
763 m->core->mainloop->io_free(u->sap_event);
764
765 if (u->check_death_event)
766 m->core->mainloop->time_free(u->check_death_event);
767
768 pa_sap_context_destroy(&u->sap_context);
769
770 if (u->by_origin) {
771 while ((s = pa_hashmap_first(u->by_origin)))
772 session_free(s);
773
774 pa_hashmap_free(u->by_origin, NULL, NULL);
775 }
776
777 pa_xfree(u->sink_name);
778 pa_xfree(u);
779 }