]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
bring module-tunnel back to life
[pulseaudio] / src / modules / module-tunnel.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <unistd.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
40
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
59 #include <pulsecore/proplist-util.h>
60
61 #ifdef TUNNEL_SINK
62 #include "module-tunnel-sink-symdef.h"
63 #else
64 #include "module-tunnel-source-symdef.h"
65 #endif
66
67 #ifdef TUNNEL_SINK
68 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
69 PA_MODULE_USAGE(
70 "server=<address> "
71 "sink=<remote sink name> "
72 "cookie=<filename> "
73 "format=<sample format> "
74 "channels=<number of channels> "
75 "rate=<sample rate> "
76 "sink_name=<name for the local sink> "
77 "channel_map=<channel map>");
78 #else
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 PA_MODULE_USAGE(
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "source_name=<name for the local source> "
88 "channel_map=<channel map>");
89 #endif
90
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION);
93 PA_MODULE_LOAD_ONCE(FALSE);
94
95 static const char* const valid_modargs[] = {
96 "server",
97 "cookie",
98 "format",
99 "channels",
100 "rate",
101 #ifdef TUNNEL_SINK
102 "sink_name",
103 "sink",
104 #else
105 "source_name",
106 "source",
107 #endif
108 "channel_map",
109 NULL,
110 };
111
112 #define DEFAULT_TIMEOUT 5
113
114 #define LATENCY_INTERVAL 10
115
116 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
117
118 #ifdef TUNNEL_SINK
119
120 enum {
121 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
122 SINK_MESSAGE_REMOTE_SUSPEND,
123 SINK_MESSAGE_UPDATE_LATENCY,
124 SINK_MESSAGE_POST
125 };
126
127 #define DEFAULT_TLENGTH_MSEC 150
128 #define DEFAULT_MINREQ_MSEC 25
129
130 #else
131
132 enum {
133 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
134 SOURCE_MESSAGE_REMOTE_SUSPEND,
135 SOURCE_MESSAGE_UPDATE_LATENCY
136 };
137
138 #define DEFAULT_FRAGSIZE_MSEC 25
139
140 #endif
141
142 #ifdef TUNNEL_SINK
143 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 #endif
146 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151
152 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
153 #ifdef TUNNEL_SINK
154 [PA_COMMAND_REQUEST] = command_request,
155 [PA_COMMAND_STARTED] = command_started,
156 #endif
157 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
158 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
159 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
160 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
161 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
162 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
163 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
164 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
165 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
166 };
167
168 struct userdata {
169 pa_core *core;
170 pa_module *module;
171
172 pa_thread_mq thread_mq;
173 pa_rtpoll *rtpoll;
174 pa_thread *thread;
175
176 pa_socket_client *client;
177 pa_pstream *pstream;
178 pa_pdispatch *pdispatch;
179
180 char *server_name;
181 #ifdef TUNNEL_SINK
182 char *sink_name;
183 pa_sink *sink;
184 int32_t requested_bytes;
185 #else
186 char *source_name;
187 pa_source *source;
188 #endif
189
190 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
191
192 uint32_t version;
193 uint32_t ctag;
194 uint32_t device_index;
195 uint32_t channel;
196
197 int64_t counter, counter_delta;
198
199 pa_bool_t remote_corked:1;
200 pa_bool_t remote_suspended:1;
201
202 pa_usec_t transport_usec;
203 pa_bool_t transport_usec_valid;
204
205 uint32_t ignore_latency_before;
206
207 pa_time_event *time_event;
208
209 pa_bool_t auth_cookie_in_property;
210
211 pa_smoother *smoother;
212
213 char *device_description;
214 char *server_fqdn;
215 char *user_name;
216
217 uint32_t maxlength;
218 #ifdef TUNNEL_SINK
219 uint32_t tlength;
220 uint32_t minreq;
221 uint32_t prebuf;
222 #else
223 uint32_t fragsize;
224 #endif
225 };
226
227 static void request_latency(struct userdata *u);
228
229 /* Called from main context */
230 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
231 struct userdata *u = userdata;
232
233 pa_assert(pd);
234 pa_assert(t);
235 pa_assert(u);
236 pa_assert(u->pdispatch == pd);
237
238 pa_log_warn("Stream killed");
239 pa_module_unload_request(u->module);
240 }
241
242 /* Called from main context */
243 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
244 struct userdata *u = userdata;
245
246 pa_assert(pd);
247 pa_assert(t);
248 pa_assert(u);
249 pa_assert(u->pdispatch == pd);
250
251 pa_log_info("Server signalled buffer overrun/underrun.");
252 request_latency(u);
253 }
254
255 /* Called from main context */
256 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
257 struct userdata *u = userdata;
258 uint32_t channel;
259 pa_bool_t suspended;
260
261 pa_assert(pd);
262 pa_assert(t);
263 pa_assert(u);
264 pa_assert(u->pdispatch == pd);
265
266 if (pa_tagstruct_getu32(t, &channel) < 0 ||
267 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
268 !pa_tagstruct_eof(t)) {
269 pa_log("Invalid packet");
270 pa_module_unload_request(u->module);
271 return;
272 }
273
274 #ifdef TUNNEL_SINK
275 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
276 #else
277 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
278 #endif
279
280 request_latency(u);
281 }
282
283 /* Called from main context */
284 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
285 struct userdata *u = userdata;
286
287 pa_assert(pd);
288 pa_assert(t);
289 pa_assert(u);
290 pa_assert(u->pdispatch == pd);
291
292 pa_log_debug("Server reports a stream move.");
293 request_latency(u);
294 }
295
296 #ifdef TUNNEL_SINK
297
298 /* Called from main context */
299 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301
302 pa_assert(pd);
303 pa_assert(t);
304 pa_assert(u);
305 pa_assert(u->pdispatch == pd);
306
307 pa_log_debug("Server reports playback started.");
308 request_latency(u);
309 }
310
311 #endif
312
313 /* Called from IO thread context */
314 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
315 pa_usec_t x;
316 pa_assert(u);
317
318 if (u->remote_corked == cork)
319 return;
320
321 u->remote_corked = cork;
322 x = pa_rtclock_usec();
323
324 /* Correct by the time this needs to travel to the other side.
325 * This is a valid thread-safe access, because the main thread is
326 * waiting for us */
327 if (u->transport_usec_valid)
328 x += u->transport_usec;
329
330 if (u->remote_suspended || u->remote_corked)
331 pa_smoother_pause(u->smoother, x);
332 else
333 pa_smoother_resume(u->smoother, x);
334 }
335
336 /* Called from main context */
337 static void stream_cork(struct userdata *u, pa_bool_t cork) {
338 pa_tagstruct *t;
339 pa_assert(u);
340
341 if (!u->pstream)
342 return;
343
344 t = pa_tagstruct_new(NULL, 0);
345 #ifdef TUNNEL_SINK
346 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
347 #else
348 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
349 #endif
350 pa_tagstruct_putu32(t, u->ctag++);
351 pa_tagstruct_putu32(t, u->channel);
352 pa_tagstruct_put_boolean(t, !!cork);
353 pa_pstream_send_tagstruct(u->pstream, t);
354
355 request_latency(u);
356 }
357
358 /* Called from IO thread context */
359 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
360 pa_usec_t x;
361 pa_assert(u);
362
363 if (u->remote_suspended == suspend)
364 return;
365
366 u->remote_suspended = suspend;
367
368 x = pa_rtclock_usec();
369
370 /* Correct by the time this needed to travel from the other side.
371 * This is a valid thread-safe access, because the main thread is
372 * waiting for us */
373 if (u->transport_usec_valid)
374 x -= u->transport_usec;
375
376 if (u->remote_suspended || u->remote_corked)
377 pa_smoother_pause(u->smoother, x);
378 else
379 pa_smoother_resume(u->smoother, x);
380 }
381
382 #ifdef TUNNEL_SINK
383
384 /* Called from IO thread context */
385 static void send_data(struct userdata *u) {
386 pa_assert(u);
387
388 while (u->requested_bytes > 0) {
389 pa_memchunk memchunk;
390
391 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
392 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
393 pa_memblock_unref(memchunk.memblock);
394
395 u->requested_bytes -= memchunk.length;
396
397 u->counter += memchunk.length;
398 }
399 }
400
401 /* This function is called from IO context -- except when it is not. */
402 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
403 struct userdata *u = PA_SINK(o)->userdata;
404
405 switch (code) {
406
407 case PA_SINK_MESSAGE_SET_STATE: {
408 int r;
409
410 /* First, change the state, because otherwide pa_sink_render() would fail */
411 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
412
413 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
414
415 if (PA_SINK_IS_OPENED(u->sink->state))
416 send_data(u);
417 }
418
419 return r;
420 }
421
422 case PA_SINK_MESSAGE_GET_LATENCY: {
423 pa_usec_t yl, yr, *usec = data;
424
425 yl = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
426 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
427
428 *usec = yl > yr ? yl - yr : 0;
429 return 0;
430 }
431
432 case SINK_MESSAGE_REQUEST:
433
434 pa_assert(offset > 0);
435 u->requested_bytes += (size_t) offset;
436
437 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
438 send_data(u);
439
440 return 0;
441
442
443 case SINK_MESSAGE_REMOTE_SUSPEND:
444
445 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
446 return 0;
447
448
449 case SINK_MESSAGE_UPDATE_LATENCY: {
450 pa_usec_t y;
451
452 y = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
453
454 if (y > (pa_usec_t) offset || offset < 0)
455 y -= offset;
456 else
457 y = 0;
458
459 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
460
461 return 0;
462 }
463
464 case SINK_MESSAGE_POST:
465
466 /* OK, This might be a bit confusing. This message is
467 * delivered to us from the main context -- NOT from the
468 * IO thread context where the rest of the messages are
469 * dispatched. Yeah, ugly, but I am a lazy bastard. */
470
471 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
472
473 u->counter_delta += chunk->length;
474
475 return 0;
476 }
477
478 return pa_sink_process_msg(o, code, data, offset, chunk);
479 }
480
481 /* Called from main context */
482 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
483 struct userdata *u;
484 pa_sink_assert_ref(s);
485 u = s->userdata;
486
487 switch ((pa_sink_state_t) state) {
488
489 case PA_SINK_SUSPENDED:
490 pa_assert(PA_SINK_IS_OPENED(s->state));
491 stream_cork(u, TRUE);
492 break;
493
494 case PA_SINK_IDLE:
495 case PA_SINK_RUNNING:
496 if (s->state == PA_SINK_SUSPENDED)
497 stream_cork(u, FALSE);
498 break;
499
500 case PA_SINK_UNLINKED:
501 case PA_SINK_INIT:
502 ;
503 }
504
505 return 0;
506 }
507
508 #else
509
510 /* This function is called from IO context -- except when it is not. */
511 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
512 struct userdata *u = PA_SOURCE(o)->userdata;
513
514 switch (code) {
515
516 case PA_SINK_MESSAGE_SET_STATE: {
517 int r;
518
519 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0)
520 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
521
522 return r;
523 }
524
525 case PA_SOURCE_MESSAGE_GET_LATENCY: {
526 pa_usec_t yr, yl, *usec = data;
527
528 yl = pa_bytes_to_usec(u->counter, &PA_SINK(o)->sample_spec);
529 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
530
531 *usec = yr > yl ? yr - yl : 0;
532 return 0;
533 }
534
535 case SOURCE_MESSAGE_POST:
536
537 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
538 pa_source_post(u->source, chunk);
539
540 u->counter += chunk->length;
541
542 return 0;
543
544 case SOURCE_MESSAGE_REMOTE_SUSPEND:
545
546 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
547 return 0;
548
549 case SOURCE_MESSAGE_UPDATE_LATENCY: {
550 pa_usec_t y;
551
552 y = pa_bytes_to_usec(u->counter, &u->source->sample_spec);
553
554 if (offset >= 0 || y > (pa_usec_t) -offset)
555 y += offset;
556 else
557 y = 0;
558
559 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
560
561 return 0;
562 }
563 }
564
565 return pa_source_process_msg(o, code, data, offset, chunk);
566 }
567
568 /* Called from main context */
569 static int source_set_state(pa_source *s, pa_source_state_t state) {
570 struct userdata *u;
571 pa_source_assert_ref(s);
572 u = s->userdata;
573
574 switch ((pa_source_state_t) state) {
575
576 case PA_SOURCE_SUSPENDED:
577 pa_assert(PA_SOURCE_IS_OPENED(s->state));
578 stream_cork(u, TRUE);
579 break;
580
581 case PA_SOURCE_IDLE:
582 case PA_SOURCE_RUNNING:
583 if (s->state == PA_SOURCE_SUSPENDED)
584 stream_cork(u, FALSE);
585 break;
586
587 case PA_SOURCE_UNLINKED:
588 case PA_SOURCE_INIT:
589 ;
590 }
591
592 return 0;
593 }
594
595 #endif
596
597 static void thread_func(void *userdata) {
598 struct userdata *u = userdata;
599
600 pa_assert(u);
601
602 pa_log_debug("Thread starting up");
603
604 pa_thread_mq_install(&u->thread_mq);
605 pa_rtpoll_install(u->rtpoll);
606
607 for (;;) {
608 int ret;
609
610 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
611 goto fail;
612
613 if (ret == 0)
614 goto finish;
615 }
616
617 fail:
618 /* If this was no regular exit from the loop we have to continue
619 * processing messages until we received PA_MESSAGE_SHUTDOWN */
620 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
621 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
622
623 finish:
624 pa_log_debug("Thread shutting down");
625 }
626
627 #ifdef TUNNEL_SINK
628 /* Called from main context */
629 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
630 struct userdata *u = userdata;
631 uint32_t bytes, channel;
632
633 pa_assert(pd);
634 pa_assert(command == PA_COMMAND_REQUEST);
635 pa_assert(t);
636 pa_assert(u);
637 pa_assert(u->pdispatch == pd);
638
639 if (pa_tagstruct_getu32(t, &channel) < 0 ||
640 pa_tagstruct_getu32(t, &bytes) < 0) {
641 pa_log("Invalid protocol reply");
642 goto fail;
643 }
644
645 if (channel != u->channel) {
646 pa_log("Recieved data for invalid channel");
647 goto fail;
648 }
649
650 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
651 return;
652
653 fail:
654 pa_module_unload_request(u->module);
655 }
656
657 #endif
658
659 /* Called from main context */
660 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
661 struct userdata *u = userdata;
662 pa_usec_t sink_usec, source_usec, transport_usec;
663 pa_bool_t playing;
664 int64_t write_index, read_index;
665 struct timeval local, remote, now;
666 pa_sample_spec *ss;
667 int64_t delay;
668
669 pa_assert(pd);
670 pa_assert(u);
671
672 if (command != PA_COMMAND_REPLY) {
673 if (command == PA_COMMAND_ERROR)
674 pa_log("Failed to get latency.");
675 else
676 pa_log("Protocol error.");
677 goto fail;
678 }
679
680 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
681 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
682 pa_tagstruct_get_boolean(t, &playing) < 0 ||
683 pa_tagstruct_get_timeval(t, &local) < 0 ||
684 pa_tagstruct_get_timeval(t, &remote) < 0 ||
685 pa_tagstruct_gets64(t, &write_index) < 0 ||
686 pa_tagstruct_gets64(t, &read_index) < 0) {
687 pa_log("Invalid reply.");
688 goto fail;
689 }
690
691 #ifdef TUNNEL_SINK
692 if (u->version >= 13) {
693 uint64_t underrun_for = 0, playing_for = 0;
694
695 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
696 pa_tagstruct_getu64(t, &playing_for) < 0) {
697 pa_log("Invalid reply.");
698 goto fail;
699 }
700 }
701 #endif
702
703 if (!pa_tagstruct_eof(t)) {
704 pa_log("Invalid reply.");
705 goto fail;
706 }
707
708 if (tag < u->ignore_latency_before) {
709 request_latency(u);
710 return;
711 }
712
713 pa_gettimeofday(&now);
714
715 /* Calculate transport usec */
716 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
717 /* local and remote seem to have synchronized clocks */
718 #ifdef TUNNEL_SINK
719 u->transport_usec = pa_timeval_diff(&remote, &local);
720 #else
721 u->transport_usec = pa_timeval_diff(&now, &remote);
722 #endif
723 } else
724 u->transport_usec = pa_timeval_diff(&now, &local)/2;
725 u->transport_usec_valid = TRUE;
726
727 /* First, take the device's delay */
728 #ifdef TUNNEL_SINK
729 delay = (int64_t) sink_usec;
730 ss = &u->sink->sample_spec;
731 #else
732 delay = (int64_t) source_usec;
733 ss = &u->source->sample_spec;
734 #endif
735
736 /* Add the length of our server-side buffer */
737 if (write_index >= read_index)
738 delay += (int64_t) pa_bytes_to_usec(write_index-read_index, ss);
739 else
740 delay -= (int64_t) pa_bytes_to_usec(read_index-write_index, ss);
741
742 /* Our measurements are already out of date, hence correct by the *
743 * transport latency */
744 #ifdef TUNNEL_SINK
745 delay -= (int64_t) transport_usec;
746 #else
747 delay += (int64_t) transport_usec;
748 #endif
749
750 /* Now correct by what we have have read/written since we requested the update */
751 #ifdef TUNNEL_SINK
752 delay += (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
753 #else
754 delay -= (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
755 #endif
756
757 #ifdef TUNNEL_SINK
758 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
759 #else
760 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
761 #endif
762
763 return;
764
765 fail:
766
767 pa_module_unload_request(u->module);
768 }
769
770 /* Called from main context */
771 static void request_latency(struct userdata *u) {
772 pa_tagstruct *t;
773 struct timeval now;
774 uint32_t tag;
775 pa_assert(u);
776
777 t = pa_tagstruct_new(NULL, 0);
778 #ifdef TUNNEL_SINK
779 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
780 #else
781 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
782 #endif
783 pa_tagstruct_putu32(t, tag = u->ctag++);
784 pa_tagstruct_putu32(t, u->channel);
785
786 pa_gettimeofday(&now);
787 pa_tagstruct_put_timeval(t, &now);
788
789 pa_pstream_send_tagstruct(u->pstream, t);
790 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
791
792 u->ignore_latency_before = tag;
793 u->counter_delta = 0;
794 }
795
796 /* Called from main context */
797 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
798 struct userdata *u = userdata;
799 struct timeval ntv;
800
801 pa_assert(m);
802 pa_assert(e);
803 pa_assert(u);
804
805 request_latency(u);
806
807 pa_gettimeofday(&ntv);
808 ntv.tv_sec += LATENCY_INTERVAL;
809 m->time_restart(e, &ntv);
810 }
811
812 /* Called from main context */
813 static void update_description(struct userdata *u) {
814 char *d;
815 char un[128], hn[128];
816 pa_tagstruct *t;
817
818 pa_assert(u);
819
820 if (!u->server_fqdn || !u->user_name || !u->device_description)
821 return;
822
823 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
824
825 #ifdef TUNNEL_SINK
826 pa_sink_set_description(u->sink, d);
827 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
828 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
829 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
830 #else
831 pa_source_set_description(u->source, d);
832 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
833 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
834 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
835 #endif
836
837 pa_xfree(d);
838
839 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
840 pa_get_user_name(un, sizeof(un)),
841 pa_get_host_name(hn, sizeof(hn)));
842
843 t = pa_tagstruct_new(NULL, 0);
844 #ifdef TUNNEL_SINK
845 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
846 #else
847 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
848 #endif
849 pa_tagstruct_putu32(t, u->ctag++);
850 pa_tagstruct_putu32(t, u->channel);
851 pa_tagstruct_puts(t, d);
852 pa_pstream_send_tagstruct(u->pstream, t);
853
854 pa_xfree(d);
855 }
856
857 /* Called from main context */
858 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
859 struct userdata *u = userdata;
860 pa_sample_spec ss;
861 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
862 uint32_t cookie;
863
864 pa_assert(pd);
865 pa_assert(u);
866
867 if (command != PA_COMMAND_REPLY) {
868 if (command == PA_COMMAND_ERROR)
869 pa_log("Failed to get info.");
870 else
871 pa_log("Protocol error.");
872 goto fail;
873 }
874
875 if (pa_tagstruct_gets(t, &server_name) < 0 ||
876 pa_tagstruct_gets(t, &server_version) < 0 ||
877 pa_tagstruct_gets(t, &user_name) < 0 ||
878 pa_tagstruct_gets(t, &host_name) < 0 ||
879 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
880 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
881 pa_tagstruct_gets(t, &default_source_name) < 0 ||
882 pa_tagstruct_getu32(t, &cookie) < 0) {
883
884 pa_log("Parse failure");
885 goto fail;
886 }
887
888 if (!pa_tagstruct_eof(t)) {
889 pa_log("Packet too long");
890 goto fail;
891 }
892
893 pa_xfree(u->server_fqdn);
894 u->server_fqdn = pa_xstrdup(host_name);
895
896 pa_xfree(u->user_name);
897 u->user_name = pa_xstrdup(user_name);
898
899 update_description(u);
900
901 return;
902
903 fail:
904 pa_module_unload_request(u->module);
905 }
906
907 #ifdef TUNNEL_SINK
908
909 /* Called from main context */
910 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
911 struct userdata *u = userdata;
912 uint32_t idx, owner_module, monitor_source, flags;
913 const char *name, *description, *monitor_source_name, *driver;
914 pa_sample_spec ss;
915 pa_channel_map cm;
916 pa_cvolume volume;
917 pa_bool_t mute;
918 pa_usec_t latency;
919 pa_proplist *pl;
920
921 pa_assert(pd);
922 pa_assert(u);
923
924 pl = pa_proplist_new();
925
926 if (command != PA_COMMAND_REPLY) {
927 if (command == PA_COMMAND_ERROR)
928 pa_log("Failed to get info.");
929 else
930 pa_log("Protocol error.");
931 goto fail;
932 }
933
934 if (pa_tagstruct_getu32(t, &idx) < 0 ||
935 pa_tagstruct_gets(t, &name) < 0 ||
936 pa_tagstruct_gets(t, &description) < 0 ||
937 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
938 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
939 pa_tagstruct_getu32(t, &owner_module) < 0 ||
940 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
941 pa_tagstruct_get_boolean(t, &mute) < 0 ||
942 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
943 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
944 pa_tagstruct_get_usec(t, &latency) < 0 ||
945 pa_tagstruct_gets(t, &driver) < 0 ||
946 pa_tagstruct_getu32(t, &flags) < 0) {
947
948 pa_log("Parse failure");
949 goto fail;
950 }
951
952 if (u->version >= 13) {
953 pa_usec_t configured_latency;
954
955 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
956 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
957
958 pa_log("Parse failure");
959 goto fail;
960 }
961 }
962
963 if (!pa_tagstruct_eof(t)) {
964 pa_log("Packet too long");
965 goto fail;
966 }
967
968 pa_proplist_free(pl);
969
970 if (!u->sink_name || strcmp(name, u->sink_name))
971 return;
972
973 pa_xfree(u->device_description);
974 u->device_description = pa_xstrdup(description);
975
976 update_description(u);
977
978 return;
979
980 fail:
981 pa_module_unload_request(u->module);
982 pa_proplist_free(pl);
983 }
984
985 /* Called from main context */
986 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
987 struct userdata *u = userdata;
988 uint32_t idx, owner_module, client, sink;
989 pa_usec_t buffer_usec, sink_usec;
990 const char *name, *driver, *resample_method;
991 pa_bool_t mute;
992 pa_sample_spec sample_spec;
993 pa_channel_map channel_map;
994 pa_cvolume volume;
995 pa_proplist *pl;
996
997 pa_assert(pd);
998 pa_assert(u);
999
1000 pl = pa_proplist_new();
1001
1002 if (command != PA_COMMAND_REPLY) {
1003 if (command == PA_COMMAND_ERROR)
1004 pa_log("Failed to get info.");
1005 else
1006 pa_log("Protocol error.");
1007 goto fail;
1008 }
1009
1010 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1011 pa_tagstruct_gets(t, &name) < 0 ||
1012 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1013 pa_tagstruct_getu32(t, &client) < 0 ||
1014 pa_tagstruct_getu32(t, &sink) < 0 ||
1015 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1016 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1017 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1018 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1019 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1020 pa_tagstruct_gets(t, &resample_method) < 0 ||
1021 pa_tagstruct_gets(t, &driver) < 0) {
1022
1023 pa_log("Parse failure");
1024 goto fail;
1025 }
1026
1027 if (u->version >= 11) {
1028 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1029
1030 pa_log("Parse failure");
1031 goto fail;
1032 }
1033 }
1034
1035 if (u->version >= 13) {
1036 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1037
1038 pa_log("Parse failure");
1039 goto fail;
1040 }
1041 }
1042
1043 if (!pa_tagstruct_eof(t)) {
1044 pa_log("Packet too long");
1045 goto fail;
1046 }
1047
1048 pa_proplist_free(pl);
1049
1050 if (idx != u->device_index)
1051 return;
1052
1053 pa_assert(u->sink);
1054
1055 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1056 pa_cvolume_equal(&volume, &u->sink->volume))
1057 return;
1058
1059 memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
1060
1061 if (u->version >= 11)
1062 u->sink->muted = !!mute;
1063
1064 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
1065 return;
1066
1067 fail:
1068 pa_module_unload_request(u->module);
1069 pa_proplist_free(pl);
1070 }
1071
1072 #else
1073
1074 /* Called from main context */
1075 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1076 struct userdata *u = userdata;
1077 uint32_t idx, owner_module, monitor_of_sink, flags;
1078 const char *name, *description, *monitor_of_sink_name, *driver;
1079 pa_sample_spec ss;
1080 pa_channel_map cm;
1081 pa_cvolume volume;
1082 pa_bool_t mute;
1083 pa_usec_t latency, configured_latency;
1084 pa_proplist *pl;
1085
1086 pa_assert(pd);
1087 pa_assert(u);
1088
1089 pl = pa_proplist_new();
1090
1091 if (command != PA_COMMAND_REPLY) {
1092 if (command == PA_COMMAND_ERROR)
1093 pa_log("Failed to get info.");
1094 else
1095 pa_log("Protocol error.");
1096 goto fail;
1097 }
1098
1099 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1100 pa_tagstruct_gets(t, &name) < 0 ||
1101 pa_tagstruct_gets(t, &description) < 0 ||
1102 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1103 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1104 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1105 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1106 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1107 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1108 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1109 pa_tagstruct_get_usec(t, &latency) < 0 ||
1110 pa_tagstruct_gets(t, &driver) < 0 ||
1111 pa_tagstruct_getu32(t, &flags) < 0) {
1112
1113 pa_log("Parse failure");
1114 goto fail;
1115 }
1116
1117 if (u->version >= 13) {
1118 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1119 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1120
1121 pa_log("Parse failure");
1122 goto fail;
1123 }
1124 }
1125
1126 if (!pa_tagstruct_eof(t)) {
1127 pa_log("Packet too long");
1128 goto fail;
1129 }
1130
1131 pa_proplist_free(pl);
1132
1133 if (!u->source_name || strcmp(name, u->source_name))
1134 return;
1135
1136 pa_xfree(u->device_description);
1137 u->device_description = pa_xstrdup(description);
1138
1139 update_description(u);
1140
1141 return;
1142
1143 fail:
1144 pa_module_unload_request(u->module);
1145 pa_proplist_free(pl);
1146 }
1147
1148 #endif
1149
1150 /* Called from main context */
1151 static void request_info(struct userdata *u) {
1152 pa_tagstruct *t;
1153 uint32_t tag;
1154 pa_assert(u);
1155
1156 t = pa_tagstruct_new(NULL, 0);
1157 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1158 pa_tagstruct_putu32(t, tag = u->ctag++);
1159 pa_pstream_send_tagstruct(u->pstream, t);
1160 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1161
1162 #ifdef TUNNEL_SINK
1163 t = pa_tagstruct_new(NULL, 0);
1164 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1165 pa_tagstruct_putu32(t, tag = u->ctag++);
1166 pa_tagstruct_putu32(t, u->device_index);
1167 pa_pstream_send_tagstruct(u->pstream, t);
1168 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1169
1170 if (u->sink_name) {
1171 t = pa_tagstruct_new(NULL, 0);
1172 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1173 pa_tagstruct_putu32(t, tag = u->ctag++);
1174 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1175 pa_tagstruct_puts(t, u->sink_name);
1176 pa_pstream_send_tagstruct(u->pstream, t);
1177 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1178 }
1179 #else
1180 if (u->source_name) {
1181 t = pa_tagstruct_new(NULL, 0);
1182 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1183 pa_tagstruct_putu32(t, tag = u->ctag++);
1184 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1185 pa_tagstruct_puts(t, u->source_name);
1186 pa_pstream_send_tagstruct(u->pstream, t);
1187 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1188 }
1189 #endif
1190 }
1191
1192 /* Called from main context */
1193 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1194 struct userdata *u = userdata;
1195 pa_subscription_event_type_t e;
1196 uint32_t idx;
1197
1198 pa_assert(pd);
1199 pa_assert(t);
1200 pa_assert(u);
1201 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1202
1203 if (pa_tagstruct_getu32(t, &e) < 0 ||
1204 pa_tagstruct_getu32(t, &idx) < 0) {
1205 pa_log("Invalid protocol reply");
1206 pa_module_unload_request(u->module);
1207 return;
1208 }
1209
1210 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1211 #ifdef TUNNEL_SINK
1212 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1213 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1214 #else
1215 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1216 #endif
1217 )
1218 return;
1219
1220 request_info(u);
1221 }
1222
1223 /* Called from main context */
1224 static void start_subscribe(struct userdata *u) {
1225 pa_tagstruct *t;
1226 uint32_t tag;
1227 pa_assert(u);
1228
1229 t = pa_tagstruct_new(NULL, 0);
1230 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1231 pa_tagstruct_putu32(t, tag = u->ctag++);
1232 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1233 #ifdef TUNNEL_SINK
1234 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1235 #else
1236 PA_SUBSCRIPTION_MASK_SOURCE
1237 #endif
1238 );
1239
1240 pa_pstream_send_tagstruct(u->pstream, t);
1241 }
1242
1243 /* Called from main context */
1244 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1245 struct userdata *u = userdata;
1246 struct timeval ntv;
1247 #ifdef TUNNEL_SINK
1248 uint32_t bytes;
1249 #endif
1250
1251 pa_assert(pd);
1252 pa_assert(u);
1253 pa_assert(u->pdispatch == pd);
1254
1255 if (command != PA_COMMAND_REPLY) {
1256 if (command == PA_COMMAND_ERROR)
1257 pa_log("Failed to create stream.");
1258 else
1259 pa_log("Protocol error.");
1260 goto fail;
1261 }
1262
1263 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1264 pa_tagstruct_getu32(t, &u->device_index) < 0
1265 #ifdef TUNNEL_SINK
1266 || pa_tagstruct_getu32(t, &bytes) < 0
1267 #endif
1268 )
1269 goto parse_error;
1270
1271 if (u->version >= 9) {
1272 #ifdef TUNNEL_SINK
1273 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1274 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1275 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1276 pa_tagstruct_getu32(t, &u->minreq) < 0)
1277 goto parse_error;
1278 #else
1279 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1280 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1281 goto parse_error;
1282 #endif
1283 }
1284
1285 if (u->version >= 12) {
1286 pa_sample_spec ss;
1287 pa_channel_map cm;
1288 uint32_t device_index;
1289 const char *dn;
1290 pa_bool_t suspended;
1291
1292 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1293 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1294 pa_tagstruct_getu32(t, &device_index) < 0 ||
1295 pa_tagstruct_gets(t, &dn) < 0 ||
1296 pa_tagstruct_get_boolean(t, &suspended) < 0)
1297 goto parse_error;
1298
1299 #ifdef TUNNEL_SINK
1300 pa_xfree(u->sink_name);
1301 u->sink_name = pa_xstrdup(dn);
1302 #else
1303 pa_xfree(u->source_name);
1304 u->source_name = pa_xstrdup(dn);
1305 #endif
1306 }
1307
1308 if (u->version >= 13) {
1309 pa_usec_t usec;
1310
1311 if (pa_tagstruct_get_usec(t, &usec) < 0)
1312 goto parse_error;
1313
1314 #ifdef TUNNEL_SINK
1315 pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
1316 #else
1317 pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
1318 #endif
1319 }
1320
1321 if (!pa_tagstruct_eof(t))
1322 goto parse_error;
1323
1324 start_subscribe(u);
1325 request_info(u);
1326
1327 pa_assert(!u->time_event);
1328 pa_gettimeofday(&ntv);
1329 ntv.tv_sec += LATENCY_INTERVAL;
1330 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1331
1332 request_latency(u);
1333
1334 pa_log_debug("Stream created.");
1335
1336 #ifdef TUNNEL_SINK
1337 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1338 #endif
1339
1340 return;
1341
1342 parse_error:
1343 pa_log("Invalid reply. (Create stream)");
1344
1345 fail:
1346 pa_module_unload_request(u->module);
1347
1348 }
1349
1350 /* Called from main context */
1351 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1352 struct userdata *u = userdata;
1353 pa_tagstruct *reply;
1354 char name[256], un[128], hn[128];
1355 #ifdef TUNNEL_SINK
1356 pa_cvolume volume;
1357 #endif
1358
1359 pa_assert(pd);
1360 pa_assert(u);
1361 pa_assert(u->pdispatch == pd);
1362
1363 if (command != PA_COMMAND_REPLY ||
1364 pa_tagstruct_getu32(t, &u->version) < 0 ||
1365 !pa_tagstruct_eof(t)) {
1366
1367 if (command == PA_COMMAND_ERROR)
1368 pa_log("Failed to authenticate");
1369 else
1370 pa_log("Protocol error.");
1371
1372 goto fail;
1373 }
1374
1375 /* Minimum supported protocol version */
1376 if (u->version < 8) {
1377 pa_log("Incompatible protocol version");
1378 goto fail;
1379 }
1380
1381 /* Starting with protocol version 13 the MSB of the version tag
1382 reflects if shm is enabled for this connection or not. We don't
1383 support SHM here at all, so we just ignore this. */
1384
1385 if (u->version >= 13)
1386 u->version &= 0x7FFFFFFFU;
1387
1388 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1389
1390 #ifdef TUNNEL_SINK
1391 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1392 u->sink_name,
1393 pa_get_user_name(un, sizeof(un)),
1394 pa_get_host_name(hn, sizeof(hn)));
1395 #else
1396 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1397 u->source_name,
1398 pa_get_user_name(un, sizeof(un)),
1399 pa_get_host_name(hn, sizeof(hn)));
1400 #endif
1401
1402 reply = pa_tagstruct_new(NULL, 0);
1403 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1404 pa_tagstruct_putu32(reply, tag = u->ctag++);
1405
1406 if (u->version >= 13) {
1407 pa_proplist *pl;
1408 pl = pa_proplist_new();
1409 pa_init_proplist(pl);
1410 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1411 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1412 pa_tagstruct_put_proplist(reply, pl);
1413 pa_proplist_free(pl);
1414 } else
1415 pa_tagstruct_puts(reply, "PulseAudio");
1416
1417 pa_pstream_send_tagstruct(u->pstream, reply);
1418 /* We ignore the server's reply here */
1419
1420 reply = pa_tagstruct_new(NULL, 0);
1421
1422 if (u->version < 13)
1423 /* Only for older PA versions we need to fill in the maxlength */
1424 u->maxlength = 4*1024*1024;
1425
1426 #ifdef TUNNEL_SINK
1427 u->tlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1428 u->minreq = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1429 u->prebuf = u->tlength;
1430 #else
1431 u->fragsize = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1432 #endif
1433
1434 #ifdef TUNNEL_SINK
1435 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1436 pa_tagstruct_putu32(reply, tag = u->ctag++);
1437
1438 if (u->version < 13)
1439 pa_tagstruct_puts(reply, name);
1440
1441 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1442 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1443 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1444 pa_tagstruct_puts(reply, u->sink_name);
1445 pa_tagstruct_putu32(reply, u->maxlength);
1446 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1447 pa_tagstruct_putu32(reply, u->tlength);
1448 pa_tagstruct_putu32(reply, u->prebuf);
1449 pa_tagstruct_putu32(reply, u->minreq);
1450 pa_tagstruct_putu32(reply, 0);
1451 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1452 pa_tagstruct_put_cvolume(reply, &volume);
1453 #else
1454 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1455 pa_tagstruct_putu32(reply, tag = u->ctag++);
1456
1457 if (u->version < 13)
1458 pa_tagstruct_puts(reply, name);
1459
1460 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1461 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1462 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1463 pa_tagstruct_puts(reply, u->source_name);
1464 pa_tagstruct_putu32(reply, u->maxlength);
1465 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1466 pa_tagstruct_putu32(reply, u->fragsize);
1467 #endif
1468
1469 if (u->version >= 12) {
1470 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1471 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1472 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1473 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1474 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1475 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1476 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1477 }
1478
1479 if (u->version >= 13) {
1480 pa_proplist *pl;
1481
1482 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1483 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1484
1485 pl = pa_proplist_new();
1486 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1487 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1488 pa_tagstruct_put_proplist(reply, pl);
1489 pa_proplist_free(pl);
1490
1491 #ifndef TUNNEL_SINK
1492 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1493 #endif
1494 }
1495
1496 pa_pstream_send_tagstruct(u->pstream, reply);
1497 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1498
1499 pa_log_debug("Connection authenticated, creating stream ...");
1500
1501 return;
1502
1503 fail:
1504 pa_module_unload_request(u->module);
1505 }
1506
1507 /* Called from main context */
1508 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1509 struct userdata *u = userdata;
1510
1511 pa_assert(p);
1512 pa_assert(u);
1513
1514 pa_log_warn("Stream died.");
1515 pa_module_unload_request(u->module);
1516 }
1517
1518 /* Called from main context */
1519 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1520 struct userdata *u = userdata;
1521
1522 pa_assert(p);
1523 pa_assert(packet);
1524 pa_assert(u);
1525
1526 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1527 pa_log("Invalid packet");
1528 pa_module_unload_request(u->module);
1529 return;
1530 }
1531 }
1532
1533 #ifndef TUNNEL_SINK
1534 /* Called from main context */
1535 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1536 struct userdata *u = userdata;
1537
1538 pa_assert(p);
1539 pa_assert(chunk);
1540 pa_assert(u);
1541
1542 if (channel != u->channel) {
1543 pa_log("Recieved memory block on bad channel.");
1544 pa_module_unload_request(u->module);
1545 return;
1546 }
1547
1548 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1549
1550 u->counter_delta += chunk->length;
1551 }
1552
1553 #endif
1554
1555 /* Called from main context */
1556 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1557 struct userdata *u = userdata;
1558 pa_tagstruct *t;
1559 uint32_t tag;
1560
1561 pa_assert(sc);
1562 pa_assert(u);
1563 pa_assert(u->client == sc);
1564
1565 pa_socket_client_unref(u->client);
1566 u->client = NULL;
1567
1568 if (!io) {
1569 pa_log("Connection failed: %s", pa_cstrerror(errno));
1570 pa_module_unload_request(u->module);
1571 return;
1572 }
1573
1574 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1575 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1576
1577 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1578 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1579 #ifndef TUNNEL_SINK
1580 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1581 #endif
1582
1583 t = pa_tagstruct_new(NULL, 0);
1584 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1585 pa_tagstruct_putu32(t, tag = u->ctag++);
1586 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1587 pa_tagstruct_put_arbitrary(t, u->auth_cookie, sizeof(u->auth_cookie));
1588
1589 #ifdef HAVE_CREDS
1590 {
1591 pa_creds ucred;
1592
1593 if (pa_iochannel_creds_supported(io))
1594 pa_iochannel_creds_enable(io);
1595
1596 ucred.uid = getuid();
1597 ucred.gid = getgid();
1598
1599 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1600 }
1601 #else
1602 pa_pstream_send_tagstruct(u->pstream, t);
1603 #endif
1604
1605 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1606
1607 pa_log_debug("Connection established, authenticating ...");
1608 }
1609
1610 #ifdef TUNNEL_SINK
1611
1612 /* Called from main context */
1613 static int sink_set_volume(pa_sink *sink) {
1614 struct userdata *u;
1615 pa_tagstruct *t;
1616 uint32_t tag;
1617
1618 pa_assert(sink);
1619 u = sink->userdata;
1620 pa_assert(u);
1621
1622 t = pa_tagstruct_new(NULL, 0);
1623 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1624 pa_tagstruct_putu32(t, tag = u->ctag++);
1625 pa_tagstruct_putu32(t, u->device_index);
1626 pa_tagstruct_put_cvolume(t, &sink->volume);
1627 pa_pstream_send_tagstruct(u->pstream, t);
1628
1629 return 0;
1630 }
1631
1632 /* Called from main context */
1633 static int sink_set_mute(pa_sink *sink) {
1634 struct userdata *u;
1635 pa_tagstruct *t;
1636 uint32_t tag;
1637
1638 pa_assert(sink);
1639 u = sink->userdata;
1640 pa_assert(u);
1641
1642 if (u->version < 11)
1643 return -1;
1644
1645 t = pa_tagstruct_new(NULL, 0);
1646 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1647 pa_tagstruct_putu32(t, tag = u->ctag++);
1648 pa_tagstruct_putu32(t, u->device_index);
1649 pa_tagstruct_put_boolean(t, !!sink->muted);
1650 pa_pstream_send_tagstruct(u->pstream, t);
1651
1652 return 0;
1653 }
1654
1655 #endif
1656
1657 /* Called from main context */
1658 static int load_key(struct userdata *u, const char*fn) {
1659 pa_assert(u);
1660
1661 u->auth_cookie_in_property = FALSE;
1662
1663 if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) {
1664 pa_log_debug("Using already loaded auth cookie.");
1665 pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1666 u->auth_cookie_in_property = 1;
1667 return 0;
1668 }
1669
1670 if (!fn)
1671 fn = PA_NATIVE_COOKIE_FILE;
1672
1673 if (pa_authkey_load_auto(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0)
1674 return -1;
1675
1676 pa_log_debug("Loading cookie from disk.");
1677
1678 if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0)
1679 u->auth_cookie_in_property = TRUE;
1680
1681 return 0;
1682 }
1683
1684 int pa__init(pa_module*m) {
1685 pa_modargs *ma = NULL;
1686 struct userdata *u = NULL;
1687 pa_sample_spec ss;
1688 pa_channel_map map;
1689 char *dn = NULL;
1690 #ifdef TUNNEL_SINK
1691 pa_sink_new_data data;
1692 #else
1693 pa_source_new_data data;
1694 #endif
1695
1696 pa_assert(m);
1697
1698 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1699 pa_log("Failed to parse module arguments");
1700 goto fail;
1701 }
1702
1703 m->userdata = u = pa_xnew0(struct userdata, 1);
1704 u->module = m;
1705 u->core = m->core;
1706 u->client = NULL;
1707 u->pdispatch = NULL;
1708 u->pstream = NULL;
1709 u->server_name = NULL;
1710 #ifdef TUNNEL_SINK
1711 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1712 u->sink = NULL;
1713 u->requested_bytes = 0;
1714 #else
1715 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1716 u->source = NULL;
1717 #endif
1718 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1719 u->ctag = 1;
1720 u->device_index = u->channel = PA_INVALID_INDEX;
1721 u->auth_cookie_in_property = FALSE;
1722 u->time_event = NULL;
1723 u->ignore_latency_before = 0;
1724 u->transport_usec = 0;
1725 u->transport_usec_valid = FALSE;
1726 u->remote_suspended = u->remote_corked = FALSE;
1727 u->counter = u->counter_delta = 0;
1728
1729 u->rtpoll = pa_rtpoll_new();
1730 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1731
1732 if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0)
1733 goto fail;
1734
1735 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1736 pa_log("No server specified.");
1737 goto fail;
1738 }
1739
1740 ss = m->core->default_sample_spec;
1741 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1742 pa_log("Invalid sample format specification");
1743 goto fail;
1744 }
1745
1746 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1747 pa_log("Failed to connect to server '%s'", u->server_name);
1748 goto fail;
1749 }
1750
1751 pa_socket_client_set_callback(u->client, on_connection, u);
1752
1753 #ifdef TUNNEL_SINK
1754
1755 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1756 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1757
1758 pa_sink_new_data_init(&data);
1759 data.driver = __FILE__;
1760 data.module = m;
1761 data.namereg_fail = TRUE;
1762 pa_sink_new_data_set_name(&data, dn);
1763 pa_sink_new_data_set_sample_spec(&data, &ss);
1764 pa_sink_new_data_set_channel_map(&data, &map);
1765 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1766 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1767 if (u->sink_name)
1768 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1769
1770 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1771 pa_sink_new_data_done(&data);
1772
1773 if (!u->sink) {
1774 pa_log("Failed to create sink.");
1775 goto fail;
1776 }
1777
1778 u->sink->parent.process_msg = sink_process_msg;
1779 u->sink->userdata = u;
1780 u->sink->set_state = sink_set_state;
1781 u->sink->set_volume = sink_set_volume;
1782 u->sink->set_mute = sink_set_mute;
1783
1784 u->sink->refresh_volume = u->sink->refresh_mute = FALSE;
1785
1786 pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
1787
1788 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1789 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1790
1791 #else
1792
1793 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1794 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1795
1796 pa_source_new_data_init(&data);
1797 data.driver = __FILE__;
1798 data.module = m;
1799 data.namereg_fail = TRUE;
1800 pa_source_new_data_set_name(&data, dn);
1801 pa_source_new_data_set_sample_spec(&data, &ss);
1802 pa_source_new_data_set_channel_map(&data, &map);
1803 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1804 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1805 if (u->source_name)
1806 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1807
1808 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1809 pa_source_new_data_done(&data);
1810
1811 if (!u->source) {
1812 pa_log("Failed to create source.");
1813 goto fail;
1814 }
1815
1816 u->source->parent.process_msg = source_process_msg;
1817 u->source->userdata = u;
1818 u->source->set_state = source_set_state;
1819
1820 pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
1821
1822 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1823 pa_source_set_rtpoll(u->source, u->rtpoll);
1824 #endif
1825
1826 pa_xfree(dn);
1827
1828 u->time_event = NULL;
1829
1830 u->maxlength = 0;
1831 #ifdef TUNNEL_SINK
1832 u->tlength = u->minreq = u->prebuf = 0;
1833 #else
1834 u->fragsize = 0;
1835 #endif
1836
1837 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1838
1839 if (!(u->thread = pa_thread_new(thread_func, u))) {
1840 pa_log("Failed to create thread.");
1841 goto fail;
1842 }
1843
1844 #ifdef TUNNEL_SINK
1845 pa_sink_put(u->sink);
1846 #else
1847 pa_source_put(u->source);
1848 #endif
1849
1850 pa_modargs_free(ma);
1851
1852 return 0;
1853
1854 fail:
1855 pa__done(m);
1856
1857 if (ma)
1858 pa_modargs_free(ma);
1859
1860 pa_xfree(dn);
1861
1862 return -1;
1863 }
1864
1865 void pa__done(pa_module*m) {
1866 struct userdata* u;
1867
1868 pa_assert(m);
1869
1870 if (!(u = m->userdata))
1871 return;
1872
1873 #ifdef TUNNEL_SINK
1874 if (u->sink)
1875 pa_sink_unlink(u->sink);
1876 #else
1877 if (u->source)
1878 pa_source_unlink(u->source);
1879 #endif
1880
1881 if (u->thread) {
1882 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1883 pa_thread_free(u->thread);
1884 }
1885
1886 pa_thread_mq_done(&u->thread_mq);
1887
1888 #ifdef TUNNEL_SINK
1889 if (u->sink)
1890 pa_sink_unref(u->sink);
1891 #else
1892 if (u->source)
1893 pa_source_unref(u->source);
1894 #endif
1895
1896 if (u->rtpoll)
1897 pa_rtpoll_free(u->rtpoll);
1898
1899 if (u->pstream) {
1900 pa_pstream_unlink(u->pstream);
1901 pa_pstream_unref(u->pstream);
1902 }
1903
1904 if (u->pdispatch)
1905 pa_pdispatch_unref(u->pdispatch);
1906
1907 if (u->client)
1908 pa_socket_client_unref(u->client);
1909
1910 if (u->auth_cookie_in_property)
1911 pa_authkey_prop_unref(m->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1912
1913 if (u->smoother)
1914 pa_smoother_free(u->smoother);
1915
1916 if (u->time_event)
1917 u->core->mainloop->time_free(u->time_event);
1918
1919 #ifdef TUNNEL_SINK
1920 pa_xfree(u->sink_name);
1921 #else
1922 pa_xfree(u->source_name);
1923 #endif
1924 pa_xfree(u->server_name);
1925
1926 pa_xfree(u->device_description);
1927 pa_xfree(u->server_fqdn);
1928 pa_xfree(u->user_name);
1929
1930 pa_xfree(u);
1931 }