]> code.delx.au - pulseaudio/blob - src/modules/raop/module-raop-sink.c
devices: Set certain sink/source flags automatically.
[pulseaudio] / src / modules / raop / module-raop-sink.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <errno.h>
30 #include <string.h>
31 #include <unistd.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <sys/ioctl.h>
36
37 #ifdef HAVE_LINUX_SOCKIOS_H
38 #include <linux/sockios.h>
39 #endif
40
41 #include <pulse/rtclock.h>
42 #include <pulse/timeval.h>
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/core-error.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/log.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/poll.h>
56
57 #include "module-raop-sink-symdef.h"
58 #include "rtp.h"
59 #include "sdp.h"
60 #include "sap.h"
61 #include "raop_client.h"
62
63 PA_MODULE_AUTHOR("Colin Guthrie");
64 PA_MODULE_DESCRIPTION("RAOP Sink");
65 PA_MODULE_VERSION(PACKAGE_VERSION);
66 PA_MODULE_LOAD_ONCE(FALSE);
67 PA_MODULE_USAGE(
68 "sink_name=<name for the sink> "
69 "sink_properties=<properties for the sink> "
70 "server=<address> "
71 "format=<sample format> "
72 "rate=<sample rate> "
73 "channels=<number of channels>");
74
75 #define DEFAULT_SINK_NAME "raop"
76
77 struct userdata {
78 pa_core *core;
79 pa_module *module;
80 pa_sink *sink;
81
82 pa_thread_mq thread_mq;
83 pa_rtpoll *rtpoll;
84 pa_rtpoll_item *rtpoll_item;
85 pa_thread *thread;
86
87 pa_memchunk raw_memchunk;
88 pa_memchunk encoded_memchunk;
89
90 void *write_data;
91 size_t write_length, write_index;
92
93 void *read_data;
94 size_t read_length, read_index;
95
96 pa_usec_t latency;
97
98 /*esd_format_t format;*/
99 int32_t rate;
100
101 pa_smoother *smoother;
102 int fd;
103
104 int64_t offset;
105 int64_t encoding_overhead;
106 int32_t next_encoding_overhead;
107 double encoding_ratio;
108
109 pa_raop_client *raop;
110
111 size_t block_size;
112 };
113
114 static const char* const valid_modargs[] = {
115 "sink_name",
116 "sink_properties",
117 "server",
118 "format",
119 "rate",
120 "channels",
121 NULL
122 };
123
124 enum {
125 SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
126 SINK_MESSAGE_RIP_SOCKET
127 };
128
129 /* Forward declaration */
130 static void sink_set_volume_cb(pa_sink *);
131
132 static void on_connection(int fd, void*userdata) {
133 int so_sndbuf = 0;
134 socklen_t sl = sizeof(int);
135 struct userdata *u = userdata;
136 pa_assert(u);
137
138 pa_assert(u->fd < 0);
139 u->fd = fd;
140
141 if (getsockopt(u->fd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf, &sl) < 0)
142 pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno));
143 else {
144 pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf);
145 pa_sink_set_max_request(u->sink, PA_MAX((size_t) so_sndbuf, u->block_size));
146 }
147
148 /* Set the initial volume */
149 sink_set_volume_cb(u->sink);
150
151 pa_log_debug("Connection authenticated, handing fd to IO thread...");
152
153 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
154 }
155
156 static void on_close(void*userdata) {
157 struct userdata *u = userdata;
158 pa_assert(u);
159
160 pa_log_debug("Connection closed, informing IO thread...");
161
162 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
163 }
164
165 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
166 struct userdata *u = PA_SINK(o)->userdata;
167
168 switch (code) {
169
170 case PA_SINK_MESSAGE_SET_STATE:
171
172 switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
173
174 case PA_SINK_SUSPENDED:
175 pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
176
177 pa_smoother_pause(u->smoother, pa_rtclock_now());
178
179 /* Issue a FLUSH if we are connected */
180 if (u->fd >= 0) {
181 pa_raop_flush(u->raop);
182 }
183 break;
184
185 case PA_SINK_IDLE:
186 case PA_SINK_RUNNING:
187
188 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
189 pa_smoother_resume(u->smoother, pa_rtclock_now(), TRUE);
190
191 /* The connection can be closed when idle, so check to
192 see if we need to reestablish it */
193 if (u->fd < 0)
194 pa_raop_connect(u->raop);
195 else
196 pa_raop_flush(u->raop);
197 }
198
199 break;
200
201 case PA_SINK_UNLINKED:
202 case PA_SINK_INIT:
203 case PA_SINK_INVALID_STATE:
204 ;
205 }
206
207 break;
208
209 case PA_SINK_MESSAGE_GET_LATENCY: {
210 pa_usec_t w, r;
211
212 r = pa_smoother_get(u->smoother, pa_rtclock_now());
213 w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
214
215 *((pa_usec_t*) data) = w > r ? w - r : 0;
216 return 0;
217 }
218
219 case SINK_MESSAGE_PASS_SOCKET: {
220 struct pollfd *pollfd;
221
222 pa_assert(!u->rtpoll_item);
223
224 u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
225 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
226 pollfd->fd = u->fd;
227 pollfd->events = POLLOUT;
228 /*pollfd->events = */pollfd->revents = 0;
229
230 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
231 /* Our stream has been suspended so we just flush it.... */
232 pa_raop_flush(u->raop);
233 }
234 return 0;
235 }
236
237 case SINK_MESSAGE_RIP_SOCKET: {
238 pa_assert(u->fd >= 0);
239
240 pa_close(u->fd);
241 u->fd = -1;
242
243 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
244
245 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
246
247 if (u->rtpoll_item)
248 pa_rtpoll_item_free(u->rtpoll_item);
249 u->rtpoll_item = NULL;
250 } else {
251 /* Quesiton: is this valid here: or should we do some sort of:
252 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
253 ?? */
254 pa_module_unload_request(u->module, TRUE);
255 }
256 return 0;
257 }
258 }
259
260 return pa_sink_process_msg(o, code, data, offset, chunk);
261 }
262
263 static void sink_set_volume_cb(pa_sink *s) {
264 struct userdata *u = s->userdata;
265 pa_cvolume hw;
266 pa_volume_t v;
267 char t[PA_CVOLUME_SNPRINT_MAX];
268
269 pa_assert(u);
270
271 /* If we're muted we don't need to do anything */
272 if (s->muted)
273 return;
274
275 /* Calculate the max volume of all channels.
276 We'll use this as our (single) volume on the APEX device and emulate
277 any variation in channel volumes in software */
278 v = pa_cvolume_max(&s->real_volume);
279
280 /* Create a pa_cvolume version of our single value */
281 pa_cvolume_set(&hw, s->sample_spec.channels, v);
282
283 /* Perform any software manipulation of the volume needed */
284 pa_sw_cvolume_divide(&s->soft_volume, &s->real_volume, &hw);
285
286 pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->real_volume));
287 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t, sizeof(t), &hw));
288 pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->soft_volume));
289
290 /* Any necessary software volume manipulateion is done so set
291 our hw volume (or v as a single value) on the device */
292 pa_raop_client_set_volume(u->raop, v);
293 }
294
295 static void sink_set_mute_cb(pa_sink *s) {
296 struct userdata *u = s->userdata;
297
298 pa_assert(u);
299
300 if (s->muted) {
301 pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
302 } else {
303 sink_set_volume_cb(s);
304 }
305 }
306
307 static void thread_func(void *userdata) {
308 struct userdata *u = userdata;
309 int write_type = 0;
310 pa_memchunk silence;
311 uint32_t silence_overhead = 0;
312 double silence_ratio = 0;
313
314 pa_assert(u);
315
316 pa_log_debug("Thread starting up");
317
318 pa_thread_mq_install(&u->thread_mq);
319
320 pa_smoother_set_time_offset(u->smoother, pa_rtclock_now());
321
322 /* Create a chunk of memory that is our encoded silence sample. */
323 pa_memchunk_reset(&silence);
324
325 for (;;) {
326 int ret;
327
328 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
329 if (u->sink->thread_info.rewind_requested)
330 pa_sink_process_rewind(u->sink, 0);
331
332 if (u->rtpoll_item) {
333 struct pollfd *pollfd;
334 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
335
336 /* Render some data and write it to the fifo */
337 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
338 pa_usec_t usec;
339 int64_t n;
340 void *p;
341
342 if (!silence.memblock) {
343 pa_memchunk silence_tmp;
344
345 pa_memchunk_reset(&silence_tmp);
346 silence_tmp.memblock = pa_memblock_new(u->core->mempool, 4096);
347 silence_tmp.length = 4096;
348 p = pa_memblock_acquire(silence_tmp.memblock);
349 memset(p, 0, 4096);
350 pa_memblock_release(silence_tmp.memblock);
351 pa_raop_client_encode_sample(u->raop, &silence_tmp, &silence);
352 pa_assert(0 == silence_tmp.length);
353 silence_overhead = silence_tmp.length - 4096;
354 silence_ratio = silence_tmp.length / 4096;
355 pa_memblock_unref(silence_tmp.memblock);
356 }
357
358 for (;;) {
359 ssize_t l;
360
361 if (u->encoded_memchunk.length <= 0) {
362 if (u->encoded_memchunk.memblock)
363 pa_memblock_unref(u->encoded_memchunk.memblock);
364 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
365 size_t rl;
366
367 /* We render real data */
368 if (u->raw_memchunk.length <= 0) {
369 if (u->raw_memchunk.memblock)
370 pa_memblock_unref(u->raw_memchunk.memblock);
371 pa_memchunk_reset(&u->raw_memchunk);
372
373 /* Grab unencoded data */
374 pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
375 }
376 pa_assert(u->raw_memchunk.length > 0);
377
378 /* Encode it */
379 rl = u->raw_memchunk.length;
380 u->encoding_overhead += u->next_encoding_overhead;
381 pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk);
382 u->next_encoding_overhead = (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
383 u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
384 } else {
385 /* We render some silence into our memchunk */
386 memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
387 pa_memblock_ref(silence.memblock);
388
389 /* Calculate/store some values to be used with the smoother */
390 u->next_encoding_overhead = silence_overhead;
391 u->encoding_ratio = silence_ratio;
392 }
393 }
394 pa_assert(u->encoded_memchunk.length > 0);
395
396 p = pa_memblock_acquire(u->encoded_memchunk.memblock);
397 l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
398 pa_memblock_release(u->encoded_memchunk.memblock);
399
400 pa_assert(l != 0);
401
402 if (l < 0) {
403
404 if (errno == EINTR)
405 continue;
406 else if (errno == EAGAIN) {
407
408 /* OK, we filled all socket buffers up
409 * now. */
410 goto filled_up;
411
412 } else {
413 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
414 goto fail;
415 }
416
417 } else {
418 u->offset += l;
419
420 u->encoded_memchunk.index += l;
421 u->encoded_memchunk.length -= l;
422
423 pollfd->revents = 0;
424
425 if (u->encoded_memchunk.length > 0) {
426 /* we've completely written the encoded data, so update our overhead */
427 u->encoding_overhead += u->next_encoding_overhead;
428
429 /* OK, we wrote less that we asked for,
430 * hence we can assume that the socket
431 * buffers are full now */
432 goto filled_up;
433 }
434 }
435 }
436
437 filled_up:
438
439 /* At this spot we know that the socket buffers are
440 * fully filled up. This is the best time to estimate
441 * the playback position of the server */
442
443 n = u->offset - u->encoding_overhead;
444
445 #ifdef SIOCOUTQ
446 {
447 int l;
448 if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
449 n -= (l / u->encoding_ratio);
450 }
451 #endif
452
453 usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
454
455 if (usec > u->latency)
456 usec -= u->latency;
457 else
458 usec = 0;
459
460 pa_smoother_put(u->smoother, pa_rtclock_now(), usec);
461 }
462
463 /* Hmm, nothing to do. Let's sleep */
464 pollfd->events = POLLOUT; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
465 }
466
467 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
468 goto fail;
469
470 if (ret == 0)
471 goto finish;
472
473 if (u->rtpoll_item) {
474 struct pollfd* pollfd;
475
476 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
477
478 if (pollfd->revents & ~POLLOUT) {
479 if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
480 pa_log("FIFO shutdown.");
481 goto fail;
482 }
483
484 /* We expect this to happen on occasion if we are not sending data.
485 It's perfectly natural and normal and natural */
486 if (u->rtpoll_item)
487 pa_rtpoll_item_free(u->rtpoll_item);
488 u->rtpoll_item = NULL;
489 }
490 }
491 }
492
493 fail:
494 /* If this was no regular exit from the loop we have to continue
495 * processing messages until we received PA_MESSAGE_SHUTDOWN */
496 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
497 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
498
499 finish:
500 if (silence.memblock)
501 pa_memblock_unref(silence.memblock);
502 pa_log_debug("Thread shutting down");
503 }
504
505 int pa__init(pa_module*m) {
506 struct userdata *u = NULL;
507 pa_sample_spec ss;
508 pa_modargs *ma = NULL;
509 const char *server;
510 pa_sink_new_data data;
511
512 pa_assert(m);
513
514 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
515 pa_log("failed to parse module arguments");
516 goto fail;
517 }
518
519 ss = m->core->default_sample_spec;
520 if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
521 pa_log("invalid sample format specification");
522 goto fail;
523 }
524
525 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
526 (ss.channels > 2)) {
527 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
528 goto fail;
529 }
530
531 u = pa_xnew0(struct userdata, 1);
532 u->core = m->core;
533 u->module = m;
534 m->userdata = u;
535 u->fd = -1;
536 u->smoother = pa_smoother_new(
537 PA_USEC_PER_SEC,
538 PA_USEC_PER_SEC*2,
539 TRUE,
540 TRUE,
541 10,
542 0,
543 FALSE);
544 pa_memchunk_reset(&u->raw_memchunk);
545 pa_memchunk_reset(&u->encoded_memchunk);
546 u->offset = 0;
547 u->encoding_overhead = 0;
548 u->next_encoding_overhead = 0;
549 u->encoding_ratio = 1.0;
550
551 u->rtpoll = pa_rtpoll_new();
552 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
553 u->rtpoll_item = NULL;
554
555 /*u->format =
556 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
557 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
558 u->rate = ss.rate;
559 u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
560
561 u->read_data = u->write_data = NULL;
562 u->read_index = u->write_index = u->read_length = u->write_length = 0;
563
564 /*u->state = STATE_AUTH;*/
565 u->latency = 0;
566
567 if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
568 pa_log("No server argument given.");
569 goto fail;
570 }
571
572 pa_sink_new_data_init(&data);
573 data.driver = __FILE__;
574 data.module = m;
575 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
576 pa_sink_new_data_set_sample_spec(&data, &ss);
577 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
578 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "music");
579 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
580
581 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
582 pa_log("Invalid properties");
583 pa_sink_new_data_done(&data);
584 goto fail;
585 }
586
587 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
588 pa_sink_new_data_done(&data);
589
590 if (!u->sink) {
591 pa_log("Failed to create sink.");
592 goto fail;
593 }
594
595 u->sink->parent.process_msg = sink_process_msg;
596 u->sink->userdata = u;
597 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
598 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
599 u->sink->flags = PA_SINK_LATENCY|PA_SINK_NETWORK;
600
601 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
602 pa_sink_set_rtpoll(u->sink, u->rtpoll);
603
604 if (!(u->raop = pa_raop_client_new(u->core, server))) {
605 pa_log("Failed to connect to server.");
606 goto fail;
607 }
608
609 pa_raop_client_set_callback(u->raop, on_connection, u);
610 pa_raop_client_set_closed_callback(u->raop, on_close, u);
611
612 if (!(u->thread = pa_thread_new("raop-sink", thread_func, u))) {
613 pa_log("Failed to create thread.");
614 goto fail;
615 }
616
617 pa_sink_put(u->sink);
618
619 pa_modargs_free(ma);
620
621 return 0;
622
623 fail:
624 if (ma)
625 pa_modargs_free(ma);
626
627 pa__done(m);
628
629 return -1;
630 }
631
632 int pa__get_n_used(pa_module *m) {
633 struct userdata *u;
634
635 pa_assert(m);
636 pa_assert_se(u = m->userdata);
637
638 return pa_sink_linked_by(u->sink);
639 }
640
641 void pa__done(pa_module*m) {
642 struct userdata *u;
643 pa_assert(m);
644
645 if (!(u = m->userdata))
646 return;
647
648 if (u->sink)
649 pa_sink_unlink(u->sink);
650
651 if (u->thread) {
652 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
653 pa_thread_free(u->thread);
654 }
655
656 pa_thread_mq_done(&u->thread_mq);
657
658 if (u->sink)
659 pa_sink_unref(u->sink);
660
661 if (u->rtpoll_item)
662 pa_rtpoll_item_free(u->rtpoll_item);
663
664 if (u->rtpoll)
665 pa_rtpoll_free(u->rtpoll);
666
667 if (u->raw_memchunk.memblock)
668 pa_memblock_unref(u->raw_memchunk.memblock);
669
670 if (u->encoded_memchunk.memblock)
671 pa_memblock_unref(u->encoded_memchunk.memblock);
672
673 if (u->raop)
674 pa_raop_client_free(u->raop);
675
676 pa_xfree(u->read_data);
677 pa_xfree(u->write_data);
678
679 if (u->smoother)
680 pa_smoother_free(u->smoother);
681
682 if (u->fd >= 0)
683 pa_close(u->fd);
684
685 pa_xfree(u);
686 }