]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
Merge dead branch 'glitch-free'
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/authkey-prop.h>
52 #include <pulsecore/time-smoother.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/thread-mq.h>
55 #include <pulsecore/rtclock.h>
56 #include <pulsecore/core-error.h>
57 #include <pulsecore/proplist-util.h>
58
59 #ifdef TUNNEL_SINK
60 #include "module-tunnel-sink-symdef.h"
61 #else
62 #include "module-tunnel-source-symdef.h"
63 #endif
64
65 #ifdef TUNNEL_SINK
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 PA_MODULE_USAGE(
68 "server=<address> "
69 "sink=<remote sink name> "
70 "cookie=<filename> "
71 "format=<sample format> "
72 "channels=<number of channels> "
73 "rate=<sample rate> "
74 "sink_name=<name for the local sink> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "server=<address> "
80 "source=<remote source name> "
81 "cookie=<filename> "
82 "format=<sample format> "
83 "channels=<number of channels> "
84 "rate=<sample rate> "
85 "source_name=<name for the local source> "
86 "channel_map=<channel map>");
87 #endif
88
89 PA_MODULE_AUTHOR("Lennart Poettering");
90 PA_MODULE_VERSION(PACKAGE_VERSION);
91 PA_MODULE_LOAD_ONCE(FALSE);
92
93 static const char* const valid_modargs[] = {
94 "server",
95 "cookie",
96 "format",
97 "channels",
98 "rate",
99 #ifdef TUNNEL_SINK
100 "sink_name",
101 "sink",
102 #else
103 "source_name",
104 "source",
105 #endif
106 "channel_map",
107 NULL,
108 };
109
110 #define DEFAULT_TIMEOUT 5
111
112 #define LATENCY_INTERVAL 10
113
114 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
115
116 #ifdef TUNNEL_SINK
117
118 enum {
119 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
120 SINK_MESSAGE_REMOTE_SUSPEND,
121 SINK_MESSAGE_UPDATE_LATENCY,
122 SINK_MESSAGE_POST
123 };
124
125 #define DEFAULT_TLENGTH_MSEC 150
126 #define DEFAULT_MINREQ_MSEC 25
127
128 #else
129
130 enum {
131 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
132 SOURCE_MESSAGE_REMOTE_SUSPEND,
133 SOURCE_MESSAGE_UPDATE_LATENCY
134 };
135
136 #define DEFAULT_FRAGSIZE_MSEC 25
137
138 #endif
139
140 #ifdef TUNNEL_SINK
141 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
143 #endif
144 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149
150 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
151 #ifdef TUNNEL_SINK
152 [PA_COMMAND_REQUEST] = command_request,
153 [PA_COMMAND_STARTED] = command_started,
154 #endif
155 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
156 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
157 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
158 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
159 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
160 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
161 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
162 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
163 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
164 };
165
166 struct userdata {
167 pa_core *core;
168 pa_module *module;
169
170 pa_thread_mq thread_mq;
171 pa_rtpoll *rtpoll;
172 pa_thread *thread;
173
174 pa_socket_client *client;
175 pa_pstream *pstream;
176 pa_pdispatch *pdispatch;
177
178 char *server_name;
179 #ifdef TUNNEL_SINK
180 char *sink_name;
181 pa_sink *sink;
182 int32_t requested_bytes;
183 #else
184 char *source_name;
185 pa_source *source;
186 #endif
187
188 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
189
190 uint32_t version;
191 uint32_t ctag;
192 uint32_t device_index;
193 uint32_t channel;
194
195 int64_t counter, counter_delta;
196
197 pa_bool_t remote_corked:1;
198 pa_bool_t remote_suspended:1;
199
200 pa_usec_t transport_usec;
201 pa_bool_t transport_usec_valid;
202
203 uint32_t ignore_latency_before;
204
205 pa_time_event *time_event;
206
207 pa_bool_t auth_cookie_in_property;
208
209 pa_smoother *smoother;
210
211 char *device_description;
212 char *server_fqdn;
213 char *user_name;
214
215 uint32_t maxlength;
216 #ifdef TUNNEL_SINK
217 uint32_t tlength;
218 uint32_t minreq;
219 uint32_t prebuf;
220 #else
221 uint32_t fragsize;
222 #endif
223 };
224
225 static void request_latency(struct userdata *u);
226
227 /* Called from main context */
228 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
229 struct userdata *u = userdata;
230
231 pa_assert(pd);
232 pa_assert(t);
233 pa_assert(u);
234 pa_assert(u->pdispatch == pd);
235
236 pa_log_warn("Stream killed");
237 pa_module_unload_request(u->module);
238 }
239
240 /* Called from main context */
241 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
242 struct userdata *u = userdata;
243
244 pa_assert(pd);
245 pa_assert(t);
246 pa_assert(u);
247 pa_assert(u->pdispatch == pd);
248
249 pa_log_info("Server signalled buffer overrun/underrun.");
250 request_latency(u);
251 }
252
253 /* Called from main context */
254 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
255 struct userdata *u = userdata;
256 uint32_t channel;
257 pa_bool_t suspended;
258
259 pa_assert(pd);
260 pa_assert(t);
261 pa_assert(u);
262 pa_assert(u->pdispatch == pd);
263
264 if (pa_tagstruct_getu32(t, &channel) < 0 ||
265 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
266 !pa_tagstruct_eof(t)) {
267 pa_log("Invalid packet");
268 pa_module_unload_request(u->module);
269 return;
270 }
271
272 #ifdef TUNNEL_SINK
273 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
274 #else
275 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
276 #endif
277
278 request_latency(u);
279 }
280
281 /* Called from main context */
282 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
283 struct userdata *u = userdata;
284
285 pa_assert(pd);
286 pa_assert(t);
287 pa_assert(u);
288 pa_assert(u->pdispatch == pd);
289
290 pa_log_debug("Server reports a stream move.");
291 request_latency(u);
292 }
293
294 #ifdef TUNNEL_SINK
295
296 /* Called from main context */
297 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
298 struct userdata *u = userdata;
299
300 pa_assert(pd);
301 pa_assert(t);
302 pa_assert(u);
303 pa_assert(u->pdispatch == pd);
304
305 pa_log_debug("Server reports playback started.");
306 request_latency(u);
307 }
308
309 #endif
310
311 /* Called from IO thread context */
312 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
313 pa_usec_t x;
314 pa_assert(u);
315
316 if (u->remote_corked == cork)
317 return;
318
319 u->remote_corked = cork;
320 x = pa_rtclock_usec();
321
322 /* Correct by the time this needs to travel to the other side.
323 * This is a valid thread-safe access, because the main thread is
324 * waiting for us */
325 if (u->transport_usec_valid)
326 x += u->transport_usec;
327
328 if (u->remote_suspended || u->remote_corked)
329 pa_smoother_pause(u->smoother, x);
330 else
331 pa_smoother_resume(u->smoother, x);
332 }
333
334 /* Called from main context */
335 static void stream_cork(struct userdata *u, pa_bool_t cork) {
336 pa_tagstruct *t;
337 pa_assert(u);
338
339 if (!u->pstream)
340 return;
341
342 t = pa_tagstruct_new(NULL, 0);
343 #ifdef TUNNEL_SINK
344 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
345 #else
346 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
347 #endif
348 pa_tagstruct_putu32(t, u->ctag++);
349 pa_tagstruct_putu32(t, u->channel);
350 pa_tagstruct_put_boolean(t, !!cork);
351 pa_pstream_send_tagstruct(u->pstream, t);
352
353 request_latency(u);
354 }
355
356 /* Called from IO thread context */
357 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
358 pa_usec_t x;
359 pa_assert(u);
360
361 if (u->remote_suspended == suspend)
362 return;
363
364 u->remote_suspended = suspend;
365
366 x = pa_rtclock_usec();
367
368 /* Correct by the time this needed to travel from the other side.
369 * This is a valid thread-safe access, because the main thread is
370 * waiting for us */
371 if (u->transport_usec_valid)
372 x -= u->transport_usec;
373
374 if (u->remote_suspended || u->remote_corked)
375 pa_smoother_pause(u->smoother, x);
376 else
377 pa_smoother_resume(u->smoother, x);
378 }
379
380 #ifdef TUNNEL_SINK
381
382 /* Called from IO thread context */
383 static void send_data(struct userdata *u) {
384 pa_assert(u);
385
386 while (u->requested_bytes > 0) {
387 pa_memchunk memchunk;
388
389 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
390 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
391 pa_memblock_unref(memchunk.memblock);
392
393 u->requested_bytes -= memchunk.length;
394
395 u->counter += memchunk.length;
396 }
397 }
398
399 /* This function is called from IO context -- except when it is not. */
400 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
401 struct userdata *u = PA_SINK(o)->userdata;
402
403 switch (code) {
404
405 case PA_SINK_MESSAGE_SET_STATE: {
406 int r;
407
408 /* First, change the state, because otherwide pa_sink_render() would fail */
409 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
410
411 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
412
413 if (PA_SINK_IS_OPENED(u->sink->state))
414 send_data(u);
415 }
416
417 return r;
418 }
419
420 case PA_SINK_MESSAGE_GET_LATENCY: {
421 pa_usec_t yl, yr, *usec = data;
422
423 yl = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
424 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
425
426 *usec = yl > yr ? yl - yr : 0;
427 return 0;
428 }
429
430 case SINK_MESSAGE_REQUEST:
431
432 pa_assert(offset > 0);
433 u->requested_bytes += (size_t) offset;
434
435 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
436 send_data(u);
437
438 return 0;
439
440
441 case SINK_MESSAGE_REMOTE_SUSPEND:
442
443 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
444 return 0;
445
446
447 case SINK_MESSAGE_UPDATE_LATENCY: {
448 pa_usec_t y;
449
450 y = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
451
452 if (y > (pa_usec_t) offset || offset < 0)
453 y -= offset;
454 else
455 y = 0;
456
457 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
458
459 return 0;
460 }
461
462 case SINK_MESSAGE_POST:
463
464 /* OK, This might be a bit confusing. This message is
465 * delivered to us from the main context -- NOT from the
466 * IO thread context where the rest of the messages are
467 * dispatched. Yeah, ugly, but I am a lazy bastard. */
468
469 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
470
471 u->counter_delta += chunk->length;
472
473 return 0;
474 }
475
476 return pa_sink_process_msg(o, code, data, offset, chunk);
477 }
478
479 /* Called from main context */
480 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
481 struct userdata *u;
482 pa_sink_assert_ref(s);
483 u = s->userdata;
484
485 switch ((pa_sink_state_t) state) {
486
487 case PA_SINK_SUSPENDED:
488 pa_assert(PA_SINK_IS_OPENED(s->state));
489 stream_cork(u, TRUE);
490 break;
491
492 case PA_SINK_IDLE:
493 case PA_SINK_RUNNING:
494 if (s->state == PA_SINK_SUSPENDED)
495 stream_cork(u, FALSE);
496 break;
497
498 case PA_SINK_UNLINKED:
499 case PA_SINK_INIT:
500 ;
501 }
502
503 return 0;
504 }
505
506 #else
507
508 /* This function is called from IO context -- except when it is not. */
509 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
510 struct userdata *u = PA_SOURCE(o)->userdata;
511
512 switch (code) {
513
514 case PA_SINK_MESSAGE_SET_STATE: {
515 int r;
516
517 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0)
518 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
519
520 return r;
521 }
522
523 case PA_SOURCE_MESSAGE_GET_LATENCY: {
524 pa_usec_t yr, yl, *usec = data;
525
526 yl = pa_bytes_to_usec(u->counter, &PA_SINK(o)->sample_spec);
527 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
528
529 *usec = yr > yl ? yr - yl : 0;
530 return 0;
531 }
532
533 case SOURCE_MESSAGE_POST:
534
535 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
536 pa_source_post(u->source, chunk);
537
538 u->counter += chunk->length;
539
540 return 0;
541
542 case SOURCE_MESSAGE_REMOTE_SUSPEND:
543
544 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
545 return 0;
546
547 case SOURCE_MESSAGE_UPDATE_LATENCY: {
548 pa_usec_t y;
549
550 y = pa_bytes_to_usec(u->counter, &u->source->sample_spec);
551
552 if (offset >= 0 || y > (pa_usec_t) -offset)
553 y += offset;
554 else
555 y = 0;
556
557 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
558
559 return 0;
560 }
561 }
562
563 return pa_source_process_msg(o, code, data, offset, chunk);
564 }
565
566 /* Called from main context */
567 static int source_set_state(pa_source *s, pa_source_state_t state) {
568 struct userdata *u;
569 pa_source_assert_ref(s);
570 u = s->userdata;
571
572 switch ((pa_source_state_t) state) {
573
574 case PA_SOURCE_SUSPENDED:
575 pa_assert(PA_SOURCE_IS_OPENED(s->state));
576 stream_cork(u, TRUE);
577 break;
578
579 case PA_SOURCE_IDLE:
580 case PA_SOURCE_RUNNING:
581 if (s->state == PA_SOURCE_SUSPENDED)
582 stream_cork(u, FALSE);
583 break;
584
585 case PA_SOURCE_UNLINKED:
586 case PA_SOURCE_INIT:
587 ;
588 }
589
590 return 0;
591 }
592
593 #endif
594
595 static void thread_func(void *userdata) {
596 struct userdata *u = userdata;
597
598 pa_assert(u);
599
600 pa_log_debug("Thread starting up");
601
602 pa_thread_mq_install(&u->thread_mq);
603 pa_rtpoll_install(u->rtpoll);
604
605 for (;;) {
606 int ret;
607
608 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
609 goto fail;
610
611 if (ret == 0)
612 goto finish;
613 }
614
615 fail:
616 /* If this was no regular exit from the loop we have to continue
617 * processing messages until we received PA_MESSAGE_SHUTDOWN */
618 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
619 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
620
621 finish:
622 pa_log_debug("Thread shutting down");
623 }
624
625 #ifdef TUNNEL_SINK
626 /* Called from main context */
627 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
628 struct userdata *u = userdata;
629 uint32_t bytes, channel;
630
631 pa_assert(pd);
632 pa_assert(command == PA_COMMAND_REQUEST);
633 pa_assert(t);
634 pa_assert(u);
635 pa_assert(u->pdispatch == pd);
636
637 if (pa_tagstruct_getu32(t, &channel) < 0 ||
638 pa_tagstruct_getu32(t, &bytes) < 0) {
639 pa_log("Invalid protocol reply");
640 goto fail;
641 }
642
643 if (channel != u->channel) {
644 pa_log("Recieved data for invalid channel");
645 goto fail;
646 }
647
648 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
649 return;
650
651 fail:
652 pa_module_unload_request(u->module);
653 }
654
655 #endif
656
657 /* Called from main context */
658 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
659 struct userdata *u = userdata;
660 pa_usec_t sink_usec, source_usec, transport_usec;
661 pa_bool_t playing;
662 int64_t write_index, read_index;
663 struct timeval local, remote, now;
664 pa_sample_spec *ss;
665 int64_t delay;
666
667 pa_assert(pd);
668 pa_assert(u);
669
670 if (command != PA_COMMAND_REPLY) {
671 if (command == PA_COMMAND_ERROR)
672 pa_log("Failed to get latency.");
673 else
674 pa_log("Protocol error.");
675 goto fail;
676 }
677
678 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
679 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
680 pa_tagstruct_get_boolean(t, &playing) < 0 ||
681 pa_tagstruct_get_timeval(t, &local) < 0 ||
682 pa_tagstruct_get_timeval(t, &remote) < 0 ||
683 pa_tagstruct_gets64(t, &write_index) < 0 ||
684 pa_tagstruct_gets64(t, &read_index) < 0) {
685 pa_log("Invalid reply.");
686 goto fail;
687 }
688
689 #ifdef TUNNEL_SINK
690 if (u->version >= 13) {
691 uint64_t underrun_for = 0, playing_for = 0;
692
693 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
694 pa_tagstruct_getu64(t, &playing_for) < 0) {
695 pa_log("Invalid reply.");
696 goto fail;
697 }
698 }
699 #endif
700
701 if (!pa_tagstruct_eof(t)) {
702 pa_log("Invalid reply.");
703 goto fail;
704 }
705
706 if (tag < u->ignore_latency_before) {
707 request_latency(u);
708 return;
709 }
710
711 pa_gettimeofday(&now);
712
713 /* Calculate transport usec */
714 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
715 /* local and remote seem to have synchronized clocks */
716 #ifdef TUNNEL_SINK
717 u->transport_usec = pa_timeval_diff(&remote, &local);
718 #else
719 u->transport_usec = pa_timeval_diff(&now, &remote);
720 #endif
721 } else
722 u->transport_usec = pa_timeval_diff(&now, &local)/2;
723 u->transport_usec_valid = TRUE;
724
725 /* First, take the device's delay */
726 #ifdef TUNNEL_SINK
727 delay = (int64_t) sink_usec;
728 ss = &u->sink->sample_spec;
729 #else
730 delay = (int64_t) source_usec;
731 ss = &u->source->sample_spec;
732 #endif
733
734 /* Add the length of our server-side buffer */
735 if (write_index >= read_index)
736 delay += (int64_t) pa_bytes_to_usec(write_index-read_index, ss);
737 else
738 delay -= (int64_t) pa_bytes_to_usec(read_index-write_index, ss);
739
740 /* Our measurements are already out of date, hence correct by the *
741 * transport latency */
742 #ifdef TUNNEL_SINK
743 delay -= (int64_t) transport_usec;
744 #else
745 delay += (int64_t) transport_usec;
746 #endif
747
748 /* Now correct by what we have have read/written since we requested the update */
749 #ifdef TUNNEL_SINK
750 delay += (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
751 #else
752 delay -= (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
753 #endif
754
755 #ifdef TUNNEL_SINK
756 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
757 #else
758 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
759 #endif
760
761 return;
762
763 fail:
764
765 pa_module_unload_request(u->module);
766 }
767
768 /* Called from main context */
769 static void request_latency(struct userdata *u) {
770 pa_tagstruct *t;
771 struct timeval now;
772 uint32_t tag;
773 pa_assert(u);
774
775 t = pa_tagstruct_new(NULL, 0);
776 #ifdef TUNNEL_SINK
777 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
778 #else
779 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
780 #endif
781 pa_tagstruct_putu32(t, tag = u->ctag++);
782 pa_tagstruct_putu32(t, u->channel);
783
784 pa_gettimeofday(&now);
785 pa_tagstruct_put_timeval(t, &now);
786
787 pa_pstream_send_tagstruct(u->pstream, t);
788 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
789
790 u->ignore_latency_before = tag;
791 u->counter_delta = 0;
792 }
793
794 /* Called from main context */
795 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
796 struct userdata *u = userdata;
797 struct timeval ntv;
798
799 pa_assert(m);
800 pa_assert(e);
801 pa_assert(u);
802
803 request_latency(u);
804
805 pa_gettimeofday(&ntv);
806 ntv.tv_sec += LATENCY_INTERVAL;
807 m->time_restart(e, &ntv);
808 }
809
810 /* Called from main context */
811 static void update_description(struct userdata *u) {
812 char *d;
813 char un[128], hn[128];
814 pa_tagstruct *t;
815
816 pa_assert(u);
817
818 if (!u->server_fqdn || !u->user_name || !u->device_description)
819 return;
820
821 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
822
823 #ifdef TUNNEL_SINK
824 pa_sink_set_description(u->sink, d);
825 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
826 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
827 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
828 #else
829 pa_source_set_description(u->source, d);
830 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
831 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
832 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
833 #endif
834
835 pa_xfree(d);
836
837 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
838 pa_get_user_name(un, sizeof(un)),
839 pa_get_host_name(hn, sizeof(hn)));
840
841 t = pa_tagstruct_new(NULL, 0);
842 #ifdef TUNNEL_SINK
843 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
844 #else
845 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
846 #endif
847 pa_tagstruct_putu32(t, u->ctag++);
848 pa_tagstruct_putu32(t, u->channel);
849 pa_tagstruct_puts(t, d);
850 pa_pstream_send_tagstruct(u->pstream, t);
851
852 pa_xfree(d);
853 }
854
855 /* Called from main context */
856 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
857 struct userdata *u = userdata;
858 pa_sample_spec ss;
859 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
860 uint32_t cookie;
861
862 pa_assert(pd);
863 pa_assert(u);
864
865 if (command != PA_COMMAND_REPLY) {
866 if (command == PA_COMMAND_ERROR)
867 pa_log("Failed to get info.");
868 else
869 pa_log("Protocol error.");
870 goto fail;
871 }
872
873 if (pa_tagstruct_gets(t, &server_name) < 0 ||
874 pa_tagstruct_gets(t, &server_version) < 0 ||
875 pa_tagstruct_gets(t, &user_name) < 0 ||
876 pa_tagstruct_gets(t, &host_name) < 0 ||
877 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
878 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
879 pa_tagstruct_gets(t, &default_source_name) < 0 ||
880 pa_tagstruct_getu32(t, &cookie) < 0) {
881
882 pa_log("Parse failure");
883 goto fail;
884 }
885
886 if (!pa_tagstruct_eof(t)) {
887 pa_log("Packet too long");
888 goto fail;
889 }
890
891 pa_xfree(u->server_fqdn);
892 u->server_fqdn = pa_xstrdup(host_name);
893
894 pa_xfree(u->user_name);
895 u->user_name = pa_xstrdup(user_name);
896
897 update_description(u);
898
899 return;
900
901 fail:
902 pa_module_unload_request(u->module);
903 }
904
905 #ifdef TUNNEL_SINK
906
907 /* Called from main context */
908 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
909 struct userdata *u = userdata;
910 uint32_t idx, owner_module, monitor_source, flags;
911 const char *name, *description, *monitor_source_name, *driver;
912 pa_sample_spec ss;
913 pa_channel_map cm;
914 pa_cvolume volume;
915 pa_bool_t mute;
916 pa_usec_t latency;
917 pa_proplist *pl;
918
919 pa_assert(pd);
920 pa_assert(u);
921
922 pl = pa_proplist_new();
923
924 if (command != PA_COMMAND_REPLY) {
925 if (command == PA_COMMAND_ERROR)
926 pa_log("Failed to get info.");
927 else
928 pa_log("Protocol error.");
929 goto fail;
930 }
931
932 if (pa_tagstruct_getu32(t, &idx) < 0 ||
933 pa_tagstruct_gets(t, &name) < 0 ||
934 pa_tagstruct_gets(t, &description) < 0 ||
935 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
936 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
937 pa_tagstruct_getu32(t, &owner_module) < 0 ||
938 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
939 pa_tagstruct_get_boolean(t, &mute) < 0 ||
940 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
941 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
942 pa_tagstruct_get_usec(t, &latency) < 0 ||
943 pa_tagstruct_gets(t, &driver) < 0 ||
944 pa_tagstruct_getu32(t, &flags) < 0) {
945
946 pa_log("Parse failure");
947 goto fail;
948 }
949
950 if (u->version >= 13) {
951 pa_usec_t configured_latency;
952
953 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
954 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
955
956 pa_log("Parse failure");
957 goto fail;
958 }
959 }
960
961 if (!pa_tagstruct_eof(t)) {
962 pa_log("Packet too long");
963 goto fail;
964 }
965
966 pa_proplist_free(pl);
967
968 if (!u->sink_name || strcmp(name, u->sink_name))
969 return;
970
971 pa_xfree(u->device_description);
972 u->device_description = pa_xstrdup(description);
973
974 update_description(u);
975
976 return;
977
978 fail:
979 pa_module_unload_request(u->module);
980 pa_proplist_free(pl);
981 }
982
983 /* Called from main context */
984 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
985 struct userdata *u = userdata;
986 uint32_t idx, owner_module, client, sink;
987 pa_usec_t buffer_usec, sink_usec;
988 const char *name, *driver, *resample_method;
989 pa_bool_t mute;
990 pa_sample_spec sample_spec;
991 pa_channel_map channel_map;
992 pa_cvolume volume;
993 pa_proplist *pl;
994
995 pa_assert(pd);
996 pa_assert(u);
997
998 pl = pa_proplist_new();
999
1000 if (command != PA_COMMAND_REPLY) {
1001 if (command == PA_COMMAND_ERROR)
1002 pa_log("Failed to get info.");
1003 else
1004 pa_log("Protocol error.");
1005 goto fail;
1006 }
1007
1008 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1009 pa_tagstruct_gets(t, &name) < 0 ||
1010 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1011 pa_tagstruct_getu32(t, &client) < 0 ||
1012 pa_tagstruct_getu32(t, &sink) < 0 ||
1013 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1014 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1015 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1016 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1017 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1018 pa_tagstruct_gets(t, &resample_method) < 0 ||
1019 pa_tagstruct_gets(t, &driver) < 0) {
1020
1021 pa_log("Parse failure");
1022 goto fail;
1023 }
1024
1025 if (u->version >= 11) {
1026 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1027
1028 pa_log("Parse failure");
1029 goto fail;
1030 }
1031 }
1032
1033 if (u->version >= 13) {
1034 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1035
1036 pa_log("Parse failure");
1037 goto fail;
1038 }
1039 }
1040
1041 if (!pa_tagstruct_eof(t)) {
1042 pa_log("Packet too long");
1043 goto fail;
1044 }
1045
1046 pa_proplist_free(pl);
1047
1048 if (idx != u->device_index)
1049 return;
1050
1051 pa_assert(u->sink);
1052
1053 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1054 pa_cvolume_equal(&volume, &u->sink->volume))
1055 return;
1056
1057 memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
1058
1059 if (u->version >= 11)
1060 u->sink->muted = !!mute;
1061
1062 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
1063 return;
1064
1065 fail:
1066 pa_module_unload_request(u->module);
1067 pa_proplist_free(pl);
1068 }
1069
1070 #else
1071
1072 /* Called from main context */
1073 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1074 struct userdata *u = userdata;
1075 uint32_t idx, owner_module, monitor_of_sink, flags;
1076 const char *name, *description, *monitor_of_sink_name, *driver;
1077 pa_sample_spec ss;
1078 pa_channel_map cm;
1079 pa_cvolume volume;
1080 pa_bool_t mute;
1081 pa_usec_t latency, configured_latency;
1082 pa_proplist *pl;
1083
1084 pa_assert(pd);
1085 pa_assert(u);
1086
1087 pl = pa_proplist_new();
1088
1089 if (command != PA_COMMAND_REPLY) {
1090 if (command == PA_COMMAND_ERROR)
1091 pa_log("Failed to get info.");
1092 else
1093 pa_log("Protocol error.");
1094 goto fail;
1095 }
1096
1097 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1098 pa_tagstruct_gets(t, &name) < 0 ||
1099 pa_tagstruct_gets(t, &description) < 0 ||
1100 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1101 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1102 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1103 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1104 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1105 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1106 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1107 pa_tagstruct_get_usec(t, &latency) < 0 ||
1108 pa_tagstruct_gets(t, &driver) < 0 ||
1109 pa_tagstruct_getu32(t, &flags) < 0) {
1110
1111 pa_log("Parse failure");
1112 goto fail;
1113 }
1114
1115 if (u->version >= 13) {
1116 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1117 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1118
1119 pa_log("Parse failure");
1120 goto fail;
1121 }
1122 }
1123
1124 if (!pa_tagstruct_eof(t)) {
1125 pa_log("Packet too long");
1126 goto fail;
1127 }
1128
1129 pa_proplist_free(pl);
1130
1131 if (!u->source_name || strcmp(name, u->source_name))
1132 return;
1133
1134 pa_xfree(u->device_description);
1135 u->device_description = pa_xstrdup(description);
1136
1137 update_description(u);
1138
1139 return;
1140
1141 fail:
1142 pa_module_unload_request(u->module);
1143 pa_proplist_free(pl);
1144 }
1145
1146 #endif
1147
1148 /* Called from main context */
1149 static void request_info(struct userdata *u) {
1150 pa_tagstruct *t;
1151 uint32_t tag;
1152 pa_assert(u);
1153
1154 t = pa_tagstruct_new(NULL, 0);
1155 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1156 pa_tagstruct_putu32(t, tag = u->ctag++);
1157 pa_pstream_send_tagstruct(u->pstream, t);
1158 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1159
1160 #ifdef TUNNEL_SINK
1161 t = pa_tagstruct_new(NULL, 0);
1162 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1163 pa_tagstruct_putu32(t, tag = u->ctag++);
1164 pa_tagstruct_putu32(t, u->device_index);
1165 pa_pstream_send_tagstruct(u->pstream, t);
1166 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1167
1168 if (u->sink_name) {
1169 t = pa_tagstruct_new(NULL, 0);
1170 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1171 pa_tagstruct_putu32(t, tag = u->ctag++);
1172 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1173 pa_tagstruct_puts(t, u->sink_name);
1174 pa_pstream_send_tagstruct(u->pstream, t);
1175 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1176 }
1177 #else
1178 if (u->source_name) {
1179 t = pa_tagstruct_new(NULL, 0);
1180 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1181 pa_tagstruct_putu32(t, tag = u->ctag++);
1182 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1183 pa_tagstruct_puts(t, u->source_name);
1184 pa_pstream_send_tagstruct(u->pstream, t);
1185 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1186 }
1187 #endif
1188 }
1189
1190 /* Called from main context */
1191 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1192 struct userdata *u = userdata;
1193 pa_subscription_event_type_t e;
1194 uint32_t idx;
1195
1196 pa_assert(pd);
1197 pa_assert(t);
1198 pa_assert(u);
1199 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1200
1201 if (pa_tagstruct_getu32(t, &e) < 0 ||
1202 pa_tagstruct_getu32(t, &idx) < 0) {
1203 pa_log("Invalid protocol reply");
1204 pa_module_unload_request(u->module);
1205 return;
1206 }
1207
1208 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1209 #ifdef TUNNEL_SINK
1210 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1211 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1212 #else
1213 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1214 #endif
1215 )
1216 return;
1217
1218 request_info(u);
1219 }
1220
1221 /* Called from main context */
1222 static void start_subscribe(struct userdata *u) {
1223 pa_tagstruct *t;
1224 uint32_t tag;
1225 pa_assert(u);
1226
1227 t = pa_tagstruct_new(NULL, 0);
1228 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1229 pa_tagstruct_putu32(t, tag = u->ctag++);
1230 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1231 #ifdef TUNNEL_SINK
1232 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1233 #else
1234 PA_SUBSCRIPTION_MASK_SOURCE
1235 #endif
1236 );
1237
1238 pa_pstream_send_tagstruct(u->pstream, t);
1239 }
1240
1241 /* Called from main context */
1242 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1243 struct userdata *u = userdata;
1244 struct timeval ntv;
1245 #ifdef TUNNEL_SINK
1246 uint32_t bytes;
1247 #endif
1248
1249 pa_assert(pd);
1250 pa_assert(u);
1251 pa_assert(u->pdispatch == pd);
1252
1253 if (command != PA_COMMAND_REPLY) {
1254 if (command == PA_COMMAND_ERROR)
1255 pa_log("Failed to create stream.");
1256 else
1257 pa_log("Protocol error.");
1258 goto fail;
1259 }
1260
1261 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1262 pa_tagstruct_getu32(t, &u->device_index) < 0
1263 #ifdef TUNNEL_SINK
1264 || pa_tagstruct_getu32(t, &bytes) < 0
1265 #endif
1266 )
1267 goto parse_error;
1268
1269 if (u->version >= 9) {
1270 #ifdef TUNNEL_SINK
1271 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1272 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1273 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1274 pa_tagstruct_getu32(t, &u->minreq) < 0)
1275 goto parse_error;
1276 #else
1277 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1278 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1279 goto parse_error;
1280 #endif
1281 }
1282
1283 if (u->version >= 12) {
1284 pa_sample_spec ss;
1285 pa_channel_map cm;
1286 uint32_t device_index;
1287 const char *dn;
1288 pa_bool_t suspended;
1289
1290 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1291 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1292 pa_tagstruct_getu32(t, &device_index) < 0 ||
1293 pa_tagstruct_gets(t, &dn) < 0 ||
1294 pa_tagstruct_get_boolean(t, &suspended) < 0)
1295 goto parse_error;
1296
1297 #ifdef TUNNEL_SINK
1298 pa_xfree(u->sink_name);
1299 u->sink_name = pa_xstrdup(dn);
1300 #else
1301 pa_xfree(u->source_name);
1302 u->source_name = pa_xstrdup(dn);
1303 #endif
1304 }
1305
1306 if (u->version >= 13) {
1307 pa_usec_t usec;
1308
1309 if (pa_tagstruct_get_usec(t, &usec) < 0)
1310 goto parse_error;
1311
1312 #ifdef TUNNEL_SINK
1313 pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
1314 #else
1315 pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
1316 #endif
1317 }
1318
1319 if (!pa_tagstruct_eof(t))
1320 goto parse_error;
1321
1322 start_subscribe(u);
1323 request_info(u);
1324
1325 pa_assert(!u->time_event);
1326 pa_gettimeofday(&ntv);
1327 ntv.tv_sec += LATENCY_INTERVAL;
1328 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1329
1330 request_latency(u);
1331
1332 pa_log_debug("Stream created.");
1333
1334 #ifdef TUNNEL_SINK
1335 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1336 #endif
1337
1338 return;
1339
1340 parse_error:
1341 pa_log("Invalid reply. (Create stream)");
1342
1343 fail:
1344 pa_module_unload_request(u->module);
1345
1346 }
1347
1348 /* Called from main context */
1349 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1350 struct userdata *u = userdata;
1351 pa_tagstruct *reply;
1352 char name[256], un[128], hn[128];
1353 #ifdef TUNNEL_SINK
1354 pa_cvolume volume;
1355 #endif
1356
1357 pa_assert(pd);
1358 pa_assert(u);
1359 pa_assert(u->pdispatch == pd);
1360
1361 if (command != PA_COMMAND_REPLY ||
1362 pa_tagstruct_getu32(t, &u->version) < 0 ||
1363 !pa_tagstruct_eof(t)) {
1364
1365 if (command == PA_COMMAND_ERROR)
1366 pa_log("Failed to authenticate");
1367 else
1368 pa_log("Protocol error.");
1369
1370 goto fail;
1371 }
1372
1373 /* Minimum supported protocol version */
1374 if (u->version < 8) {
1375 pa_log("Incompatible protocol version");
1376 goto fail;
1377 }
1378
1379 /* Starting with protocol version 13 the MSB of the version tag
1380 reflects if shm is enabled for this connection or not. We don't
1381 support SHM here at all, so we just ignore this. */
1382
1383 if (u->version >= 13)
1384 u->version &= 0x7FFFFFFFU;
1385
1386 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1387
1388 #ifdef TUNNEL_SINK
1389 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1390 u->sink_name,
1391 pa_get_user_name(un, sizeof(un)),
1392 pa_get_host_name(hn, sizeof(hn)));
1393 #else
1394 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1395 u->source_name,
1396 pa_get_user_name(un, sizeof(un)),
1397 pa_get_host_name(hn, sizeof(hn)));
1398 #endif
1399
1400 reply = pa_tagstruct_new(NULL, 0);
1401 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1402 pa_tagstruct_putu32(reply, tag = u->ctag++);
1403
1404 if (u->version >= 13) {
1405 pa_proplist *pl;
1406 pl = pa_proplist_new();
1407 pa_init_proplist(pl);
1408 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1409 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1410 pa_tagstruct_put_proplist(reply, pl);
1411 pa_proplist_free(pl);
1412 } else
1413 pa_tagstruct_puts(reply, "PulseAudio");
1414
1415 pa_pstream_send_tagstruct(u->pstream, reply);
1416 /* We ignore the server's reply here */
1417
1418 reply = pa_tagstruct_new(NULL, 0);
1419
1420 if (u->version < 13)
1421 /* Only for older PA versions we need to fill in the maxlength */
1422 u->maxlength = 4*1024*1024;
1423
1424 #ifdef TUNNEL_SINK
1425 u->tlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1426 u->minreq = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1427 u->prebuf = u->tlength;
1428 #else
1429 u->fragsize = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1430 #endif
1431
1432 #ifdef TUNNEL_SINK
1433 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1434 pa_tagstruct_putu32(reply, tag = u->ctag++);
1435
1436 if (u->version < 13)
1437 pa_tagstruct_puts(reply, name);
1438
1439 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1440 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1441 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1442 pa_tagstruct_puts(reply, u->sink_name);
1443 pa_tagstruct_putu32(reply, u->maxlength);
1444 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1445 pa_tagstruct_putu32(reply, u->tlength);
1446 pa_tagstruct_putu32(reply, u->prebuf);
1447 pa_tagstruct_putu32(reply, u->minreq);
1448 pa_tagstruct_putu32(reply, 0);
1449 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1450 pa_tagstruct_put_cvolume(reply, &volume);
1451 #else
1452 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1453 pa_tagstruct_putu32(reply, tag = u->ctag++);
1454
1455 if (u->version < 13)
1456 pa_tagstruct_puts(reply, name);
1457
1458 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1459 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1460 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1461 pa_tagstruct_puts(reply, u->source_name);
1462 pa_tagstruct_putu32(reply, u->maxlength);
1463 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1464 pa_tagstruct_putu32(reply, u->fragsize);
1465 #endif
1466
1467 if (u->version >= 12) {
1468 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1469 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1470 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1471 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1472 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1473 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1474 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1475 }
1476
1477 if (u->version >= 13) {
1478 pa_proplist *pl;
1479
1480 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1481 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1482
1483 pl = pa_proplist_new();
1484 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1485 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1486 pa_tagstruct_put_proplist(reply, pl);
1487 pa_proplist_free(pl);
1488
1489 #ifndef TUNNEL_SINK
1490 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1491 #endif
1492 }
1493
1494 pa_pstream_send_tagstruct(u->pstream, reply);
1495 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1496
1497 pa_log_debug("Connection authenticated, creating stream ...");
1498
1499 return;
1500
1501 fail:
1502 pa_module_unload_request(u->module);
1503 }
1504
1505 /* Called from main context */
1506 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1507 struct userdata *u = userdata;
1508
1509 pa_assert(p);
1510 pa_assert(u);
1511
1512 pa_log_warn("Stream died.");
1513 pa_module_unload_request(u->module);
1514 }
1515
1516 /* Called from main context */
1517 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1518 struct userdata *u = userdata;
1519
1520 pa_assert(p);
1521 pa_assert(packet);
1522 pa_assert(u);
1523
1524 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1525 pa_log("Invalid packet");
1526 pa_module_unload_request(u->module);
1527 return;
1528 }
1529 }
1530
1531 #ifndef TUNNEL_SINK
1532 /* Called from main context */
1533 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1534 struct userdata *u = userdata;
1535
1536 pa_assert(p);
1537 pa_assert(chunk);
1538 pa_assert(u);
1539
1540 if (channel != u->channel) {
1541 pa_log("Recieved memory block on bad channel.");
1542 pa_module_unload_request(u->module);
1543 return;
1544 }
1545
1546 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1547
1548 u->counter_delta += chunk->length;
1549 }
1550
1551 #endif
1552
1553 /* Called from main context */
1554 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1555 struct userdata *u = userdata;
1556 pa_tagstruct *t;
1557 uint32_t tag;
1558
1559 pa_assert(sc);
1560 pa_assert(u);
1561 pa_assert(u->client == sc);
1562
1563 pa_socket_client_unref(u->client);
1564 u->client = NULL;
1565
1566 if (!io) {
1567 pa_log("Connection failed: %s", pa_cstrerror(errno));
1568 pa_module_unload_request(u->module);
1569 return;
1570 }
1571
1572 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1573 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1574
1575 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1576 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1577 #ifndef TUNNEL_SINK
1578 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1579 #endif
1580
1581 t = pa_tagstruct_new(NULL, 0);
1582 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1583 pa_tagstruct_putu32(t, tag = u->ctag++);
1584 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1585 pa_tagstruct_put_arbitrary(t, u->auth_cookie, sizeof(u->auth_cookie));
1586
1587 #ifdef HAVE_CREDS
1588 {
1589 pa_creds ucred;
1590
1591 if (pa_iochannel_creds_supported(io))
1592 pa_iochannel_creds_enable(io);
1593
1594 ucred.uid = getuid();
1595 ucred.gid = getgid();
1596
1597 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1598 }
1599 #else
1600 pa_pstream_send_tagstruct(u->pstream, t);
1601 #endif
1602
1603 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1604
1605 pa_log_debug("Connection established, authenticating ...");
1606 }
1607
1608 #ifdef TUNNEL_SINK
1609
1610 /* Called from main context */
1611 static int sink_set_volume(pa_sink *sink) {
1612 struct userdata *u;
1613 pa_tagstruct *t;
1614 uint32_t tag;
1615
1616 pa_assert(sink);
1617 u = sink->userdata;
1618 pa_assert(u);
1619
1620 t = pa_tagstruct_new(NULL, 0);
1621 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1622 pa_tagstruct_putu32(t, tag = u->ctag++);
1623 pa_tagstruct_putu32(t, u->device_index);
1624 pa_tagstruct_put_cvolume(t, &sink->volume);
1625 pa_pstream_send_tagstruct(u->pstream, t);
1626
1627 return 0;
1628 }
1629
1630 /* Called from main context */
1631 static int sink_set_mute(pa_sink *sink) {
1632 struct userdata *u;
1633 pa_tagstruct *t;
1634 uint32_t tag;
1635
1636 pa_assert(sink);
1637 u = sink->userdata;
1638 pa_assert(u);
1639
1640 if (u->version < 11)
1641 return -1;
1642
1643 t = pa_tagstruct_new(NULL, 0);
1644 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1645 pa_tagstruct_putu32(t, tag = u->ctag++);
1646 pa_tagstruct_putu32(t, u->device_index);
1647 pa_tagstruct_put_boolean(t, !!sink->muted);
1648 pa_pstream_send_tagstruct(u->pstream, t);
1649
1650 return 0;
1651 }
1652
1653 #endif
1654
1655 /* Called from main context */
1656 static int load_key(struct userdata *u, const char*fn) {
1657 pa_assert(u);
1658
1659 u->auth_cookie_in_property = FALSE;
1660
1661 if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) {
1662 pa_log_debug("Using already loaded auth cookie.");
1663 pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1664 u->auth_cookie_in_property = 1;
1665 return 0;
1666 }
1667
1668 if (!fn)
1669 fn = PA_NATIVE_COOKIE_FILE;
1670
1671 if (pa_authkey_load_auto(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0)
1672 return -1;
1673
1674 pa_log_debug("Loading cookie from disk.");
1675
1676 if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0)
1677 u->auth_cookie_in_property = TRUE;
1678
1679 return 0;
1680 }
1681
1682 int pa__init(pa_module*m) {
1683 pa_modargs *ma = NULL;
1684 struct userdata *u = NULL;
1685 pa_sample_spec ss;
1686 pa_channel_map map;
1687 char *dn = NULL;
1688 #ifdef TUNNEL_SINK
1689 pa_sink_new_data data;
1690 #else
1691 pa_source_new_data data;
1692 #endif
1693
1694 pa_assert(m);
1695
1696 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1697 pa_log("Failed to parse module arguments");
1698 goto fail;
1699 }
1700
1701 m->userdata = u = pa_xnew0(struct userdata, 1);
1702 u->core = m->core;
1703 u->module = m;
1704 u->client = NULL;
1705 u->pdispatch = NULL;
1706 u->pstream = NULL;
1707 u->server_name = NULL;
1708 #ifdef TUNNEL_SINK
1709 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1710 u->sink = NULL;
1711 u->requested_bytes = 0;
1712 #else
1713 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1714 u->source = NULL;
1715 #endif
1716 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1717 u->ctag = 1;
1718 u->device_index = u->channel = PA_INVALID_INDEX;
1719 u->auth_cookie_in_property = FALSE;
1720 u->time_event = NULL;
1721 u->ignore_latency_before = 0;
1722 u->transport_usec = 0;
1723 u->transport_usec_valid = FALSE;
1724 u->remote_suspended = u->remote_corked = FALSE;
1725 u->counter = u->counter_delta = 0;
1726
1727 u->rtpoll = pa_rtpoll_new();
1728 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1729
1730 if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0)
1731 goto fail;
1732
1733 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1734 pa_log("No server specified.");
1735 goto fail;
1736 }
1737
1738 ss = m->core->default_sample_spec;
1739 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1740 pa_log("Invalid sample format specification");
1741 goto fail;
1742 }
1743
1744 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1745 pa_log("Failed to connect to server '%s'", u->server_name);
1746 goto fail;
1747 }
1748
1749 pa_socket_client_set_callback(u->client, on_connection, u);
1750
1751 #ifdef TUNNEL_SINK
1752
1753 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1754 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1755
1756 pa_sink_new_data_init(&data);
1757 data.driver = __FILE__;
1758 data.module = m;
1759 data.namereg_fail = TRUE;
1760 pa_sink_new_data_set_name(&data, dn);
1761 pa_sink_new_data_set_sample_spec(&data, &ss);
1762 pa_sink_new_data_set_channel_map(&data, &map);
1763 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1764 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1765 if (u->sink_name)
1766 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1767
1768 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1769 pa_sink_new_data_done(&data);
1770
1771 if (!u->sink) {
1772 pa_log("Failed to create sink.");
1773 goto fail;
1774 }
1775
1776 u->sink->parent.process_msg = sink_process_msg;
1777 u->sink->userdata = u;
1778 u->sink->set_state = sink_set_state;
1779 u->sink->set_volume = sink_set_volume;
1780 u->sink->set_mute = sink_set_mute;
1781
1782 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1783
1784 pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
1785
1786 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1787 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1788
1789 #else
1790
1791 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1792 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1793
1794 pa_source_new_data_init(&data);
1795 data.driver = __FILE__;
1796 data.module = m;
1797 data.namereg_fail = TRUE;
1798 pa_source_new_data_set_name(&data, dn);
1799 pa_source_new_data_set_sample_spec(&data, &ss);
1800 pa_source_new_data_set_channel_map(&data, &map);
1801 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1802 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1803 if (u->source_name)
1804 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1805
1806 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1807 pa_source_new_data_done(&data);
1808
1809 if (!u->source) {
1810 pa_log("Failed to create source.");
1811 goto fail;
1812 }
1813
1814 u->source->parent.process_msg = source_process_msg;
1815 u->source->set_state = source_set_state;
1816 u->source->userdata = u;
1817
1818 pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
1819
1820 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1821 pa_source_set_rtpoll(u->source, u->rtpoll);
1822 #endif
1823
1824 pa_xfree(dn);
1825
1826 u->time_event = NULL;
1827
1828 u->maxlength = 0;
1829 #ifdef TUNNEL_SINK
1830 u->tlength = u->minreq = u->prebuf = 0;
1831 #else
1832 u->fragsize = 0;
1833 #endif
1834
1835 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1836
1837 if (!(u->thread = pa_thread_new(thread_func, u))) {
1838 pa_log("Failed to create thread.");
1839 goto fail;
1840 }
1841
1842 #ifdef TUNNEL_SINK
1843 pa_sink_put(u->sink);
1844 #else
1845 pa_source_put(u->source);
1846 #endif
1847
1848 pa_modargs_free(ma);
1849
1850 return 0;
1851
1852 fail:
1853 pa__done(m);
1854
1855 if (ma)
1856 pa_modargs_free(ma);
1857
1858 pa_xfree(dn);
1859
1860 return -1;
1861 }
1862
1863 void pa__done(pa_module*m) {
1864 struct userdata* u;
1865
1866 pa_assert(m);
1867
1868 if (!(u = m->userdata))
1869 return;
1870
1871 #ifdef TUNNEL_SINK
1872 if (u->sink)
1873 pa_sink_unlink(u->sink);
1874 #else
1875 if (u->source)
1876 pa_source_unlink(u->source);
1877 #endif
1878
1879 if (u->thread) {
1880 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1881 pa_thread_free(u->thread);
1882 }
1883
1884 pa_thread_mq_done(&u->thread_mq);
1885
1886 #ifdef TUNNEL_SINK
1887 if (u->sink)
1888 pa_sink_unref(u->sink);
1889 #else
1890 if (u->source)
1891 pa_source_unref(u->source);
1892 #endif
1893
1894 if (u->rtpoll)
1895 pa_rtpoll_free(u->rtpoll);
1896
1897 if (u->pstream) {
1898 pa_pstream_unlink(u->pstream);
1899 pa_pstream_unref(u->pstream);
1900 }
1901
1902 if (u->pdispatch)
1903 pa_pdispatch_unref(u->pdispatch);
1904
1905 if (u->client)
1906 pa_socket_client_unref(u->client);
1907
1908 if (u->auth_cookie_in_property)
1909 pa_authkey_prop_unref(m->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1910
1911 if (u->smoother)
1912 pa_smoother_free(u->smoother);
1913
1914 if (u->time_event)
1915 u->core->mainloop->time_free(u->time_event);
1916
1917 #ifdef TUNNEL_SINK
1918 pa_xfree(u->sink_name);
1919 #else
1920 pa_xfree(u->source_name);
1921 #endif
1922 pa_xfree(u->server_name);
1923
1924 pa_xfree(u->device_description);
1925 pa_xfree(u->server_fqdn);
1926 pa_xfree(u->user_name);
1927
1928 pa_xfree(u);
1929 }