3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
56 #include "module-rtp-recv-symdef.h"
62 PA_MODULE_AUTHOR("Lennart Poettering");
63 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP");
64 PA_MODULE_VERSION(PACKAGE_VERSION
);
65 PA_MODULE_LOAD_ONCE(FALSE
);
67 "sink=<name of the sink> "
68 "sap_address=<multicast address to listen on> "
72 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
79 static const char* const valid_modargs
[] = {
86 struct userdata
*userdata
;
87 PA_LLIST_FIELDS(struct session
);
89 pa_sink_input
*sink_input
;
90 pa_memblockq
*memblockq
;
92 pa_bool_t first_packet
;
96 struct pa_sdp_info sdp_info
;
98 pa_rtp_context rtp_context
;
100 pa_rtpoll_item
*rtpoll_item
;
102 pa_atomic_t timestamp
;
104 pa_smoother
*smoother
;
105 pa_usec_t intended_latency
;
106 pa_usec_t sink_latency
;
108 pa_usec_t last_rate_update
;
114 pa_sap_context sap_context
;
115 pa_io_event
* sap_event
;
117 pa_time_event
*check_death_event
;
121 PA_LLIST_HEAD(struct session
, sessions
);
122 pa_hashmap
*by_origin
;
126 static void session_free(struct session
*s
);
128 /* Called from I/O thread context */
129 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
130 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
133 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
134 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
136 /* Fall through, the default handler will add in the extra
137 * latency added by the resampler */
141 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
144 /* Called from I/O thread context */
145 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
147 pa_sink_input_assert_ref(i
);
148 pa_assert_se(s
= i
->userdata
);
150 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
153 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
158 /* Called from I/O thread context */
159 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
162 pa_sink_input_assert_ref(i
);
163 pa_assert_se(s
= i
->userdata
);
165 pa_memblockq_rewind(s
->memblockq
, nbytes
);
168 /* Called from thread context */
169 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
172 pa_sink_input_assert_ref(i
);
173 pa_assert_se(s
= i
->userdata
);
175 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
178 /* Called from main context */
179 static void sink_input_kill(pa_sink_input
* i
) {
181 pa_sink_input_assert_ref(i
);
182 pa_assert_se(s
= i
->userdata
);
187 /* Called from I/O thread context */
188 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
195 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
197 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
199 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
200 pa_log("poll() signalled bad revents.");
204 if ((p
->revents
& POLLIN
) == 0)
209 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
) < 0)
212 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
) {
213 pa_memblock_unref(chunk
.memblock
);
217 if (!s
->first_packet
) {
218 s
->first_packet
= TRUE
;
220 s
->ssrc
= s
->rtp_context
.ssrc
;
221 s
->offset
= s
->rtp_context
.timestamp
;
223 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
224 pa_log_warn("Detected RTP packet loop!");
226 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
227 pa_memblock_unref(chunk
.memblock
);
232 /* Check whether there was a timestamp overflow */
233 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
234 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
236 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
241 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
, TRUE
);
243 pa_rtclock_get(&now
);
245 pa_smoother_put(s
->smoother
, pa_timeval_load(&now
), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
));
247 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
248 pa_log_warn("Queue overrun");
249 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
, TRUE
);
252 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
254 pa_memblock_unref(chunk
.memblock
);
256 /* The next timestamp we expect */
257 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
259 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
261 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
262 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
, fix
;
263 unsigned fix_samples
;
265 pa_log_debug("Updating sample rate");
267 wi
= pa_smoother_get(s
->smoother
, pa_timeval_load(&now
));
268 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
270 sink_delay
= pa_sink_get_latency_within_thread(s
->sink_input
->sink
);
271 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
273 if (ri
> render_delay
+sink_delay
)
274 ri
-= render_delay
+sink_delay
;
283 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
285 /* Calculate deviation */
286 if (latency
< s
->intended_latency
)
287 fix
= s
->intended_latency
- latency
;
289 fix
= latency
- s
->intended_latency
;
291 /* How many samples is this per second? */
292 fix_samples
= (unsigned) (fix
* (pa_usec_t
) s
->sink_input
->thread_info
.sample_spec
.rate
/ (pa_usec_t
) RATE_UPDATE_INTERVAL
);
294 /* Check if deviation is in bounds */
295 if (fix_samples
> s
->sink_input
->sample_spec
.rate
*.20)
296 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples
);
299 if (latency
< s
->intended_latency
)
300 s
->sink_input
->sample_spec
.rate
-= fix_samples
;
302 s
->sink_input
->sample_spec
.rate
+= fix_samples
;
304 if (s
->sink_input
->sample_spec
.rate
> PA_RATE_MAX
)
305 s
->sink_input
->sample_spec
.rate
= PA_RATE_MAX
;
308 pa_assert(pa_sample_spec_valid(&s
->sink_input
->sample_spec
));
310 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
312 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
314 s
->last_rate_update
= pa_timeval_load(&now
);
317 if (pa_memblockq_is_readable(s
->memblockq
) &&
318 s
->sink_input
->thread_info
.underrun_for
> 0) {
319 pa_log_debug("Requesting rewind due to end of underrun");
320 pa_sink_input_request_rewind(s
->sink_input
, 0, FALSE
, TRUE
, FALSE
);
326 /* Called from I/O thread context */
327 static void sink_input_attach(pa_sink_input
*i
) {
331 pa_sink_input_assert_ref(i
);
332 pa_assert_se(s
= i
->userdata
);
334 pa_assert(!s
->rtpoll_item
);
335 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->rtpoll
, PA_RTPOLL_LATE
, 1);
337 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
338 p
->fd
= s
->rtp_context
.fd
;
342 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
343 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
346 /* Called from I/O thread context */
347 static void sink_input_detach(pa_sink_input
*i
) {
349 pa_sink_input_assert_ref(i
);
350 pa_assert_se(s
= i
->userdata
);
352 pa_assert(s
->rtpoll_item
);
353 pa_rtpoll_item_free(s
->rtpoll_item
);
354 s
->rtpoll_item
= NULL
;
357 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
358 int af
, fd
= -1, r
, one
;
361 pa_assert(salen
> 0);
364 if ((fd
= socket(af
, SOCK_DGRAM
, 0)) < 0) {
365 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
370 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
371 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
377 memset(&mr4
, 0, sizeof(mr4
));
378 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
379 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
382 struct ipv6_mreq mr6
;
383 memset(&mr6
, 0, sizeof(mr6
));
384 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
385 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
390 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
394 if (bind(fd
, sa
, salen
) < 0) {
395 pa_log("bind() failed: %s", pa_cstrerror(errno
));
408 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
409 struct session
*s
= NULL
;
413 pa_sink_input_new_data data
;
419 if (u
->n_sessions
>= MAX_SESSIONS
) {
420 pa_log("Session limit reached.");
424 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
425 pa_log("Sink does not exist.");
429 pa_rtclock_get(&now
);
431 s
= pa_xnew0(struct session
, 1);
433 s
->first_packet
= FALSE
;
434 s
->sdp_info
= *sdp_info
;
435 s
->rtpoll_item
= NULL
;
436 s
->intended_latency
= LATENCY_USEC
;
437 s
->smoother
= pa_smoother_new(
443 pa_timeval_load(&now
),
445 s
->last_rate_update
= pa_timeval_load(&now
);
446 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
448 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
451 pa_sink_input_new_data_init(&data
);
453 data
.driver
= __FILE__
;
454 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
455 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
457 sdp_info
->session_name
? " (" : "",
458 sdp_info
->session_name
? sdp_info
->session_name
: "",
459 sdp_info
->session_name
? ")" : "");
461 if (sdp_info
->session_name
)
462 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
463 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
464 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
465 data
.module
= u
->module
;
466 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
468 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
, PA_SINK_INPUT_VARIABLE_RATE
);
469 pa_sink_input_new_data_done(&data
);
471 if (!s
->sink_input
) {
472 pa_log("Failed to create sink input.");
476 s
->sink_input
->userdata
= s
;
478 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
479 s
->sink_input
->pop
= sink_input_pop_cb
;
480 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
481 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
482 s
->sink_input
->kill
= sink_input_kill
;
483 s
->sink_input
->attach
= sink_input_attach
;
484 s
->sink_input
->detach
= sink_input_detach
;
486 pa_sink_input_get_silence(s
->sink_input
, &silence
);
488 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
490 if (s
->intended_latency
< s
->sink_latency
*2)
491 s
->intended_latency
= s
->sink_latency
*2;
493 s
->memblockq
= pa_memblockq_new(
497 pa_frame_size(&s
->sink_input
->sample_spec
),
498 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
503 pa_memblock_unref(silence
.memblock
);
505 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
507 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
509 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
511 pa_sink_input_put(s
->sink_input
);
513 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
526 static void session_free(struct session
*s
) {
529 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
531 pa_sink_input_unlink(s
->sink_input
);
532 pa_sink_input_unref(s
->sink_input
);
534 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
535 pa_assert(s
->userdata
->n_sessions
>= 1);
536 s
->userdata
->n_sessions
--;
537 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
539 pa_memblockq_free(s
->memblockq
);
540 pa_sdp_info_destroy(&s
->sdp_info
);
541 pa_rtp_context_destroy(&s
->rtp_context
);
543 pa_smoother_free(s
->smoother
);
548 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
549 struct userdata
*u
= userdata
;
550 pa_bool_t goodbye
= FALSE
;
557 pa_assert(fd
== u
->sap_context
.fd
);
558 pa_assert(flags
== PA_IO_EVENT_INPUT
);
560 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
563 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
568 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
571 pa_sdp_info_destroy(&info
);
574 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
575 if (!session_new(u
, &info
))
576 pa_sdp_info_destroy(&info
);
580 pa_rtclock_get(&now
);
581 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
583 pa_sdp_info_destroy(&info
);
588 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*ptv
, void *userdata
) {
589 struct session
*s
, *n
;
590 struct userdata
*u
= userdata
;
599 pa_rtclock_get(&now
);
601 pa_log_debug("Checking for dead streams ...");
603 for (s
= u
->sessions
; s
; s
= n
) {
607 k
= pa_atomic_load(&s
->timestamp
);
609 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
614 pa_gettimeofday(&tv
);
615 pa_timeval_add(&tv
, DEATH_TIMEOUT
*PA_USEC_PER_SEC
);
616 m
->time_restart(t
, &tv
);
619 int pa__init(pa_module
*m
) {
621 pa_modargs
*ma
= NULL
;
622 struct sockaddr_in sa4
;
624 struct sockaddr_in6 sa6
;
628 const char *sap_address
;
634 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
635 pa_log("failed to parse module arguments");
639 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
641 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
642 sa4
.sin_family
= AF_INET
;
643 sa4
.sin_port
= htons(SAP_PORT
);
644 sa
= (struct sockaddr
*) &sa4
;
647 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
648 sa6
.sin6_family
= AF_INET6
;
649 sa6
.sin6_port
= htons(SAP_PORT
);
650 sa
= (struct sockaddr
*) &sa6
;
654 pa_log("Invalid SAP address '%s'", sap_address
);
658 if ((fd
= mcast_socket(sa
, salen
)) < 0)
661 m
->userdata
= u
= pa_xnew(struct userdata
, 1);
663 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
665 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
666 pa_sap_context_init_recv(&u
->sap_context
, fd
);
668 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
670 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
672 pa_gettimeofday(&tv
);
673 pa_timeval_add(&tv
, DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
674 u
->check_death_event
= m
->core
->mainloop
->time_new(m
->core
->mainloop
, &tv
, check_death_event_cb
, u
);
690 void pa__done(pa_module
*m
) {
696 if (!(u
= m
->userdata
))
700 m
->core
->mainloop
->io_free(u
->sap_event
);
702 if (u
->check_death_event
)
703 m
->core
->mainloop
->time_free(u
->check_death_event
);
705 pa_sap_context_destroy(&u
->sap_context
);
708 while ((s
= pa_hashmap_first(u
->by_origin
)))
711 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
714 pa_xfree(u
->sink_name
);