3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
58 #include "module-rtp-recv-symdef.h"
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION
);
67 PA_MODULE_LOAD_ONCE(false);
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
71 "latency_msec=<latency in ms> "
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define DEFAULT_LATENCY_MSEC 500
77 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
78 #define MAX_SESSIONS 16
79 #define DEATH_TIMEOUT 20
80 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
82 static const char* const valid_modargs
[] = {
90 struct userdata
*userdata
;
91 PA_LLIST_FIELDS(struct session
);
93 pa_sink_input
*sink_input
;
94 pa_memblockq
*memblockq
;
100 struct pa_sdp_info sdp_info
;
102 pa_rtp_context rtp_context
;
104 pa_rtpoll_item
*rtpoll_item
;
106 pa_atomic_t timestamp
;
108 pa_usec_t intended_latency
;
109 pa_usec_t sink_latency
;
111 pa_usec_t last_rate_update
;
112 pa_usec_t last_latency
;
113 double estimated_rate
;
114 double avg_estimated_rate
;
121 pa_sap_context sap_context
;
122 pa_io_event
* sap_event
;
124 pa_time_event
*check_death_event
;
128 PA_LLIST_HEAD(struct session
, sessions
);
129 pa_hashmap
*by_origin
;
135 static void session_free(struct session
*s
);
137 /* Called from I/O thread context */
138 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
139 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
142 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
143 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
145 /* Fall through, the default handler will add in the extra
146 * latency added by the resampler */
150 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
153 /* Called from I/O thread context */
154 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
156 pa_sink_input_assert_ref(i
);
157 pa_assert_se(s
= i
->userdata
);
159 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
162 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
167 /* Called from I/O thread context */
168 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
171 pa_sink_input_assert_ref(i
);
172 pa_assert_se(s
= i
->userdata
);
174 pa_memblockq_rewind(s
->memblockq
, nbytes
);
177 /* Called from I/O thread context */
178 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
181 pa_sink_input_assert_ref(i
);
182 pa_assert_se(s
= i
->userdata
);
184 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
187 /* Called from main context */
188 static void sink_input_kill(pa_sink_input
* i
) {
190 pa_sink_input_assert_ref(i
);
191 pa_assert_se(s
= i
->userdata
);
193 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
197 /* Called from IO context */
198 static void sink_input_suspend_within_thread(pa_sink_input
* i
, bool b
) {
200 pa_sink_input_assert_ref(i
);
201 pa_assert_se(s
= i
->userdata
);
204 pa_memblockq_flush_read(s
->memblockq
);
206 s
->first_packet
= false;
209 /* Called from I/O thread context */
210 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
213 struct timeval now
= { 0, 0 };
217 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
219 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
221 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
222 pa_log("poll() signalled bad revents.");
226 if ((p
->revents
& POLLIN
) == 0)
231 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
234 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
235 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
236 pa_memblock_unref(chunk
.memblock
);
240 if (!s
->first_packet
) {
241 s
->first_packet
= true;
243 s
->ssrc
= s
->rtp_context
.ssrc
;
244 s
->offset
= s
->rtp_context
.timestamp
;
246 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
247 pa_log_warn("Detected RTP packet loop!");
249 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
250 pa_memblock_unref(chunk
.memblock
);
255 /* Check whether there was a timestamp overflow */
256 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
257 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
259 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
264 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, true);
266 if (now
.tv_sec
== 0) {
268 pa_log_warn("Using artificial time instead of timestamp");
270 pa_rtclock_get(&now
);
272 pa_rtclock_from_wallclock(&now
);
274 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
275 pa_log_warn("Queue overrun");
276 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, true);
279 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
281 pa_memblock_unref(chunk
.memblock
);
283 /* The next timestamp we expect */
284 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
286 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
288 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
289 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
;
290 uint32_t base_rate
= s
->sink_input
->sink
->sample_spec
.rate
;
291 uint32_t current_rate
= s
->sink_input
->sample_spec
.rate
;
293 double estimated_rate
, alpha
= 0.02;
295 pa_log_debug("Updating sample rate");
297 wi
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
298 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
300 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
302 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
303 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
305 if (ri
> render_delay
+sink_delay
)
306 ri
-= render_delay
+sink_delay
;
315 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
317 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
318 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
319 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
321 * R̂ = ─────────────── Rⁿ . (1)
324 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
325 * is correct). But there is also the requirement to keep the buffer at a predefined target
326 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
327 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
328 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
329 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
330 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
332 * Solving for Rⁿ⁺ⁱ gives
334 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
336 * In the code below a = 7 is used.
338 * Equation (1) is not directly used in (2), but instead an exponentially weighted average
339 * of the estimated rate R̂ is used. This average R̅ is defined as
340 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
341 * Because it is difficult to find a fixed value for the coefficient α such that the
342 * averaging is without significant lag but oscillations are filtered out, a heuristic is
343 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
344 * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
346 estimated_rate
= (double) current_rate
* (double) RATE_UPDATE_INTERVAL
/ (double) (RATE_UPDATE_INTERVAL
+ s
->last_latency
- latency
);
347 if (fabs(s
->estimated_rate
- s
->avg_estimated_rate
) > 1) {
348 double ratio
= (estimated_rate
+ s
->estimated_rate
- 2*s
->avg_estimated_rate
) / (s
->estimated_rate
- s
->avg_estimated_rate
);
349 alpha
= PA_CLAMP(2 * (ratio
+ fabs(ratio
)) / (4 + ratio
*ratio
), 0.02, 0.8);
351 s
->avg_estimated_rate
= alpha
* estimated_rate
+ (1-alpha
) * s
->avg_estimated_rate
;
352 s
->estimated_rate
= estimated_rate
;
353 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate
, s
->avg_estimated_rate
, alpha
);
354 new_rate
= (uint32_t) ((double) (RATE_UPDATE_INTERVAL
+ latency
/4 - s
->intended_latency
/4) / (double) RATE_UPDATE_INTERVAL
* s
->avg_estimated_rate
);
355 s
->last_latency
= latency
;
357 if (new_rate
< (uint32_t) (base_rate
*0.8) || new_rate
> (uint32_t) (base_rate
*1.25)) {
358 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate
, new_rate
);
359 new_rate
= base_rate
;
361 if (base_rate
< new_rate
+ 20 && new_rate
< base_rate
+ 20)
362 new_rate
= base_rate
;
363 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
364 if (new_rate
< (uint32_t) (current_rate
*0.998) || new_rate
> (uint32_t) (current_rate
*1.002)) {
365 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate
, current_rate
);
366 new_rate
= PA_CLAMP(new_rate
, (uint32_t) (current_rate
*0.998), (uint32_t) (current_rate
*1.002));
369 s
->sink_input
->sample_spec
.rate
= new_rate
;
371 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
373 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
375 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
377 s
->last_rate_update
= pa_timeval_load(&now
);
380 if (pa_memblockq_is_readable(s
->memblockq
) &&
381 s
->sink_input
->thread_info
.underrun_for
> 0) {
382 pa_log_debug("Requesting rewind due to end of underrun");
383 pa_sink_input_request_rewind(s
->sink_input
,
384 (size_t) (s
->sink_input
->thread_info
.underrun_for
== (uint64_t) -1 ? 0 : s
->sink_input
->thread_info
.underrun_for
),
391 /* Called from I/O thread context */
392 static void sink_input_attach(pa_sink_input
*i
) {
396 pa_sink_input_assert_ref(i
);
397 pa_assert_se(s
= i
->userdata
);
399 pa_assert(!s
->rtpoll_item
);
400 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->thread_info
.rtpoll
, PA_RTPOLL_LATE
, 1);
402 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
403 p
->fd
= s
->rtp_context
.fd
;
407 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
408 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
411 /* Called from I/O thread context */
412 static void sink_input_detach(pa_sink_input
*i
) {
414 pa_sink_input_assert_ref(i
);
415 pa_assert_se(s
= i
->userdata
);
417 pa_assert(s
->rtpoll_item
);
418 pa_rtpoll_item_free(s
->rtpoll_item
);
419 s
->rtpoll_item
= NULL
;
422 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
423 int af
, fd
= -1, r
, one
;
426 pa_assert(salen
> 0);
429 if ((fd
= pa_socket_cloexec(af
, SOCK_DGRAM
, 0)) < 0) {
430 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
434 pa_make_udp_socket_low_delay(fd
);
438 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
439 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
443 pa_log("SO_TIMESTAMP unsupported on this platform");
448 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
449 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
455 memset(&mr4
, 0, sizeof(mr4
));
456 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
457 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
459 } else if (af
== AF_INET6
) {
460 struct ipv6_mreq mr6
;
461 memset(&mr6
, 0, sizeof(mr6
));
462 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
463 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
466 pa_assert_not_reached();
469 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
473 if (bind(fd
, sa
, salen
) < 0) {
474 pa_log("bind() failed: %s", pa_cstrerror(errno
));
487 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
488 struct session
*s
= NULL
;
492 pa_sink_input_new_data data
;
498 if (u
->n_sessions
>= MAX_SESSIONS
) {
499 pa_log("Session limit reached.");
503 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
504 pa_log("Sink does not exist.");
508 pa_rtclock_get(&now
);
510 s
= pa_xnew0(struct session
, 1);
512 s
->first_packet
= false;
513 s
->sdp_info
= *sdp_info
;
514 s
->rtpoll_item
= NULL
;
515 s
->intended_latency
= u
->latency
;
516 s
->last_rate_update
= pa_timeval_load(&now
);
517 s
->last_latency
= u
->latency
;
518 s
->estimated_rate
= (double) sink
->sample_spec
.rate
;
519 s
->avg_estimated_rate
= (double) sink
->sample_spec
.rate
;
520 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
522 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
525 pa_sink_input_new_data_init(&data
);
526 pa_sink_input_new_data_set_sink(&data
, sink
, false);
527 data
.driver
= __FILE__
;
528 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
529 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
531 sdp_info
->session_name
? " (" : "",
532 sdp_info
->session_name
? sdp_info
->session_name
: "",
533 sdp_info
->session_name
? ")" : "");
535 if (sdp_info
->session_name
)
536 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
537 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
538 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
539 data
.module
= u
->module
;
540 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
541 data
.flags
= PA_SINK_INPUT_VARIABLE_RATE
;
543 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
);
544 pa_sink_input_new_data_done(&data
);
546 if (!s
->sink_input
) {
547 pa_log("Failed to create sink input.");
551 s
->sink_input
->userdata
= s
;
553 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
554 s
->sink_input
->pop
= sink_input_pop_cb
;
555 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
556 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
557 s
->sink_input
->kill
= sink_input_kill
;
558 s
->sink_input
->attach
= sink_input_attach
;
559 s
->sink_input
->detach
= sink_input_detach
;
560 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
562 pa_sink_input_get_silence(s
->sink_input
, &silence
);
564 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
566 if (s
->intended_latency
< s
->sink_latency
*2)
567 s
->intended_latency
= s
->sink_latency
*2;
569 s
->memblockq
= pa_memblockq_new(
570 "module-rtp-recv memblockq",
574 &s
->sink_input
->sample_spec
,
575 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
580 pa_memblock_unref(silence
.memblock
);
582 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
584 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
586 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
588 pa_sink_input_put(s
->sink_input
);
590 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
603 static void session_free(struct session
*s
) {
606 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
608 pa_sink_input_unlink(s
->sink_input
);
609 pa_sink_input_unref(s
->sink_input
);
611 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
612 pa_assert(s
->userdata
->n_sessions
>= 1);
613 s
->userdata
->n_sessions
--;
615 pa_memblockq_free(s
->memblockq
);
616 pa_sdp_info_destroy(&s
->sdp_info
);
617 pa_rtp_context_destroy(&s
->rtp_context
);
622 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
623 struct userdata
*u
= userdata
;
624 bool goodbye
= false;
631 pa_assert(fd
== u
->sap_context
.fd
);
632 pa_assert(flags
== PA_IO_EVENT_INPUT
);
634 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
637 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
642 if ((s
= pa_hashmap_remove(u
->by_origin
, info
.origin
)))
645 pa_sdp_info_destroy(&info
);
648 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
649 if (!session_new(u
, &info
))
650 pa_sdp_info_destroy(&info
);
654 pa_rtclock_get(&now
);
655 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
657 pa_sdp_info_destroy(&info
);
662 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
663 struct session
*s
, *n
;
664 struct userdata
*u
= userdata
;
671 pa_rtclock_get(&now
);
673 pa_log_debug("Checking for dead streams ...");
675 for (s
= u
->sessions
; s
; s
= n
) {
679 k
= pa_atomic_load(&s
->timestamp
);
681 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
) {
682 pa_hashmap_remove(u
->by_origin
, s
->sdp_info
.origin
);
688 pa_core_rttime_restart(u
->module
->core
, t
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
691 int pa__init(pa_module
*m
) {
693 pa_modargs
*ma
= NULL
;
694 struct sockaddr_in sa4
;
696 struct sockaddr_in6 sa6
;
700 const char *sap_address
;
701 uint32_t latency_msec
;
706 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
707 pa_log("failed to parse module arguments");
711 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
713 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
714 sa4
.sin_family
= AF_INET
;
715 sa4
.sin_port
= htons(SAP_PORT
);
716 sa
= (struct sockaddr
*) &sa4
;
719 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
720 sa6
.sin6_family
= AF_INET6
;
721 sa6
.sin6_port
= htons(SAP_PORT
);
722 sa
= (struct sockaddr
*) &sa6
;
726 pa_log("Invalid SAP address '%s'", sap_address
);
730 latency_msec
= DEFAULT_LATENCY_MSEC
;
731 if (pa_modargs_get_value_u32(ma
, "latency_msec", &latency_msec
) < 0 || latency_msec
< 1 || latency_msec
> 300000) {
732 pa_log("Invalid latency specification");
736 if ((fd
= mcast_socket(sa
, salen
)) < 0)
739 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
742 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
743 u
->latency
= (pa_usec_t
) latency_msec
* PA_USEC_PER_MSEC
;
745 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
746 pa_sap_context_init_recv(&u
->sap_context
, fd
);
748 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
750 u
->by_origin
= pa_hashmap_new_full(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
, NULL
, (pa_free_cb_t
) session_free
);
752 u
->check_death_event
= pa_core_rttime_new(m
->core
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
, check_death_event_cb
, u
);
768 void pa__done(pa_module
*m
) {
773 if (!(u
= m
->userdata
))
777 m
->core
->mainloop
->io_free(u
->sap_event
);
779 if (u
->check_death_event
)
780 m
->core
->mainloop
->time_free(u
->check_death_event
);
782 pa_sap_context_destroy(&u
->sap_context
);
785 pa_hashmap_free(u
->by_origin
);
787 pa_xfree(u
->sink_name
);