]> code.delx.au - pulseaudio/blob - src/modules/module-raop-sink.c
Merge commit 'origin/master-tx'
[pulseaudio] / src / modules / module-raop-sink.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdlib.h>
28 #include <sys/stat.h>
29 #include <stdio.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <limits.h>
35 #include <poll.h>
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/ioctl.h>
40
41 #ifdef HAVE_LINUX_SOCKIOS_H
42 #include <linux/sockios.h>
43 #endif
44
45 #include <pulse/xmalloc.h>
46 #include <pulse/timeval.h>
47
48 #include <pulsecore/core-error.h>
49 #include <pulsecore/iochannel.h>
50 #include <pulsecore/sink.h>
51 #include <pulsecore/module.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/log.h>
55 #include <pulsecore/socket-client.h>
56 #include <pulsecore/authkey.h>
57 #include <pulsecore/thread-mq.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/time-smoother.h>
60 #include <pulsecore/rtclock.h>
61 #include <pulsecore/socket-util.h>
62
63 #include "module-raop-sink-symdef.h"
64 #include "rtp.h"
65 #include "sdp.h"
66 #include "sap.h"
67 #include "raop_client.h"
68
69 PA_MODULE_AUTHOR("Colin Guthrie");
70 PA_MODULE_DESCRIPTION("RAOP Sink");
71 PA_MODULE_VERSION(PACKAGE_VERSION);
72 PA_MODULE_LOAD_ONCE(FALSE);
73 PA_MODULE_USAGE(
74 "sink_name=<name for the sink> "
75 "description=<description for the sink> "
76 "server=<address> "
77 "format=<sample format> "
78 "channels=<number of channels> "
79 "rate=<sample rate>");
80
81 #define DEFAULT_SINK_NAME "raop"
82
83 struct userdata {
84 pa_core *core;
85 pa_module *module;
86 pa_sink *sink;
87
88 pa_thread_mq thread_mq;
89 pa_rtpoll *rtpoll;
90 pa_rtpoll_item *rtpoll_item;
91 pa_thread *thread;
92
93 pa_memchunk raw_memchunk;
94 pa_memchunk encoded_memchunk;
95
96 void *write_data;
97 size_t write_length, write_index;
98
99 void *read_data;
100 size_t read_length, read_index;
101
102 pa_usec_t latency;
103
104 /*esd_format_t format;*/
105 int32_t rate;
106
107 pa_smoother *smoother;
108 int fd;
109
110 int64_t offset;
111 int64_t encoding_overhead;
112 int32_t next_encoding_overhead;
113 double encoding_ratio;
114
115 pa_raop_client *raop;
116
117 size_t block_size;
118 };
119
120 static const char* const valid_modargs[] = {
121 "server",
122 "rate",
123 "format",
124 "channels",
125 "sink_name",
126 "description",
127 NULL
128 };
129
130 enum {
131 SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
132 SINK_MESSAGE_RIP_SOCKET
133 };
134
135 /* Forward declaration */
136 static void sink_set_volume_cb(pa_sink *);
137
138 static void on_connection(PA_GCC_UNUSED int fd, void*userdata) {
139 struct userdata *u = userdata;
140 pa_assert(u);
141
142 pa_assert(u->fd < 0);
143 u->fd = fd;
144
145 /* Set the initial volume */
146 sink_set_volume_cb(u->sink);
147
148 pa_log_debug("Connection authenticated, handing fd to IO thread...");
149
150 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
151 }
152
153 static void on_close(void*userdata) {
154 struct userdata *u = userdata;
155 pa_assert(u);
156
157 pa_log_debug("Connection closed, informing IO thread...");
158
159 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
160 }
161
162 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
163 struct userdata *u = PA_SINK(o)->userdata;
164
165 switch (code) {
166
167 case PA_SINK_MESSAGE_SET_STATE:
168
169 switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
170
171 case PA_SINK_SUSPENDED:
172 pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
173
174 pa_smoother_pause(u->smoother, pa_rtclock_usec());
175
176 /* Issue a FLUSH if we are connected */
177 if (u->fd >= 0) {
178 pa_raop_flush(u->raop);
179 }
180 break;
181
182 case PA_SINK_IDLE:
183 case PA_SINK_RUNNING:
184
185 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
186 pa_smoother_resume(u->smoother, pa_rtclock_usec());
187
188 /* The connection can be closed when idle, so check to
189 see if we need to reestablish it */
190 if (u->fd < 0)
191 pa_raop_connect(u->raop);
192 else
193 pa_raop_flush(u->raop);
194 }
195
196 break;
197
198 case PA_SINK_UNLINKED:
199 case PA_SINK_INIT:
200 case PA_SINK_INVALID_STATE:
201 ;
202 }
203
204 break;
205
206 case PA_SINK_MESSAGE_GET_LATENCY: {
207 pa_usec_t w, r;
208
209 r = pa_smoother_get(u->smoother, pa_rtclock_usec());
210 w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
211
212 *((pa_usec_t*) data) = w > r ? w - r : 0;
213 return 0;
214 }
215
216 case SINK_MESSAGE_PASS_SOCKET: {
217 struct pollfd *pollfd;
218
219 pa_assert(!u->rtpoll_item);
220
221 u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
222 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
223 pollfd->fd = u->fd;
224 pollfd->events = POLLOUT;
225 /*pollfd->events = */pollfd->revents = 0;
226
227 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
228 /* Our stream has been suspended so we just flush it.... */
229 pa_raop_flush(u->raop);
230 }
231 return 0;
232 }
233
234 case SINK_MESSAGE_RIP_SOCKET: {
235 pa_assert(u->fd >= 0);
236
237 pa_close(u->fd);
238 u->fd = -1;
239
240 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
241
242 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
243
244 if (u->rtpoll_item)
245 pa_rtpoll_item_free(u->rtpoll_item);
246 u->rtpoll_item = NULL;
247 } else {
248 /* Quesiton: is this valid here: or should we do some sort of:
249 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
250 ?? */
251 pa_module_unload_request(u->module, TRUE);
252 }
253 return 0;
254 }
255 }
256
257 return pa_sink_process_msg(o, code, data, offset, chunk);
258 }
259
260 static void sink_set_volume_cb(pa_sink *s) {
261 struct userdata *u = s->userdata;
262 pa_cvolume hw;
263 pa_volume_t v;
264 char t[PA_CVOLUME_SNPRINT_MAX];
265
266 pa_assert(u);
267
268 /* If we're muted we don't need to do anything */
269 if (s->muted)
270 return;
271
272 /* Calculate the max volume of all channels.
273 We'll use this as our (single) volume on the APEX device and emulate
274 any variation in channel volumes in software */
275 v = pa_cvolume_max(&s->virtual_volume);
276
277 /* Create a pa_cvolume version of our single value */
278 pa_cvolume_set(&hw, s->sample_spec.channels, v);
279
280 /* Perform any software manipulation of the volume needed */
281 pa_sw_cvolume_divide(&s->soft_volume, &s->virtual_volume, &hw);
282
283 pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->virtual_volume));
284 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t, sizeof(t), &hw));
285 pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->soft_volume));
286
287 /* Any necessary software volume manipulateion is done so set
288 our hw volume (or v as a single value) on the device */
289 pa_raop_client_set_volume(u->raop, v);
290 }
291
292 static void sink_set_mute_cb(pa_sink *s) {
293 struct userdata *u = s->userdata;
294
295 pa_assert(u);
296
297 if (s->muted) {
298 pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
299 } else {
300 sink_set_volume_cb(s);
301 }
302 }
303
304 static void thread_func(void *userdata) {
305 struct userdata *u = userdata;
306 int write_type = 0;
307 pa_memchunk silence;
308 uint32_t silence_overhead = 0;
309 double silence_ratio = 0;
310
311 pa_assert(u);
312
313 pa_log_debug("Thread starting up");
314
315 pa_thread_mq_install(&u->thread_mq);
316 pa_rtpoll_install(u->rtpoll);
317
318 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
319
320 /* Create a chunk of memory that is our encoded silence sample. */
321 pa_memchunk_reset(&silence);
322
323 for (;;) {
324 int ret;
325
326 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
327 if (u->sink->thread_info.rewind_requested)
328 pa_sink_process_rewind(u->sink, 0);
329
330 if (u->rtpoll_item) {
331 struct pollfd *pollfd;
332 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
333
334 /* Render some data and write it to the fifo */
335 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
336 pa_usec_t usec;
337 int64_t n;
338 void *p;
339
340 if (!silence.memblock) {
341 pa_memchunk silence_tmp;
342
343 pa_memchunk_reset(&silence_tmp);
344 silence_tmp.memblock = pa_memblock_new(u->core->mempool, 4096);
345 silence_tmp.length = 4096;
346 p = pa_memblock_acquire(silence_tmp.memblock);
347 memset(p, 0, 4096);
348 pa_memblock_release(silence_tmp.memblock);
349 pa_raop_client_encode_sample(u->raop, &silence_tmp, &silence);
350 pa_assert(0 == silence_tmp.length);
351 silence_overhead = silence_tmp.length - 4096;
352 silence_ratio = silence_tmp.length / 4096;
353 pa_memblock_unref(silence_tmp.memblock);
354 }
355
356 for (;;) {
357 ssize_t l;
358
359 if (u->encoded_memchunk.length <= 0) {
360 if (u->encoded_memchunk.memblock)
361 pa_memblock_unref(u->encoded_memchunk.memblock);
362 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
363 size_t rl;
364
365 /* We render real data */
366 if (u->raw_memchunk.length <= 0) {
367 if (u->raw_memchunk.memblock)
368 pa_memblock_unref(u->raw_memchunk.memblock);
369 pa_memchunk_reset(&u->raw_memchunk);
370
371 /* Grab unencoded data */
372 pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
373 }
374 pa_assert(u->raw_memchunk.length > 0);
375
376 /* Encode it */
377 rl = u->raw_memchunk.length;
378 u->encoding_overhead += u->next_encoding_overhead;
379 pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk);
380 u->next_encoding_overhead = (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
381 u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
382 } else {
383 /* We render some silence into our memchunk */
384 memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
385 pa_memblock_ref(silence.memblock);
386
387 /* Calculate/store some values to be used with the smoother */
388 u->next_encoding_overhead = silence_overhead;
389 u->encoding_ratio = silence_ratio;
390 }
391 }
392 pa_assert(u->encoded_memchunk.length > 0);
393
394 p = pa_memblock_acquire(u->encoded_memchunk.memblock);
395 l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
396 pa_memblock_release(u->encoded_memchunk.memblock);
397
398 pa_assert(l != 0);
399
400 if (l < 0) {
401
402 if (errno == EINTR)
403 continue;
404 else if (errno == EAGAIN) {
405
406 /* OK, we filled all socket buffers up
407 * now. */
408 goto filled_up;
409
410 } else {
411 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
412 goto fail;
413 }
414
415 } else {
416 u->offset += l;
417
418 u->encoded_memchunk.index += l;
419 u->encoded_memchunk.length -= l;
420
421 pollfd->revents = 0;
422
423 if (u->encoded_memchunk.length > 0) {
424 /* we've completely written the encoded data, so update our overhead */
425 u->encoding_overhead += u->next_encoding_overhead;
426
427 /* OK, we wrote less that we asked for,
428 * hence we can assume that the socket
429 * buffers are full now */
430 goto filled_up;
431 }
432 }
433 }
434
435 filled_up:
436
437 /* At this spot we know that the socket buffers are
438 * fully filled up. This is the best time to estimate
439 * the playback position of the server */
440
441 n = u->offset - u->encoding_overhead;
442
443 #ifdef SIOCOUTQ
444 {
445 int l;
446 if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
447 n -= (l / u->encoding_ratio);
448 }
449 #endif
450
451 usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
452
453 if (usec > u->latency)
454 usec -= u->latency;
455 else
456 usec = 0;
457
458 pa_smoother_put(u->smoother, pa_rtclock_usec(), usec);
459 }
460
461 /* Hmm, nothing to do. Let's sleep */
462 pollfd->events = POLLOUT; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
463 }
464
465 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
466 goto fail;
467
468 if (ret == 0)
469 goto finish;
470
471 if (u->rtpoll_item) {
472 struct pollfd* pollfd;
473
474 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
475
476 if (pollfd->revents & ~POLLOUT) {
477 if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
478 pa_log("FIFO shutdown.");
479 goto fail;
480 }
481
482 /* We expect this to happen on occasion if we are not sending data.
483 It's perfectly natural and normal and natural */
484 if (u->rtpoll_item)
485 pa_rtpoll_item_free(u->rtpoll_item);
486 u->rtpoll_item = NULL;
487 }
488 }
489 }
490
491 fail:
492 /* If this was no regular exit from the loop we have to continue
493 * processing messages until we received PA_MESSAGE_SHUTDOWN */
494 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
495 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
496
497 finish:
498 if (silence.memblock)
499 pa_memblock_unref(silence.memblock);
500 pa_log_debug("Thread shutting down");
501 }
502
503 int pa__init(pa_module*m) {
504 struct userdata *u = NULL;
505 pa_sample_spec ss;
506 pa_modargs *ma = NULL;
507 const char *server, *desc;
508 pa_sink_new_data data;
509
510 pa_assert(m);
511
512 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
513 pa_log("failed to parse module arguments");
514 goto fail;
515 }
516
517 ss = m->core->default_sample_spec;
518 if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
519 pa_log("invalid sample format specification");
520 goto fail;
521 }
522
523 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
524 (ss.channels > 2)) {
525 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
526 goto fail;
527 }
528
529 u = pa_xnew0(struct userdata, 1);
530 u->core = m->core;
531 u->module = m;
532 m->userdata = u;
533 u->fd = -1;
534 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
535 pa_memchunk_reset(&u->raw_memchunk);
536 pa_memchunk_reset(&u->encoded_memchunk);
537 u->offset = 0;
538 u->encoding_overhead = 0;
539 u->next_encoding_overhead = 0;
540 u->encoding_ratio = 1.0;
541
542 u->rtpoll = pa_rtpoll_new();
543 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
544 u->rtpoll_item = NULL;
545
546 /*u->format =
547 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
548 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
549 u->rate = ss.rate;
550 u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
551
552 u->read_data = u->write_data = NULL;
553 u->read_index = u->write_index = u->read_length = u->write_length = 0;
554
555 /*u->state = STATE_AUTH;*/
556 u->latency = 0;
557
558 if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
559 pa_log("No server argument given.");
560 goto fail;
561 }
562
563 pa_sink_new_data_init(&data);
564 data.driver = __FILE__;
565 data.module = m;
566 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
567 pa_sink_new_data_set_sample_spec(&data, &ss);
568 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
569 if ((desc = pa_modargs_get_value(ma, "description", NULL)))
570 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, desc);
571 else
572 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
573
574 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
575 pa_sink_new_data_done(&data);
576
577 if (!u->sink) {
578 pa_log("Failed to create sink.");
579 goto fail;
580 }
581
582 u->sink->parent.process_msg = sink_process_msg;
583 u->sink->userdata = u;
584 u->sink->set_volume = sink_set_volume_cb;
585 u->sink->set_mute = sink_set_mute_cb;
586 u->sink->flags = PA_SINK_LATENCY|PA_SINK_NETWORK|PA_SINK_HW_VOLUME_CTRL;
587
588 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
589 pa_sink_set_rtpoll(u->sink, u->rtpoll);
590
591 if (!(u->raop = pa_raop_client_new(u->core, server))) {
592 pa_log("Failed to connect to server.");
593 goto fail;
594 }
595
596 pa_raop_client_set_callback(u->raop, on_connection, u);
597 pa_raop_client_set_closed_callback(u->raop, on_close, u);
598
599 if (!(u->thread = pa_thread_new(thread_func, u))) {
600 pa_log("Failed to create thread.");
601 goto fail;
602 }
603
604 pa_sink_put(u->sink);
605
606 pa_modargs_free(ma);
607
608 return 0;
609
610 fail:
611 if (ma)
612 pa_modargs_free(ma);
613
614 pa__done(m);
615
616 return -1;
617 }
618
619 int pa__get_n_used(pa_module *m) {
620 struct userdata *u;
621
622 pa_assert(m);
623 pa_assert_se(u = m->userdata);
624
625 return pa_sink_linked_by(u->sink);
626 }
627
628 void pa__done(pa_module*m) {
629 struct userdata *u;
630 pa_assert(m);
631
632 if (!(u = m->userdata))
633 return;
634
635 if (u->sink)
636 pa_sink_unlink(u->sink);
637
638 if (u->thread) {
639 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
640 pa_thread_free(u->thread);
641 }
642
643 pa_thread_mq_done(&u->thread_mq);
644
645 if (u->sink)
646 pa_sink_unref(u->sink);
647
648 if (u->rtpoll_item)
649 pa_rtpoll_item_free(u->rtpoll_item);
650
651 if (u->rtpoll)
652 pa_rtpoll_free(u->rtpoll);
653
654 if (u->raw_memchunk.memblock)
655 pa_memblock_unref(u->raw_memchunk.memblock);
656
657 if (u->encoded_memchunk.memblock)
658 pa_memblock_unref(u->encoded_memchunk.memblock);
659
660 if (u->raop)
661 pa_raop_client_free(u->raop);
662
663 pa_xfree(u->read_data);
664 pa_xfree(u->write_data);
665
666 if (u->smoother)
667 pa_smoother_free(u->smoother);
668
669 if (u->fd >= 0)
670 pa_close(u->fd);
671
672 pa_xfree(u);
673 }