3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
58 #include "module-rtp-recv-symdef.h"
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION
);
67 PA_MODULE_LOAD_ONCE(FALSE
);
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81 static const char* const valid_modargs
[] = {
88 struct userdata
*userdata
;
89 PA_LLIST_FIELDS(struct session
);
91 pa_sink_input
*sink_input
;
92 pa_memblockq
*memblockq
;
94 pa_bool_t first_packet
;
98 struct pa_sdp_info sdp_info
;
100 pa_rtp_context rtp_context
;
102 pa_rtpoll_item
*rtpoll_item
;
104 pa_atomic_t timestamp
;
106 pa_usec_t intended_latency
;
107 pa_usec_t sink_latency
;
109 pa_usec_t last_rate_update
;
110 pa_usec_t last_latency
;
111 double estimated_rate
;
112 double avg_estimated_rate
;
119 pa_sap_context sap_context
;
120 pa_io_event
* sap_event
;
122 pa_time_event
*check_death_event
;
126 PA_LLIST_HEAD(struct session
, sessions
);
127 pa_hashmap
*by_origin
;
131 static void session_free(struct session
*s
);
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
135 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
139 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
146 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
152 pa_sink_input_assert_ref(i
);
153 pa_assert_se(s
= i
->userdata
);
155 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
158 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
167 pa_sink_input_assert_ref(i
);
168 pa_assert_se(s
= i
->userdata
);
170 pa_memblockq_rewind(s
->memblockq
, nbytes
);
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
177 pa_sink_input_assert_ref(i
);
178 pa_assert_se(s
= i
->userdata
);
180 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input
* i
) {
186 pa_sink_input_assert_ref(i
);
187 pa_assert_se(s
= i
->userdata
);
189 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
193 /* Called from IO context */
194 static void sink_input_suspend_within_thread(pa_sink_input
* i
, pa_bool_t b
) {
196 pa_sink_input_assert_ref(i
);
197 pa_assert_se(s
= i
->userdata
);
200 pa_memblockq_flush_read(s
->memblockq
);
202 s
->first_packet
= FALSE
;
205 /* Called from I/O thread context */
206 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
209 struct timeval now
= { 0, 0 };
213 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
215 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
217 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
218 pa_log("poll() signalled bad revents.");
222 if ((p
->revents
& POLLIN
) == 0)
227 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
230 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
231 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
232 pa_memblock_unref(chunk
.memblock
);
236 if (!s
->first_packet
) {
237 s
->first_packet
= TRUE
;
239 s
->ssrc
= s
->rtp_context
.ssrc
;
240 s
->offset
= s
->rtp_context
.timestamp
;
242 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
243 pa_log_warn("Detected RTP packet loop!");
245 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
246 pa_memblock_unref(chunk
.memblock
);
251 /* Check whether there was a timestamp overflow */
252 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
253 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
255 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
260 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
262 if (now
.tv_sec
== 0) {
264 pa_log_warn("Using artificial time instead of timestamp");
266 pa_rtclock_get(&now
);
268 pa_rtclock_from_wallclock(&now
);
270 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
271 pa_log_warn("Queue overrun");
272 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
275 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
277 pa_memblock_unref(chunk
.memblock
);
279 /* The next timestamp we expect */
280 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
282 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
284 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
285 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
;
286 uint32_t base_rate
= s
->sink_input
->sink
->sample_spec
.rate
;
287 uint32_t current_rate
= s
->sink_input
->sample_spec
.rate
;
289 double estimated_rate
, alpha
= 0.02;
291 pa_log_debug("Updating sample rate");
293 wi
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
294 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
296 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
298 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
299 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
301 if (ri
> render_delay
+sink_delay
)
302 ri
-= render_delay
+sink_delay
;
311 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
313 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
314 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
315 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
317 * R̂ = ─────────────── Rⁿ . (1)
320 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
321 * is correct). But there is also the requirement to keep the buffer at a predefined target
322 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
323 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
324 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
325 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
326 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
328 * Solving for Rⁿ⁺ⁱ gives
330 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
332 * In the code below a = 7 is used.
334 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
335 * of the estimated rate R̂ is used. This average R̅ is defined as
336 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
337 * Because it is difficult to find a fixed value for the coefficient α such that the
338 * averaging is without significant lag but oscillations are filtered out, a heuristic is
339 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
340 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
342 estimated_rate
= (double) current_rate
* (double) RATE_UPDATE_INTERVAL
/ (double) (RATE_UPDATE_INTERVAL
+ s
->last_latency
- latency
);
343 if (fabs(s
->estimated_rate
- s
->avg_estimated_rate
) > 1) {
344 double ratio
= (estimated_rate
+ s
->estimated_rate
- 2*s
->avg_estimated_rate
) / (s
->estimated_rate
- s
->avg_estimated_rate
);
345 alpha
= PA_CLAMP(2 * (ratio
+ fabs(ratio
)) / (4 + ratio
*ratio
), 0.02, 0.8);
347 s
->avg_estimated_rate
= alpha
* estimated_rate
+ (1-alpha
) * s
->avg_estimated_rate
;
348 s
->estimated_rate
= estimated_rate
;
349 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate
, s
->avg_estimated_rate
, alpha
);
350 new_rate
= (uint32_t) ((double) (RATE_UPDATE_INTERVAL
+ latency
/4 - s
->intended_latency
/4) / (double) RATE_UPDATE_INTERVAL
* s
->avg_estimated_rate
);
351 s
->last_latency
= latency
;
353 if (new_rate
< (uint32_t) (base_rate
*0.8) || new_rate
> (uint32_t) (base_rate
*1.25)) {
354 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate
, new_rate
);
355 new_rate
= base_rate
;
357 if (base_rate
< new_rate
+ 20 && new_rate
< base_rate
+ 20)
358 new_rate
= base_rate
;
359 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
360 if (new_rate
< (uint32_t) (current_rate
*0.998) || new_rate
> (uint32_t) (current_rate
*1.002)) {
361 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate
, current_rate
);
362 new_rate
= PA_CLAMP(new_rate
, (uint32_t) (current_rate
*0.998), (uint32_t) (current_rate
*1.002));
365 s
->sink_input
->sample_spec
.rate
= new_rate
;
367 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
369 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
371 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
373 s
->last_rate_update
= pa_timeval_load(&now
);
376 if (pa_memblockq_is_readable(s
->memblockq
) &&
377 s
->sink_input
->thread_info
.underrun_for
> 0) {
378 pa_log_debug("Requesting rewind due to end of underrun");
379 pa_sink_input_request_rewind(s
->sink_input
,
380 (size_t) (s
->sink_input
->thread_info
.underrun_for
== (uint64_t) -1 ? 0 : s
->sink_input
->thread_info
.underrun_for
),
387 /* Called from I/O thread context */
388 static void sink_input_attach(pa_sink_input
*i
) {
392 pa_sink_input_assert_ref(i
);
393 pa_assert_se(s
= i
->userdata
);
395 pa_assert(!s
->rtpoll_item
);
396 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->thread_info
.rtpoll
, PA_RTPOLL_LATE
, 1);
398 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
399 p
->fd
= s
->rtp_context
.fd
;
403 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
404 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
407 /* Called from I/O thread context */
408 static void sink_input_detach(pa_sink_input
*i
) {
410 pa_sink_input_assert_ref(i
);
411 pa_assert_se(s
= i
->userdata
);
413 pa_assert(s
->rtpoll_item
);
414 pa_rtpoll_item_free(s
->rtpoll_item
);
415 s
->rtpoll_item
= NULL
;
418 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
419 int af
, fd
= -1, r
, one
;
422 pa_assert(salen
> 0);
425 if ((fd
= pa_socket_cloexec(af
, SOCK_DGRAM
, 0)) < 0) {
426 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
430 pa_make_udp_socket_low_delay(fd
);
434 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
435 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
439 pa_log("SO_TIMESTAMP unsupported on this platform");
444 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
445 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
451 memset(&mr4
, 0, sizeof(mr4
));
452 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
453 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
455 } else if (af
== AF_INET6
) {
456 struct ipv6_mreq mr6
;
457 memset(&mr6
, 0, sizeof(mr6
));
458 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
459 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
462 pa_assert_not_reached();
465 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
469 if (bind(fd
, sa
, salen
) < 0) {
470 pa_log("bind() failed: %s", pa_cstrerror(errno
));
483 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
484 struct session
*s
= NULL
;
488 pa_sink_input_new_data data
;
494 if (u
->n_sessions
>= MAX_SESSIONS
) {
495 pa_log("Session limit reached.");
499 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
500 pa_log("Sink does not exist.");
504 pa_rtclock_get(&now
);
506 s
= pa_xnew0(struct session
, 1);
508 s
->first_packet
= FALSE
;
509 s
->sdp_info
= *sdp_info
;
510 s
->rtpoll_item
= NULL
;
511 s
->intended_latency
= LATENCY_USEC
;
512 s
->last_rate_update
= pa_timeval_load(&now
);
513 s
->last_latency
= LATENCY_USEC
;
514 s
->estimated_rate
= (double) sink
->sample_spec
.rate
;
515 s
->avg_estimated_rate
= (double) sink
->sample_spec
.rate
;
516 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
518 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
521 pa_sink_input_new_data_init(&data
);
522 pa_sink_input_new_data_set_sink(&data
, sink
, FALSE
);
523 data
.driver
= __FILE__
;
524 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
525 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
527 sdp_info
->session_name
? " (" : "",
528 sdp_info
->session_name
? sdp_info
->session_name
: "",
529 sdp_info
->session_name
? ")" : "");
531 if (sdp_info
->session_name
)
532 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
533 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
534 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
535 data
.module
= u
->module
;
536 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
537 data
.flags
= PA_SINK_INPUT_VARIABLE_RATE
;
539 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
);
540 pa_sink_input_new_data_done(&data
);
542 if (!s
->sink_input
) {
543 pa_log("Failed to create sink input.");
547 s
->sink_input
->userdata
= s
;
549 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
550 s
->sink_input
->pop
= sink_input_pop_cb
;
551 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
552 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
553 s
->sink_input
->kill
= sink_input_kill
;
554 s
->sink_input
->attach
= sink_input_attach
;
555 s
->sink_input
->detach
= sink_input_detach
;
556 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
558 pa_sink_input_get_silence(s
->sink_input
, &silence
);
560 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
562 if (s
->intended_latency
< s
->sink_latency
*2)
563 s
->intended_latency
= s
->sink_latency
*2;
565 s
->memblockq
= pa_memblockq_new(
566 "module-rtp-recv memblockq",
570 &s
->sink_input
->sample_spec
,
571 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
576 pa_memblock_unref(silence
.memblock
);
578 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
580 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
582 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
584 pa_sink_input_put(s
->sink_input
);
586 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
599 static void session_free(struct session
*s
) {
602 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
604 pa_sink_input_unlink(s
->sink_input
);
605 pa_sink_input_unref(s
->sink_input
);
607 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
608 pa_assert(s
->userdata
->n_sessions
>= 1);
609 s
->userdata
->n_sessions
--;
611 pa_memblockq_free(s
->memblockq
);
612 pa_sdp_info_destroy(&s
->sdp_info
);
613 pa_rtp_context_destroy(&s
->rtp_context
);
618 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
619 struct userdata
*u
= userdata
;
620 pa_bool_t goodbye
= FALSE
;
627 pa_assert(fd
== u
->sap_context
.fd
);
628 pa_assert(flags
== PA_IO_EVENT_INPUT
);
630 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
633 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
638 if ((s
= pa_hashmap_remove(u
->by_origin
, info
.origin
)))
641 pa_sdp_info_destroy(&info
);
644 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
645 if (!session_new(u
, &info
))
646 pa_sdp_info_destroy(&info
);
650 pa_rtclock_get(&now
);
651 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
653 pa_sdp_info_destroy(&info
);
658 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
659 struct session
*s
, *n
;
660 struct userdata
*u
= userdata
;
667 pa_rtclock_get(&now
);
669 pa_log_debug("Checking for dead streams ...");
671 for (s
= u
->sessions
; s
; s
= n
) {
675 k
= pa_atomic_load(&s
->timestamp
);
677 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
) {
678 pa_hashmap_remove(u
->by_origin
, s
->sdp_info
.origin
);
684 pa_core_rttime_restart(u
->module
->core
, t
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
687 int pa__init(pa_module
*m
) {
689 pa_modargs
*ma
= NULL
;
690 struct sockaddr_in sa4
;
692 struct sockaddr_in6 sa6
;
696 const char *sap_address
;
701 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
702 pa_log("failed to parse module arguments");
706 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
708 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
709 sa4
.sin_family
= AF_INET
;
710 sa4
.sin_port
= htons(SAP_PORT
);
711 sa
= (struct sockaddr
*) &sa4
;
714 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
715 sa6
.sin6_family
= AF_INET6
;
716 sa6
.sin6_port
= htons(SAP_PORT
);
717 sa
= (struct sockaddr
*) &sa6
;
721 pa_log("Invalid SAP address '%s'", sap_address
);
725 if ((fd
= mcast_socket(sa
, salen
)) < 0)
728 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
731 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
733 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
734 pa_sap_context_init_recv(&u
->sap_context
, fd
);
736 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
738 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
740 u
->check_death_event
= pa_core_rttime_new(m
->core
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
, check_death_event_cb
, u
);
756 void pa__done(pa_module
*m
) {
761 if (!(u
= m
->userdata
))
765 m
->core
->mainloop
->io_free(u
->sap_event
);
767 if (u
->check_death_event
)
768 m
->core
->mainloop
->time_free(u
->check_death_event
);
770 pa_sap_context_destroy(&u
->sap_context
);
773 pa_hashmap_free(u
->by_origin
, (pa_free_cb_t
) session_free
);
775 pa_xfree(u
->sink_name
);