3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
58 #include "module-rtp-recv-symdef.h"
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION
);
67 PA_MODULE_LOAD_ONCE(FALSE
);
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81 static const char* const valid_modargs
[] = {
88 struct userdata
*userdata
;
89 PA_LLIST_FIELDS(struct session
);
91 pa_sink_input
*sink_input
;
92 pa_memblockq
*memblockq
;
94 pa_bool_t first_packet
;
98 struct pa_sdp_info sdp_info
;
100 pa_rtp_context rtp_context
;
102 pa_rtpoll_item
*rtpoll_item
;
104 pa_atomic_t timestamp
;
106 pa_usec_t intended_latency
;
107 pa_usec_t sink_latency
;
109 pa_usec_t last_rate_update
;
110 pa_usec_t last_latency
;
111 double estimated_rate
;
112 double avg_estimated_rate
;
119 pa_sap_context sap_context
;
120 pa_io_event
* sap_event
;
122 pa_time_event
*check_death_event
;
126 PA_LLIST_HEAD(struct session
, sessions
);
127 pa_hashmap
*by_origin
;
131 static void session_free(struct session
*s
);
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
135 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
139 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
146 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
152 pa_sink_input_assert_ref(i
);
153 pa_assert_se(s
= i
->userdata
);
155 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
158 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
167 pa_sink_input_assert_ref(i
);
168 pa_assert_se(s
= i
->userdata
);
170 pa_memblockq_rewind(s
->memblockq
, nbytes
);
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
177 pa_sink_input_assert_ref(i
);
178 pa_assert_se(s
= i
->userdata
);
180 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input
* i
) {
186 pa_sink_input_assert_ref(i
);
187 pa_assert_se(s
= i
->userdata
);
192 /* Called from IO context */
193 static void sink_input_suspend_within_thread(pa_sink_input
* i
, pa_bool_t b
) {
195 pa_sink_input_assert_ref(i
);
196 pa_assert_se(s
= i
->userdata
);
199 pa_memblockq_flush_read(s
->memblockq
);
201 s
->first_packet
= FALSE
;
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
208 struct timeval now
= { 0, 0 };
212 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
214 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
216 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
217 pa_log("poll() signalled bad revents.");
221 if ((p
->revents
& POLLIN
) == 0)
226 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
229 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
230 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
231 pa_memblock_unref(chunk
.memblock
);
235 if (!s
->first_packet
) {
236 s
->first_packet
= TRUE
;
238 s
->ssrc
= s
->rtp_context
.ssrc
;
239 s
->offset
= s
->rtp_context
.timestamp
;
241 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
242 pa_log_warn("Detected RTP packet loop!");
244 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
245 pa_memblock_unref(chunk
.memblock
);
250 /* Check whether there was a timestamp overflow */
251 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
252 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
254 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
259 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
261 if (now
.tv_sec
== 0) {
263 pa_log_warn("Using artificial time instead of timestamp");
265 pa_rtclock_get(&now
);
267 pa_rtclock_from_wallclock(&now
);
269 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
270 pa_log_warn("Queue overrun");
271 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
274 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
276 pa_memblock_unref(chunk
.memblock
);
278 /* The next timestamp we expect */
279 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
281 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
283 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
284 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
;
285 uint32_t base_rate
= s
->sink_input
->sink
->sample_spec
.rate
;
286 uint32_t current_rate
= s
->sink_input
->sample_spec
.rate
;
288 double estimated_rate
, alpha
= 0.02;
290 pa_log_debug("Updating sample rate");
292 wi
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
293 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
295 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
297 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
298 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
300 if (ri
> render_delay
+sink_delay
)
301 ri
-= render_delay
+sink_delay
;
310 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
312 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
313 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
314 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
316 * R̂ = ─────────────── Rⁿ . (1)
319 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
320 * is correct). But there is also the requirement to keep the buffer at a predefined target
321 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
322 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
323 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
324 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
325 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
327 * Solving for Rⁿ⁺ⁱ gives
329 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
331 * In the code below a = 7 is used.
333 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
334 * of the estimated rate R̂ is used. This average R̅ is defined as
335 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
336 * Because it is difficult to find a fixed value for the coefficient α such that the
337 * averaging is without significant lag but oscillations are filtered out, a heuristic is
338 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
339 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
341 estimated_rate
= (double) current_rate
* (double) RATE_UPDATE_INTERVAL
/ (double) (RATE_UPDATE_INTERVAL
+ s
->last_latency
- latency
);
342 if (fabs(s
->estimated_rate
- s
->avg_estimated_rate
) > 1) {
343 double ratio
= (estimated_rate
+ s
->estimated_rate
- 2*s
->avg_estimated_rate
) / (s
->estimated_rate
- s
->avg_estimated_rate
);
344 alpha
= PA_CLAMP(2 * (ratio
+ fabs(ratio
)) / (4 + ratio
*ratio
), 0.02, 0.8);
346 s
->avg_estimated_rate
= alpha
* estimated_rate
+ (1-alpha
) * s
->avg_estimated_rate
;
347 s
->estimated_rate
= estimated_rate
;
348 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate
, s
->avg_estimated_rate
, alpha
);
349 new_rate
= (uint32_t) ((double) (RATE_UPDATE_INTERVAL
+ latency
/4 - s
->intended_latency
/4) / (double) RATE_UPDATE_INTERVAL
* s
->avg_estimated_rate
);
350 s
->last_latency
= latency
;
352 if (new_rate
< (uint32_t) (base_rate
*0.8) || new_rate
> (uint32_t) (base_rate
*1.25)) {
353 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate
, new_rate
);
354 new_rate
= base_rate
;
356 if (base_rate
< new_rate
+ 20 && new_rate
< base_rate
+ 20)
357 new_rate
= base_rate
;
358 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
359 if (new_rate
< (uint32_t) (current_rate
*0.998) || new_rate
> (uint32_t) (current_rate
*1.002)) {
360 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate
, current_rate
);
361 new_rate
= PA_CLAMP(new_rate
, (uint32_t) (current_rate
*0.998), (uint32_t) (current_rate
*1.002));
364 s
->sink_input
->sample_spec
.rate
= new_rate
;
366 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
368 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
370 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
372 s
->last_rate_update
= pa_timeval_load(&now
);
375 if (pa_memblockq_is_readable(s
->memblockq
) &&
376 s
->sink_input
->thread_info
.underrun_for
> 0) {
377 pa_log_debug("Requesting rewind due to end of underrun");
378 pa_sink_input_request_rewind(s
->sink_input
,
379 (size_t) (s
->sink_input
->thread_info
.underrun_for
== (uint64_t) -1 ? 0 : s
->sink_input
->thread_info
.underrun_for
),
386 /* Called from I/O thread context */
387 static void sink_input_attach(pa_sink_input
*i
) {
391 pa_sink_input_assert_ref(i
);
392 pa_assert_se(s
= i
->userdata
);
394 pa_assert(!s
->rtpoll_item
);
395 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->thread_info
.rtpoll
, PA_RTPOLL_LATE
, 1);
397 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
398 p
->fd
= s
->rtp_context
.fd
;
402 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
403 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
406 /* Called from I/O thread context */
407 static void sink_input_detach(pa_sink_input
*i
) {
409 pa_sink_input_assert_ref(i
);
410 pa_assert_se(s
= i
->userdata
);
412 pa_assert(s
->rtpoll_item
);
413 pa_rtpoll_item_free(s
->rtpoll_item
);
414 s
->rtpoll_item
= NULL
;
417 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
418 int af
, fd
= -1, r
, one
;
421 pa_assert(salen
> 0);
424 if ((fd
= pa_socket_cloexec(af
, SOCK_DGRAM
, 0)) < 0) {
425 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
429 pa_make_udp_socket_low_delay(fd
);
433 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
434 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
438 pa_log("SO_TIMESTAMP unsupported on this platform");
443 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
444 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
450 memset(&mr4
, 0, sizeof(mr4
));
451 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
452 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
455 struct ipv6_mreq mr6
;
456 memset(&mr6
, 0, sizeof(mr6
));
457 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
458 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
463 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
467 if (bind(fd
, sa
, salen
) < 0) {
468 pa_log("bind() failed: %s", pa_cstrerror(errno
));
481 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
482 struct session
*s
= NULL
;
486 pa_sink_input_new_data data
;
492 if (u
->n_sessions
>= MAX_SESSIONS
) {
493 pa_log("Session limit reached.");
497 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
498 pa_log("Sink does not exist.");
502 pa_rtclock_get(&now
);
504 s
= pa_xnew0(struct session
, 1);
506 s
->first_packet
= FALSE
;
507 s
->sdp_info
= *sdp_info
;
508 s
->rtpoll_item
= NULL
;
509 s
->intended_latency
= LATENCY_USEC
;
510 s
->last_rate_update
= pa_timeval_load(&now
);
511 s
->last_latency
= LATENCY_USEC
;
512 s
->estimated_rate
= (double) sink
->sample_spec
.rate
;
513 s
->avg_estimated_rate
= (double) sink
->sample_spec
.rate
;
514 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
516 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
519 pa_sink_input_new_data_init(&data
);
520 pa_sink_input_new_data_set_sink(&data
, sink
, FALSE
);
521 data
.driver
= __FILE__
;
522 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
523 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
525 sdp_info
->session_name
? " (" : "",
526 sdp_info
->session_name
? sdp_info
->session_name
: "",
527 sdp_info
->session_name
? ")" : "");
529 if (sdp_info
->session_name
)
530 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
531 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
532 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
533 data
.module
= u
->module
;
534 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
535 data
.flags
= PA_SINK_INPUT_VARIABLE_RATE
;
537 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
);
538 pa_sink_input_new_data_done(&data
);
540 if (!s
->sink_input
) {
541 pa_log("Failed to create sink input.");
545 s
->sink_input
->userdata
= s
;
547 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
548 s
->sink_input
->pop
= sink_input_pop_cb
;
549 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
550 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
551 s
->sink_input
->kill
= sink_input_kill
;
552 s
->sink_input
->attach
= sink_input_attach
;
553 s
->sink_input
->detach
= sink_input_detach
;
554 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
556 pa_sink_input_get_silence(s
->sink_input
, &silence
);
558 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
560 if (s
->intended_latency
< s
->sink_latency
*2)
561 s
->intended_latency
= s
->sink_latency
*2;
563 s
->memblockq
= pa_memblockq_new(
564 "module-rtp-recv memblockq",
568 &s
->sink_input
->sample_spec
,
569 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
574 pa_memblock_unref(silence
.memblock
);
576 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
578 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
580 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
582 pa_sink_input_put(s
->sink_input
);
584 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
597 static void session_free(struct session
*s
) {
600 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
602 pa_sink_input_unlink(s
->sink_input
);
603 pa_sink_input_unref(s
->sink_input
);
605 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
606 pa_assert(s
->userdata
->n_sessions
>= 1);
607 s
->userdata
->n_sessions
--;
608 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
610 pa_memblockq_free(s
->memblockq
);
611 pa_sdp_info_destroy(&s
->sdp_info
);
612 pa_rtp_context_destroy(&s
->rtp_context
);
617 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
618 struct userdata
*u
= userdata
;
619 pa_bool_t goodbye
= FALSE
;
626 pa_assert(fd
== u
->sap_context
.fd
);
627 pa_assert(flags
== PA_IO_EVENT_INPUT
);
629 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
632 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
637 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
640 pa_sdp_info_destroy(&info
);
643 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
644 if (!session_new(u
, &info
))
645 pa_sdp_info_destroy(&info
);
649 pa_rtclock_get(&now
);
650 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
652 pa_sdp_info_destroy(&info
);
657 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
658 struct session
*s
, *n
;
659 struct userdata
*u
= userdata
;
666 pa_rtclock_get(&now
);
668 pa_log_debug("Checking for dead streams ...");
670 for (s
= u
->sessions
; s
; s
= n
) {
674 k
= pa_atomic_load(&s
->timestamp
);
676 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
681 pa_core_rttime_restart(u
->module
->core
, t
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
684 int pa__init(pa_module
*m
) {
686 pa_modargs
*ma
= NULL
;
687 struct sockaddr_in sa4
;
689 struct sockaddr_in6 sa6
;
693 const char *sap_address
;
698 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
699 pa_log("failed to parse module arguments");
703 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
705 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
706 sa4
.sin_family
= AF_INET
;
707 sa4
.sin_port
= htons(SAP_PORT
);
708 sa
= (struct sockaddr
*) &sa4
;
711 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
712 sa6
.sin6_family
= AF_INET6
;
713 sa6
.sin6_port
= htons(SAP_PORT
);
714 sa
= (struct sockaddr
*) &sa6
;
718 pa_log("Invalid SAP address '%s'", sap_address
);
722 if ((fd
= mcast_socket(sa
, salen
)) < 0)
725 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
728 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
730 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
731 pa_sap_context_init_recv(&u
->sap_context
, fd
);
733 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
735 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
737 u
->check_death_event
= pa_core_rttime_new(m
->core
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
, check_death_event_cb
, u
);
753 void pa__done(pa_module
*m
) {
759 if (!(u
= m
->userdata
))
763 m
->core
->mainloop
->io_free(u
->sap_event
);
765 if (u
->check_death_event
)
766 m
->core
->mainloop
->time_free(u
->check_death_event
);
768 pa_sap_context_destroy(&u
->sap_context
);
771 while ((s
= pa_hashmap_first(u
->by_origin
)))
774 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
777 pa_xfree(u
->sink_name
);