]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
fix shutdown when --disallow-module-loading=1 is passed
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "server=<address> "
68 "sink=<remote sink name> "
69 "cookie=<filename> "
70 "format=<sample format> "
71 "channels=<number of channels> "
72 "rate=<sample rate> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
75 #else
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
77 PA_MODULE_USAGE(
78 "server=<address> "
79 "source=<remote source name> "
80 "cookie=<filename> "
81 "format=<sample format> "
82 "channels=<number of channels> "
83 "rate=<sample rate> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
86 #endif
87
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION);
90 PA_MODULE_LOAD_ONCE(FALSE);
91
92 static const char* const valid_modargs[] = {
93 "server",
94 "cookie",
95 "format",
96 "channels",
97 "rate",
98 #ifdef TUNNEL_SINK
99 "sink_name",
100 "sink",
101 #else
102 "source_name",
103 "source",
104 #endif
105 "channel_map",
106 NULL,
107 };
108
109 #define DEFAULT_TIMEOUT 5
110
111 #define LATENCY_INTERVAL 10
112
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
114
115 #ifdef TUNNEL_SINK
116
117 enum {
118 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
119 SINK_MESSAGE_REMOTE_SUSPEND,
120 SINK_MESSAGE_UPDATE_LATENCY,
121 SINK_MESSAGE_POST
122 };
123
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
126
127 #else
128
129 enum {
130 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
131 SOURCE_MESSAGE_REMOTE_SUSPEND,
132 SOURCE_MESSAGE_UPDATE_LATENCY
133 };
134
135 #define DEFAULT_FRAGSIZE_MSEC 25
136
137 #endif
138
139 #ifdef TUNNEL_SINK
140 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
141 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 #endif
143 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148
149 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
150 #ifdef TUNNEL_SINK
151 [PA_COMMAND_REQUEST] = command_request,
152 [PA_COMMAND_STARTED] = command_started,
153 #endif
154 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
155 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
156 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
157 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
158 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
159 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
160 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
161 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
162 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
163 };
164
165 struct userdata {
166 pa_core *core;
167 pa_module *module;
168
169 pa_thread_mq thread_mq;
170 pa_rtpoll *rtpoll;
171 pa_thread *thread;
172
173 pa_socket_client *client;
174 pa_pstream *pstream;
175 pa_pdispatch *pdispatch;
176
177 char *server_name;
178 #ifdef TUNNEL_SINK
179 char *sink_name;
180 pa_sink *sink;
181 int32_t requested_bytes;
182 #else
183 char *source_name;
184 pa_source *source;
185 #endif
186
187 pa_auth_cookie *auth_cookie;
188
189 uint32_t version;
190 uint32_t ctag;
191 uint32_t device_index;
192 uint32_t channel;
193
194 int64_t counter, counter_delta;
195
196 pa_bool_t remote_corked:1;
197 pa_bool_t remote_suspended:1;
198
199 pa_usec_t transport_usec;
200 pa_bool_t transport_usec_valid;
201
202 uint32_t ignore_latency_before;
203
204 pa_time_event *time_event;
205
206 pa_smoother *smoother;
207
208 char *device_description;
209 char *server_fqdn;
210 char *user_name;
211
212 uint32_t maxlength;
213 #ifdef TUNNEL_SINK
214 uint32_t tlength;
215 uint32_t minreq;
216 uint32_t prebuf;
217 #else
218 uint32_t fragsize;
219 #endif
220 };
221
222 static void request_latency(struct userdata *u);
223
224 /* Called from main context */
225 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
226 struct userdata *u = userdata;
227
228 pa_assert(pd);
229 pa_assert(t);
230 pa_assert(u);
231 pa_assert(u->pdispatch == pd);
232
233 pa_log_warn("Stream killed");
234 pa_module_unload_request(u->module, TRUE);
235 }
236
237 /* Called from main context */
238 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
239 struct userdata *u = userdata;
240
241 pa_assert(pd);
242 pa_assert(t);
243 pa_assert(u);
244 pa_assert(u->pdispatch == pd);
245
246 pa_log_info("Server signalled buffer overrun/underrun.");
247 request_latency(u);
248 }
249
250 /* Called from main context */
251 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
252 struct userdata *u = userdata;
253 uint32_t channel;
254 pa_bool_t suspended;
255
256 pa_assert(pd);
257 pa_assert(t);
258 pa_assert(u);
259 pa_assert(u->pdispatch == pd);
260
261 if (pa_tagstruct_getu32(t, &channel) < 0 ||
262 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
263 !pa_tagstruct_eof(t)) {
264 pa_log("Invalid packet");
265 pa_module_unload_request(u->module, TRUE);
266 return;
267 }
268
269 #ifdef TUNNEL_SINK
270 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
271 #else
272 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
273 #endif
274
275 request_latency(u);
276 }
277
278 /* Called from main context */
279 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
280 struct userdata *u = userdata;
281
282 pa_assert(pd);
283 pa_assert(t);
284 pa_assert(u);
285 pa_assert(u->pdispatch == pd);
286
287 pa_log_debug("Server reports a stream move.");
288 request_latency(u);
289 }
290
291 #ifdef TUNNEL_SINK
292
293 /* Called from main context */
294 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
295 struct userdata *u = userdata;
296
297 pa_assert(pd);
298 pa_assert(t);
299 pa_assert(u);
300 pa_assert(u->pdispatch == pd);
301
302 pa_log_debug("Server reports playback started.");
303 request_latency(u);
304 }
305
306 #endif
307
308 /* Called from IO thread context */
309 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
310 pa_usec_t x;
311 pa_assert(u);
312
313 if (u->remote_corked == cork)
314 return;
315
316 u->remote_corked = cork;
317 x = pa_rtclock_usec();
318
319 /* Correct by the time this needs to travel to the other side.
320 * This is a valid thread-safe access, because the main thread is
321 * waiting for us */
322 if (u->transport_usec_valid)
323 x += u->transport_usec;
324
325 if (u->remote_suspended || u->remote_corked)
326 pa_smoother_pause(u->smoother, x);
327 else
328 pa_smoother_resume(u->smoother, x);
329 }
330
331 /* Called from main context */
332 static void stream_cork(struct userdata *u, pa_bool_t cork) {
333 pa_tagstruct *t;
334 pa_assert(u);
335
336 if (!u->pstream)
337 return;
338
339 t = pa_tagstruct_new(NULL, 0);
340 #ifdef TUNNEL_SINK
341 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
342 #else
343 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
344 #endif
345 pa_tagstruct_putu32(t, u->ctag++);
346 pa_tagstruct_putu32(t, u->channel);
347 pa_tagstruct_put_boolean(t, !!cork);
348 pa_pstream_send_tagstruct(u->pstream, t);
349
350 request_latency(u);
351 }
352
353 /* Called from IO thread context */
354 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
355 pa_usec_t x;
356 pa_assert(u);
357
358 if (u->remote_suspended == suspend)
359 return;
360
361 u->remote_suspended = suspend;
362
363 x = pa_rtclock_usec();
364
365 /* Correct by the time this needed to travel from the other side.
366 * This is a valid thread-safe access, because the main thread is
367 * waiting for us */
368 if (u->transport_usec_valid)
369 x -= u->transport_usec;
370
371 if (u->remote_suspended || u->remote_corked)
372 pa_smoother_pause(u->smoother, x);
373 else
374 pa_smoother_resume(u->smoother, x);
375 }
376
377 #ifdef TUNNEL_SINK
378
379 /* Called from IO thread context */
380 static void send_data(struct userdata *u) {
381 pa_assert(u);
382
383 while (u->requested_bytes > 0) {
384 pa_memchunk memchunk;
385
386 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
387 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
388 pa_memblock_unref(memchunk.memblock);
389
390 u->requested_bytes -= memchunk.length;
391
392 u->counter += memchunk.length;
393 }
394 }
395
396 /* This function is called from IO context -- except when it is not. */
397 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
398 struct userdata *u = PA_SINK(o)->userdata;
399
400 switch (code) {
401
402 case PA_SINK_MESSAGE_SET_STATE: {
403 int r;
404
405 /* First, change the state, because otherwide pa_sink_render() would fail */
406 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
407
408 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
409
410 if (PA_SINK_IS_OPENED(u->sink->state))
411 send_data(u);
412 }
413
414 return r;
415 }
416
417 case PA_SINK_MESSAGE_GET_LATENCY: {
418 pa_usec_t yl, yr, *usec = data;
419
420 yl = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
421 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
422
423 *usec = yl > yr ? yl - yr : 0;
424 return 0;
425 }
426
427 case SINK_MESSAGE_REQUEST:
428
429 pa_assert(offset > 0);
430 u->requested_bytes += (size_t) offset;
431
432 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
433 send_data(u);
434
435 return 0;
436
437
438 case SINK_MESSAGE_REMOTE_SUSPEND:
439
440 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
441 return 0;
442
443
444 case SINK_MESSAGE_UPDATE_LATENCY: {
445 pa_usec_t y;
446
447 y = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
448
449 if (y > (pa_usec_t) offset || offset < 0)
450 y -= offset;
451 else
452 y = 0;
453
454 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
455
456 return 0;
457 }
458
459 case SINK_MESSAGE_POST:
460
461 /* OK, This might be a bit confusing. This message is
462 * delivered to us from the main context -- NOT from the
463 * IO thread context where the rest of the messages are
464 * dispatched. Yeah, ugly, but I am a lazy bastard. */
465
466 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
467
468 u->counter_delta += chunk->length;
469
470 return 0;
471 }
472
473 return pa_sink_process_msg(o, code, data, offset, chunk);
474 }
475
476 /* Called from main context */
477 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
478 struct userdata *u;
479 pa_sink_assert_ref(s);
480 u = s->userdata;
481
482 switch ((pa_sink_state_t) state) {
483
484 case PA_SINK_SUSPENDED:
485 pa_assert(PA_SINK_IS_OPENED(s->state));
486 stream_cork(u, TRUE);
487 break;
488
489 case PA_SINK_IDLE:
490 case PA_SINK_RUNNING:
491 if (s->state == PA_SINK_SUSPENDED)
492 stream_cork(u, FALSE);
493 break;
494
495 case PA_SINK_UNLINKED:
496 case PA_SINK_INIT:
497 ;
498 }
499
500 return 0;
501 }
502
503 #else
504
505 /* This function is called from IO context -- except when it is not. */
506 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
507 struct userdata *u = PA_SOURCE(o)->userdata;
508
509 switch (code) {
510
511 case PA_SINK_MESSAGE_SET_STATE: {
512 int r;
513
514 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
515 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
516
517 return r;
518 }
519
520 case PA_SOURCE_MESSAGE_GET_LATENCY: {
521 pa_usec_t yr, yl, *usec = data;
522
523 yl = pa_bytes_to_usec(u->counter, &PA_SINK(o)->sample_spec);
524 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
525
526 *usec = yr > yl ? yr - yl : 0;
527 return 0;
528 }
529
530 case SOURCE_MESSAGE_POST:
531
532 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
533 pa_source_post(u->source, chunk);
534
535 u->counter += chunk->length;
536
537 return 0;
538
539 case SOURCE_MESSAGE_REMOTE_SUSPEND:
540
541 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
542 return 0;
543
544 case SOURCE_MESSAGE_UPDATE_LATENCY: {
545 pa_usec_t y;
546
547 y = pa_bytes_to_usec(u->counter, &u->source->sample_spec);
548
549 if (offset >= 0 || y > (pa_usec_t) -offset)
550 y += offset;
551 else
552 y = 0;
553
554 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
555
556 return 0;
557 }
558 }
559
560 return pa_source_process_msg(o, code, data, offset, chunk);
561 }
562
563 /* Called from main context */
564 static int source_set_state(pa_source *s, pa_source_state_t state) {
565 struct userdata *u;
566 pa_source_assert_ref(s);
567 u = s->userdata;
568
569 switch ((pa_source_state_t) state) {
570
571 case PA_SOURCE_SUSPENDED:
572 pa_assert(PA_SOURCE_IS_OPENED(s->state));
573 stream_cork(u, TRUE);
574 break;
575
576 case PA_SOURCE_IDLE:
577 case PA_SOURCE_RUNNING:
578 if (s->state == PA_SOURCE_SUSPENDED)
579 stream_cork(u, FALSE);
580 break;
581
582 case PA_SOURCE_UNLINKED:
583 case PA_SOURCE_INIT:
584 ;
585 }
586
587 return 0;
588 }
589
590 #endif
591
592 static void thread_func(void *userdata) {
593 struct userdata *u = userdata;
594
595 pa_assert(u);
596
597 pa_log_debug("Thread starting up");
598
599 pa_thread_mq_install(&u->thread_mq);
600 pa_rtpoll_install(u->rtpoll);
601
602 for (;;) {
603 int ret;
604
605 #ifdef TUNNEL_SINK
606 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
607 if (u->sink->thread_info.rewind_requested)
608 pa_sink_process_rewind(u->sink, 0);
609 #endif
610
611 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
612 goto fail;
613
614 if (ret == 0)
615 goto finish;
616 }
617
618 fail:
619 /* If this was no regular exit from the loop we have to continue
620 * processing messages until we received PA_MESSAGE_SHUTDOWN */
621 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
622 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
623
624 finish:
625 pa_log_debug("Thread shutting down");
626 }
627
628 #ifdef TUNNEL_SINK
629 /* Called from main context */
630 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631 struct userdata *u = userdata;
632 uint32_t bytes, channel;
633
634 pa_assert(pd);
635 pa_assert(command == PA_COMMAND_REQUEST);
636 pa_assert(t);
637 pa_assert(u);
638 pa_assert(u->pdispatch == pd);
639
640 if (pa_tagstruct_getu32(t, &channel) < 0 ||
641 pa_tagstruct_getu32(t, &bytes) < 0) {
642 pa_log("Invalid protocol reply");
643 goto fail;
644 }
645
646 if (channel != u->channel) {
647 pa_log("Recieved data for invalid channel");
648 goto fail;
649 }
650
651 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
652 return;
653
654 fail:
655 pa_module_unload_request(u->module, TRUE);
656 }
657
658 #endif
659
660 /* Called from main context */
661 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
662 struct userdata *u = userdata;
663 pa_usec_t sink_usec, source_usec, transport_usec;
664 pa_bool_t playing;
665 int64_t write_index, read_index;
666 struct timeval local, remote, now;
667 pa_sample_spec *ss;
668 int64_t delay;
669
670 pa_assert(pd);
671 pa_assert(u);
672
673 if (command != PA_COMMAND_REPLY) {
674 if (command == PA_COMMAND_ERROR)
675 pa_log("Failed to get latency.");
676 else
677 pa_log("Protocol error.");
678 goto fail;
679 }
680
681 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
682 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
683 pa_tagstruct_get_boolean(t, &playing) < 0 ||
684 pa_tagstruct_get_timeval(t, &local) < 0 ||
685 pa_tagstruct_get_timeval(t, &remote) < 0 ||
686 pa_tagstruct_gets64(t, &write_index) < 0 ||
687 pa_tagstruct_gets64(t, &read_index) < 0) {
688 pa_log("Invalid reply.");
689 goto fail;
690 }
691
692 #ifdef TUNNEL_SINK
693 if (u->version >= 13) {
694 uint64_t underrun_for = 0, playing_for = 0;
695
696 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
697 pa_tagstruct_getu64(t, &playing_for) < 0) {
698 pa_log("Invalid reply.");
699 goto fail;
700 }
701 }
702 #endif
703
704 if (!pa_tagstruct_eof(t)) {
705 pa_log("Invalid reply.");
706 goto fail;
707 }
708
709 if (tag < u->ignore_latency_before) {
710 request_latency(u);
711 return;
712 }
713
714 pa_gettimeofday(&now);
715
716 /* Calculate transport usec */
717 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
718 /* local and remote seem to have synchronized clocks */
719 #ifdef TUNNEL_SINK
720 u->transport_usec = pa_timeval_diff(&remote, &local);
721 #else
722 u->transport_usec = pa_timeval_diff(&now, &remote);
723 #endif
724 } else
725 u->transport_usec = pa_timeval_diff(&now, &local)/2;
726 u->transport_usec_valid = TRUE;
727
728 /* First, take the device's delay */
729 #ifdef TUNNEL_SINK
730 delay = (int64_t) sink_usec;
731 ss = &u->sink->sample_spec;
732 #else
733 delay = (int64_t) source_usec;
734 ss = &u->source->sample_spec;
735 #endif
736
737 /* Add the length of our server-side buffer */
738 if (write_index >= read_index)
739 delay += (int64_t) pa_bytes_to_usec(write_index-read_index, ss);
740 else
741 delay -= (int64_t) pa_bytes_to_usec(read_index-write_index, ss);
742
743 /* Our measurements are already out of date, hence correct by the *
744 * transport latency */
745 #ifdef TUNNEL_SINK
746 delay -= (int64_t) transport_usec;
747 #else
748 delay += (int64_t) transport_usec;
749 #endif
750
751 /* Now correct by what we have have read/written since we requested the update */
752 #ifdef TUNNEL_SINK
753 delay += (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
754 #else
755 delay -= (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
756 #endif
757
758 #ifdef TUNNEL_SINK
759 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
760 #else
761 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
762 #endif
763
764 return;
765
766 fail:
767
768 pa_module_unload_request(u->module, TRUE);
769 }
770
771 /* Called from main context */
772 static void request_latency(struct userdata *u) {
773 pa_tagstruct *t;
774 struct timeval now;
775 uint32_t tag;
776 pa_assert(u);
777
778 t = pa_tagstruct_new(NULL, 0);
779 #ifdef TUNNEL_SINK
780 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
781 #else
782 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
783 #endif
784 pa_tagstruct_putu32(t, tag = u->ctag++);
785 pa_tagstruct_putu32(t, u->channel);
786
787 pa_gettimeofday(&now);
788 pa_tagstruct_put_timeval(t, &now);
789
790 pa_pstream_send_tagstruct(u->pstream, t);
791 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
792
793 u->ignore_latency_before = tag;
794 u->counter_delta = 0;
795 }
796
797 /* Called from main context */
798 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
799 struct userdata *u = userdata;
800 struct timeval ntv;
801
802 pa_assert(m);
803 pa_assert(e);
804 pa_assert(u);
805
806 request_latency(u);
807
808 pa_gettimeofday(&ntv);
809 ntv.tv_sec += LATENCY_INTERVAL;
810 m->time_restart(e, &ntv);
811 }
812
813 /* Called from main context */
814 static void update_description(struct userdata *u) {
815 char *d;
816 char un[128], hn[128];
817 pa_tagstruct *t;
818
819 pa_assert(u);
820
821 if (!u->server_fqdn || !u->user_name || !u->device_description)
822 return;
823
824 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
825
826 #ifdef TUNNEL_SINK
827 pa_sink_set_description(u->sink, d);
828 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
829 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
830 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
831 #else
832 pa_source_set_description(u->source, d);
833 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
834 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
835 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
836 #endif
837
838 pa_xfree(d);
839
840 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
841 pa_get_user_name(un, sizeof(un)),
842 pa_get_host_name(hn, sizeof(hn)));
843
844 t = pa_tagstruct_new(NULL, 0);
845 #ifdef TUNNEL_SINK
846 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
847 #else
848 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
849 #endif
850 pa_tagstruct_putu32(t, u->ctag++);
851 pa_tagstruct_putu32(t, u->channel);
852 pa_tagstruct_puts(t, d);
853 pa_pstream_send_tagstruct(u->pstream, t);
854
855 pa_xfree(d);
856 }
857
858 /* Called from main context */
859 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
860 struct userdata *u = userdata;
861 pa_sample_spec ss;
862 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
863 uint32_t cookie;
864
865 pa_assert(pd);
866 pa_assert(u);
867
868 if (command != PA_COMMAND_REPLY) {
869 if (command == PA_COMMAND_ERROR)
870 pa_log("Failed to get info.");
871 else
872 pa_log("Protocol error.");
873 goto fail;
874 }
875
876 if (pa_tagstruct_gets(t, &server_name) < 0 ||
877 pa_tagstruct_gets(t, &server_version) < 0 ||
878 pa_tagstruct_gets(t, &user_name) < 0 ||
879 pa_tagstruct_gets(t, &host_name) < 0 ||
880 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
881 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
882 pa_tagstruct_gets(t, &default_source_name) < 0 ||
883 pa_tagstruct_getu32(t, &cookie) < 0) {
884
885 pa_log("Parse failure");
886 goto fail;
887 }
888
889 if (!pa_tagstruct_eof(t)) {
890 pa_log("Packet too long");
891 goto fail;
892 }
893
894 pa_xfree(u->server_fqdn);
895 u->server_fqdn = pa_xstrdup(host_name);
896
897 pa_xfree(u->user_name);
898 u->user_name = pa_xstrdup(user_name);
899
900 update_description(u);
901
902 return;
903
904 fail:
905 pa_module_unload_request(u->module, TRUE);
906 }
907
908 #ifdef TUNNEL_SINK
909
910 /* Called from main context */
911 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
912 struct userdata *u = userdata;
913 uint32_t idx, owner_module, monitor_source, flags;
914 const char *name, *description, *monitor_source_name, *driver;
915 pa_sample_spec ss;
916 pa_channel_map cm;
917 pa_cvolume volume;
918 pa_bool_t mute;
919 pa_usec_t latency;
920 pa_proplist *pl;
921
922 pa_assert(pd);
923 pa_assert(u);
924
925 pl = pa_proplist_new();
926
927 if (command != PA_COMMAND_REPLY) {
928 if (command == PA_COMMAND_ERROR)
929 pa_log("Failed to get info.");
930 else
931 pa_log("Protocol error.");
932 goto fail;
933 }
934
935 if (pa_tagstruct_getu32(t, &idx) < 0 ||
936 pa_tagstruct_gets(t, &name) < 0 ||
937 pa_tagstruct_gets(t, &description) < 0 ||
938 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
939 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
940 pa_tagstruct_getu32(t, &owner_module) < 0 ||
941 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
942 pa_tagstruct_get_boolean(t, &mute) < 0 ||
943 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
944 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
945 pa_tagstruct_get_usec(t, &latency) < 0 ||
946 pa_tagstruct_gets(t, &driver) < 0 ||
947 pa_tagstruct_getu32(t, &flags) < 0) {
948
949 pa_log("Parse failure");
950 goto fail;
951 }
952
953 if (u->version >= 13) {
954 pa_usec_t configured_latency;
955
956 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
957 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
958
959 pa_log("Parse failure");
960 goto fail;
961 }
962 }
963
964 if (!pa_tagstruct_eof(t)) {
965 pa_log("Packet too long");
966 goto fail;
967 }
968
969 pa_proplist_free(pl);
970
971 if (!u->sink_name || strcmp(name, u->sink_name))
972 return;
973
974 pa_xfree(u->device_description);
975 u->device_description = pa_xstrdup(description);
976
977 update_description(u);
978
979 return;
980
981 fail:
982 pa_module_unload_request(u->module, TRUE);
983 pa_proplist_free(pl);
984 }
985
986 /* Called from main context */
987 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
988 struct userdata *u = userdata;
989 uint32_t idx, owner_module, client, sink;
990 pa_usec_t buffer_usec, sink_usec;
991 const char *name, *driver, *resample_method;
992 pa_bool_t mute;
993 pa_sample_spec sample_spec;
994 pa_channel_map channel_map;
995 pa_cvolume volume;
996 pa_proplist *pl;
997
998 pa_assert(pd);
999 pa_assert(u);
1000
1001 pl = pa_proplist_new();
1002
1003 if (command != PA_COMMAND_REPLY) {
1004 if (command == PA_COMMAND_ERROR)
1005 pa_log("Failed to get info.");
1006 else
1007 pa_log("Protocol error.");
1008 goto fail;
1009 }
1010
1011 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1012 pa_tagstruct_gets(t, &name) < 0 ||
1013 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1014 pa_tagstruct_getu32(t, &client) < 0 ||
1015 pa_tagstruct_getu32(t, &sink) < 0 ||
1016 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1017 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1018 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1019 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1020 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1021 pa_tagstruct_gets(t, &resample_method) < 0 ||
1022 pa_tagstruct_gets(t, &driver) < 0) {
1023
1024 pa_log("Parse failure");
1025 goto fail;
1026 }
1027
1028 if (u->version >= 11) {
1029 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1030
1031 pa_log("Parse failure");
1032 goto fail;
1033 }
1034 }
1035
1036 if (u->version >= 13) {
1037 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1038
1039 pa_log("Parse failure");
1040 goto fail;
1041 }
1042 }
1043
1044 if (!pa_tagstruct_eof(t)) {
1045 pa_log("Packet too long");
1046 goto fail;
1047 }
1048
1049 pa_proplist_free(pl);
1050
1051 if (idx != u->device_index)
1052 return;
1053
1054 pa_assert(u->sink);
1055
1056 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1057 pa_cvolume_equal(&volume, &u->sink->volume))
1058 return;
1059
1060 memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
1061
1062 if (u->version >= 11)
1063 u->sink->muted = !!mute;
1064
1065 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
1066 return;
1067
1068 fail:
1069 pa_module_unload_request(u->module, TRUE);
1070 pa_proplist_free(pl);
1071 }
1072
1073 #else
1074
1075 /* Called from main context */
1076 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1077 struct userdata *u = userdata;
1078 uint32_t idx, owner_module, monitor_of_sink, flags;
1079 const char *name, *description, *monitor_of_sink_name, *driver;
1080 pa_sample_spec ss;
1081 pa_channel_map cm;
1082 pa_cvolume volume;
1083 pa_bool_t mute;
1084 pa_usec_t latency, configured_latency;
1085 pa_proplist *pl;
1086
1087 pa_assert(pd);
1088 pa_assert(u);
1089
1090 pl = pa_proplist_new();
1091
1092 if (command != PA_COMMAND_REPLY) {
1093 if (command == PA_COMMAND_ERROR)
1094 pa_log("Failed to get info.");
1095 else
1096 pa_log("Protocol error.");
1097 goto fail;
1098 }
1099
1100 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1101 pa_tagstruct_gets(t, &name) < 0 ||
1102 pa_tagstruct_gets(t, &description) < 0 ||
1103 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1104 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1105 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1106 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1107 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1108 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1109 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1110 pa_tagstruct_get_usec(t, &latency) < 0 ||
1111 pa_tagstruct_gets(t, &driver) < 0 ||
1112 pa_tagstruct_getu32(t, &flags) < 0) {
1113
1114 pa_log("Parse failure");
1115 goto fail;
1116 }
1117
1118 if (u->version >= 13) {
1119 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1120 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1121
1122 pa_log("Parse failure");
1123 goto fail;
1124 }
1125 }
1126
1127 if (!pa_tagstruct_eof(t)) {
1128 pa_log("Packet too long");
1129 goto fail;
1130 }
1131
1132 pa_proplist_free(pl);
1133
1134 if (!u->source_name || strcmp(name, u->source_name))
1135 return;
1136
1137 pa_xfree(u->device_description);
1138 u->device_description = pa_xstrdup(description);
1139
1140 update_description(u);
1141
1142 return;
1143
1144 fail:
1145 pa_module_unload_request(u->module, TRUE);
1146 pa_proplist_free(pl);
1147 }
1148
1149 #endif
1150
1151 /* Called from main context */
1152 static void request_info(struct userdata *u) {
1153 pa_tagstruct *t;
1154 uint32_t tag;
1155 pa_assert(u);
1156
1157 t = pa_tagstruct_new(NULL, 0);
1158 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1159 pa_tagstruct_putu32(t, tag = u->ctag++);
1160 pa_pstream_send_tagstruct(u->pstream, t);
1161 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1162
1163 #ifdef TUNNEL_SINK
1164 t = pa_tagstruct_new(NULL, 0);
1165 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1166 pa_tagstruct_putu32(t, tag = u->ctag++);
1167 pa_tagstruct_putu32(t, u->device_index);
1168 pa_pstream_send_tagstruct(u->pstream, t);
1169 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1170
1171 if (u->sink_name) {
1172 t = pa_tagstruct_new(NULL, 0);
1173 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1174 pa_tagstruct_putu32(t, tag = u->ctag++);
1175 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1176 pa_tagstruct_puts(t, u->sink_name);
1177 pa_pstream_send_tagstruct(u->pstream, t);
1178 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1179 }
1180 #else
1181 if (u->source_name) {
1182 t = pa_tagstruct_new(NULL, 0);
1183 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1184 pa_tagstruct_putu32(t, tag = u->ctag++);
1185 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1186 pa_tagstruct_puts(t, u->source_name);
1187 pa_pstream_send_tagstruct(u->pstream, t);
1188 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1189 }
1190 #endif
1191 }
1192
1193 /* Called from main context */
1194 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1195 struct userdata *u = userdata;
1196 pa_subscription_event_type_t e;
1197 uint32_t idx;
1198
1199 pa_assert(pd);
1200 pa_assert(t);
1201 pa_assert(u);
1202 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1203
1204 if (pa_tagstruct_getu32(t, &e) < 0 ||
1205 pa_tagstruct_getu32(t, &idx) < 0) {
1206 pa_log("Invalid protocol reply");
1207 pa_module_unload_request(u->module, TRUE);
1208 return;
1209 }
1210
1211 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1212 #ifdef TUNNEL_SINK
1213 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1214 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1215 #else
1216 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1217 #endif
1218 )
1219 return;
1220
1221 request_info(u);
1222 }
1223
1224 /* Called from main context */
1225 static void start_subscribe(struct userdata *u) {
1226 pa_tagstruct *t;
1227 uint32_t tag;
1228 pa_assert(u);
1229
1230 t = pa_tagstruct_new(NULL, 0);
1231 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1232 pa_tagstruct_putu32(t, tag = u->ctag++);
1233 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1234 #ifdef TUNNEL_SINK
1235 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1236 #else
1237 PA_SUBSCRIPTION_MASK_SOURCE
1238 #endif
1239 );
1240
1241 pa_pstream_send_tagstruct(u->pstream, t);
1242 }
1243
1244 /* Called from main context */
1245 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1246 struct userdata *u = userdata;
1247 struct timeval ntv;
1248 #ifdef TUNNEL_SINK
1249 uint32_t bytes;
1250 #endif
1251
1252 pa_assert(pd);
1253 pa_assert(u);
1254 pa_assert(u->pdispatch == pd);
1255
1256 if (command != PA_COMMAND_REPLY) {
1257 if (command == PA_COMMAND_ERROR)
1258 pa_log("Failed to create stream.");
1259 else
1260 pa_log("Protocol error.");
1261 goto fail;
1262 }
1263
1264 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1265 pa_tagstruct_getu32(t, &u->device_index) < 0
1266 #ifdef TUNNEL_SINK
1267 || pa_tagstruct_getu32(t, &bytes) < 0
1268 #endif
1269 )
1270 goto parse_error;
1271
1272 if (u->version >= 9) {
1273 #ifdef TUNNEL_SINK
1274 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1275 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1276 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1277 pa_tagstruct_getu32(t, &u->minreq) < 0)
1278 goto parse_error;
1279 #else
1280 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1281 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1282 goto parse_error;
1283 #endif
1284 }
1285
1286 if (u->version >= 12) {
1287 pa_sample_spec ss;
1288 pa_channel_map cm;
1289 uint32_t device_index;
1290 const char *dn;
1291 pa_bool_t suspended;
1292
1293 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1294 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1295 pa_tagstruct_getu32(t, &device_index) < 0 ||
1296 pa_tagstruct_gets(t, &dn) < 0 ||
1297 pa_tagstruct_get_boolean(t, &suspended) < 0)
1298 goto parse_error;
1299
1300 #ifdef TUNNEL_SINK
1301 pa_xfree(u->sink_name);
1302 u->sink_name = pa_xstrdup(dn);
1303 #else
1304 pa_xfree(u->source_name);
1305 u->source_name = pa_xstrdup(dn);
1306 #endif
1307 }
1308
1309 if (u->version >= 13) {
1310 pa_usec_t usec;
1311
1312 if (pa_tagstruct_get_usec(t, &usec) < 0)
1313 goto parse_error;
1314
1315 #ifdef TUNNEL_SINK
1316 pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
1317 #else
1318 pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
1319 #endif
1320 }
1321
1322 if (!pa_tagstruct_eof(t))
1323 goto parse_error;
1324
1325 start_subscribe(u);
1326 request_info(u);
1327
1328 pa_assert(!u->time_event);
1329 pa_gettimeofday(&ntv);
1330 ntv.tv_sec += LATENCY_INTERVAL;
1331 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1332
1333 request_latency(u);
1334
1335 pa_log_debug("Stream created.");
1336
1337 #ifdef TUNNEL_SINK
1338 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1339 #endif
1340
1341 return;
1342
1343 parse_error:
1344 pa_log("Invalid reply. (Create stream)");
1345
1346 fail:
1347 pa_module_unload_request(u->module, TRUE);
1348
1349 }
1350
1351 /* Called from main context */
1352 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1353 struct userdata *u = userdata;
1354 pa_tagstruct *reply;
1355 char name[256], un[128], hn[128];
1356 #ifdef TUNNEL_SINK
1357 pa_cvolume volume;
1358 #endif
1359
1360 pa_assert(pd);
1361 pa_assert(u);
1362 pa_assert(u->pdispatch == pd);
1363
1364 if (command != PA_COMMAND_REPLY ||
1365 pa_tagstruct_getu32(t, &u->version) < 0 ||
1366 !pa_tagstruct_eof(t)) {
1367
1368 if (command == PA_COMMAND_ERROR)
1369 pa_log("Failed to authenticate");
1370 else
1371 pa_log("Protocol error.");
1372
1373 goto fail;
1374 }
1375
1376 /* Minimum supported protocol version */
1377 if (u->version < 8) {
1378 pa_log("Incompatible protocol version");
1379 goto fail;
1380 }
1381
1382 /* Starting with protocol version 13 the MSB of the version tag
1383 reflects if shm is enabled for this connection or not. We don't
1384 support SHM here at all, so we just ignore this. */
1385
1386 if (u->version >= 13)
1387 u->version &= 0x7FFFFFFFU;
1388
1389 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1390
1391 #ifdef TUNNEL_SINK
1392 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1393 u->sink_name,
1394 pa_get_user_name(un, sizeof(un)),
1395 pa_get_host_name(hn, sizeof(hn)));
1396 #else
1397 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1398 u->source_name,
1399 pa_get_user_name(un, sizeof(un)),
1400 pa_get_host_name(hn, sizeof(hn)));
1401 #endif
1402
1403 reply = pa_tagstruct_new(NULL, 0);
1404 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1405 pa_tagstruct_putu32(reply, tag = u->ctag++);
1406
1407 if (u->version >= 13) {
1408 pa_proplist *pl;
1409 pl = pa_proplist_new();
1410 pa_init_proplist(pl);
1411 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1412 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1413 pa_tagstruct_put_proplist(reply, pl);
1414 pa_proplist_free(pl);
1415 } else
1416 pa_tagstruct_puts(reply, "PulseAudio");
1417
1418 pa_pstream_send_tagstruct(u->pstream, reply);
1419 /* We ignore the server's reply here */
1420
1421 reply = pa_tagstruct_new(NULL, 0);
1422
1423 if (u->version < 13)
1424 /* Only for older PA versions we need to fill in the maxlength */
1425 u->maxlength = 4*1024*1024;
1426
1427 #ifdef TUNNEL_SINK
1428 u->tlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1429 u->minreq = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1430 u->prebuf = u->tlength;
1431 #else
1432 u->fragsize = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1433 #endif
1434
1435 #ifdef TUNNEL_SINK
1436 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1437 pa_tagstruct_putu32(reply, tag = u->ctag++);
1438
1439 if (u->version < 13)
1440 pa_tagstruct_puts(reply, name);
1441
1442 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1443 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1444 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1445 pa_tagstruct_puts(reply, u->sink_name);
1446 pa_tagstruct_putu32(reply, u->maxlength);
1447 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1448 pa_tagstruct_putu32(reply, u->tlength);
1449 pa_tagstruct_putu32(reply, u->prebuf);
1450 pa_tagstruct_putu32(reply, u->minreq);
1451 pa_tagstruct_putu32(reply, 0);
1452 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1453 pa_tagstruct_put_cvolume(reply, &volume);
1454 #else
1455 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1456 pa_tagstruct_putu32(reply, tag = u->ctag++);
1457
1458 if (u->version < 13)
1459 pa_tagstruct_puts(reply, name);
1460
1461 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1462 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1463 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1464 pa_tagstruct_puts(reply, u->source_name);
1465 pa_tagstruct_putu32(reply, u->maxlength);
1466 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1467 pa_tagstruct_putu32(reply, u->fragsize);
1468 #endif
1469
1470 if (u->version >= 12) {
1471 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1472 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1473 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1474 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1475 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1476 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1477 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1478 }
1479
1480 if (u->version >= 13) {
1481 pa_proplist *pl;
1482
1483 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1484 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1485
1486 pl = pa_proplist_new();
1487 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1488 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1489 pa_tagstruct_put_proplist(reply, pl);
1490 pa_proplist_free(pl);
1491
1492 #ifndef TUNNEL_SINK
1493 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1494 #endif
1495 }
1496
1497 pa_pstream_send_tagstruct(u->pstream, reply);
1498 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1499
1500 pa_log_debug("Connection authenticated, creating stream ...");
1501
1502 return;
1503
1504 fail:
1505 pa_module_unload_request(u->module, TRUE);
1506 }
1507
1508 /* Called from main context */
1509 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1510 struct userdata *u = userdata;
1511
1512 pa_assert(p);
1513 pa_assert(u);
1514
1515 pa_log_warn("Stream died.");
1516 pa_module_unload_request(u->module, TRUE);
1517 }
1518
1519 /* Called from main context */
1520 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1521 struct userdata *u = userdata;
1522
1523 pa_assert(p);
1524 pa_assert(packet);
1525 pa_assert(u);
1526
1527 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1528 pa_log("Invalid packet");
1529 pa_module_unload_request(u->module, TRUE);
1530 return;
1531 }
1532 }
1533
1534 #ifndef TUNNEL_SINK
1535 /* Called from main context */
1536 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1537 struct userdata *u = userdata;
1538
1539 pa_assert(p);
1540 pa_assert(chunk);
1541 pa_assert(u);
1542
1543 if (channel != u->channel) {
1544 pa_log("Recieved memory block on bad channel.");
1545 pa_module_unload_request(u->module, TRUE);
1546 return;
1547 }
1548
1549 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1550
1551 u->counter_delta += chunk->length;
1552 }
1553
1554 #endif
1555
1556 /* Called from main context */
1557 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1558 struct userdata *u = userdata;
1559 pa_tagstruct *t;
1560 uint32_t tag;
1561
1562 pa_assert(sc);
1563 pa_assert(u);
1564 pa_assert(u->client == sc);
1565
1566 pa_socket_client_unref(u->client);
1567 u->client = NULL;
1568
1569 if (!io) {
1570 pa_log("Connection failed: %s", pa_cstrerror(errno));
1571 pa_module_unload_request(u->module, TRUE);
1572 return;
1573 }
1574
1575 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1576 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1577
1578 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1579 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1580 #ifndef TUNNEL_SINK
1581 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1582 #endif
1583
1584 t = pa_tagstruct_new(NULL, 0);
1585 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1586 pa_tagstruct_putu32(t, tag = u->ctag++);
1587 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1588
1589 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1590
1591 #ifdef HAVE_CREDS
1592 {
1593 pa_creds ucred;
1594
1595 if (pa_iochannel_creds_supported(io))
1596 pa_iochannel_creds_enable(io);
1597
1598 ucred.uid = getuid();
1599 ucred.gid = getgid();
1600
1601 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1602 }
1603 #else
1604 pa_pstream_send_tagstruct(u->pstream, t);
1605 #endif
1606
1607 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1608
1609 pa_log_debug("Connection established, authenticating ...");
1610 }
1611
1612 #ifdef TUNNEL_SINK
1613
1614 /* Called from main context */
1615 static int sink_set_volume(pa_sink *sink) {
1616 struct userdata *u;
1617 pa_tagstruct *t;
1618 uint32_t tag;
1619
1620 pa_assert(sink);
1621 u = sink->userdata;
1622 pa_assert(u);
1623
1624 t = pa_tagstruct_new(NULL, 0);
1625 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1626 pa_tagstruct_putu32(t, tag = u->ctag++);
1627 pa_tagstruct_putu32(t, u->device_index);
1628 pa_tagstruct_put_cvolume(t, &sink->volume);
1629 pa_pstream_send_tagstruct(u->pstream, t);
1630
1631 return 0;
1632 }
1633
1634 /* Called from main context */
1635 static int sink_set_mute(pa_sink *sink) {
1636 struct userdata *u;
1637 pa_tagstruct *t;
1638 uint32_t tag;
1639
1640 pa_assert(sink);
1641 u = sink->userdata;
1642 pa_assert(u);
1643
1644 if (u->version < 11)
1645 return -1;
1646
1647 t = pa_tagstruct_new(NULL, 0);
1648 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1649 pa_tagstruct_putu32(t, tag = u->ctag++);
1650 pa_tagstruct_putu32(t, u->device_index);
1651 pa_tagstruct_put_boolean(t, !!sink->muted);
1652 pa_pstream_send_tagstruct(u->pstream, t);
1653
1654 return 0;
1655 }
1656
1657 #endif
1658
1659 int pa__init(pa_module*m) {
1660 pa_modargs *ma = NULL;
1661 struct userdata *u = NULL;
1662 pa_sample_spec ss;
1663 pa_channel_map map;
1664 char *dn = NULL;
1665 #ifdef TUNNEL_SINK
1666 pa_sink_new_data data;
1667 #else
1668 pa_source_new_data data;
1669 #endif
1670
1671 pa_assert(m);
1672
1673 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1674 pa_log("Failed to parse module arguments");
1675 goto fail;
1676 }
1677
1678 m->userdata = u = pa_xnew0(struct userdata, 1);
1679 u->core = m->core;
1680 u->module = m;
1681 u->client = NULL;
1682 u->pdispatch = NULL;
1683 u->pstream = NULL;
1684 u->server_name = NULL;
1685 #ifdef TUNNEL_SINK
1686 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1687 u->sink = NULL;
1688 u->requested_bytes = 0;
1689 #else
1690 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1691 u->source = NULL;
1692 #endif
1693 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1694 u->ctag = 1;
1695 u->device_index = u->channel = PA_INVALID_INDEX;
1696 u->time_event = NULL;
1697 u->ignore_latency_before = 0;
1698 u->transport_usec = 0;
1699 u->transport_usec_valid = FALSE;
1700 u->remote_suspended = u->remote_corked = FALSE;
1701 u->counter = u->counter_delta = 0;
1702
1703 u->rtpoll = pa_rtpoll_new();
1704 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1705
1706 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1707 goto fail;
1708
1709 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1710 pa_log("No server specified.");
1711 goto fail;
1712 }
1713
1714 ss = m->core->default_sample_spec;
1715 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1716 pa_log("Invalid sample format specification");
1717 goto fail;
1718 }
1719
1720 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1721 pa_log("Failed to connect to server '%s'", u->server_name);
1722 goto fail;
1723 }
1724
1725 pa_socket_client_set_callback(u->client, on_connection, u);
1726
1727 #ifdef TUNNEL_SINK
1728
1729 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1730 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1731
1732 pa_sink_new_data_init(&data);
1733 data.driver = __FILE__;
1734 data.module = m;
1735 data.namereg_fail = TRUE;
1736 pa_sink_new_data_set_name(&data, dn);
1737 pa_sink_new_data_set_sample_spec(&data, &ss);
1738 pa_sink_new_data_set_channel_map(&data, &map);
1739 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1740 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1741 if (u->sink_name)
1742 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1743
1744 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1745 pa_sink_new_data_done(&data);
1746
1747 if (!u->sink) {
1748 pa_log("Failed to create sink.");
1749 goto fail;
1750 }
1751
1752 u->sink->parent.process_msg = sink_process_msg;
1753 u->sink->userdata = u;
1754 u->sink->set_state = sink_set_state;
1755 u->sink->set_volume = sink_set_volume;
1756 u->sink->set_mute = sink_set_mute;
1757
1758 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1759
1760 pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
1761
1762 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1763 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1764
1765 #else
1766
1767 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1768 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1769
1770 pa_source_new_data_init(&data);
1771 data.driver = __FILE__;
1772 data.module = m;
1773 data.namereg_fail = TRUE;
1774 pa_source_new_data_set_name(&data, dn);
1775 pa_source_new_data_set_sample_spec(&data, &ss);
1776 pa_source_new_data_set_channel_map(&data, &map);
1777 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1778 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1779 if (u->source_name)
1780 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1781
1782 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1783 pa_source_new_data_done(&data);
1784
1785 if (!u->source) {
1786 pa_log("Failed to create source.");
1787 goto fail;
1788 }
1789
1790 u->source->parent.process_msg = source_process_msg;
1791 u->source->set_state = source_set_state;
1792 u->source->userdata = u;
1793
1794 pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
1795
1796 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1797 pa_source_set_rtpoll(u->source, u->rtpoll);
1798 #endif
1799
1800 pa_xfree(dn);
1801
1802 u->time_event = NULL;
1803
1804 u->maxlength = 0;
1805 #ifdef TUNNEL_SINK
1806 u->tlength = u->minreq = u->prebuf = 0;
1807 #else
1808 u->fragsize = 0;
1809 #endif
1810
1811 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1812
1813 if (!(u->thread = pa_thread_new(thread_func, u))) {
1814 pa_log("Failed to create thread.");
1815 goto fail;
1816 }
1817
1818 #ifdef TUNNEL_SINK
1819 pa_sink_put(u->sink);
1820 #else
1821 pa_source_put(u->source);
1822 #endif
1823
1824 pa_modargs_free(ma);
1825
1826 return 0;
1827
1828 fail:
1829 pa__done(m);
1830
1831 if (ma)
1832 pa_modargs_free(ma);
1833
1834 pa_xfree(dn);
1835
1836 return -1;
1837 }
1838
1839 void pa__done(pa_module*m) {
1840 struct userdata* u;
1841
1842 pa_assert(m);
1843
1844 if (!(u = m->userdata))
1845 return;
1846
1847 #ifdef TUNNEL_SINK
1848 if (u->sink)
1849 pa_sink_unlink(u->sink);
1850 #else
1851 if (u->source)
1852 pa_source_unlink(u->source);
1853 #endif
1854
1855 if (u->thread) {
1856 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1857 pa_thread_free(u->thread);
1858 }
1859
1860 pa_thread_mq_done(&u->thread_mq);
1861
1862 #ifdef TUNNEL_SINK
1863 if (u->sink)
1864 pa_sink_unref(u->sink);
1865 #else
1866 if (u->source)
1867 pa_source_unref(u->source);
1868 #endif
1869
1870 if (u->rtpoll)
1871 pa_rtpoll_free(u->rtpoll);
1872
1873 if (u->pstream) {
1874 pa_pstream_unlink(u->pstream);
1875 pa_pstream_unref(u->pstream);
1876 }
1877
1878 if (u->pdispatch)
1879 pa_pdispatch_unref(u->pdispatch);
1880
1881 if (u->client)
1882 pa_socket_client_unref(u->client);
1883
1884 if (u->auth_cookie)
1885 pa_auth_cookie_unref(u->auth_cookie);
1886
1887 if (u->smoother)
1888 pa_smoother_free(u->smoother);
1889
1890 if (u->time_event)
1891 u->core->mainloop->time_free(u->time_event);
1892
1893 #ifdef TUNNEL_SINK
1894 pa_xfree(u->sink_name);
1895 #else
1896 pa_xfree(u->source_name);
1897 #endif
1898 pa_xfree(u->server_name);
1899
1900 pa_xfree(u->device_description);
1901 pa_xfree(u->server_fqdn);
1902 pa_xfree(u->user_name);
1903
1904 pa_xfree(u);
1905 }