3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
56 #include "module-rtp-recv-symdef.h"
62 PA_MODULE_AUTHOR("Lennart Poettering");
63 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP");
64 PA_MODULE_VERSION(PACKAGE_VERSION
);
65 PA_MODULE_LOAD_ONCE(FALSE
);
67 "sink=<name of the sink> "
68 "sap_address=<multicast address to listen on> "
72 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
79 static const char* const valid_modargs
[] = {
86 struct userdata
*userdata
;
87 PA_LLIST_FIELDS(struct session
);
89 pa_sink_input
*sink_input
;
90 pa_memblockq
*memblockq
;
92 pa_bool_t first_packet
;
96 struct pa_sdp_info sdp_info
;
98 pa_rtp_context rtp_context
;
100 pa_rtpoll_item
*rtpoll_item
;
102 pa_atomic_t timestamp
;
104 pa_smoother
*smoother
;
105 pa_usec_t intended_latency
;
106 pa_usec_t sink_latency
;
108 pa_usec_t last_rate_update
;
114 pa_sap_context sap_context
;
115 pa_io_event
* sap_event
;
117 pa_time_event
*check_death_event
;
121 PA_LLIST_HEAD(struct session
, sessions
);
122 pa_hashmap
*by_origin
;
126 static void session_free(struct session
*s
);
128 /* Called from I/O thread context */
129 static int sink_input_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
130 struct session
*s
= PA_SINK_INPUT(o
)->userdata
;
133 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
:
134 *((pa_usec_t
*) data
) = pa_bytes_to_usec(pa_memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
136 /* Fall through, the default handler will add in the extra
137 * latency added by the resampler */
141 return pa_sink_input_process_msg(o
, code
, data
, offset
, chunk
);
144 /* Called from I/O thread context */
145 static int sink_input_pop_cb(pa_sink_input
*i
, size_t length
, pa_memchunk
*chunk
) {
147 pa_sink_input_assert_ref(i
);
148 pa_assert_se(s
= i
->userdata
);
150 if (pa_memblockq_peek(s
->memblockq
, chunk
) < 0)
153 pa_memblockq_drop(s
->memblockq
, chunk
->length
);
158 /* Called from I/O thread context */
159 static void sink_input_process_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
162 pa_sink_input_assert_ref(i
);
163 pa_assert_se(s
= i
->userdata
);
165 pa_memblockq_rewind(s
->memblockq
, nbytes
);
168 /* Called from thread context */
169 static void sink_input_update_max_rewind_cb(pa_sink_input
*i
, size_t nbytes
) {
172 pa_sink_input_assert_ref(i
);
173 pa_assert_se(s
= i
->userdata
);
175 pa_memblockq_set_maxrewind(s
->memblockq
, nbytes
);
178 /* Called from main context */
179 static void sink_input_kill(pa_sink_input
* i
) {
181 pa_sink_input_assert_ref(i
);
182 pa_assert_se(s
= i
->userdata
);
187 /* Called from I/O thread context */
188 static int rtpoll_work_cb(pa_rtpoll_item
*i
) {
195 pa_assert_se(s
= pa_rtpoll_item_get_userdata(i
));
197 p
= pa_rtpoll_item_get_pollfd(i
, NULL
);
199 if (p
->revents
& (POLLERR
|POLLNVAL
|POLLHUP
|POLLOUT
)) {
200 pa_log("poll() signalled bad revents.");
204 if ((p
->revents
& POLLIN
) == 0)
209 if (pa_rtp_recv(&s
->rtp_context
, &chunk
, s
->userdata
->module
->core
->mempool
) < 0)
212 if (s
->sdp_info
.payload
!= s
->rtp_context
.payload
) {
213 pa_memblock_unref(chunk
.memblock
);
217 if (!s
->first_packet
) {
218 s
->first_packet
= TRUE
;
220 s
->ssrc
= s
->rtp_context
.ssrc
;
221 s
->offset
= s
->rtp_context
.timestamp
;
223 if (s
->ssrc
== s
->userdata
->module
->core
->cookie
)
224 pa_log_warn("Detected RTP packet loop!");
226 if (s
->ssrc
!= s
->rtp_context
.ssrc
) {
227 pa_memblock_unref(chunk
.memblock
);
232 /* Check wheter there was a timestamp overflow */
233 k
= (int64_t) s
->rtp_context
.timestamp
- (int64_t) s
->offset
;
234 j
= (int64_t) 0x100000000LL
- (int64_t) s
->offset
+ (int64_t) s
->rtp_context
.timestamp
;
236 if ((k
< 0 ? -k
: k
) < (j
< 0 ? -j
: j
))
241 pa_memblockq_seek(s
->memblockq
, delta
* (int64_t) s
->rtp_context
.frame_size
, PA_SEEK_RELATIVE
);
243 pa_rtclock_get(&now
);
245 pa_smoother_put(s
->smoother
, pa_timeval_load(&now
), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s
->memblockq
), &s
->sink_input
->sample_spec
));
247 if (pa_memblockq_push(s
->memblockq
, &chunk
) < 0) {
248 pa_log_warn("Queue overrun");
249 pa_memblockq_seek(s
->memblockq
, (int64_t) chunk
.length
, PA_SEEK_RELATIVE
);
252 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
254 pa_memblock_unref(chunk
.memblock
);
256 /* The next timestamp we expect */
257 s
->offset
= s
->rtp_context
.timestamp
+ (uint32_t) (chunk
.length
/ s
->rtp_context
.frame_size
);
259 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
261 if (s
->last_rate_update
+ RATE_UPDATE_INTERVAL
< pa_timeval_load(&now
)) {
262 pa_usec_t wi
, ri
, render_delay
, sink_delay
= 0, latency
, fix
;
263 unsigned fix_samples
;
265 pa_log("Updating sample rate");
267 wi
= pa_smoother_get(s
->smoother
, pa_timeval_load(&now
));
268 ri
= pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s
->memblockq
), &s
->sink_input
->sample_spec
);
270 if (PA_MSGOBJECT(s
->sink_input
->sink
)->process_msg(PA_MSGOBJECT(s
->sink_input
->sink
), PA_SINK_MESSAGE_GET_LATENCY
, &sink_delay
, 0, NULL
) < 0)
273 render_delay
= pa_bytes_to_usec(pa_memblockq_get_length(s
->sink_input
->thread_info
.render_memblockq
), &s
->sink_input
->sink
->sample_spec
);
275 if (ri
> render_delay
+sink_delay
)
276 ri
-= render_delay
+sink_delay
;
285 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency
/PA_USEC_PER_MSEC
, (double) s
->intended_latency
/PA_USEC_PER_MSEC
);
287 /* Calculate deviation */
288 if (latency
< s
->intended_latency
)
289 fix
= s
->intended_latency
- latency
;
291 fix
= latency
- s
->intended_latency
;
293 /* How many samples is this per second? */
294 fix_samples
= (unsigned) (fix
* (pa_usec_t
) s
->sink_input
->thread_info
.sample_spec
.rate
/ (pa_usec_t
) RATE_UPDATE_INTERVAL
);
296 /* Check if deviation is in bounds */
297 if (fix_samples
> s
->sink_input
->sample_spec
.rate
*.20)
298 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples
);
301 if (latency
< s
->intended_latency
)
302 s
->sink_input
->sample_spec
.rate
-= fix_samples
;
304 s
->sink_input
->sample_spec
.rate
+= fix_samples
;
306 pa_resampler_set_input_rate(s
->sink_input
->thread_info
.resampler
, s
->sink_input
->sample_spec
.rate
);
308 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s
->sink_input
->sample_spec
.rate
);
310 s
->last_rate_update
= pa_timeval_load(&now
);
313 if (pa_memblockq_is_readable(s
->memblockq
) &&
314 s
->sink_input
->thread_info
.underrun_for
> 0) {
315 pa_log_debug("Requesting rewind due to end of underrun");
316 pa_sink_input_request_rewind(s
->sink_input
, 0, FALSE
, TRUE
, FALSE
);
322 /* Called from I/O thread context */
323 static void sink_input_attach(pa_sink_input
*i
) {
327 pa_sink_input_assert_ref(i
);
328 pa_assert_se(s
= i
->userdata
);
330 pa_assert(!s
->rtpoll_item
);
331 s
->rtpoll_item
= pa_rtpoll_item_new(i
->sink
->rtpoll
, PA_RTPOLL_LATE
, 1);
333 p
= pa_rtpoll_item_get_pollfd(s
->rtpoll_item
, NULL
);
334 p
->fd
= s
->rtp_context
.fd
;
338 pa_rtpoll_item_set_work_callback(s
->rtpoll_item
, rtpoll_work_cb
);
339 pa_rtpoll_item_set_userdata(s
->rtpoll_item
, s
);
342 /* Called from I/O thread context */
343 static void sink_input_detach(pa_sink_input
*i
) {
345 pa_sink_input_assert_ref(i
);
346 pa_assert_se(s
= i
->userdata
);
348 pa_assert(s
->rtpoll_item
);
349 pa_rtpoll_item_free(s
->rtpoll_item
);
350 s
->rtpoll_item
= NULL
;
353 static int mcast_socket(const struct sockaddr
* sa
, socklen_t salen
) {
354 int af
, fd
= -1, r
, one
;
357 pa_assert(salen
> 0);
360 if ((fd
= socket(af
, SOCK_DGRAM
, 0)) < 0) {
361 pa_log("Failed to create socket: %s", pa_cstrerror(errno
));
366 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)) < 0) {
367 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno
));
373 memset(&mr4
, 0, sizeof(mr4
));
374 mr4
.imr_multiaddr
= ((const struct sockaddr_in
*) sa
)->sin_addr
;
375 r
= setsockopt(fd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mr4
, sizeof(mr4
));
378 struct ipv6_mreq mr6
;
379 memset(&mr6
, 0, sizeof(mr6
));
380 mr6
.ipv6mr_multiaddr
= ((const struct sockaddr_in6
*) sa
)->sin6_addr
;
381 r
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
, &mr6
, sizeof(mr6
));
386 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno
));
390 if (bind(fd
, sa
, salen
) < 0) {
391 pa_log("bind() failed: %s", pa_cstrerror(errno
));
404 static struct session
*session_new(struct userdata
*u
, const pa_sdp_info
*sdp_info
) {
405 struct session
*s
= NULL
;
409 pa_sink_input_new_data data
;
415 if (u
->n_sessions
>= MAX_SESSIONS
) {
416 pa_log("Session limit reached.");
420 if (!(sink
= pa_namereg_get(u
->module
->core
, u
->sink_name
, PA_NAMEREG_SINK
))) {
421 pa_log("Sink does not exist.");
425 pa_rtclock_get(&now
);
427 s
= pa_xnew0(struct session
, 1);
429 s
->first_packet
= FALSE
;
430 s
->sdp_info
= *sdp_info
;
431 s
->rtpoll_item
= NULL
;
432 s
->intended_latency
= LATENCY_USEC
;
433 s
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
*5, PA_USEC_PER_SEC
*2, TRUE
, 10);
434 pa_smoother_set_time_offset(s
->smoother
, pa_timeval_load(&now
));
435 s
->last_rate_update
= pa_timeval_load(&now
);
436 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
438 if ((fd
= mcast_socket((const struct sockaddr
*) &sdp_info
->sa
, sdp_info
->salen
)) < 0)
441 pa_sink_input_new_data_init(&data
);
443 data
.driver
= __FILE__
;
444 pa_proplist_sets(data
.proplist
, PA_PROP_MEDIA_ROLE
, "stream");
445 pa_proplist_setf(data
.proplist
, PA_PROP_MEDIA_NAME
,
447 sdp_info
->session_name
? " (" : "",
448 sdp_info
->session_name
? sdp_info
->session_name
: "",
449 sdp_info
->session_name
? ")" : "");
451 if (sdp_info
->session_name
)
452 pa_proplist_sets(data
.proplist
, "rtp.session", sdp_info
->session_name
);
453 pa_proplist_sets(data
.proplist
, "rtp.origin", sdp_info
->origin
);
454 pa_proplist_setf(data
.proplist
, "rtp.payload", "%u", (unsigned) sdp_info
->payload
);
455 data
.module
= u
->module
;
456 pa_sink_input_new_data_set_sample_spec(&data
, &sdp_info
->sample_spec
);
458 pa_sink_input_new(&s
->sink_input
, u
->module
->core
, &data
, PA_SINK_INPUT_VARIABLE_RATE
);
459 pa_sink_input_new_data_done(&data
);
461 if (!s
->sink_input
) {
462 pa_log("Failed to create sink input.");
466 s
->sink_input
->userdata
= s
;
468 s
->sink_input
->parent
.process_msg
= sink_input_process_msg
;
469 s
->sink_input
->pop
= sink_input_pop_cb
;
470 s
->sink_input
->process_rewind
= sink_input_process_rewind_cb
;
471 s
->sink_input
->update_max_rewind
= sink_input_update_max_rewind_cb
;
472 s
->sink_input
->kill
= sink_input_kill
;
473 s
->sink_input
->attach
= sink_input_attach
;
474 s
->sink_input
->detach
= sink_input_detach
;
476 pa_sink_input_get_silence(s
->sink_input
, &silence
);
478 s
->sink_latency
= pa_sink_input_set_requested_latency(s
->sink_input
, s
->intended_latency
/2);
480 if (s
->intended_latency
< s
->sink_latency
*2)
481 s
->intended_latency
= s
->sink_latency
*2;
483 s
->memblockq
= pa_memblockq_new(
487 pa_frame_size(&s
->sink_input
->sample_spec
),
488 pa_usec_to_bytes(s
->intended_latency
- s
->sink_latency
, &s
->sink_input
->sample_spec
),
493 pa_memblock_unref(silence
.memblock
);
495 pa_rtp_context_init_recv(&s
->rtp_context
, fd
, pa_frame_size(&s
->sdp_info
.sample_spec
));
497 pa_hashmap_put(s
->userdata
->by_origin
, s
->sdp_info
.origin
, s
);
499 PA_LLIST_PREPEND(struct session
, s
->userdata
->sessions
, s
);
501 pa_sink_input_put(s
->sink_input
);
503 pa_log_info("New session '%s'", s
->sdp_info
.session_name
);
516 static void session_free(struct session
*s
) {
519 pa_log_info("Freeing session '%s'", s
->sdp_info
.session_name
);
521 pa_sink_input_unlink(s
->sink_input
);
522 pa_sink_input_unref(s
->sink_input
);
524 PA_LLIST_REMOVE(struct session
, s
->userdata
->sessions
, s
);
525 pa_assert(s
->userdata
->n_sessions
>= 1);
526 s
->userdata
->n_sessions
--;
527 pa_hashmap_remove(s
->userdata
->by_origin
, s
->sdp_info
.origin
);
529 pa_memblockq_free(s
->memblockq
);
530 pa_sdp_info_destroy(&s
->sdp_info
);
531 pa_rtp_context_destroy(&s
->rtp_context
);
533 pa_smoother_free(s
->smoother
);
538 static void sap_event_cb(pa_mainloop_api
*m
, pa_io_event
*e
, int fd
, pa_io_event_flags_t flags
, void *userdata
) {
539 struct userdata
*u
= userdata
;
540 pa_bool_t goodbye
= FALSE
;
547 pa_assert(fd
== u
->sap_context
.fd
);
548 pa_assert(flags
== PA_IO_EVENT_INPUT
);
550 if (pa_sap_recv(&u
->sap_context
, &goodbye
) < 0)
553 if (!pa_sdp_parse(u
->sap_context
.sdp_data
, &info
, goodbye
))
558 if ((s
= pa_hashmap_get(u
->by_origin
, info
.origin
)))
561 pa_sdp_info_destroy(&info
);
564 if (!(s
= pa_hashmap_get(u
->by_origin
, info
.origin
))) {
565 if (!session_new(u
, &info
))
566 pa_sdp_info_destroy(&info
);
570 pa_rtclock_get(&now
);
571 pa_atomic_store(&s
->timestamp
, (int) now
.tv_sec
);
573 pa_sdp_info_destroy(&info
);
578 static void check_death_event_cb(pa_mainloop_api
*m
, pa_time_event
*t
, const struct timeval
*ptv
, void *userdata
) {
579 struct session
*s
, *n
;
580 struct userdata
*u
= userdata
;
589 pa_rtclock_get(&now
);
591 pa_log_debug("Checking for dead streams ...");
593 for (s
= u
->sessions
; s
; s
= n
) {
597 k
= pa_atomic_load(&s
->timestamp
);
599 if (k
+ DEATH_TIMEOUT
< now
.tv_sec
)
604 pa_gettimeofday(&tv
);
605 pa_timeval_add(&tv
, DEATH_TIMEOUT
*PA_USEC_PER_SEC
);
606 m
->time_restart(t
, &tv
);
609 int pa__init(pa_module
*m
) {
611 pa_modargs
*ma
= NULL
;
612 struct sockaddr_in sa4
;
614 struct sockaddr_in6 sa6
;
618 const char *sap_address
;
624 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
625 pa_log("failed to parse module arguments");
629 sap_address
= pa_modargs_get_value(ma
, "sap_address", DEFAULT_SAP_ADDRESS
);
631 if (inet_pton(AF_INET
, sap_address
, &sa4
.sin_addr
) > 0) {
632 sa4
.sin_family
= AF_INET
;
633 sa4
.sin_port
= htons(SAP_PORT
);
634 sa
= (struct sockaddr
*) &sa4
;
637 } else if (inet_pton(AF_INET6
, sap_address
, &sa6
.sin6_addr
) > 0) {
638 sa6
.sin6_family
= AF_INET6
;
639 sa6
.sin6_port
= htons(SAP_PORT
);
640 sa
= (struct sockaddr
*) &sa6
;
644 pa_log("Invalid SAP address '%s'", sap_address
);
648 if ((fd
= mcast_socket(sa
, salen
)) < 0)
651 u
= pa_xnew(struct userdata
, 1);
654 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
656 u
->sap_event
= m
->core
->mainloop
->io_new(m
->core
->mainloop
, fd
, PA_IO_EVENT_INPUT
, sap_event_cb
, u
);
657 pa_sap_context_init_recv(&u
->sap_context
, fd
);
659 PA_LLIST_HEAD_INIT(struct session
, u
->sessions
);
661 u
->by_origin
= pa_hashmap_new(pa_idxset_string_hash_func
, pa_idxset_string_compare_func
);
663 pa_gettimeofday(&tv
);
664 pa_timeval_add(&tv
, DEATH_TIMEOUT
* PA_USEC_PER_SEC
);
665 u
->check_death_event
= m
->core
->mainloop
->time_new(m
->core
->mainloop
, &tv
, check_death_event_cb
, u
);
681 void pa__done(pa_module
*m
) {
687 if (!(u
= m
->userdata
))
691 m
->core
->mainloop
->io_free(u
->sap_event
);
693 if (u
->check_death_event
)
694 m
->core
->mainloop
->time_free(u
->check_death_event
);
696 pa_sap_context_destroy(&u
->sap_context
);
699 while ((s
= pa_hashmap_first(u
->by_origin
)))
702 pa_hashmap_free(u
->by_origin
, NULL
, NULL
);
705 pa_xfree(u
->sink_name
);