2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/ioctl.h>
41 #ifdef HAVE_LINUX_SOCKIOS_H
42 #include <linux/sockios.h>
45 #include <pulse/xmalloc.h>
46 #include <pulse/timeval.h>
48 #include <pulsecore/core-error.h>
49 #include <pulsecore/iochannel.h>
50 #include <pulsecore/sink.h>
51 #include <pulsecore/module.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/log.h>
55 #include <pulsecore/socket-client.h>
56 #include <pulsecore/authkey.h>
57 #include <pulsecore/thread-mq.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/time-smoother.h>
60 #include <pulsecore/rtclock.h>
61 #include <pulsecore/socket-util.h>
63 #include "module-raop-sink-symdef.h"
67 #include "raop_client.h"
69 PA_MODULE_AUTHOR("Colin Guthrie");
70 PA_MODULE_DESCRIPTION("RAOP Sink");
71 PA_MODULE_VERSION(PACKAGE_VERSION
);
72 PA_MODULE_LOAD_ONCE(FALSE
);
74 "sink_name=<name for the sink> "
75 "sink_properties=<properties for the sink> "
77 "format=<sample format> "
79 "channels=<number of channels>");
81 #define DEFAULT_SINK_NAME "raop"
88 pa_thread_mq thread_mq
;
90 pa_rtpoll_item
*rtpoll_item
;
93 pa_memchunk raw_memchunk
;
94 pa_memchunk encoded_memchunk
;
97 size_t write_length
, write_index
;
100 size_t read_length
, read_index
;
104 /*esd_format_t format;*/
107 pa_smoother
*smoother
;
111 int64_t encoding_overhead
;
112 int32_t next_encoding_overhead
;
113 double encoding_ratio
;
115 pa_raop_client
*raop
;
120 static const char* const valid_modargs
[] = {
127 "description", /* supported for compatibility reasons, made redundant by sink_properties= */
132 SINK_MESSAGE_PASS_SOCKET
= PA_SINK_MESSAGE_MAX
,
133 SINK_MESSAGE_RIP_SOCKET
136 /* Forward declaration */
137 static void sink_set_volume_cb(pa_sink
*);
139 static void on_connection(int fd
, void*userdata
) {
141 socklen_t sl
= sizeof(int);
142 struct userdata
*u
= userdata
;
145 pa_assert(u
->fd
< 0);
148 if (getsockopt(u
->fd
, SOL_SOCKET
, SO_SNDBUF
, &so_sndbuf
, &sl
) < 0)
149 pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno
));
151 pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf
);
152 pa_sink_set_max_request(u
->sink
, PA_MAX((size_t) so_sndbuf
, u
->block_size
));
155 /* Set the initial volume */
156 sink_set_volume_cb(u
->sink
);
158 pa_log_debug("Connection authenticated, handing fd to IO thread...");
160 pa_asyncmsgq_post(u
->thread_mq
.inq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_PASS_SOCKET
, NULL
, 0, NULL
, NULL
);
163 static void on_close(void*userdata
) {
164 struct userdata
*u
= userdata
;
167 pa_log_debug("Connection closed, informing IO thread...");
169 pa_asyncmsgq_post(u
->thread_mq
.inq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_RIP_SOCKET
, NULL
, 0, NULL
, NULL
);
172 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
173 struct userdata
*u
= PA_SINK(o
)->userdata
;
177 case PA_SINK_MESSAGE_SET_STATE
:
179 switch ((pa_sink_state_t
) PA_PTR_TO_UINT(data
)) {
181 case PA_SINK_SUSPENDED
:
182 pa_assert(PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
));
184 pa_smoother_pause(u
->smoother
, pa_rtclock_usec());
186 /* Issue a FLUSH if we are connected */
188 pa_raop_flush(u
->raop
);
193 case PA_SINK_RUNNING
:
195 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
196 pa_smoother_resume(u
->smoother
, pa_rtclock_usec(), TRUE
);
198 /* The connection can be closed when idle, so check to
199 see if we need to reestablish it */
201 pa_raop_connect(u
->raop
);
203 pa_raop_flush(u
->raop
);
208 case PA_SINK_UNLINKED
:
210 case PA_SINK_INVALID_STATE
:
216 case PA_SINK_MESSAGE_GET_LATENCY
: {
219 r
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
220 w
= pa_bytes_to_usec((u
->offset
- u
->encoding_overhead
+ (u
->encoded_memchunk
.length
/ u
->encoding_ratio
)), &u
->sink
->sample_spec
);
222 *((pa_usec_t
*) data
) = w
> r
? w
- r
: 0;
226 case SINK_MESSAGE_PASS_SOCKET
: {
227 struct pollfd
*pollfd
;
229 pa_assert(!u
->rtpoll_item
);
231 u
->rtpoll_item
= pa_rtpoll_item_new(u
->rtpoll
, PA_RTPOLL_NEVER
, 1);
232 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
234 pollfd
->events
= POLLOUT
;
235 /*pollfd->events = */pollfd
->revents
= 0;
237 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
238 /* Our stream has been suspended so we just flush it.... */
239 pa_raop_flush(u
->raop
);
244 case SINK_MESSAGE_RIP_SOCKET
: {
245 pa_assert(u
->fd
>= 0);
250 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
252 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
255 pa_rtpoll_item_free(u
->rtpoll_item
);
256 u
->rtpoll_item
= NULL
;
258 /* Quesiton: is this valid here: or should we do some sort of:
259 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
261 pa_module_unload_request(u
->module
, TRUE
);
267 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
270 static void sink_set_volume_cb(pa_sink
*s
) {
271 struct userdata
*u
= s
->userdata
;
274 char t
[PA_CVOLUME_SNPRINT_MAX
];
278 /* If we're muted we don't need to do anything */
282 /* Calculate the max volume of all channels.
283 We'll use this as our (single) volume on the APEX device and emulate
284 any variation in channel volumes in software */
285 v
= pa_cvolume_max(&s
->virtual_volume
);
287 /* Create a pa_cvolume version of our single value */
288 pa_cvolume_set(&hw
, s
->sample_spec
.channels
, v
);
290 /* Perform any software manipulation of the volume needed */
291 pa_sw_cvolume_divide(&s
->soft_volume
, &s
->virtual_volume
, &hw
);
293 pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t
, sizeof(t
), &s
->virtual_volume
));
294 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t
, sizeof(t
), &hw
));
295 pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t
, sizeof(t
), &s
->soft_volume
));
297 /* Any necessary software volume manipulateion is done so set
298 our hw volume (or v as a single value) on the device */
299 pa_raop_client_set_volume(u
->raop
, v
);
302 static void sink_set_mute_cb(pa_sink
*s
) {
303 struct userdata
*u
= s
->userdata
;
308 pa_raop_client_set_volume(u
->raop
, PA_VOLUME_MUTED
);
310 sink_set_volume_cb(s
);
314 static void thread_func(void *userdata
) {
315 struct userdata
*u
= userdata
;
318 uint32_t silence_overhead
= 0;
319 double silence_ratio
= 0;
323 pa_log_debug("Thread starting up");
325 pa_thread_mq_install(&u
->thread_mq
);
326 pa_rtpoll_install(u
->rtpoll
);
328 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
330 /* Create a chunk of memory that is our encoded silence sample. */
331 pa_memchunk_reset(&silence
);
336 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
337 if (u
->sink
->thread_info
.rewind_requested
)
338 pa_sink_process_rewind(u
->sink
, 0);
340 if (u
->rtpoll_item
) {
341 struct pollfd
*pollfd
;
342 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
344 /* Render some data and write it to the fifo */
345 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd
->revents
) {
350 if (!silence
.memblock
) {
351 pa_memchunk silence_tmp
;
353 pa_memchunk_reset(&silence_tmp
);
354 silence_tmp
.memblock
= pa_memblock_new(u
->core
->mempool
, 4096);
355 silence_tmp
.length
= 4096;
356 p
= pa_memblock_acquire(silence_tmp
.memblock
);
358 pa_memblock_release(silence_tmp
.memblock
);
359 pa_raop_client_encode_sample(u
->raop
, &silence_tmp
, &silence
);
360 pa_assert(0 == silence_tmp
.length
);
361 silence_overhead
= silence_tmp
.length
- 4096;
362 silence_ratio
= silence_tmp
.length
/ 4096;
363 pa_memblock_unref(silence_tmp
.memblock
);
369 if (u
->encoded_memchunk
.length
<= 0) {
370 if (u
->encoded_memchunk
.memblock
)
371 pa_memblock_unref(u
->encoded_memchunk
.memblock
);
372 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
)) {
375 /* We render real data */
376 if (u
->raw_memchunk
.length
<= 0) {
377 if (u
->raw_memchunk
.memblock
)
378 pa_memblock_unref(u
->raw_memchunk
.memblock
);
379 pa_memchunk_reset(&u
->raw_memchunk
);
381 /* Grab unencoded data */
382 pa_sink_render(u
->sink
, u
->block_size
, &u
->raw_memchunk
);
384 pa_assert(u
->raw_memchunk
.length
> 0);
387 rl
= u
->raw_memchunk
.length
;
388 u
->encoding_overhead
+= u
->next_encoding_overhead
;
389 pa_raop_client_encode_sample(u
->raop
, &u
->raw_memchunk
, &u
->encoded_memchunk
);
390 u
->next_encoding_overhead
= (u
->encoded_memchunk
.length
- (rl
- u
->raw_memchunk
.length
));
391 u
->encoding_ratio
= u
->encoded_memchunk
.length
/ (rl
- u
->raw_memchunk
.length
);
393 /* We render some silence into our memchunk */
394 memcpy(&u
->encoded_memchunk
, &silence
, sizeof(pa_memchunk
));
395 pa_memblock_ref(silence
.memblock
);
397 /* Calculate/store some values to be used with the smoother */
398 u
->next_encoding_overhead
= silence_overhead
;
399 u
->encoding_ratio
= silence_ratio
;
402 pa_assert(u
->encoded_memchunk
.length
> 0);
404 p
= pa_memblock_acquire(u
->encoded_memchunk
.memblock
);
405 l
= pa_write(u
->fd
, (uint8_t*) p
+ u
->encoded_memchunk
.index
, u
->encoded_memchunk
.length
, &write_type
);
406 pa_memblock_release(u
->encoded_memchunk
.memblock
);
414 else if (errno
== EAGAIN
) {
416 /* OK, we filled all socket buffers up
421 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno
));
428 u
->encoded_memchunk
.index
+= l
;
429 u
->encoded_memchunk
.length
-= l
;
433 if (u
->encoded_memchunk
.length
> 0) {
434 /* we've completely written the encoded data, so update our overhead */
435 u
->encoding_overhead
+= u
->next_encoding_overhead
;
437 /* OK, we wrote less that we asked for,
438 * hence we can assume that the socket
439 * buffers are full now */
447 /* At this spot we know that the socket buffers are
448 * fully filled up. This is the best time to estimate
449 * the playback position of the server */
451 n
= u
->offset
- u
->encoding_overhead
;
456 if (ioctl(u
->fd
, SIOCOUTQ
, &l
) >= 0 && l
> 0)
457 n
-= (l
/ u
->encoding_ratio
);
461 usec
= pa_bytes_to_usec(n
, &u
->sink
->sample_spec
);
463 if (usec
> u
->latency
)
468 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), usec
);
471 /* Hmm, nothing to do. Let's sleep */
472 pollfd
->events
= POLLOUT
; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
475 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
481 if (u
->rtpoll_item
) {
482 struct pollfd
* pollfd
;
484 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
486 if (pollfd
->revents
& ~POLLOUT
) {
487 if (u
->sink
->thread_info
.state
!= PA_SINK_SUSPENDED
) {
488 pa_log("FIFO shutdown.");
492 /* We expect this to happen on occasion if we are not sending data.
493 It's perfectly natural and normal and natural */
495 pa_rtpoll_item_free(u
->rtpoll_item
);
496 u
->rtpoll_item
= NULL
;
502 /* If this was no regular exit from the loop we have to continue
503 * processing messages until we received PA_MESSAGE_SHUTDOWN */
504 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
505 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
508 if (silence
.memblock
)
509 pa_memblock_unref(silence
.memblock
);
510 pa_log_debug("Thread shutting down");
513 int pa__init(pa_module
*m
) {
514 struct userdata
*u
= NULL
;
516 pa_modargs
*ma
= NULL
;
517 const char *server
, *desc
;
518 pa_sink_new_data data
;
522 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
523 pa_log("failed to parse module arguments");
527 ss
= m
->core
->default_sample_spec
;
528 if (pa_modargs_get_sample_spec(ma
, &ss
) < 0) {
529 pa_log("invalid sample format specification");
533 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss
.format
!= PA_SAMPLE_S16NE
) ||
535 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
539 u
= pa_xnew0(struct userdata
, 1);
544 u
->smoother
= pa_smoother_new(
552 pa_memchunk_reset(&u
->raw_memchunk
);
553 pa_memchunk_reset(&u
->encoded_memchunk
);
555 u
->encoding_overhead
= 0;
556 u
->next_encoding_overhead
= 0;
557 u
->encoding_ratio
= 1.0;
559 u
->rtpoll
= pa_rtpoll_new();
560 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
561 u
->rtpoll_item
= NULL
;
564 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
565 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
567 u
->block_size
= pa_usec_to_bytes(PA_USEC_PER_SEC
/20, &ss
);
569 u
->read_data
= u
->write_data
= NULL
;
570 u
->read_index
= u
->write_index
= u
->read_length
= u
->write_length
= 0;
572 /*u->state = STATE_AUTH;*/
575 if (!(server
= pa_modargs_get_value(ma
, "server", NULL
))) {
576 pa_log("No server argument given.");
580 pa_sink_new_data_init(&data
);
581 data
.driver
= __FILE__
;
583 pa_sink_new_data_set_name(&data
, pa_modargs_get_value(ma
, "sink_name", DEFAULT_SINK_NAME
));
584 pa_sink_new_data_set_sample_spec(&data
, &ss
);
585 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_STRING
, server
);
586 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_INTENDED_ROLES
, "music");
587 if ((desc
= pa_modargs_get_value(ma
, "description", NULL
)))
588 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, desc
);
590 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "RAOP sink '%s'", server
);
592 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
593 pa_log("Invalid properties");
594 pa_sink_new_data_done(&data
);
598 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_LATENCY
|PA_SINK_NETWORK
);
599 pa_sink_new_data_done(&data
);
602 pa_log("Failed to create sink.");
606 u
->sink
->parent
.process_msg
= sink_process_msg
;
607 u
->sink
->userdata
= u
;
608 u
->sink
->set_volume
= sink_set_volume_cb
;
609 u
->sink
->set_mute
= sink_set_mute_cb
;
610 u
->sink
->flags
= PA_SINK_LATENCY
|PA_SINK_NETWORK
|PA_SINK_HW_VOLUME_CTRL
;
612 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
613 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
615 if (!(u
->raop
= pa_raop_client_new(u
->core
, server
))) {
616 pa_log("Failed to connect to server.");
620 pa_raop_client_set_callback(u
->raop
, on_connection
, u
);
621 pa_raop_client_set_closed_callback(u
->raop
, on_close
, u
);
623 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
624 pa_log("Failed to create thread.");
628 pa_sink_put(u
->sink
);
643 int pa__get_n_used(pa_module
*m
) {
647 pa_assert_se(u
= m
->userdata
);
649 return pa_sink_linked_by(u
->sink
);
652 void pa__done(pa_module
*m
) {
656 if (!(u
= m
->userdata
))
660 pa_sink_unlink(u
->sink
);
663 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
664 pa_thread_free(u
->thread
);
667 pa_thread_mq_done(&u
->thread_mq
);
670 pa_sink_unref(u
->sink
);
673 pa_rtpoll_item_free(u
->rtpoll_item
);
676 pa_rtpoll_free(u
->rtpoll
);
678 if (u
->raw_memchunk
.memblock
)
679 pa_memblock_unref(u
->raw_memchunk
.memblock
);
681 if (u
->encoded_memchunk
.memblock
)
682 pa_memblock_unref(u
->encoded_memchunk
.memblock
);
685 pa_raop_client_free(u
->raop
);
687 pa_xfree(u
->read_data
);
688 pa_xfree(u
->write_data
);
691 pa_smoother_free(u
->smoother
);