3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
59 #include "module-rtp-recv-symdef.h"
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION
);
68 PA_MODULE_LOAD_ONCE(FALSE
);
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
82 static const char* const valid_modargs
[] = {
89 struct userdata
*userdata
;
90 PA_LLIST_FIELDS(struct session
);
92 pa_sink_input
*sink_input
;
93 pa_memblockq
*memblockq
;
95 pa_bool_t first_packet
;
99 struct pa_sdp_info sdp_info
;
101 pa_rtp_context rtp_context
;
103 pa_rtpoll_item
*rtpoll_item
;
105 pa_atomic_t timestamp
;
107 pa_smoother
*smoother
;
108 pa_usec_t intended_latency
;
109 pa_usec_t sink_latency
;
111 pa_usec_t last_rate_update
;
112 pa_usec_t last_latency
;
119 pa_sap_context sap_context
;
120 pa_io_event
* sap_event
;
122 pa_time_event
*check_death_event
;
126 PA_LLIST_HEAD(struct session
, sessions
);
127 pa_hashmap
*by_origin
;
131 static void session_free(struct session
*s
);
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
135 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
139 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
141 /* Fall through, the default handler will add in the extra
142 * latency added by the resampler */
146 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
152 pa_sink_input_assert_ref(i
);
153 pa_assert_se(s
= i
->userdata
);
155 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
158 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
167 pa_sink_input_assert_ref(i
);
168 pa_assert_se(s
= i
->userdata
);
170 pa_memblockq_rewind(s
->memblockq
, nbytes
);
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
177 pa_sink_input_assert_ref(i
);
178 pa_assert_se(s
= i
->userdata
);
180 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input
* i
) {
186 pa_sink_input_assert_ref(i
);
187 pa_assert_se(s
= i
->userdata
);
192 /* Called from IO context */
193 static void sink_input_suspend_within_thread(pa_sink_input
* i
, pa_bool_t b
) {
195 pa_sink_input_assert_ref(i
);
196 pa_assert_se(s
= i
->userdata
);
199 pa_smoother_pause(s
->smoother
, pa_rtclock_now());
200 pa_memblockq_flush_read(s
->memblockq
);
202 s
->first_packet
= FALSE
;
205 /* Called from I/O thread context */
206 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
209 struct timeval now
= { 0, 0 };
213 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
215 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
217 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
218 pa_log("poll() signalled bad revents.");
222 if ((p
->revents
& POLLIN
) == 0)
227 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
230 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
231 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
232 pa_memblock_unref(chunk
.memblock
);
236 if (!s
->first_packet
) {
237 s
->first_packet
= TRUE
;
239 s
->ssrc
= s
->rtp_context
.ssrc
;
240 s
->offset
= s
->rtp_context
.timestamp
;
242 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
243 pa_log_warn("Detected RTP packet loop!");
245 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
246 pa_memblock_unref(chunk
.memblock
);
251 /* Check whether there was a timestamp overflow */
252 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
253 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
255 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
260 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
262 if (now
.tv_sec
== 0) {
264 pa_log_warn("Using artificial time instead of timestamp");
266 pa_rtclock_get(&now
);
268 pa_rtclock_from_wallclock(&now
);
270 pa_smoother_put(s
->smoother
, pa_timeval_load(&now
), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
));
272 /* Tell the smoother that we are rolling now, in case it is still paused */
273 pa_smoother_resume(s
->smoother
, pa_timeval_load(&now
), TRUE
);
275 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
276 pa_log_warn("Queue overrun");
277 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
280 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
282 pa_memblock_unref(chunk
.memblock
);
284 /* The next timestamp we expect */
285 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
287 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
289 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
290 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
;
291 uint32_t base_rate
= s
->sink_input
->sink
->sample_spec
.rate
;
292 uint32_t current_rate
= s
->sink_input
->sample_spec
.rate
;
294 double estimated_rate
;
296 pa_log_debug("Updating sample rate");
298 wi
= pa_smoother_get(s
->smoother
, pa_timeval_load(&now
));
299 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
301 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
303 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
304 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
306 if (ri
> render_delay
+sink_delay
)
307 ri
-= render_delay
+sink_delay
;
316 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
318 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
319 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
320 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
322 * R̂ = ─────────────── Rⁿ . (1)
325 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
326 * is correct). But there is also the requirement to keep the buffer at a predefined target
327 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
328 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
329 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements
330 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1
331 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
333 * Solving for Rⁿ⁺ⁱ gives
335 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2)
337 * Together Equations (1) and (2) specify the algorithm used below, where a = 7 is used.
339 estimated_rate
= (double) current_rate
* (double) RATE_UPDATE_INTERVAL
/ (double) (RATE_UPDATE_INTERVAL
+ s
->last_latency
- latency
);
340 pa_log_debug("Estimated target rate: %.0f Hz", estimated_rate
);
341 new_rate
= (uint32_t) ((double) (RATE_UPDATE_INTERVAL
+ latency
/4 - s
->intended_latency
/4) / (double) RATE_UPDATE_INTERVAL
* estimated_rate
);
342 s
->last_latency
= latency
;
344 if (new_rate
< (uint32_t) (base_rate
*0.8) || new_rate
> (uint32_t) (base_rate
*1.25)) {
345 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate
, new_rate
);
346 new_rate
= base_rate
;
348 if (base_rate
< new_rate
+ 20 && new_rate
< base_rate
+ 20)
349 new_rate
= base_rate
;
350 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
351 if (new_rate
< (uint32_t) (current_rate
*0.998) || new_rate
> (uint32_t) (current_rate
*1.002)) {
352 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate
, current_rate
);
353 new_rate
= PA_CLAMP(new_rate
, (uint32_t) (current_rate
*0.998), (uint32_t) (current_rate
*1.002));
356 s
->sink_input
->sample_spec
.rate
= new_rate
;
358 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
360 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
362 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
364 s
->last_rate_update
= pa_timeval_load(&now
);
367 if (pa_memblockq_is_readable(s
->memblockq
) &&
368 s
->sink_input
->thread_info
.underrun_for
> 0) {
369 pa_log_debug("Requesting rewind due to end of underrun");
370 pa_sink_input_request_rewind(s
->sink_input
, 0, FALSE
, TRUE
, FALSE
);
376 /* Called from I/O thread context */
377 static void sink_input_attach(pa_sink_input
*i
) {
381 pa_sink_input_assert_ref(i
);
382 pa_assert_se(s
= i
->userdata
);
384 pa_assert(!s
->rtpoll_item
);
385 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->thread_info
.rtpoll
, PA_RTPOLL_LATE
, 1);
387 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
388 p
->fd
= s
->rtp_context
.fd
;
392 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
393 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
396 /* Called from I/O thread context */
397 static void sink_input_detach(pa_sink_input
*i
) {
399 pa_sink_input_assert_ref(i
);
400 pa_assert_se(s
= i
->userdata
);
402 pa_assert(s
->rtpoll_item
);
403 pa_rtpoll_item_free(s
->rtpoll_item
);
404 s
->rtpoll_item
= NULL
;
407 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
408 int af
, fd
= -1, r
, one
;
411 pa_assert(salen
> 0);
414 if ((fd
= pa_socket_cloexec(af
, SOCK_DGRAM
, 0)) < 0) {
415 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
419 pa_make_udp_socket_low_delay(fd
);
422 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
423 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
428 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
429 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
435 memset(&mr4
, 0, sizeof(mr4
));
436 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
437 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
440 struct ipv6_mreq mr6
;
441 memset(&mr6
, 0, sizeof(mr6
));
442 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
443 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
448 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
452 if (bind(fd
, sa
, salen
) < 0) {
453 pa_log("bind() failed: %s", pa_cstrerror(errno
));
466 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
467 struct session
*s
= NULL
;
471 pa_sink_input_new_data data
;
477 if (u
->n_sessions
>= MAX_SESSIONS
) {
478 pa_log("Session limit reached.");
482 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
483 pa_log("Sink does not exist.");
487 pa_rtclock_get(&now
);
489 s
= pa_xnew0(struct session
, 1);
491 s
->first_packet
= FALSE
;
492 s
->sdp_info
= *sdp_info
;
493 s
->rtpoll_item
= NULL
;
494 s
->intended_latency
= LATENCY_USEC
;
495 s
->smoother
= pa_smoother_new(
501 pa_timeval_load(&now
),
503 s
->last_rate_update
= pa_timeval_load(&now
);
504 s
->last_latency
= LATENCY_USEC
;
505 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
507 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
510 pa_sink_input_new_data_init(&data
);
512 data
.driver
= __FILE__
;
513 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
514 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
516 sdp_info
->session_name
? " (" : "",
517 sdp_info
->session_name
? sdp_info
->session_name
: "",
518 sdp_info
->session_name
? ")" : "");
520 if (sdp_info
->session_name
)
521 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
522 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
523 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
524 data
.module
= u
->module
;
525 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
526 data
.flags
= PA_SINK_INPUT_VARIABLE_RATE
;
528 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
);
529 pa_sink_input_new_data_done(&data
);
531 if (!s
->sink_input
) {
532 pa_log("Failed to create sink input.");
536 s
->sink_input
->userdata
= s
;
538 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
539 s
->sink_input
->pop
= sink_input_pop_cb
;
540 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
541 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
542 s
->sink_input
->kill
= sink_input_kill
;
543 s
->sink_input
->attach
= sink_input_attach
;
544 s
->sink_input
->detach
= sink_input_detach
;
545 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
547 pa_sink_input_get_silence(s
->sink_input
, &silence
);
549 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
551 if (s
->intended_latency
< s
->sink_latency
*2)
552 s
->intended_latency
= s
->sink_latency
*2;
554 s
->memblockq
= pa_memblockq_new(
558 pa_frame_size(&s
->sink_input
->sample_spec
),
559 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
564 pa_memblock_unref(silence
.memblock
);
566 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
568 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
570 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
572 pa_sink_input_put(s
->sink_input
);
574 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
587 static void session_free(struct session
*s
) {
590 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
592 pa_sink_input_unlink(s
->sink_input
);
593 pa_sink_input_unref(s
->sink_input
);
595 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
596 pa_assert(s
->userdata
->n_sessions
>= 1);
597 s
->userdata
->n_sessions
--;
598 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
600 pa_memblockq_free(s
->memblockq
);
601 pa_sdp_info_destroy(&s
->sdp_info
);
602 pa_rtp_context_destroy(&s
->rtp_context
);
604 pa_smoother_free(s
->smoother
);
609 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
610 struct userdata
*u
= userdata
;
611 pa_bool_t goodbye
= FALSE
;
618 pa_assert(fd
== u
->sap_context
.fd
);
619 pa_assert(flags
== PA_IO_EVENT_INPUT
);
621 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
624 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
629 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
632 pa_sdp_info_destroy(&info
);
635 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
636 if (!session_new(u
, &info
))
637 pa_sdp_info_destroy(&info
);
641 pa_rtclock_get(&now
);
642 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
644 pa_sdp_info_destroy(&info
);
649 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*tv
, void *userdata
) {
650 struct session
*s
, *n
;
651 struct userdata
*u
= userdata
;
658 pa_rtclock_get(&now
);
660 pa_log_debug("Checking for dead streams ...");
662 for (s
= u
->sessions
; s
; s
= n
) {
666 k
= pa_atomic_load(&s
->timestamp
);
668 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
673 pa_core_rttime_restart(u
->module
->core
, t
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
676 int pa__init(pa_module
*m
) {
678 pa_modargs
*ma
= NULL
;
679 struct sockaddr_in sa4
;
681 struct sockaddr_in6 sa6
;
685 const char *sap_address
;
690 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
691 pa_log("failed to parse module arguments");
695 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
697 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
698 sa4
.sin_family
= AF_INET
;
699 sa4
.sin_port
= htons(SAP_PORT
);
700 sa
= (struct sockaddr
*) &sa4
;
703 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
704 sa6
.sin6_family
= AF_INET6
;
705 sa6
.sin6_port
= htons(SAP_PORT
);
706 sa
= (struct sockaddr
*) &sa6
;
710 pa_log("Invalid SAP address '%s'", sap_address
);
714 if ((fd
= mcast_socket(sa
, salen
)) < 0)
717 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
720 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
722 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
723 pa_sap_context_init_recv(&u
->sap_context
, fd
);
725 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
727 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
729 u
->check_death_event
= pa_core_rttime_new(m
->core
, pa_rtclock_now() + DEATH_TIMEOUT
* PA_USEC_PER_SEC
, check_death_event_cb
, u
);
745 void pa__done(pa_module
*m
) {
751 if (!(u
= m
->userdata
))
755 m
->core
->mainloop
->io_free(u
->sap_event
);
757 if (u
->check_death_event
)
758 m
->core
->mainloop
->time_free(u
->check_death_event
);
760 pa_sap_context_destroy(&u
->sap_context
);
763 while ((s
= pa_hashmap_first(u
->by_origin
)))
766 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
769 pa_xfree(u
->sink_name
);