2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/ioctl.h>
41 #ifdef HAVE_LINUX_SOCKIOS_H
42 #include <linux/sockios.h>
45 #include <pulse/xmalloc.h>
46 #include <pulse/timeval.h>
48 #include <pulsecore/core-error.h>
49 #include <pulsecore/iochannel.h>
50 #include <pulsecore/sink.h>
51 #include <pulsecore/module.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/log.h>
55 #include <pulsecore/socket-client.h>
56 #include <pulsecore/authkey.h>
57 #include <pulsecore/thread-mq.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/time-smoother.h>
60 #include <pulsecore/rtclock.h>
61 #include <pulsecore/socket-util.h>
63 #include "module-raop-sink-symdef.h"
67 #include "raop_client.h"
69 PA_MODULE_AUTHOR("Colin Guthrie");
70 PA_MODULE_DESCRIPTION("RAOP Sink");
71 PA_MODULE_VERSION(PACKAGE_VERSION
);
72 PA_MODULE_LOAD_ONCE(FALSE
);
74 "sink_name=<name for the sink> "
75 "description=<description for the sink> "
77 "format=<sample format> "
78 "channels=<number of channels> "
79 "rate=<sample rate>");
81 #define DEFAULT_SINK_NAME "raop"
88 pa_thread_mq thread_mq
;
90 pa_rtpoll_item
*rtpoll_item
;
93 pa_memchunk raw_memchunk
;
94 pa_memchunk encoded_memchunk
;
97 size_t write_length
, write_index
;
100 size_t read_length
, read_index
;
104 /*esd_format_t format;*/
107 pa_smoother
*smoother
;
111 int64_t encoding_overhead
;
112 int32_t next_encoding_overhead
;
113 double encoding_ratio
;
115 pa_raop_client
*raop
;
120 static const char* const valid_modargs
[] = {
131 SINK_MESSAGE_PASS_SOCKET
= PA_SINK_MESSAGE_MAX
,
132 SINK_MESSAGE_RIP_SOCKET
135 /* Forward declaration */
136 static void sink_set_volume_cb(pa_sink
*);
138 static void on_connection(PA_GCC_UNUSED
int fd
, void*userdata
) {
139 struct userdata
*u
= userdata
;
142 pa_assert(u
->fd
< 0);
145 /* Set the initial volume */
146 sink_set_volume_cb(u
->sink
);
148 pa_log_debug("Connection authenticated, handing fd to IO thread...");
150 pa_asyncmsgq_post(u
->thread_mq
.inq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_PASS_SOCKET
, NULL
, 0, NULL
, NULL
);
153 static void on_close(void*userdata
) {
154 struct userdata
*u
= userdata
;
157 pa_log_debug("Connection closed, informing IO thread...");
159 pa_asyncmsgq_post(u
->thread_mq
.inq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_RIP_SOCKET
, NULL
, 0, NULL
, NULL
);
162 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
163 struct userdata
*u
= PA_SINK(o
)->userdata
;
167 case PA_SINK_MESSAGE_SET_STATE
:
169 switch ((pa_sink_state_t
) PA_PTR_TO_UINT(data
)) {
171 case PA_SINK_SUSPENDED
:
172 pa_assert(PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
));
174 pa_smoother_pause(u
->smoother
, pa_rtclock_usec());
176 /* Issue a FLUSH if we are connected */
178 pa_raop_flush(u
->raop
);
183 case PA_SINK_RUNNING
:
185 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
186 pa_smoother_resume(u
->smoother
, pa_rtclock_usec());
188 /* The connection can be closed when idle, so check to
189 see if we need to reestablish it */
191 pa_raop_connect(u
->raop
);
193 pa_raop_flush(u
->raop
);
198 case PA_SINK_UNLINKED
:
200 case PA_SINK_INVALID_STATE
:
206 case PA_SINK_MESSAGE_GET_LATENCY
: {
209 r
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
210 w
= pa_bytes_to_usec((u
->offset
- u
->encoding_overhead
+ (u
->encoded_memchunk
.length
/ u
->encoding_ratio
)), &u
->sink
->sample_spec
);
212 *((pa_usec_t
*) data
) = w
> r
? w
- r
: 0;
216 case SINK_MESSAGE_PASS_SOCKET
: {
217 struct pollfd
*pollfd
;
219 pa_assert(!u
->rtpoll_item
);
221 u
->rtpoll_item
= pa_rtpoll_item_new(u
->rtpoll
, PA_RTPOLL_NEVER
, 1);
222 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
224 pollfd
->events
= POLLOUT
;
225 /*pollfd->events = */pollfd
->revents
= 0;
227 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
228 /* Our stream has been suspended so we just flush it.... */
229 pa_raop_flush(u
->raop
);
234 case SINK_MESSAGE_RIP_SOCKET
: {
235 pa_assert(u
->fd
>= 0);
240 if (u
->sink
->thread_info
.state
== PA_SINK_SUSPENDED
) {
242 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
245 pa_rtpoll_item_free(u
->rtpoll_item
);
246 u
->rtpoll_item
= NULL
;
248 /* Quesiton: is this valid here: or should we do some sort of:
249 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
251 pa_module_unload_request(u
->module
, TRUE
);
257 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
260 static void sink_set_volume_cb(pa_sink
*s
) {
261 struct userdata
*u
= s
->userdata
;
264 char t
[PA_CVOLUME_SNPRINT_MAX
];
268 /* If we're muted we don't need to do anything */
272 /* Calculate the max volume of all channels.
273 We'll use this as our (single) volume on the APEX device and emulate
274 any variation in channel volumes in software */
275 v
= pa_cvolume_max(&s
->virtual_volume
);
277 /* Create a pa_cvolume version of our single value */
278 pa_cvolume_set(&hw
, s
->sample_spec
.channels
, v
);
280 /* Perform any software manipulation of the volume needed */
281 pa_sw_cvolume_divide(&s
->soft_volume
, &s
->virtual_volume
, &hw
);
283 pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t
, sizeof(t
), &s
->virtual_volume
));
284 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t
, sizeof(t
), &hw
));
285 pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t
, sizeof(t
), &s
->soft_volume
));
287 /* Any necessary software volume manipulateion is done so set
288 our hw volume (or v as a single value) on the device */
289 pa_raop_client_set_volume(u
->raop
, v
);
292 static void sink_set_mute_cb(pa_sink
*s
) {
293 struct userdata
*u
= s
->userdata
;
298 pa_raop_client_set_volume(u
->raop
, PA_VOLUME_MUTED
);
300 sink_set_volume_cb(s
);
304 static void thread_func(void *userdata
) {
305 struct userdata
*u
= userdata
;
308 uint32_t silence_overhead
= 0;
309 double silence_ratio
= 0;
313 pa_log_debug("Thread starting up");
315 pa_thread_mq_install(&u
->thread_mq
);
316 pa_rtpoll_install(u
->rtpoll
);
318 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
320 /* Create a chunk of memory that is our encoded silence sample. */
321 pa_memchunk_reset(&silence
);
326 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
327 if (u
->sink
->thread_info
.rewind_requested
)
328 pa_sink_process_rewind(u
->sink
, 0);
330 if (u
->rtpoll_item
) {
331 struct pollfd
*pollfd
;
332 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
334 /* Render some data and write it to the fifo */
335 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd
->revents
) {
340 if (!silence
.memblock
) {
341 pa_memchunk silence_tmp
;
343 pa_memchunk_reset(&silence_tmp
);
344 silence_tmp
.memblock
= pa_memblock_new(u
->core
->mempool
, 4096);
345 silence_tmp
.length
= 4096;
346 p
= pa_memblock_acquire(silence_tmp
.memblock
);
348 pa_memblock_release(silence_tmp
.memblock
);
349 pa_raop_client_encode_sample(u
->raop
, &silence_tmp
, &silence
);
350 pa_assert(0 == silence_tmp
.length
);
351 silence_overhead
= silence_tmp
.length
- 4096;
352 silence_ratio
= silence_tmp
.length
/ 4096;
353 pa_memblock_unref(silence_tmp
.memblock
);
359 if (u
->encoded_memchunk
.length
<= 0) {
360 if (u
->encoded_memchunk
.memblock
)
361 pa_memblock_unref(u
->encoded_memchunk
.memblock
);
362 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
)) {
365 /* We render real data */
366 if (u
->raw_memchunk
.length
<= 0) {
367 if (u
->raw_memchunk
.memblock
)
368 pa_memblock_unref(u
->raw_memchunk
.memblock
);
369 pa_memchunk_reset(&u
->raw_memchunk
);
371 /* Grab unencoded data */
372 pa_sink_render(u
->sink
, u
->block_size
, &u
->raw_memchunk
);
374 pa_assert(u
->raw_memchunk
.length
> 0);
377 rl
= u
->raw_memchunk
.length
;
378 u
->encoding_overhead
+= u
->next_encoding_overhead
;
379 pa_raop_client_encode_sample(u
->raop
, &u
->raw_memchunk
, &u
->encoded_memchunk
);
380 u
->next_encoding_overhead
= (u
->encoded_memchunk
.length
- (rl
- u
->raw_memchunk
.length
));
381 u
->encoding_ratio
= u
->encoded_memchunk
.length
/ (rl
- u
->raw_memchunk
.length
);
383 /* We render some silence into our memchunk */
384 memcpy(&u
->encoded_memchunk
, &silence
, sizeof(pa_memchunk
));
385 pa_memblock_ref(silence
.memblock
);
387 /* Calculate/store some values to be used with the smoother */
388 u
->next_encoding_overhead
= silence_overhead
;
389 u
->encoding_ratio
= silence_ratio
;
392 pa_assert(u
->encoded_memchunk
.length
> 0);
394 p
= pa_memblock_acquire(u
->encoded_memchunk
.memblock
);
395 l
= pa_write(u
->fd
, (uint8_t*) p
+ u
->encoded_memchunk
.index
, u
->encoded_memchunk
.length
, &write_type
);
396 pa_memblock_release(u
->encoded_memchunk
.memblock
);
404 else if (errno
== EAGAIN
) {
406 /* OK, we filled all socket buffers up
411 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno
));
418 u
->encoded_memchunk
.index
+= l
;
419 u
->encoded_memchunk
.length
-= l
;
423 if (u
->encoded_memchunk
.length
> 0) {
424 /* we've completely written the encoded data, so update our overhead */
425 u
->encoding_overhead
+= u
->next_encoding_overhead
;
427 /* OK, we wrote less that we asked for,
428 * hence we can assume that the socket
429 * buffers are full now */
437 /* At this spot we know that the socket buffers are
438 * fully filled up. This is the best time to estimate
439 * the playback position of the server */
441 n
= u
->offset
- u
->encoding_overhead
;
446 if (ioctl(u
->fd
, SIOCOUTQ
, &l
) >= 0 && l
> 0)
447 n
-= (l
/ u
->encoding_ratio
);
451 usec
= pa_bytes_to_usec(n
, &u
->sink
->sample_spec
);
453 if (usec
> u
->latency
)
458 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), usec
);
461 /* Hmm, nothing to do. Let's sleep */
462 pollfd
->events
= POLLOUT
; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
465 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
471 if (u
->rtpoll_item
) {
472 struct pollfd
* pollfd
;
474 pollfd
= pa_rtpoll_item_get_pollfd(u
->rtpoll_item
, NULL
);
476 if (pollfd
->revents
& ~POLLOUT
) {
477 if (u
->sink
->thread_info
.state
!= PA_SINK_SUSPENDED
) {
478 pa_log("FIFO shutdown.");
482 /* We expect this to happen on occasion if we are not sending data.
483 It's perfectly natural and normal and natural */
485 pa_rtpoll_item_free(u
->rtpoll_item
);
486 u
->rtpoll_item
= NULL
;
492 /* If this was no regular exit from the loop we have to continue
493 * processing messages until we received PA_MESSAGE_SHUTDOWN */
494 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
495 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
498 if (silence
.memblock
)
499 pa_memblock_unref(silence
.memblock
);
500 pa_log_debug("Thread shutting down");
503 int pa__init(pa_module
*m
) {
504 struct userdata
*u
= NULL
;
506 pa_modargs
*ma
= NULL
;
507 const char *server
, *desc
;
508 pa_sink_new_data data
;
512 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
513 pa_log("failed to parse module arguments");
517 ss
= m
->core
->default_sample_spec
;
518 if (pa_modargs_get_sample_spec(ma
, &ss
) < 0) {
519 pa_log("invalid sample format specification");
523 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss
.format
!= PA_SAMPLE_S16NE
) ||
525 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
529 u
= pa_xnew0(struct userdata
, 1);
534 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
535 pa_memchunk_reset(&u
->raw_memchunk
);
536 pa_memchunk_reset(&u
->encoded_memchunk
);
538 u
->encoding_overhead
= 0;
539 u
->next_encoding_overhead
= 0;
540 u
->encoding_ratio
= 1.0;
542 u
->rtpoll
= pa_rtpoll_new();
543 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
544 u
->rtpoll_item
= NULL
;
547 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
548 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
550 u
->block_size
= pa_usec_to_bytes(PA_USEC_PER_SEC
/20, &ss
);
552 u
->read_data
= u
->write_data
= NULL
;
553 u
->read_index
= u
->write_index
= u
->read_length
= u
->write_length
= 0;
555 /*u->state = STATE_AUTH;*/
558 if (!(server
= pa_modargs_get_value(ma
, "server", NULL
))) {
559 pa_log("No server argument given.");
563 pa_sink_new_data_init(&data
);
564 data
.driver
= __FILE__
;
566 pa_sink_new_data_set_name(&data
, pa_modargs_get_value(ma
, "sink_name", DEFAULT_SINK_NAME
));
567 pa_sink_new_data_set_sample_spec(&data
, &ss
);
568 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_STRING
, server
);
569 if ((desc
= pa_modargs_get_value(ma
, "description", NULL
)))
570 pa_proplist_sets(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, desc
);
572 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "RAOP sink '%s'", server
);
574 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_LATENCY
|PA_SINK_NETWORK
);
575 pa_sink_new_data_done(&data
);
578 pa_log("Failed to create sink.");
582 u
->sink
->parent
.process_msg
= sink_process_msg
;
583 u
->sink
->userdata
= u
;
584 u
->sink
->set_volume
= sink_set_volume_cb
;
585 u
->sink
->set_mute
= sink_set_mute_cb
;
586 u
->sink
->flags
= PA_SINK_LATENCY
|PA_SINK_NETWORK
|PA_SINK_HW_VOLUME_CTRL
;
588 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
589 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
591 if (!(u
->raop
= pa_raop_client_new(u
->core
, server
))) {
592 pa_log("Failed to connect to server.");
596 pa_raop_client_set_callback(u
->raop
, on_connection
, u
);
597 pa_raop_client_set_closed_callback(u
->raop
, on_close
, u
);
599 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
600 pa_log("Failed to create thread.");
604 pa_sink_put(u
->sink
);
619 int pa__get_n_used(pa_module
*m
) {
623 pa_assert_se(u
= m
->userdata
);
625 return pa_sink_linked_by(u
->sink
);
628 void pa__done(pa_module
*m
) {
632 if (!(u
= m
->userdata
))
636 pa_sink_unlink(u
->sink
);
639 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
640 pa_thread_free(u
->thread
);
643 pa_thread_mq_done(&u
->thread_mq
);
646 pa_sink_unref(u
->sink
);
649 pa_rtpoll_item_free(u
->rtpoll_item
);
652 pa_rtpoll_free(u
->rtpoll
);
654 if (u
->raw_memchunk
.memblock
)
655 pa_memblock_unref(u
->raw_memchunk
.memblock
);
657 if (u
->encoded_memchunk
.memblock
)
658 pa_memblock_unref(u
->encoded_memchunk
.memblock
);
661 pa_raop_client_free(u
->raop
);
663 pa_xfree(u
->read_data
);
664 pa_xfree(u
->write_data
);
667 pa_smoother_free(u
->smoother
);