3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
58 #include "module-rtp-recv-symdef.h"
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION
);
67 PA_MODULE_LOAD_ONCE(false);
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 "latency_msec=<latency in ms> "
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define DEFAULT_LATENCY_MSEC 500
77 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
78 #define MAX_SESSIONS 16
79 #define DEATH_TIMEOUT 20
80 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
82 static const char* const valid_modargs
[] = {
90 struct userdata
*userdata
;
91 PA_LLIST_FIELDS(struct session
);
93 pa_sink_input
*sink_input
;
94 pa_memblockq
*memblockq
;
100 struct pa_sdp_info sdp_info
;
102 pa_rtp_context rtp_context
;
104 pa_rtpoll_item
*rtpoll_item
;
106 pa_atomic_t timestamp
;
108 pa_usec_t intended_latency
;
109 pa_usec_t sink_latency
;
111 pa_usec_t last_rate_update
;
112 pa_usec_t last_latency
;
113 double estimated_rate
;
114 double avg_estimated_rate
;
121 pa_sap_context sap_context
;
122 pa_io_event
* sap_event
;
124 pa_time_event
*check_death_event
;
128 PA_LLIST_HEAD(struct session
, sessions
);
129 pa_hashmap
*by_origin
;
135 static void session_free(struct session
*s
);
137 /* Called from I/O thread context */
138 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
139 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
142 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
143 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
145 /* Fall through, the default handler will add in the extra
146 * latency added by the resampler */
150 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
153 /* Called from I/O thread context */
154 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
156 pa_sink_input_assert_ref(i
);
157 pa_assert_se(s
= i
->userdata
);
159 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
162 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
167 /* Called from I/O thread context */
168 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
171 pa_sink_input_assert_ref(i
);
172 pa_assert_se(s
= i
->userdata
);
174 pa_memblockq_rewind(s
->memblockq
, nbytes
);
177 /* Called from I/O thread context */
178 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
181 pa_sink_input_assert_ref(i
);
182 pa_assert_se(s
= i
->userdata
);
184 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
187 /* Called from main context */
188 static void sink_input_kill(pa_sink_input
* i
) {
190 pa_sink_input_assert_ref(i
);
191 pa_assert_se(s
= i
->userdata
);
193 pa_hashmap_remove_and_free(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
196 /* Called from IO context */
197 static void sink_input_suspend_within_thread(pa_sink_input
* i
, bool b
) {
199 pa_sink_input_assert_ref(i
);
200 pa_assert_se(s
= i
->userdata
);
203 pa_memblockq_flush_read(s
->memblockq
);
205 s
->first_packet
= false;
208 /* Called from I/O thread context */
209 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
212 struct timeval now
= { 0, 0 };
216 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
218 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
220 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
221 pa_log("poll() signalled bad revents.");
225 if ((p
->revents
& POLLIN
) == 0)
230 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
233 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
234 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
235 pa_memblock_unref(chunk
.memblock
);
239 if (!s
->first_packet
) {
240 s
->first_packet
= true;
242 s
->ssrc
= s
->rtp_context
.ssrc
;
243 s
->offset
= s
->rtp_context
.timestamp
;
245 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
246 pa_log_warn("Detected RTP packet loop!");
248 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
249 pa_memblock_unref(chunk
.memblock
);
254 /* Check whether there was a timestamp overflow */
255 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
256 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
258 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
263 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, true);
265 if (now
.tv_sec
== 0) {
267 pa_log_warn("Using artificial time instead of timestamp");
269 pa_rtclock_get(&now
);
271 pa_rtclock_from_wallclock(&now
);
273 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
274 pa_log_warn("Queue overrun");
275 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, true);
278 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280 pa_memblock_unref(chunk
.memblock
);
282 /* The next timestamp we expect */
283 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
285 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
287 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
288 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
;
289 uint32_t base_rate
= s
->sink_input
->sink
->sample_spec
.rate
;
290 uint32_t current_rate
= s
->sink_input
->sample_spec
.rate
;
292 double estimated_rate
, alpha
= 0.02;
294 pa_log_debug("Updating sample rate");
296 wi
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
297 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
299 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
301 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
302 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
304 if (ri
> render_delay
+sink_delay
)
305 ri
-= render_delay
+sink_delay
;
314 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
316 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
317 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
318 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
320 * R̂ = ─────────────── Rⁿ . (1)
323 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
324 * is correct). But there is also the requirement to keep the buffer at a predefined target
325 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
326 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
327 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
328 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
329 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
331 * Solving for Rⁿ⁺ⁱ gives
333 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
335 * In the code below a = 7 is used.
337 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
338 * of the estimated rate R̂ is used. This average R̅ is defined as
339 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
340 * Because it is difficult to find a fixed value for the coefficient α such that the
341 * averaging is without significant lag but oscillations are filtered out, a heuristic is
342 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
343 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
345 estimated_rate
= (double) current_rate
* (double) RATE_UPDATE_INTERVAL
/ (double) (RATE_UPDATE_INTERVAL
+ s
->last_latency
- latency
);
346 if (fabs(s
->estimated_rate
- s
->avg_estimated_rate
) > 1) {
347 double ratio
= (estimated_rate
+ s
->estimated_rate
- 2*s
->avg_estimated_rate
) / (s
->estimated_rate
- s
->avg_estimated_rate
);
348 alpha
= PA_CLAMP(2 * (ratio
+ fabs(ratio
)) / (4 + ratio
*ratio
), 0.02, 0.8);
350 s
->avg_estimated_rate
= alpha
* estimated_rate
+ (1-alpha
) * s
->avg_estimated_rate
;
351 s
->estimated_rate
= estimated_rate
;
352 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate
, s
->avg_estimated_rate
, alpha
);
353 new_rate
= (uint32_t) ((double) (RATE_UPDATE_INTERVAL
+ latency
/4 - s
->intended_latency
/4) / (double) RATE_UPDATE_INTERVAL
* s
->avg_estimated_rate
);
354 s
->last_latency
= latency
;
356 if (new_rate
< (uint32_t) (base_rate
*0.8) || new_rate
> (uint32_t) (base_rate
*1.25)) {
357 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate
, new_rate
);
358 new_rate
= base_rate
;
360 if (base_rate
< new_rate
+ 20 && new_rate
< base_rate
+ 20)
361 new_rate
= base_rate
;
362 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
363 if (new_rate
< (uint32_t) (current_rate
*0.998) || new_rate
> (uint32_t) (current_rate
*1.002)) {
364 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate
, current_rate
);
365 new_rate
= PA_CLAMP(new_rate
, (uint32_t) (current_rate
*0.998), (uint32_t) (current_rate
*1.002));
368 s
->sink_input
->sample_spec
.rate
= new_rate
;
370 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
372 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
374 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
376 s
->last_rate_update
= pa_timeval_load(&now
);
379 if (pa_memblockq_is_readable(s
->memblockq
) &&
380 s
->sink_input
->thread_info
.underrun_for
> 0) {
381 pa_log_debug("Requesting rewind due to end of underrun");
382 pa_sink_input_request_rewind(s
->sink_input
,
383 (size_t) (s
->sink_input
->thread_info
.underrun_for
== (uint64_t) -1 ? 0 : s
->sink_input
->thread_info
.underrun_for
),
390 /* Called from I/O thread context */
391 static void sink_input_attach(pa_sink_input
*i
) {
395 pa_sink_input_assert_ref(i
);
396 pa_assert_se(s
= i
->userdata
);
398 pa_assert(!s
->rtpoll_item
);
399 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->thread_info
.rtpoll
, PA_RTPOLL_LATE
, 1);
401 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
402 p
->fd
= s
->rtp_context
.fd
;
406 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
407 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
410 /* Called from I/O thread context */
411 static void sink_input_detach(pa_sink_input
*i
) {
413 pa_sink_input_assert_ref(i
);
414 pa_assert_se(s
= i
->userdata
);
416 pa_assert(s
->rtpoll_item
);
417 pa_rtpoll_item_free(s
->rtpoll_item
);
418 s
->rtpoll_item
= NULL
;
421 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
422 int af
, fd
= -1, r
, one
;
425 pa_assert(salen
> 0);
428 if ((fd
= pa_socket_cloexec(af
, SOCK_DGRAM
, 0)) < 0) {
429 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
433 pa_make_udp_socket_low_delay(fd
);
437 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
438 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
442 pa_log("SO_TIMESTAMP unsupported on this platform");
447 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
448 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
454 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
455 static const uint32_t ipv4_mcast_mask
= 0xe0000000;
457 if ((ntohl(((const struct sockaddr_in
*) sa
)->sin_addr
.s_addr
) & ipv4_mcast_mask
) == ipv4_mcast_mask
) {
459 memset(&mr4
, 0, sizeof(mr4
));
460 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
461 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
464 } else if (af
== AF_INET6
) {
465 /* IPv6 multicast addresses have 255 as the most significant byte */
466 if (((const struct sockaddr_in6
*) sa
)->sin6_addr
.s6_addr
[0] == 0xff) {
467 struct ipv6_mreq mr6
;
468 memset(&mr6
, 0, sizeof(mr6
));
469 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
470 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
474 pa_assert_not_reached();
477 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
481 if (bind(fd
, sa
, salen
) < 0) {
482 pa_log("bind() failed: %s", pa_cstrerror(errno
));
495 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
496 struct session
*s
= NULL
;
500 pa_sink_input_new_data data
;
506 if (u
->n_sessions
>= MAX_SESSIONS
) {
507 pa_log("Session limit reached.");
511 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
512 pa_log("Sink does not exist.");
516 pa_rtclock_get(&now
);
518 s
= pa_xnew0(struct session
, 1);
520 s
->first_packet
= false;
521 s
->sdp_info
= *sdp_info
;
522 s
->rtpoll_item
= NULL
;
523 s
->intended_latency
= u
->latency
;
524 s
->last_rate_update
= pa_timeval_load(&now
);
525 s
->last_latency
= u
->latency
;
526 s
->estimated_rate
= (double) sink
->sample_spec
.rate
;
527 s
->avg_estimated_rate
= (double) sink
->sample_spec
.rate
;
528 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
530 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
533 pa_sink_input_new_data_init(&data
);
534 pa_sink_input_new_data_set_sink(&data
, sink
, false);
535 data
.driver
= __FILE__
;
536 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
537 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
539 sdp_info
->session_name
? " (" : "",
540 sdp_info
->session_name
? sdp_info
->session_name
: "",
541 sdp_info
->session_name
? ")" : "");
543 if (sdp_info
->session_name
)
544 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
545 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
546 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
547 data
.module
= u
->module
;
548 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
549 data
.flags
= PA_SINK_INPUT_VARIABLE_RATE
;
551 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
);
552 pa_sink_input_new_data_done(&data
);
554 if (!s
->sink_input
) {
555 pa_log("Failed to create sink input.");
559 s
->sink_input
->userdata
= s
;
561 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
562 s
->sink_input
->pop
= sink_input_pop_cb
;
563 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
564 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
565 s
->sink_input
->kill
= sink_input_kill
;
566 s
->sink_input
->attach
= sink_input_attach
;
567 s
->sink_input
->detach
= sink_input_detach
;
568 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
570 pa_sink_input_get_silence(s
->sink_input
, &silence
);
572 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
574 if (s
->intended_latency
< s
->sink_latency
*2)
575 s
->intended_latency
= s
->sink_latency
*2;
577 s
->memblockq
= pa_memblockq_new(
578 "module-rtp-recv memblockq",
582 &s
->sink_input
->sample_spec
,
583 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
588 pa_memblock_unref(silence
.memblock
);
590 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
592 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
594 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
596 pa_sink_input_put(s
->sink_input
);
598 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
611 static void session_free(struct session
*s
) {
614 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
616 pa_sink_input_unlink(s
->sink_input
);
617 pa_sink_input_unref(s
->sink_input
);
619 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
620 pa_assert(s
->userdata
->n_sessions
>= 1);
621 s
->userdata
->n_sessions
--;
623 pa_memblockq_free(s
->memblockq
);
624 pa_sdp_info_destroy(&s
->sdp_info
);
625 pa_rtp_context_destroy(&s
->rtp_context
);
630 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
631 struct userdata
*u
= userdata
;
632 bool goodbye
= false;
639 pa_assert(fd
== u
->sap_context
.fd
);
640 pa_assert(flags
== PA_IO_EVENT_INPUT
);
642 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
645 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
649 pa_hashmap_remove_and_free(u
->by_origin
, info
.origin
);
650 pa_sdp_info_destroy(&info
);
653 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
654 if (!session_new(u
, &info
))
655 pa_sdp_info_destroy(&info
);
659 pa_rtclock_get(&now
);
660 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
662 pa_sdp_info_destroy(&info
);
667 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
668 struct session
*s
, *n
;
669 struct userdata
*u
= userdata
;
676 pa_rtclock_get(&now
);
678 pa_log_debug("Checking for dead streams ...");
680 for (s
= u
->sessions
; s
; s
= n
) {
684 k
= pa_atomic_load(&s
->timestamp
);
686 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
687 pa_hashmap_remove_and_free(u
->by_origin
, s
->sdp_info
.origin
);
691 pa_core_rttime_restart(u
->module
->core
, t
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
694 int pa__init(pa_module
*m
) {
696 pa_modargs
*ma
= NULL
;
697 struct sockaddr_in sa4
;
699 struct sockaddr_in6 sa6
;
703 const char *sap_address
;
704 uint32_t latency_msec
;
709 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
710 pa_log("failed to parse module arguments");
714 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
716 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
717 sa4
.sin_family
= AF_INET
;
718 sa4
.sin_port
= htons(SAP_PORT
);
719 sa
= (struct sockaddr
*) &sa4
;
722 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
723 sa6
.sin6_family
= AF_INET6
;
724 sa6
.sin6_port
= htons(SAP_PORT
);
725 sa
= (struct sockaddr
*) &sa6
;
729 pa_log("Invalid SAP address '%s'", sap_address
);
733 latency_msec
= DEFAULT_LATENCY_MSEC
;
734 if (pa_modargs_get_value_u32(ma
, "latency_msec", &latency_msec
) < 0 || latency_msec
< 1 || latency_msec
> 300000) {
735 pa_log("Invalid latency specification");
739 if ((fd
= mcast_socket(sa
, salen
)) < 0)
742 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
745 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
746 u
->latency
= (pa_usec_t
) latency_msec
* PA_USEC_PER_MSEC
;
748 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
749 pa_sap_context_init_recv(&u
->sap_context
, fd
);
751 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
753 u
->by_origin
= pa_hashmap_new_full(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
, NULL
, (pa_free_cb_t
) session_free
);
755 u
->check_death_event
= pa_core_rttime_new(m
->core
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
, check_death_event_cb
, u
);
771 void pa__done(pa_module
*m
) {
776 if (!(u
= m
->userdata
))
780 m
->core
->mainloop
->io_free(u
->sap_event
);
782 if (u
->check_death_event
)
783 m
->core
->mainloop
->time_free(u
->check_death_event
);
785 pa_sap_context_destroy(&u
->sap_context
);
788 pa_hashmap_free(u
->by_origin
);
790 pa_xfree(u
->sink_name
);