3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/socket-util.h>
56 #include <pulsecore/once.h>
58 #include "module-rtp-recv-symdef.h"
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION
);
67 PA_MODULE_LOAD_ONCE(FALSE
);
69 "sink=<name of the sink> "
70 "sap_address=<multicast address to listen on> "
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81 static const char* const valid_modargs
[] = {
88 struct userdata
*userdata
;
89 PA_LLIST_FIELDS(struct session
);
91 pa_sink_input
*sink_input
;
92 pa_memblockq
*memblockq
;
94 pa_bool_t first_packet
;
98 struct pa_sdp_info sdp_info
;
100 pa_rtp_context rtp_context
;
102 pa_rtpoll_item
*rtpoll_item
;
104 pa_atomic_t timestamp
;
106 pa_smoother
*smoother
;
107 pa_usec_t intended_latency
;
108 pa_usec_t sink_latency
;
110 pa_usec_t last_rate_update
;
116 pa_sap_context sap_context
;
117 pa_io_event
* sap_event
;
119 pa_time_event
*check_death_event
;
123 PA_LLIST_HEAD(struct session
, sessions
);
124 pa_hashmap
*by_origin
;
128 static void session_free(struct session
*s
);
130 /* Called from I/O thread context */
131 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
132 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
135 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
136 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
138 /* Fall through, the default handler will add in the extra
139 * latency added by the resampler */
143 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
146 /* Called from I/O thread context */
147 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
149 pa_sink_input_assert_ref(i
);
150 pa_assert_se(s
= i
->userdata
);
152 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
155 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
160 /* Called from I/O thread context */
161 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
164 pa_sink_input_assert_ref(i
);
165 pa_assert_se(s
= i
->userdata
);
167 pa_memblockq_rewind(s
->memblockq
, nbytes
);
170 /* Called from I/O thread context */
171 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
174 pa_sink_input_assert_ref(i
);
175 pa_assert_se(s
= i
->userdata
);
177 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
180 /* Called from main context */
181 static void sink_input_kill(pa_sink_input
* i
) {
183 pa_sink_input_assert_ref(i
);
184 pa_assert_se(s
= i
->userdata
);
189 /* Called from IO context */
190 static void sink_input_suspend_within_thread(pa_sink_input
* i
, pa_bool_t b
) {
192 pa_sink_input_assert_ref(i
);
193 pa_assert_se(s
= i
->userdata
);
196 pa_smoother_pause(s
->smoother
, pa_rtclock_usec());
197 pa_memblockq_flush_read(s
->memblockq
);
199 s
->first_packet
= FALSE
;
202 /* Called from I/O thread context */
203 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
206 struct timeval now
= { 0, 0 };
210 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
212 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
214 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
215 pa_log("poll() signalled bad revents.");
219 if ((p
->revents
& POLLIN
) == 0)
224 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
, &now
) < 0)
227 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
||
228 !PA_SINK_IS_OPENED(s
->sink_input
->sink
->thread_info
.state
)) {
229 pa_memblock_unref(chunk
.memblock
);
233 if (!s
->first_packet
) {
234 s
->first_packet
= TRUE
;
236 s
->ssrc
= s
->rtp_context
.ssrc
;
237 s
->offset
= s
->rtp_context
.timestamp
;
239 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
240 pa_log_warn("Detected RTP packet loop!");
242 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
243 pa_memblock_unref(chunk
.memblock
);
248 /* Check whether there was a timestamp overflow */
249 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
250 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
252 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
257 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
259 if (now
.tv_sec
== 0) {
261 pa_log_warn("Using artificial time instead of timestamp");
263 pa_rtclock_get(&now
);
265 pa_rtclock_from_wallclock(&now
);
267 pa_smoother_put(s
->smoother
, pa_timeval_load(&now
), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
));
269 /* Tell the smoother that we are rolling now, in case it is still paused */
270 pa_smoother_resume(s
->smoother
, pa_timeval_load(&now
), TRUE
);
272 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
273 pa_log_warn("Queue overrun");
274 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
277 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
279 pa_memblock_unref(chunk
.memblock
);
281 /* The next timestamp we expect */
282 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
284 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
286 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
287 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
, fix
;
288 unsigned fix_samples
;
290 pa_log_debug("Updating sample rate");
292 wi
= pa_smoother_get(s
->smoother
, pa_timeval_load(&now
));
293 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
295 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi
, (unsigned long) ri
);
297 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
298 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
300 if (ri
> render_delay
+sink_delay
)
301 ri
-= render_delay
+sink_delay
;
310 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
312 /* Calculate deviation */
313 if (latency
< s
->intended_latency
)
314 fix
= s
->intended_latency
- latency
;
316 fix
= latency
- s
->intended_latency
;
318 /* How many samples is this per second? */
319 fix_samples
= (unsigned) (fix
* (pa_usec_t
) s
->sink_input
->thread_info
.sample_spec
.rate
/ (pa_usec_t
) RATE_UPDATE_INTERVAL
);
321 /* Check if deviation is in bounds */
322 if (fix_samples
> s
->sink_input
->sample_spec
.rate
*.50)
323 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples
);
326 if (latency
< s
->intended_latency
)
327 s
->sink_input
->sample_spec
.rate
-= fix_samples
;
329 s
->sink_input
->sample_spec
.rate
+= fix_samples
;
331 if (s
->sink_input
->sample_spec
.rate
> PA_RATE_MAX
)
332 s
->sink_input
->sample_spec
.rate
= PA_RATE_MAX
;
335 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
337 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
339 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
341 s
->last_rate_update
= pa_timeval_load(&now
);
344 if (pa_memblockq_is_readable(s
->memblockq
) &&
345 s
->sink_input
->thread_info
.underrun_for
> 0) {
346 pa_log_debug("Requesting rewind due to end of underrun");
347 pa_sink_input_request_rewind(s
->sink_input
, 0, FALSE
, TRUE
, FALSE
);
353 /* Called from I/O thread context */
354 static void sink_input_attach(pa_sink_input
*i
) {
358 pa_sink_input_assert_ref(i
);
359 pa_assert_se(s
= i
->userdata
);
361 pa_assert(!s
->rtpoll_item
);
362 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->rtpoll
, PA_RTPOLL_LATE
, 1);
364 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
365 p
->fd
= s
->rtp_context
.fd
;
369 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
370 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
373 /* Called from I/O thread context */
374 static void sink_input_detach(pa_sink_input
*i
) {
376 pa_sink_input_assert_ref(i
);
377 pa_assert_se(s
= i
->userdata
);
379 pa_assert(s
->rtpoll_item
);
380 pa_rtpoll_item_free(s
->rtpoll_item
);
381 s
->rtpoll_item
= NULL
;
384 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
385 int af
, fd
= -1, r
, one
;
388 pa_assert(salen
> 0);
391 if ((fd
= socket(af
, SOCK_DGRAM
, 0)) < 0) {
392 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
396 pa_make_udp_socket_low_delay(fd
);
399 if (setsockopt(fd
, SOL_SOCKET
, SO_TIMESTAMP
, &one
, sizeof(one
)) < 0) {
400 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno
));
405 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
406 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
412 memset(&mr4
, 0, sizeof(mr4
));
413 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
414 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
417 struct ipv6_mreq mr6
;
418 memset(&mr6
, 0, sizeof(mr6
));
419 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
420 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
425 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
429 if (bind(fd
, sa
, salen
) < 0) {
430 pa_log("bind() failed: %s", pa_cstrerror(errno
));
443 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
444 struct session
*s
= NULL
;
448 pa_sink_input_new_data data
;
454 if (u
->n_sessions
>= MAX_SESSIONS
) {
455 pa_log("Session limit reached.");
459 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
460 pa_log("Sink does not exist.");
464 pa_rtclock_get(&now
);
466 s
= pa_xnew0(struct session
, 1);
468 s
->first_packet
= FALSE
;
469 s
->sdp_info
= *sdp_info
;
470 s
->rtpoll_item
= NULL
;
471 s
->intended_latency
= LATENCY_USEC
;
472 s
->smoother
= pa_smoother_new(
478 pa_timeval_load(&now
),
480 s
->last_rate_update
= pa_timeval_load(&now
);
481 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
483 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
486 pa_sink_input_new_data_init(&data
);
488 data
.driver
= __FILE__
;
489 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
490 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
492 sdp_info
->session_name
? " (" : "",
493 sdp_info
->session_name
? sdp_info
->session_name
: "",
494 sdp_info
->session_name
? ")" : "");
496 if (sdp_info
->session_name
)
497 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
498 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
499 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
500 data
.module
= u
->module
;
501 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
503 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
, PA_SINK_INPUT_VARIABLE_RATE
);
504 pa_sink_input_new_data_done(&data
);
506 if (!s
->sink_input
) {
507 pa_log("Failed to create sink input.");
511 s
->sink_input
->userdata
= s
;
513 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
514 s
->sink_input
->pop
= sink_input_pop_cb
;
515 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
516 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
517 s
->sink_input
->kill
= sink_input_kill
;
518 s
->sink_input
->attach
= sink_input_attach
;
519 s
->sink_input
->detach
= sink_input_detach
;
520 s
->sink_input
->suspend_within_thread
= sink_input_suspend_within_thread
;
522 pa_sink_input_get_silence(s
->sink_input
, &silence
);
524 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
526 if (s
->intended_latency
< s
->sink_latency
*2)
527 s
->intended_latency
= s
->sink_latency
*2;
529 s
->memblockq
= pa_memblockq_new(
533 pa_frame_size(&s
->sink_input
->sample_spec
),
534 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
539 pa_memblock_unref(silence
.memblock
);
541 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
543 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
545 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
547 pa_sink_input_put(s
->sink_input
);
549 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
562 static void session_free(struct session
*s
) {
565 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
567 pa_sink_input_unlink(s
->sink_input
);
568 pa_sink_input_unref(s
->sink_input
);
570 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
571 pa_assert(s
->userdata
->n_sessions
>= 1);
572 s
->userdata
->n_sessions
--;
573 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
575 pa_memblockq_free(s
->memblockq
);
576 pa_sdp_info_destroy(&s
->sdp_info
);
577 pa_rtp_context_destroy(&s
->rtp_context
);
579 pa_smoother_free(s
->smoother
);
584 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
585 struct userdata
*u
= userdata
;
586 pa_bool_t goodbye
= FALSE
;
593 pa_assert(fd
== u
->sap_context
.fd
);
594 pa_assert(flags
== PA_IO_EVENT_INPUT
);
596 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
599 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
604 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
607 pa_sdp_info_destroy(&info
);
610 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
611 if (!session_new(u
, &info
))
612 pa_sdp_info_destroy(&info
);
616 pa_rtclock_get(&now
);
617 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
619 pa_sdp_info_destroy(&info
);
624 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*ptv
, void *userdata
) {
625 struct session
*s
, *n
;
626 struct userdata
*u
= userdata
;
635 pa_rtclock_get(&now
);
637 pa_log_debug("Checking for dead streams ...");
639 for (s
= u
->sessions
; s
; s
= n
) {
643 k
= pa_atomic_load(&s
->timestamp
);
645 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
650 pa_gettimeofday(&tv
);
651 pa_timeval_add(&tv
, DEATH_TIMEOUT
*PA_USEC_PER_SEC
);
652 m
->time_restart(t
, &tv
);
655 int pa__init(pa_module
*m
) {
657 pa_modargs
*ma
= NULL
;
658 struct sockaddr_in sa4
;
660 struct sockaddr_in6 sa6
;
664 const char *sap_address
;
670 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
671 pa_log("failed to parse module arguments");
675 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
677 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
678 sa4
.sin_family
= AF_INET
;
679 sa4
.sin_port
= htons(SAP_PORT
);
680 sa
= (struct sockaddr
*) &sa4
;
683 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
684 sa6
.sin6_family
= AF_INET6
;
685 sa6
.sin6_port
= htons(SAP_PORT
);
686 sa
= (struct sockaddr
*) &sa6
;
690 pa_log("Invalid SAP address '%s'", sap_address
);
694 if ((fd
= mcast_socket(sa
, salen
)) < 0)
697 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
699 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
701 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
702 pa_sap_context_init_recv(&u
->sap_context
, fd
);
704 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
706 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
708 pa_gettimeofday(&tv
);
709 pa_timeval_add(&tv
, DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
710 u
->check_death_event
= m
->core
->mainloop
->time_new(m
->core
->mainloop
, &tv
, check_death_event_cb
, u
);
726 void pa__done(pa_module
*m
) {
732 if (!(u
= m
->userdata
))
736 m
->core
->mainloop
->io_free(u
->sap_event
);
738 if (u
->check_death_event
)
739 m
->core
->mainloop
->time_free(u
->check_death_event
);
741 pa_sap_context_destroy(&u
->sap_context
);
744 while ((s
= pa_hashmap_first(u
->by_origin
)))
747 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
750 pa_xfree(u
->sink_name
);