3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
59 #include "module-rtp-recv-symdef.h"
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION
);
68 PA_MODULE_LOAD_ONCE(FALSE
);
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
82 static const char* const valid_modargs
[] = {
89 struct userdata
*userdata
;
90 PA_LLIST_FIELDS(struct session
);
92 pa_sink_input
*sink_input
;
93 pa_memblockq
*memblockq
;
95 pa_bool_t first_packet
;
99 struct pa_sdp_info sdp_info
;
101 pa_rtp_context rtp_context
;
103 pa_rtpoll_item
*rtpoll_item
;
105 pa_atomic_t timestamp
;
107 pa_smoother
*smoother
;
108 pa_usec_t intended_latency
;
109 pa_usec_t sink_latency
;
111 pa_usec_t last_rate_update
;
117 pa_sap_context sap_context
;
118 pa_io_event
* sap_event
;
120 pa_time_event
*check_death_event
;
124 PA_LLIST_HEAD(struct session
, sessions
);
125 pa_hashmap
*by_origin
;
129 static void session_free(struct session
*s
);
131 /* Called from I/O thread context */
132 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
133 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
136 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
137 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
139 /* Fall through, the default handler will add in the extra
140 * latency added by the resampler */
144 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
147 /* Called from I/O thread context */
148 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
150 pa_sink_input_assert_ref(i
);
151 pa_assert_se(s
= i
->userdata
);
153 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
156 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
161 /* Called from I/O thread context */
162 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
165 pa_sink_input_assert_ref(i
);
166 pa_assert_se(s
= i
->userdata
);
168 pa_memblockq_rewind(s
->memblockq
, nbytes
);
171 /* Called from I/O thread context */
172 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
175 pa_sink_input_assert_ref(i
);
176 pa_assert_se(s
= i
->userdata
);
178 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
181 /* Called from main context */
182 static void sink_input_kill(pa_sink_input
* i
) {
184 pa_sink_input_assert_ref(i
);
185 pa_assert_se(s
= i
->userdata
);
190 /* Called from IO context */
191 static void sink_input_suspend_within_thread(pa_sink_input
* i
, pa_bool_t b
) {
193 pa_sink_input_assert_ref(i
);
194 pa_assert_se(s
= i
->userdata
);
197 pa_smoother_pause(s
->smoother
, pa_rtclock_now());
198 pa_memblockq_flush_read(s
->memblockq
);
200 s
->first_packet
= FALSE
;
203 /* Called from I/O thread context */
204 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
207 struct timeval now
= { 0, 0 };
211 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
213 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
215 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
216 pa_log("poll() signalled bad revents.");
220 if ((p
->revents
& POLLIN
) == 0)
225 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
228 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
229 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
230 pa_memblock_unref(chunk
.memblock
);
234 if (!s
->first_packet
) {
235 s
->first_packet
= TRUE
;
237 s
->ssrc
= s
->rtp_context
.ssrc
;
238 s
->offset
= s
->rtp_context
.timestamp
;
240 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
241 pa_log_warn("Detected RTP packet loop!");
243 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
244 pa_memblock_unref(chunk
.memblock
);
249 /* Check whether there was a timestamp overflow */
250 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
251 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
253 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
258 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
260 if (now
.tv_sec
== 0) {
262 pa_log_warn("Using artificial time instead of timestamp");
264 pa_rtclock_get(&now
);
266 pa_rtclock_from_wallclock(&now
);
268 pa_smoother_put(s
->smoother
, pa_timeval_load(&now
), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
));
270 /* Tell the smoother that we are rolling now, in case it is still paused */
271 pa_smoother_resume(s
->smoother
, pa_timeval_load(&now
), TRUE
);
273 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
274 pa_log_warn("Queue overrun");
275 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
278 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280 pa_memblock_unref(chunk
.memblock
);
282 /* The next timestamp we expect */
283 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
285 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
287 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
288 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
, fix
;
289 unsigned fix_samples
;
291 pa_log_debug("Updating sample rate");
293 wi
= pa_smoother_get(s
->smoother
, pa_timeval_load(&now
));
294 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
296 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
298 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
299 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
301 if (ri
> render_delay
+sink_delay
)
302 ri
-= render_delay
+sink_delay
;
311 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
313 /* Calculate deviation */
314 if (latency
< s
->intended_latency
)
315 fix
= s
->intended_latency
- latency
;
317 fix
= latency
- s
->intended_latency
;
319 /* How many samples is this per second? */
320 fix_samples
= (unsigned) (fix
* (pa_usec_t
) s
->sink_input
->thread_info
.sample_spec
.rate
/ (pa_usec_t
) RATE_UPDATE_INTERVAL
);
322 /* Check if deviation is in bounds */
323 if (fix_samples
> s
->sink_input
->sample_spec
.rate
*.50)
324 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples
);
327 if (latency
< s
->intended_latency
)
328 s
->sink_input
->sample_spec
.rate
-= fix_samples
;
330 s
->sink_input
->sample_spec
.rate
+= fix_samples
;
332 if (s
->sink_input
->sample_spec
.rate
> PA_RATE_MAX
)
333 s
->sink_input
->sample_spec
.rate
= PA_RATE_MAX
;
336 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
338 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
340 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
342 s
->last_rate_update
= pa_timeval_load(&now
);
345 if (pa_memblockq_is_readable(s
->memblockq
) &&
346 s
->sink_input
->thread_info
.underrun_for
> 0) {
347 pa_log_debug("Requesting rewind due to end of underrun");
348 pa_sink_input_request_rewind(s
->sink_input
, 0, FALSE
, TRUE
, FALSE
);
354 /* Called from I/O thread context */
355 static void sink_input_attach(pa_sink_input
*i
) {
359 pa_sink_input_assert_ref(i
);
360 pa_assert_se(s
= i
->userdata
);
362 pa_assert(!s
->rtpoll_item
);
363 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->rtpoll
, PA_RTPOLL_LATE
, 1);
365 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
366 p
->fd
= s
->rtp_context
.fd
;
370 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
371 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
374 /* Called from I/O thread context */
375 static void sink_input_detach(pa_sink_input
*i
) {
377 pa_sink_input_assert_ref(i
);
378 pa_assert_se(s
= i
->userdata
);
380 pa_assert(s
->rtpoll_item
);
381 pa_rtpoll_item_free(s
->rtpoll_item
);
382 s
->rtpoll_item
= NULL
;
385 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
386 int af
, fd
= -1, r
, one
;
389 pa_assert(salen
> 0);
392 if ((fd
= socket(af
, SOCK_DGRAM
, 0)) < 0) {
393 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
397 pa_make_udp_socket_low_delay(fd
);
400 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
401 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
406 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
407 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
413 memset(&mr4
, 0, sizeof(mr4
));
414 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
415 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
418 struct ipv6_mreq mr6
;
419 memset(&mr6
, 0, sizeof(mr6
));
420 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
421 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
426 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
430 if (bind(fd
, sa
, salen
) < 0) {
431 pa_log("bind() failed: %s", pa_cstrerror(errno
));
444 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
445 struct session
*s
= NULL
;
449 pa_sink_input_new_data data
;
455 if (u
->n_sessions
>= MAX_SESSIONS
) {
456 pa_log("Session limit reached.");
460 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
461 pa_log("Sink does not exist.");
465 pa_rtclock_get(&now
);
467 s
= pa_xnew0(struct session
, 1);
469 s
->first_packet
= FALSE
;
470 s
->sdp_info
= *sdp_info
;
471 s
->rtpoll_item
= NULL
;
472 s
->intended_latency
= LATENCY_USEC
;
473 s
->smoother
= pa_smoother_new(
479 pa_timeval_load(&now
),
481 s
->last_rate_update
= pa_timeval_load(&now
);
482 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
484 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
487 pa_sink_input_new_data_init(&data
);
489 data
.driver
= __FILE__
;
490 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
491 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
493 sdp_info
->session_name
? " (" : "",
494 sdp_info
->session_name
? sdp_info
->session_name
: "",
495 sdp_info
->session_name
? ")" : "");
497 if (sdp_info
->session_name
)
498 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
499 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
500 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
501 data
.module
= u
->module
;
502 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
504 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
, PA_SINK_INPUT_VARIABLE_RATE
);
505 pa_sink_input_new_data_done(&data
);
507 if (!s
->sink_input
) {
508 pa_log("Failed to create sink input.");
512 s
->sink_input
->userdata
= s
;
514 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
515 s
->sink_input
->pop
= sink_input_pop_cb
;
516 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
517 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
518 s
->sink_input
->kill
= sink_input_kill
;
519 s
->sink_input
->attach
= sink_input_attach
;
520 s
->sink_input
->detach
= sink_input_detach
;
521 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
523 pa_sink_input_get_silence(s
->sink_input
, &silence
);
525 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
527 if (s
->intended_latency
< s
->sink_latency
*2)
528 s
->intended_latency
= s
->sink_latency
*2;
530 s
->memblockq
= pa_memblockq_new(
534 pa_frame_size(&s
->sink_input
->sample_spec
),
535 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
540 pa_memblock_unref(silence
.memblock
);
542 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
544 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
546 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
548 pa_sink_input_put(s
->sink_input
);
550 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
563 static void session_free(struct session
*s
) {
566 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
568 pa_sink_input_unlink(s
->sink_input
);
569 pa_sink_input_unref(s
->sink_input
);
571 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
572 pa_assert(s
->userdata
->n_sessions
>= 1);
573 s
->userdata
->n_sessions
--;
574 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
576 pa_memblockq_free(s
->memblockq
);
577 pa_sdp_info_destroy(&s
->sdp_info
);
578 pa_rtp_context_destroy(&s
->rtp_context
);
580 pa_smoother_free(s
->smoother
);
585 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
586 struct userdata
*u
= userdata
;
587 pa_bool_t goodbye
= FALSE
;
594 pa_assert(fd
== u
->sap_context
.fd
);
595 pa_assert(flags
== PA_IO_EVENT_INPUT
);
597 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
600 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
605 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
608 pa_sdp_info_destroy(&info
);
611 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
612 if (!session_new(u
, &info
))
613 pa_sdp_info_destroy(&info
);
617 pa_rtclock_get(&now
);
618 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
620 pa_sdp_info_destroy(&info
);
625 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*ptv
, void *userdata
) {
626 struct session
*s
, *n
;
627 struct userdata
*u
= userdata
;
636 pa_rtclock_get(&now
);
638 pa_log_debug("Checking for dead streams ...");
640 for (s
= u
->sessions
; s
; s
= n
) {
644 k
= pa_atomic_load(&s
->timestamp
);
646 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
651 pa_gettimeofday(&tv
);
652 pa_timeval_add(&tv
, DEATH_TIMEOUT
*PA_USEC_PER_SEC
);
653 m
->time_restart(t
, &tv
);
656 int pa__init(pa_module
*m
) {
658 pa_modargs
*ma
= NULL
;
659 struct sockaddr_in sa4
;
661 struct sockaddr_in6 sa6
;
665 const char *sap_address
;
671 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
672 pa_log("failed to parse module arguments");
676 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
678 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
679 sa4
.sin_family
= AF_INET
;
680 sa4
.sin_port
= htons(SAP_PORT
);
681 sa
= (struct sockaddr
*) &sa4
;
684 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
685 sa6
.sin6_family
= AF_INET6
;
686 sa6
.sin6_port
= htons(SAP_PORT
);
687 sa
= (struct sockaddr
*) &sa6
;
691 pa_log("Invalid SAP address '%s'", sap_address
);
695 if ((fd
= mcast_socket(sa
, salen
)) < 0)
698 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
700 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
702 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
703 pa_sap_context_init_recv(&u
->sap_context
, fd
);
705 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
707 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
709 pa_gettimeofday(&tv
);
710 pa_timeval_add(&tv
, DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
711 u
->check_death_event
= m
->core
->mainloop
->time_new(m
->core
->mainloop
, &tv
, check_death_event_cb
, u
);
727 void pa__done(pa_module
*m
) {
733 if (!(u
= m
->userdata
))
737 m
->core
->mainloop
->io_free(u
->sap_event
);
739 if (u
->check_death_event
)
740 m
->core
->mainloop
->time_free(u
->check_death_event
);
742 pa_sap_context_destroy(&u
->sap_context
);
745 while ((s
= pa_hashmap_first(u
->by_origin
)))
748 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
751 pa_xfree(u
->sink_name
);