]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
core: Add extended stream API to support compressed formats
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/core-error.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/llist.h>
41 #include <pulsecore/sink.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/memblockq.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-rtclock.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/atomic.h>
53 #include <pulsecore/socket-util.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(FALSE);
68 PA_MODULE_USAGE(
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82 "sink",
83 "sap_address",
84 NULL
85 };
86
87 struct session {
88 struct userdata *userdata;
89 PA_LLIST_FIELDS(struct session);
90
91 pa_sink_input *sink_input;
92 pa_memblockq *memblockq;
93
94 pa_bool_t first_packet;
95 uint32_t ssrc;
96 uint32_t offset;
97
98 struct pa_sdp_info sdp_info;
99
100 pa_rtp_context rtp_context;
101
102 pa_rtpoll_item *rtpoll_item;
103
104 pa_atomic_t timestamp;
105
106 pa_usec_t intended_latency;
107 pa_usec_t sink_latency;
108
109 pa_usec_t last_rate_update;
110 pa_usec_t last_latency;
111 double estimated_rate;
112 double avg_estimated_rate;
113 };
114
115 struct userdata {
116 pa_module *module;
117 pa_core *core;
118
119 pa_sap_context sap_context;
120 pa_io_event* sap_event;
121
122 pa_time_event *check_death_event;
123
124 char *sink_name;
125
126 PA_LLIST_HEAD(struct session, sessions);
127 pa_hashmap *by_origin;
128 int n_sessions;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135 struct session *s = PA_SINK_INPUT(o)->userdata;
136
137 switch (code) {
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
143 break;
144 }
145
146 return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151 struct session *s;
152 pa_sink_input_assert_ref(i);
153 pa_assert_se(s = i->userdata);
154
155 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156 return -1;
157
158 pa_memblockq_drop(s->memblockq, chunk->length);
159
160 return 0;
161 }
162
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165 struct session *s;
166
167 pa_sink_input_assert_ref(i);
168 pa_assert_se(s = i->userdata);
169
170 pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175 struct session *s;
176
177 pa_sink_input_assert_ref(i);
178 pa_assert_se(s = i->userdata);
179
180 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input* i) {
185 struct session *s;
186 pa_sink_input_assert_ref(i);
187 pa_assert_se(s = i->userdata);
188
189 session_free(s);
190 }
191
192 /* Called from IO context */
193 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
194 struct session *s;
195 pa_sink_input_assert_ref(i);
196 pa_assert_se(s = i->userdata);
197
198 if (b)
199 pa_memblockq_flush_read(s->memblockq);
200 else
201 s->first_packet = FALSE;
202 }
203
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 int64_t k, j, delta;
208 struct timeval now = { 0, 0 };
209 struct session *s;
210 struct pollfd *p;
211
212 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
213
214 p = pa_rtpoll_item_get_pollfd(i, NULL);
215
216 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217 pa_log("poll() signalled bad revents.");
218 return -1;
219 }
220
221 if ((p->revents & POLLIN) == 0)
222 return 0;
223
224 p->revents = 0;
225
226 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227 return 0;
228
229 if (s->sdp_info.payload != s->rtp_context.payload ||
230 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
233 }
234
235 if (!s->first_packet) {
236 s->first_packet = TRUE;
237
238 s->ssrc = s->rtp_context.ssrc;
239 s->offset = s->rtp_context.timestamp;
240
241 if (s->ssrc == s->userdata->module->core->cookie)
242 pa_log_warn("Detected RTP packet loop!");
243 } else {
244 if (s->ssrc != s->rtp_context.ssrc) {
245 pa_memblock_unref(chunk.memblock);
246 return 0;
247 }
248 }
249
250 /* Check whether there was a timestamp overflow */
251 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
253
254 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255 delta = k;
256 else
257 delta = j;
258
259 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
260
261 if (now.tv_sec == 0) {
262 PA_ONCE_BEGIN {
263 pa_log_warn("Using artificial time instead of timestamp");
264 } PA_ONCE_END;
265 pa_rtclock_get(&now);
266 } else
267 pa_rtclock_from_wallclock(&now);
268
269 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
270 pa_log_warn("Queue overrun");
271 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
272 }
273
274 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
275
276 pa_memblock_unref(chunk.memblock);
277
278 /* The next timestamp we expect */
279 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
280
281 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
282
283 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
284 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
285 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
286 uint32_t current_rate = s->sink_input->sample_spec.rate;
287 uint32_t new_rate;
288 double estimated_rate, alpha = 0.02;
289
290 pa_log_debug("Updating sample rate");
291
292 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
293 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
294
295 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
296
297 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
298 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
299
300 if (ri > render_delay+sink_delay)
301 ri -= render_delay+sink_delay;
302 else
303 ri = 0;
304
305 if (wi < ri)
306 latency = 0;
307 else
308 latency = wi - ri;
309
310 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
311
312 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
313 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
314 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
315 * T
316 * R̂ = ─────────────── Rⁿ . (1)
317 * T - (Lⁿ - Lⁿ⁻ⁱ)
318 *
319 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
320 * is correct). But there is also the requirement to keep the buffer at a predefined target
321 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
322 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
323 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
324 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
325 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
326 * ʲ⁼ⁱ R̂ a a
327 * Solving for Rⁿ⁺ⁱ gives
328 * T - ²∕ₐ₊₁(L̂ - Lⁿ)
329 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
330 * T
331 * In the code below a = 7 is used.
332 *
333 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
334 * of the estimated rate R̂ is used. This average R̅ is defined as
335 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
336 * Because it is difficult to find a fixed value for the coefficient α such that the
337 * averaging is without significant lag but oscillations are filtered out, a heuristic is
338 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
339 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
340 */
341 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
342 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
343 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
344 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
345 }
346 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
347 s->estimated_rate = estimated_rate;
348 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
349 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
350 s->last_latency = latency;
351
352 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
353 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
354 new_rate = base_rate;
355 } else {
356 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
357 new_rate = base_rate;
358 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
359 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
360 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
361 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
362 }
363 }
364 s->sink_input->sample_spec.rate = new_rate;
365
366 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
367
368 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
369
370 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
371
372 s->last_rate_update = pa_timeval_load(&now);
373 }
374
375 if (pa_memblockq_is_readable(s->memblockq) &&
376 s->sink_input->thread_info.underrun_for > 0) {
377 pa_log_debug("Requesting rewind due to end of underrun");
378 pa_sink_input_request_rewind(s->sink_input,
379 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
380 FALSE, TRUE, FALSE);
381 }
382
383 return 1;
384 }
385
386 /* Called from I/O thread context */
387 static void sink_input_attach(pa_sink_input *i) {
388 struct session *s;
389 struct pollfd *p;
390
391 pa_sink_input_assert_ref(i);
392 pa_assert_se(s = i->userdata);
393
394 pa_assert(!s->rtpoll_item);
395 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
396
397 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
398 p->fd = s->rtp_context.fd;
399 p->events = POLLIN;
400 p->revents = 0;
401
402 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
403 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
404 }
405
406 /* Called from I/O thread context */
407 static void sink_input_detach(pa_sink_input *i) {
408 struct session *s;
409 pa_sink_input_assert_ref(i);
410 pa_assert_se(s = i->userdata);
411
412 pa_assert(s->rtpoll_item);
413 pa_rtpoll_item_free(s->rtpoll_item);
414 s->rtpoll_item = NULL;
415 }
416
417 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
418 int af, fd = -1, r, one;
419
420 pa_assert(sa);
421 pa_assert(salen > 0);
422
423 af = sa->sa_family;
424 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
425 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
426 goto fail;
427 }
428
429 pa_make_udp_socket_low_delay(fd);
430
431 one = 1;
432 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
433 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
434 goto fail;
435 }
436
437 one = 1;
438 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
439 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
440 goto fail;
441 }
442
443 if (af == AF_INET) {
444 struct ip_mreq mr4;
445 memset(&mr4, 0, sizeof(mr4));
446 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
447 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
448 #ifdef HAVE_IPV6
449 } else {
450 struct ipv6_mreq mr6;
451 memset(&mr6, 0, sizeof(mr6));
452 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
453 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
454 #endif
455 }
456
457 if (r < 0) {
458 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
459 goto fail;
460 }
461
462 if (bind(fd, sa, salen) < 0) {
463 pa_log("bind() failed: %s", pa_cstrerror(errno));
464 goto fail;
465 }
466
467 return fd;
468
469 fail:
470 if (fd >= 0)
471 close(fd);
472
473 return -1;
474 }
475
476 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
477 struct session *s = NULL;
478 pa_sink *sink;
479 int fd = -1;
480 pa_memchunk silence;
481 pa_sink_input_new_data data;
482 struct timeval now;
483
484 pa_assert(u);
485 pa_assert(sdp_info);
486
487 if (u->n_sessions >= MAX_SESSIONS) {
488 pa_log("Session limit reached.");
489 goto fail;
490 }
491
492 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
493 pa_log("Sink does not exist.");
494 goto fail;
495 }
496
497 pa_rtclock_get(&now);
498
499 s = pa_xnew0(struct session, 1);
500 s->userdata = u;
501 s->first_packet = FALSE;
502 s->sdp_info = *sdp_info;
503 s->rtpoll_item = NULL;
504 s->intended_latency = LATENCY_USEC;
505 s->last_rate_update = pa_timeval_load(&now);
506 s->last_latency = LATENCY_USEC;
507 s->estimated_rate = (double) sink->sample_spec.rate;
508 s->avg_estimated_rate = (double) sink->sample_spec.rate;
509 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
510
511 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
512 goto fail;
513
514 pa_sink_input_new_data_init(&data);
515 pa_sink_input_new_data_set_sink(&data, sink, FALSE);
516 data.driver = __FILE__;
517 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
518 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
519 "RTP Stream%s%s%s",
520 sdp_info->session_name ? " (" : "",
521 sdp_info->session_name ? sdp_info->session_name : "",
522 sdp_info->session_name ? ")" : "");
523
524 if (sdp_info->session_name)
525 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
526 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
527 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
528 data.module = u->module;
529 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
530 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
531
532 pa_sink_input_new(&s->sink_input, u->module->core, &data);
533 pa_sink_input_new_data_done(&data);
534
535 if (!s->sink_input) {
536 pa_log("Failed to create sink input.");
537 goto fail;
538 }
539
540 s->sink_input->userdata = s;
541
542 s->sink_input->parent.process_msg = sink_input_process_msg;
543 s->sink_input->pop = sink_input_pop_cb;
544 s->sink_input->process_rewind = sink_input_process_rewind_cb;
545 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
546 s->sink_input->kill = sink_input_kill;
547 s->sink_input->attach = sink_input_attach;
548 s->sink_input->detach = sink_input_detach;
549 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
550
551 pa_sink_input_get_silence(s->sink_input, &silence);
552
553 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
554
555 if (s->intended_latency < s->sink_latency*2)
556 s->intended_latency = s->sink_latency*2;
557
558 s->memblockq = pa_memblockq_new(
559 0,
560 MEMBLOCKQ_MAXLENGTH,
561 MEMBLOCKQ_MAXLENGTH,
562 pa_frame_size(&s->sink_input->sample_spec),
563 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
564 0,
565 0,
566 &silence);
567
568 pa_memblock_unref(silence.memblock);
569
570 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
571
572 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
573 u->n_sessions++;
574 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
575
576 pa_sink_input_put(s->sink_input);
577
578 pa_log_info("New session '%s'", s->sdp_info.session_name);
579
580 return s;
581
582 fail:
583 pa_xfree(s);
584
585 if (fd >= 0)
586 pa_close(fd);
587
588 return NULL;
589 }
590
591 static void session_free(struct session *s) {
592 pa_assert(s);
593
594 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
595
596 pa_sink_input_unlink(s->sink_input);
597 pa_sink_input_unref(s->sink_input);
598
599 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
600 pa_assert(s->userdata->n_sessions >= 1);
601 s->userdata->n_sessions--;
602 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
603
604 pa_memblockq_free(s->memblockq);
605 pa_sdp_info_destroy(&s->sdp_info);
606 pa_rtp_context_destroy(&s->rtp_context);
607
608 pa_xfree(s);
609 }
610
611 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
612 struct userdata *u = userdata;
613 pa_bool_t goodbye = FALSE;
614 pa_sdp_info info;
615 struct session *s;
616
617 pa_assert(m);
618 pa_assert(e);
619 pa_assert(u);
620 pa_assert(fd == u->sap_context.fd);
621 pa_assert(flags == PA_IO_EVENT_INPUT);
622
623 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
624 return;
625
626 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
627 return;
628
629 if (goodbye) {
630
631 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
632 session_free(s);
633
634 pa_sdp_info_destroy(&info);
635 } else {
636
637 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
638 if (!session_new(u, &info))
639 pa_sdp_info_destroy(&info);
640
641 } else {
642 struct timeval now;
643 pa_rtclock_get(&now);
644 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
645
646 pa_sdp_info_destroy(&info);
647 }
648 }
649 }
650
651 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
652 struct session *s, *n;
653 struct userdata *u = userdata;
654 struct timeval now;
655
656 pa_assert(m);
657 pa_assert(t);
658 pa_assert(u);
659
660 pa_rtclock_get(&now);
661
662 pa_log_debug("Checking for dead streams ...");
663
664 for (s = u->sessions; s; s = n) {
665 int k;
666 n = s->next;
667
668 k = pa_atomic_load(&s->timestamp);
669
670 if (k + DEATH_TIMEOUT < now.tv_sec)
671 session_free(s);
672 }
673
674 /* Restart timer */
675 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
676 }
677
678 int pa__init(pa_module*m) {
679 struct userdata *u;
680 pa_modargs *ma = NULL;
681 struct sockaddr_in sa4;
682 #ifdef HAVE_IPV6
683 struct sockaddr_in6 sa6;
684 #endif
685 struct sockaddr *sa;
686 socklen_t salen;
687 const char *sap_address;
688 int fd = -1;
689
690 pa_assert(m);
691
692 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
693 pa_log("failed to parse module arguments");
694 goto fail;
695 }
696
697 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
698
699 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
700 sa4.sin_family = AF_INET;
701 sa4.sin_port = htons(SAP_PORT);
702 sa = (struct sockaddr*) &sa4;
703 salen = sizeof(sa4);
704 #ifdef HAVE_IPV6
705 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
706 sa6.sin6_family = AF_INET6;
707 sa6.sin6_port = htons(SAP_PORT);
708 sa = (struct sockaddr*) &sa6;
709 salen = sizeof(sa6);
710 #endif
711 } else {
712 pa_log("Invalid SAP address '%s'", sap_address);
713 goto fail;
714 }
715
716 if ((fd = mcast_socket(sa, salen)) < 0)
717 goto fail;
718
719 m->userdata = u = pa_xnew(struct userdata, 1);
720 u->module = m;
721 u->core = m->core;
722 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
723
724 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
725 pa_sap_context_init_recv(&u->sap_context, fd);
726
727 PA_LLIST_HEAD_INIT(struct session, u->sessions);
728 u->n_sessions = 0;
729 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
730
731 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
732
733 pa_modargs_free(ma);
734
735 return 0;
736
737 fail:
738 if (ma)
739 pa_modargs_free(ma);
740
741 if (fd >= 0)
742 pa_close(fd);
743
744 return -1;
745 }
746
747 void pa__done(pa_module*m) {
748 struct userdata *u;
749 struct session *s;
750
751 pa_assert(m);
752
753 if (!(u = m->userdata))
754 return;
755
756 if (u->sap_event)
757 m->core->mainloop->io_free(u->sap_event);
758
759 if (u->check_death_event)
760 m->core->mainloop->time_free(u->check_death_event);
761
762 pa_sap_context_destroy(&u->sap_context);
763
764 if (u->by_origin) {
765 while ((s = pa_hashmap_first(u->by_origin)))
766 session_free(s);
767
768 pa_hashmap_free(u->by_origin, NULL, NULL);
769 }
770
771 pa_xfree(u->sink_name);
772 pa_xfree(u);
773 }