]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
forward process_msg calls to the generic source handler, not the generic sink handler
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/authkey-prop.h>
52 #include <pulsecore/time-smoother.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/thread-mq.h>
55 #include <pulsecore/rtclock.h>
56 #include <pulsecore/core-error.h>
57 #include <pulsecore/proplist-util.h>
58
59 #ifdef TUNNEL_SINK
60 #include "module-tunnel-sink-symdef.h"
61 #else
62 #include "module-tunnel-source-symdef.h"
63 #endif
64
65 #ifdef TUNNEL_SINK
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 PA_MODULE_USAGE(
68 "server=<address> "
69 "sink=<remote sink name> "
70 "cookie=<filename> "
71 "format=<sample format> "
72 "channels=<number of channels> "
73 "rate=<sample rate> "
74 "sink_name=<name for the local sink> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "server=<address> "
80 "source=<remote source name> "
81 "cookie=<filename> "
82 "format=<sample format> "
83 "channels=<number of channels> "
84 "rate=<sample rate> "
85 "source_name=<name for the local source> "
86 "channel_map=<channel map>");
87 #endif
88
89 PA_MODULE_AUTHOR("Lennart Poettering");
90 PA_MODULE_VERSION(PACKAGE_VERSION);
91 PA_MODULE_LOAD_ONCE(FALSE);
92
93 static const char* const valid_modargs[] = {
94 "server",
95 "cookie",
96 "format",
97 "channels",
98 "rate",
99 #ifdef TUNNEL_SINK
100 "sink_name",
101 "sink",
102 #else
103 "source_name",
104 "source",
105 #endif
106 "channel_map",
107 NULL,
108 };
109
110 #define DEFAULT_TIMEOUT 5
111
112 #define LATENCY_INTERVAL 10
113
114 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
115
116 #ifdef TUNNEL_SINK
117
118 enum {
119 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
120 SINK_MESSAGE_REMOTE_SUSPEND,
121 SINK_MESSAGE_UPDATE_LATENCY,
122 SINK_MESSAGE_POST
123 };
124
125 #define DEFAULT_TLENGTH_MSEC 150
126 #define DEFAULT_MINREQ_MSEC 25
127
128 #else
129
130 enum {
131 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
132 SOURCE_MESSAGE_REMOTE_SUSPEND,
133 SOURCE_MESSAGE_UPDATE_LATENCY
134 };
135
136 #define DEFAULT_FRAGSIZE_MSEC 25
137
138 #endif
139
140 #ifdef TUNNEL_SINK
141 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
143 #endif
144 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149
150 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
151 #ifdef TUNNEL_SINK
152 [PA_COMMAND_REQUEST] = command_request,
153 [PA_COMMAND_STARTED] = command_started,
154 #endif
155 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
156 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
157 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
158 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
159 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
160 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
161 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
162 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
163 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
164 };
165
166 struct userdata {
167 pa_core *core;
168 pa_module *module;
169
170 pa_thread_mq thread_mq;
171 pa_rtpoll *rtpoll;
172 pa_thread *thread;
173
174 pa_socket_client *client;
175 pa_pstream *pstream;
176 pa_pdispatch *pdispatch;
177
178 char *server_name;
179 #ifdef TUNNEL_SINK
180 char *sink_name;
181 pa_sink *sink;
182 int32_t requested_bytes;
183 #else
184 char *source_name;
185 pa_source *source;
186 #endif
187
188 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
189
190 uint32_t version;
191 uint32_t ctag;
192 uint32_t device_index;
193 uint32_t channel;
194
195 int64_t counter, counter_delta;
196
197 pa_bool_t remote_corked:1;
198 pa_bool_t remote_suspended:1;
199
200 pa_usec_t transport_usec;
201 pa_bool_t transport_usec_valid;
202
203 uint32_t ignore_latency_before;
204
205 pa_time_event *time_event;
206
207 pa_bool_t auth_cookie_in_property;
208
209 pa_smoother *smoother;
210
211 char *device_description;
212 char *server_fqdn;
213 char *user_name;
214
215 uint32_t maxlength;
216 #ifdef TUNNEL_SINK
217 uint32_t tlength;
218 uint32_t minreq;
219 uint32_t prebuf;
220 #else
221 uint32_t fragsize;
222 #endif
223 };
224
225 static void request_latency(struct userdata *u);
226
227 /* Called from main context */
228 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
229 struct userdata *u = userdata;
230
231 pa_assert(pd);
232 pa_assert(t);
233 pa_assert(u);
234 pa_assert(u->pdispatch == pd);
235
236 pa_log_warn("Stream killed");
237 pa_module_unload_request(u->module);
238 }
239
240 /* Called from main context */
241 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
242 struct userdata *u = userdata;
243
244 pa_assert(pd);
245 pa_assert(t);
246 pa_assert(u);
247 pa_assert(u->pdispatch == pd);
248
249 pa_log_info("Server signalled buffer overrun/underrun.");
250 request_latency(u);
251 }
252
253 /* Called from main context */
254 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
255 struct userdata *u = userdata;
256 uint32_t channel;
257 pa_bool_t suspended;
258
259 pa_assert(pd);
260 pa_assert(t);
261 pa_assert(u);
262 pa_assert(u->pdispatch == pd);
263
264 if (pa_tagstruct_getu32(t, &channel) < 0 ||
265 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
266 !pa_tagstruct_eof(t)) {
267 pa_log("Invalid packet");
268 pa_module_unload_request(u->module);
269 return;
270 }
271
272 #ifdef TUNNEL_SINK
273 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
274 #else
275 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
276 #endif
277
278 request_latency(u);
279 }
280
281 /* Called from main context */
282 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
283 struct userdata *u = userdata;
284
285 pa_assert(pd);
286 pa_assert(t);
287 pa_assert(u);
288 pa_assert(u->pdispatch == pd);
289
290 pa_log_debug("Server reports a stream move.");
291 request_latency(u);
292 }
293
294 #ifdef TUNNEL_SINK
295
296 /* Called from main context */
297 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
298 struct userdata *u = userdata;
299
300 pa_assert(pd);
301 pa_assert(t);
302 pa_assert(u);
303 pa_assert(u->pdispatch == pd);
304
305 pa_log_debug("Server reports playback started.");
306 request_latency(u);
307 }
308
309 #endif
310
311 /* Called from IO thread context */
312 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
313 pa_usec_t x;
314 pa_assert(u);
315
316 if (u->remote_corked == cork)
317 return;
318
319 u->remote_corked = cork;
320 x = pa_rtclock_usec();
321
322 /* Correct by the time this needs to travel to the other side.
323 * This is a valid thread-safe access, because the main thread is
324 * waiting for us */
325 if (u->transport_usec_valid)
326 x += u->transport_usec;
327
328 if (u->remote_suspended || u->remote_corked)
329 pa_smoother_pause(u->smoother, x);
330 else
331 pa_smoother_resume(u->smoother, x);
332 }
333
334 /* Called from main context */
335 static void stream_cork(struct userdata *u, pa_bool_t cork) {
336 pa_tagstruct *t;
337 pa_assert(u);
338
339 if (!u->pstream)
340 return;
341
342 t = pa_tagstruct_new(NULL, 0);
343 #ifdef TUNNEL_SINK
344 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
345 #else
346 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
347 #endif
348 pa_tagstruct_putu32(t, u->ctag++);
349 pa_tagstruct_putu32(t, u->channel);
350 pa_tagstruct_put_boolean(t, !!cork);
351 pa_pstream_send_tagstruct(u->pstream, t);
352
353 request_latency(u);
354 }
355
356 /* Called from IO thread context */
357 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
358 pa_usec_t x;
359 pa_assert(u);
360
361 if (u->remote_suspended == suspend)
362 return;
363
364 u->remote_suspended = suspend;
365
366 x = pa_rtclock_usec();
367
368 /* Correct by the time this needed to travel from the other side.
369 * This is a valid thread-safe access, because the main thread is
370 * waiting for us */
371 if (u->transport_usec_valid)
372 x -= u->transport_usec;
373
374 if (u->remote_suspended || u->remote_corked)
375 pa_smoother_pause(u->smoother, x);
376 else
377 pa_smoother_resume(u->smoother, x);
378 }
379
380 #ifdef TUNNEL_SINK
381
382 /* Called from IO thread context */
383 static void send_data(struct userdata *u) {
384 pa_assert(u);
385
386 while (u->requested_bytes > 0) {
387 pa_memchunk memchunk;
388
389 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
390 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
391 pa_memblock_unref(memchunk.memblock);
392
393 u->requested_bytes -= memchunk.length;
394
395 u->counter += memchunk.length;
396 }
397 }
398
399 /* This function is called from IO context -- except when it is not. */
400 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
401 struct userdata *u = PA_SINK(o)->userdata;
402
403 switch (code) {
404
405 case PA_SINK_MESSAGE_SET_STATE: {
406 int r;
407
408 /* First, change the state, because otherwide pa_sink_render() would fail */
409 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
410
411 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
412
413 if (PA_SINK_IS_OPENED(u->sink->state))
414 send_data(u);
415 }
416
417 return r;
418 }
419
420 case PA_SINK_MESSAGE_GET_LATENCY: {
421 pa_usec_t yl, yr, *usec = data;
422
423 yl = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
424 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
425
426 *usec = yl > yr ? yl - yr : 0;
427 return 0;
428 }
429
430 case SINK_MESSAGE_REQUEST:
431
432 pa_assert(offset > 0);
433 u->requested_bytes += (size_t) offset;
434
435 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
436 send_data(u);
437
438 return 0;
439
440
441 case SINK_MESSAGE_REMOTE_SUSPEND:
442
443 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
444 return 0;
445
446
447 case SINK_MESSAGE_UPDATE_LATENCY: {
448 pa_usec_t y;
449
450 y = pa_bytes_to_usec(u->counter, &u->sink->sample_spec);
451
452 if (y > (pa_usec_t) offset || offset < 0)
453 y -= offset;
454 else
455 y = 0;
456
457 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
458
459 return 0;
460 }
461
462 case SINK_MESSAGE_POST:
463
464 /* OK, This might be a bit confusing. This message is
465 * delivered to us from the main context -- NOT from the
466 * IO thread context where the rest of the messages are
467 * dispatched. Yeah, ugly, but I am a lazy bastard. */
468
469 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
470
471 u->counter_delta += chunk->length;
472
473 return 0;
474 }
475
476 return pa_sink_process_msg(o, code, data, offset, chunk);
477 }
478
479 /* Called from main context */
480 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
481 struct userdata *u;
482 pa_sink_assert_ref(s);
483 u = s->userdata;
484
485 switch ((pa_sink_state_t) state) {
486
487 case PA_SINK_SUSPENDED:
488 pa_assert(PA_SINK_IS_OPENED(s->state));
489 stream_cork(u, TRUE);
490 break;
491
492 case PA_SINK_IDLE:
493 case PA_SINK_RUNNING:
494 if (s->state == PA_SINK_SUSPENDED)
495 stream_cork(u, FALSE);
496 break;
497
498 case PA_SINK_UNLINKED:
499 case PA_SINK_INIT:
500 ;
501 }
502
503 return 0;
504 }
505
506 #else
507
508 /* This function is called from IO context -- except when it is not. */
509 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
510 struct userdata *u = PA_SOURCE(o)->userdata;
511
512 switch (code) {
513
514 case PA_SINK_MESSAGE_SET_STATE: {
515 int r;
516
517 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
518 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
519
520 return r;
521 }
522
523 case PA_SOURCE_MESSAGE_GET_LATENCY: {
524 pa_usec_t yr, yl, *usec = data;
525
526 yl = pa_bytes_to_usec(u->counter, &PA_SINK(o)->sample_spec);
527 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
528
529 *usec = yr > yl ? yr - yl : 0;
530 return 0;
531 }
532
533 case SOURCE_MESSAGE_POST:
534
535 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
536 pa_source_post(u->source, chunk);
537
538 u->counter += chunk->length;
539
540 return 0;
541
542 case SOURCE_MESSAGE_REMOTE_SUSPEND:
543
544 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
545 return 0;
546
547 case SOURCE_MESSAGE_UPDATE_LATENCY: {
548 pa_usec_t y;
549
550 y = pa_bytes_to_usec(u->counter, &u->source->sample_spec);
551
552 if (offset >= 0 || y > (pa_usec_t) -offset)
553 y += offset;
554 else
555 y = 0;
556
557 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
558
559 return 0;
560 }
561 }
562
563 return pa_source_process_msg(o, code, data, offset, chunk);
564 }
565
566 /* Called from main context */
567 static int source_set_state(pa_source *s, pa_source_state_t state) {
568 struct userdata *u;
569 pa_source_assert_ref(s);
570 u = s->userdata;
571
572 switch ((pa_source_state_t) state) {
573
574 case PA_SOURCE_SUSPENDED:
575 pa_assert(PA_SOURCE_IS_OPENED(s->state));
576 stream_cork(u, TRUE);
577 break;
578
579 case PA_SOURCE_IDLE:
580 case PA_SOURCE_RUNNING:
581 if (s->state == PA_SOURCE_SUSPENDED)
582 stream_cork(u, FALSE);
583 break;
584
585 case PA_SOURCE_UNLINKED:
586 case PA_SOURCE_INIT:
587 ;
588 }
589
590 return 0;
591 }
592
593 #endif
594
595 static void thread_func(void *userdata) {
596 struct userdata *u = userdata;
597
598 pa_assert(u);
599
600 pa_log_debug("Thread starting up");
601
602 pa_thread_mq_install(&u->thread_mq);
603 pa_rtpoll_install(u->rtpoll);
604
605 for (;;) {
606 int ret;
607
608 #ifdef TUNNEL_SINK
609 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
610 if (u->sink->thread_info.rewind_requested)
611 pa_sink_process_rewind(u->sink, 0);
612 #endif
613
614 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
615 goto fail;
616
617 if (ret == 0)
618 goto finish;
619 }
620
621 fail:
622 /* If this was no regular exit from the loop we have to continue
623 * processing messages until we received PA_MESSAGE_SHUTDOWN */
624 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
625 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
626
627 finish:
628 pa_log_debug("Thread shutting down");
629 }
630
631 #ifdef TUNNEL_SINK
632 /* Called from main context */
633 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
634 struct userdata *u = userdata;
635 uint32_t bytes, channel;
636
637 pa_assert(pd);
638 pa_assert(command == PA_COMMAND_REQUEST);
639 pa_assert(t);
640 pa_assert(u);
641 pa_assert(u->pdispatch == pd);
642
643 if (pa_tagstruct_getu32(t, &channel) < 0 ||
644 pa_tagstruct_getu32(t, &bytes) < 0) {
645 pa_log("Invalid protocol reply");
646 goto fail;
647 }
648
649 if (channel != u->channel) {
650 pa_log("Recieved data for invalid channel");
651 goto fail;
652 }
653
654 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
655 return;
656
657 fail:
658 pa_module_unload_request(u->module);
659 }
660
661 #endif
662
663 /* Called from main context */
664 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
665 struct userdata *u = userdata;
666 pa_usec_t sink_usec, source_usec, transport_usec;
667 pa_bool_t playing;
668 int64_t write_index, read_index;
669 struct timeval local, remote, now;
670 pa_sample_spec *ss;
671 int64_t delay;
672
673 pa_assert(pd);
674 pa_assert(u);
675
676 if (command != PA_COMMAND_REPLY) {
677 if (command == PA_COMMAND_ERROR)
678 pa_log("Failed to get latency.");
679 else
680 pa_log("Protocol error.");
681 goto fail;
682 }
683
684 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
685 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
686 pa_tagstruct_get_boolean(t, &playing) < 0 ||
687 pa_tagstruct_get_timeval(t, &local) < 0 ||
688 pa_tagstruct_get_timeval(t, &remote) < 0 ||
689 pa_tagstruct_gets64(t, &write_index) < 0 ||
690 pa_tagstruct_gets64(t, &read_index) < 0) {
691 pa_log("Invalid reply.");
692 goto fail;
693 }
694
695 #ifdef TUNNEL_SINK
696 if (u->version >= 13) {
697 uint64_t underrun_for = 0, playing_for = 0;
698
699 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
700 pa_tagstruct_getu64(t, &playing_for) < 0) {
701 pa_log("Invalid reply.");
702 goto fail;
703 }
704 }
705 #endif
706
707 if (!pa_tagstruct_eof(t)) {
708 pa_log("Invalid reply.");
709 goto fail;
710 }
711
712 if (tag < u->ignore_latency_before) {
713 request_latency(u);
714 return;
715 }
716
717 pa_gettimeofday(&now);
718
719 /* Calculate transport usec */
720 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
721 /* local and remote seem to have synchronized clocks */
722 #ifdef TUNNEL_SINK
723 u->transport_usec = pa_timeval_diff(&remote, &local);
724 #else
725 u->transport_usec = pa_timeval_diff(&now, &remote);
726 #endif
727 } else
728 u->transport_usec = pa_timeval_diff(&now, &local)/2;
729 u->transport_usec_valid = TRUE;
730
731 /* First, take the device's delay */
732 #ifdef TUNNEL_SINK
733 delay = (int64_t) sink_usec;
734 ss = &u->sink->sample_spec;
735 #else
736 delay = (int64_t) source_usec;
737 ss = &u->source->sample_spec;
738 #endif
739
740 /* Add the length of our server-side buffer */
741 if (write_index >= read_index)
742 delay += (int64_t) pa_bytes_to_usec(write_index-read_index, ss);
743 else
744 delay -= (int64_t) pa_bytes_to_usec(read_index-write_index, ss);
745
746 /* Our measurements are already out of date, hence correct by the *
747 * transport latency */
748 #ifdef TUNNEL_SINK
749 delay -= (int64_t) transport_usec;
750 #else
751 delay += (int64_t) transport_usec;
752 #endif
753
754 /* Now correct by what we have have read/written since we requested the update */
755 #ifdef TUNNEL_SINK
756 delay += (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
757 #else
758 delay -= (int64_t) pa_bytes_to_usec(u->counter_delta, ss);
759 #endif
760
761 #ifdef TUNNEL_SINK
762 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
763 #else
764 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
765 #endif
766
767 return;
768
769 fail:
770
771 pa_module_unload_request(u->module);
772 }
773
774 /* Called from main context */
775 static void request_latency(struct userdata *u) {
776 pa_tagstruct *t;
777 struct timeval now;
778 uint32_t tag;
779 pa_assert(u);
780
781 t = pa_tagstruct_new(NULL, 0);
782 #ifdef TUNNEL_SINK
783 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
784 #else
785 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
786 #endif
787 pa_tagstruct_putu32(t, tag = u->ctag++);
788 pa_tagstruct_putu32(t, u->channel);
789
790 pa_gettimeofday(&now);
791 pa_tagstruct_put_timeval(t, &now);
792
793 pa_pstream_send_tagstruct(u->pstream, t);
794 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
795
796 u->ignore_latency_before = tag;
797 u->counter_delta = 0;
798 }
799
800 /* Called from main context */
801 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
802 struct userdata *u = userdata;
803 struct timeval ntv;
804
805 pa_assert(m);
806 pa_assert(e);
807 pa_assert(u);
808
809 request_latency(u);
810
811 pa_gettimeofday(&ntv);
812 ntv.tv_sec += LATENCY_INTERVAL;
813 m->time_restart(e, &ntv);
814 }
815
816 /* Called from main context */
817 static void update_description(struct userdata *u) {
818 char *d;
819 char un[128], hn[128];
820 pa_tagstruct *t;
821
822 pa_assert(u);
823
824 if (!u->server_fqdn || !u->user_name || !u->device_description)
825 return;
826
827 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
828
829 #ifdef TUNNEL_SINK
830 pa_sink_set_description(u->sink, d);
831 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
832 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
833 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
834 #else
835 pa_source_set_description(u->source, d);
836 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
837 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
838 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
839 #endif
840
841 pa_xfree(d);
842
843 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
844 pa_get_user_name(un, sizeof(un)),
845 pa_get_host_name(hn, sizeof(hn)));
846
847 t = pa_tagstruct_new(NULL, 0);
848 #ifdef TUNNEL_SINK
849 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
850 #else
851 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
852 #endif
853 pa_tagstruct_putu32(t, u->ctag++);
854 pa_tagstruct_putu32(t, u->channel);
855 pa_tagstruct_puts(t, d);
856 pa_pstream_send_tagstruct(u->pstream, t);
857
858 pa_xfree(d);
859 }
860
861 /* Called from main context */
862 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
863 struct userdata *u = userdata;
864 pa_sample_spec ss;
865 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
866 uint32_t cookie;
867
868 pa_assert(pd);
869 pa_assert(u);
870
871 if (command != PA_COMMAND_REPLY) {
872 if (command == PA_COMMAND_ERROR)
873 pa_log("Failed to get info.");
874 else
875 pa_log("Protocol error.");
876 goto fail;
877 }
878
879 if (pa_tagstruct_gets(t, &server_name) < 0 ||
880 pa_tagstruct_gets(t, &server_version) < 0 ||
881 pa_tagstruct_gets(t, &user_name) < 0 ||
882 pa_tagstruct_gets(t, &host_name) < 0 ||
883 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
884 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
885 pa_tagstruct_gets(t, &default_source_name) < 0 ||
886 pa_tagstruct_getu32(t, &cookie) < 0) {
887
888 pa_log("Parse failure");
889 goto fail;
890 }
891
892 if (!pa_tagstruct_eof(t)) {
893 pa_log("Packet too long");
894 goto fail;
895 }
896
897 pa_xfree(u->server_fqdn);
898 u->server_fqdn = pa_xstrdup(host_name);
899
900 pa_xfree(u->user_name);
901 u->user_name = pa_xstrdup(user_name);
902
903 update_description(u);
904
905 return;
906
907 fail:
908 pa_module_unload_request(u->module);
909 }
910
911 #ifdef TUNNEL_SINK
912
913 /* Called from main context */
914 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
915 struct userdata *u = userdata;
916 uint32_t idx, owner_module, monitor_source, flags;
917 const char *name, *description, *monitor_source_name, *driver;
918 pa_sample_spec ss;
919 pa_channel_map cm;
920 pa_cvolume volume;
921 pa_bool_t mute;
922 pa_usec_t latency;
923 pa_proplist *pl;
924
925 pa_assert(pd);
926 pa_assert(u);
927
928 pl = pa_proplist_new();
929
930 if (command != PA_COMMAND_REPLY) {
931 if (command == PA_COMMAND_ERROR)
932 pa_log("Failed to get info.");
933 else
934 pa_log("Protocol error.");
935 goto fail;
936 }
937
938 if (pa_tagstruct_getu32(t, &idx) < 0 ||
939 pa_tagstruct_gets(t, &name) < 0 ||
940 pa_tagstruct_gets(t, &description) < 0 ||
941 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
942 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
943 pa_tagstruct_getu32(t, &owner_module) < 0 ||
944 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
945 pa_tagstruct_get_boolean(t, &mute) < 0 ||
946 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
947 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
948 pa_tagstruct_get_usec(t, &latency) < 0 ||
949 pa_tagstruct_gets(t, &driver) < 0 ||
950 pa_tagstruct_getu32(t, &flags) < 0) {
951
952 pa_log("Parse failure");
953 goto fail;
954 }
955
956 if (u->version >= 13) {
957 pa_usec_t configured_latency;
958
959 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
960 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
961
962 pa_log("Parse failure");
963 goto fail;
964 }
965 }
966
967 if (!pa_tagstruct_eof(t)) {
968 pa_log("Packet too long");
969 goto fail;
970 }
971
972 pa_proplist_free(pl);
973
974 if (!u->sink_name || strcmp(name, u->sink_name))
975 return;
976
977 pa_xfree(u->device_description);
978 u->device_description = pa_xstrdup(description);
979
980 update_description(u);
981
982 return;
983
984 fail:
985 pa_module_unload_request(u->module);
986 pa_proplist_free(pl);
987 }
988
989 /* Called from main context */
990 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
991 struct userdata *u = userdata;
992 uint32_t idx, owner_module, client, sink;
993 pa_usec_t buffer_usec, sink_usec;
994 const char *name, *driver, *resample_method;
995 pa_bool_t mute;
996 pa_sample_spec sample_spec;
997 pa_channel_map channel_map;
998 pa_cvolume volume;
999 pa_proplist *pl;
1000
1001 pa_assert(pd);
1002 pa_assert(u);
1003
1004 pl = pa_proplist_new();
1005
1006 if (command != PA_COMMAND_REPLY) {
1007 if (command == PA_COMMAND_ERROR)
1008 pa_log("Failed to get info.");
1009 else
1010 pa_log("Protocol error.");
1011 goto fail;
1012 }
1013
1014 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1015 pa_tagstruct_gets(t, &name) < 0 ||
1016 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1017 pa_tagstruct_getu32(t, &client) < 0 ||
1018 pa_tagstruct_getu32(t, &sink) < 0 ||
1019 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1020 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1021 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1022 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1023 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1024 pa_tagstruct_gets(t, &resample_method) < 0 ||
1025 pa_tagstruct_gets(t, &driver) < 0) {
1026
1027 pa_log("Parse failure");
1028 goto fail;
1029 }
1030
1031 if (u->version >= 11) {
1032 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1033
1034 pa_log("Parse failure");
1035 goto fail;
1036 }
1037 }
1038
1039 if (u->version >= 13) {
1040 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1041
1042 pa_log("Parse failure");
1043 goto fail;
1044 }
1045 }
1046
1047 if (!pa_tagstruct_eof(t)) {
1048 pa_log("Packet too long");
1049 goto fail;
1050 }
1051
1052 pa_proplist_free(pl);
1053
1054 if (idx != u->device_index)
1055 return;
1056
1057 pa_assert(u->sink);
1058
1059 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1060 pa_cvolume_equal(&volume, &u->sink->volume))
1061 return;
1062
1063 memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
1064
1065 if (u->version >= 11)
1066 u->sink->muted = !!mute;
1067
1068 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
1069 return;
1070
1071 fail:
1072 pa_module_unload_request(u->module);
1073 pa_proplist_free(pl);
1074 }
1075
1076 #else
1077
1078 /* Called from main context */
1079 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1080 struct userdata *u = userdata;
1081 uint32_t idx, owner_module, monitor_of_sink, flags;
1082 const char *name, *description, *monitor_of_sink_name, *driver;
1083 pa_sample_spec ss;
1084 pa_channel_map cm;
1085 pa_cvolume volume;
1086 pa_bool_t mute;
1087 pa_usec_t latency, configured_latency;
1088 pa_proplist *pl;
1089
1090 pa_assert(pd);
1091 pa_assert(u);
1092
1093 pl = pa_proplist_new();
1094
1095 if (command != PA_COMMAND_REPLY) {
1096 if (command == PA_COMMAND_ERROR)
1097 pa_log("Failed to get info.");
1098 else
1099 pa_log("Protocol error.");
1100 goto fail;
1101 }
1102
1103 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1104 pa_tagstruct_gets(t, &name) < 0 ||
1105 pa_tagstruct_gets(t, &description) < 0 ||
1106 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1107 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1108 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1109 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1110 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1111 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1112 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1113 pa_tagstruct_get_usec(t, &latency) < 0 ||
1114 pa_tagstruct_gets(t, &driver) < 0 ||
1115 pa_tagstruct_getu32(t, &flags) < 0) {
1116
1117 pa_log("Parse failure");
1118 goto fail;
1119 }
1120
1121 if (u->version >= 13) {
1122 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1123 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1124
1125 pa_log("Parse failure");
1126 goto fail;
1127 }
1128 }
1129
1130 if (!pa_tagstruct_eof(t)) {
1131 pa_log("Packet too long");
1132 goto fail;
1133 }
1134
1135 pa_proplist_free(pl);
1136
1137 if (!u->source_name || strcmp(name, u->source_name))
1138 return;
1139
1140 pa_xfree(u->device_description);
1141 u->device_description = pa_xstrdup(description);
1142
1143 update_description(u);
1144
1145 return;
1146
1147 fail:
1148 pa_module_unload_request(u->module);
1149 pa_proplist_free(pl);
1150 }
1151
1152 #endif
1153
1154 /* Called from main context */
1155 static void request_info(struct userdata *u) {
1156 pa_tagstruct *t;
1157 uint32_t tag;
1158 pa_assert(u);
1159
1160 t = pa_tagstruct_new(NULL, 0);
1161 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1162 pa_tagstruct_putu32(t, tag = u->ctag++);
1163 pa_pstream_send_tagstruct(u->pstream, t);
1164 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1165
1166 #ifdef TUNNEL_SINK
1167 t = pa_tagstruct_new(NULL, 0);
1168 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1169 pa_tagstruct_putu32(t, tag = u->ctag++);
1170 pa_tagstruct_putu32(t, u->device_index);
1171 pa_pstream_send_tagstruct(u->pstream, t);
1172 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1173
1174 if (u->sink_name) {
1175 t = pa_tagstruct_new(NULL, 0);
1176 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1177 pa_tagstruct_putu32(t, tag = u->ctag++);
1178 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1179 pa_tagstruct_puts(t, u->sink_name);
1180 pa_pstream_send_tagstruct(u->pstream, t);
1181 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1182 }
1183 #else
1184 if (u->source_name) {
1185 t = pa_tagstruct_new(NULL, 0);
1186 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1187 pa_tagstruct_putu32(t, tag = u->ctag++);
1188 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1189 pa_tagstruct_puts(t, u->source_name);
1190 pa_pstream_send_tagstruct(u->pstream, t);
1191 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1192 }
1193 #endif
1194 }
1195
1196 /* Called from main context */
1197 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1198 struct userdata *u = userdata;
1199 pa_subscription_event_type_t e;
1200 uint32_t idx;
1201
1202 pa_assert(pd);
1203 pa_assert(t);
1204 pa_assert(u);
1205 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1206
1207 if (pa_tagstruct_getu32(t, &e) < 0 ||
1208 pa_tagstruct_getu32(t, &idx) < 0) {
1209 pa_log("Invalid protocol reply");
1210 pa_module_unload_request(u->module);
1211 return;
1212 }
1213
1214 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1215 #ifdef TUNNEL_SINK
1216 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1217 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1218 #else
1219 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1220 #endif
1221 )
1222 return;
1223
1224 request_info(u);
1225 }
1226
1227 /* Called from main context */
1228 static void start_subscribe(struct userdata *u) {
1229 pa_tagstruct *t;
1230 uint32_t tag;
1231 pa_assert(u);
1232
1233 t = pa_tagstruct_new(NULL, 0);
1234 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1235 pa_tagstruct_putu32(t, tag = u->ctag++);
1236 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1237 #ifdef TUNNEL_SINK
1238 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1239 #else
1240 PA_SUBSCRIPTION_MASK_SOURCE
1241 #endif
1242 );
1243
1244 pa_pstream_send_tagstruct(u->pstream, t);
1245 }
1246
1247 /* Called from main context */
1248 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1249 struct userdata *u = userdata;
1250 struct timeval ntv;
1251 #ifdef TUNNEL_SINK
1252 uint32_t bytes;
1253 #endif
1254
1255 pa_assert(pd);
1256 pa_assert(u);
1257 pa_assert(u->pdispatch == pd);
1258
1259 if (command != PA_COMMAND_REPLY) {
1260 if (command == PA_COMMAND_ERROR)
1261 pa_log("Failed to create stream.");
1262 else
1263 pa_log("Protocol error.");
1264 goto fail;
1265 }
1266
1267 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1268 pa_tagstruct_getu32(t, &u->device_index) < 0
1269 #ifdef TUNNEL_SINK
1270 || pa_tagstruct_getu32(t, &bytes) < 0
1271 #endif
1272 )
1273 goto parse_error;
1274
1275 if (u->version >= 9) {
1276 #ifdef TUNNEL_SINK
1277 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1278 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1279 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1280 pa_tagstruct_getu32(t, &u->minreq) < 0)
1281 goto parse_error;
1282 #else
1283 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1284 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1285 goto parse_error;
1286 #endif
1287 }
1288
1289 if (u->version >= 12) {
1290 pa_sample_spec ss;
1291 pa_channel_map cm;
1292 uint32_t device_index;
1293 const char *dn;
1294 pa_bool_t suspended;
1295
1296 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1297 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1298 pa_tagstruct_getu32(t, &device_index) < 0 ||
1299 pa_tagstruct_gets(t, &dn) < 0 ||
1300 pa_tagstruct_get_boolean(t, &suspended) < 0)
1301 goto parse_error;
1302
1303 #ifdef TUNNEL_SINK
1304 pa_xfree(u->sink_name);
1305 u->sink_name = pa_xstrdup(dn);
1306 #else
1307 pa_xfree(u->source_name);
1308 u->source_name = pa_xstrdup(dn);
1309 #endif
1310 }
1311
1312 if (u->version >= 13) {
1313 pa_usec_t usec;
1314
1315 if (pa_tagstruct_get_usec(t, &usec) < 0)
1316 goto parse_error;
1317
1318 #ifdef TUNNEL_SINK
1319 pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
1320 #else
1321 pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
1322 #endif
1323 }
1324
1325 if (!pa_tagstruct_eof(t))
1326 goto parse_error;
1327
1328 start_subscribe(u);
1329 request_info(u);
1330
1331 pa_assert(!u->time_event);
1332 pa_gettimeofday(&ntv);
1333 ntv.tv_sec += LATENCY_INTERVAL;
1334 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1335
1336 request_latency(u);
1337
1338 pa_log_debug("Stream created.");
1339
1340 #ifdef TUNNEL_SINK
1341 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1342 #endif
1343
1344 return;
1345
1346 parse_error:
1347 pa_log("Invalid reply. (Create stream)");
1348
1349 fail:
1350 pa_module_unload_request(u->module);
1351
1352 }
1353
1354 /* Called from main context */
1355 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1356 struct userdata *u = userdata;
1357 pa_tagstruct *reply;
1358 char name[256], un[128], hn[128];
1359 #ifdef TUNNEL_SINK
1360 pa_cvolume volume;
1361 #endif
1362
1363 pa_assert(pd);
1364 pa_assert(u);
1365 pa_assert(u->pdispatch == pd);
1366
1367 if (command != PA_COMMAND_REPLY ||
1368 pa_tagstruct_getu32(t, &u->version) < 0 ||
1369 !pa_tagstruct_eof(t)) {
1370
1371 if (command == PA_COMMAND_ERROR)
1372 pa_log("Failed to authenticate");
1373 else
1374 pa_log("Protocol error.");
1375
1376 goto fail;
1377 }
1378
1379 /* Minimum supported protocol version */
1380 if (u->version < 8) {
1381 pa_log("Incompatible protocol version");
1382 goto fail;
1383 }
1384
1385 /* Starting with protocol version 13 the MSB of the version tag
1386 reflects if shm is enabled for this connection or not. We don't
1387 support SHM here at all, so we just ignore this. */
1388
1389 if (u->version >= 13)
1390 u->version &= 0x7FFFFFFFU;
1391
1392 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1393
1394 #ifdef TUNNEL_SINK
1395 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1396 u->sink_name,
1397 pa_get_user_name(un, sizeof(un)),
1398 pa_get_host_name(hn, sizeof(hn)));
1399 #else
1400 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1401 u->source_name,
1402 pa_get_user_name(un, sizeof(un)),
1403 pa_get_host_name(hn, sizeof(hn)));
1404 #endif
1405
1406 reply = pa_tagstruct_new(NULL, 0);
1407 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1408 pa_tagstruct_putu32(reply, tag = u->ctag++);
1409
1410 if (u->version >= 13) {
1411 pa_proplist *pl;
1412 pl = pa_proplist_new();
1413 pa_init_proplist(pl);
1414 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1415 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1416 pa_tagstruct_put_proplist(reply, pl);
1417 pa_proplist_free(pl);
1418 } else
1419 pa_tagstruct_puts(reply, "PulseAudio");
1420
1421 pa_pstream_send_tagstruct(u->pstream, reply);
1422 /* We ignore the server's reply here */
1423
1424 reply = pa_tagstruct_new(NULL, 0);
1425
1426 if (u->version < 13)
1427 /* Only for older PA versions we need to fill in the maxlength */
1428 u->maxlength = 4*1024*1024;
1429
1430 #ifdef TUNNEL_SINK
1431 u->tlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1432 u->minreq = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1433 u->prebuf = u->tlength;
1434 #else
1435 u->fragsize = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1436 #endif
1437
1438 #ifdef TUNNEL_SINK
1439 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1440 pa_tagstruct_putu32(reply, tag = u->ctag++);
1441
1442 if (u->version < 13)
1443 pa_tagstruct_puts(reply, name);
1444
1445 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1446 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1447 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1448 pa_tagstruct_puts(reply, u->sink_name);
1449 pa_tagstruct_putu32(reply, u->maxlength);
1450 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1451 pa_tagstruct_putu32(reply, u->tlength);
1452 pa_tagstruct_putu32(reply, u->prebuf);
1453 pa_tagstruct_putu32(reply, u->minreq);
1454 pa_tagstruct_putu32(reply, 0);
1455 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1456 pa_tagstruct_put_cvolume(reply, &volume);
1457 #else
1458 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1459 pa_tagstruct_putu32(reply, tag = u->ctag++);
1460
1461 if (u->version < 13)
1462 pa_tagstruct_puts(reply, name);
1463
1464 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1465 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1466 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1467 pa_tagstruct_puts(reply, u->source_name);
1468 pa_tagstruct_putu32(reply, u->maxlength);
1469 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1470 pa_tagstruct_putu32(reply, u->fragsize);
1471 #endif
1472
1473 if (u->version >= 12) {
1474 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1475 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1476 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1477 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1478 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1479 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1480 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1481 }
1482
1483 if (u->version >= 13) {
1484 pa_proplist *pl;
1485
1486 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1487 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1488
1489 pl = pa_proplist_new();
1490 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1491 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1492 pa_tagstruct_put_proplist(reply, pl);
1493 pa_proplist_free(pl);
1494
1495 #ifndef TUNNEL_SINK
1496 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1497 #endif
1498 }
1499
1500 pa_pstream_send_tagstruct(u->pstream, reply);
1501 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1502
1503 pa_log_debug("Connection authenticated, creating stream ...");
1504
1505 return;
1506
1507 fail:
1508 pa_module_unload_request(u->module);
1509 }
1510
1511 /* Called from main context */
1512 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1513 struct userdata *u = userdata;
1514
1515 pa_assert(p);
1516 pa_assert(u);
1517
1518 pa_log_warn("Stream died.");
1519 pa_module_unload_request(u->module);
1520 }
1521
1522 /* Called from main context */
1523 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1524 struct userdata *u = userdata;
1525
1526 pa_assert(p);
1527 pa_assert(packet);
1528 pa_assert(u);
1529
1530 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1531 pa_log("Invalid packet");
1532 pa_module_unload_request(u->module);
1533 return;
1534 }
1535 }
1536
1537 #ifndef TUNNEL_SINK
1538 /* Called from main context */
1539 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1540 struct userdata *u = userdata;
1541
1542 pa_assert(p);
1543 pa_assert(chunk);
1544 pa_assert(u);
1545
1546 if (channel != u->channel) {
1547 pa_log("Recieved memory block on bad channel.");
1548 pa_module_unload_request(u->module);
1549 return;
1550 }
1551
1552 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1553
1554 u->counter_delta += chunk->length;
1555 }
1556
1557 #endif
1558
1559 /* Called from main context */
1560 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1561 struct userdata *u = userdata;
1562 pa_tagstruct *t;
1563 uint32_t tag;
1564
1565 pa_assert(sc);
1566 pa_assert(u);
1567 pa_assert(u->client == sc);
1568
1569 pa_socket_client_unref(u->client);
1570 u->client = NULL;
1571
1572 if (!io) {
1573 pa_log("Connection failed: %s", pa_cstrerror(errno));
1574 pa_module_unload_request(u->module);
1575 return;
1576 }
1577
1578 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1579 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1580
1581 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1582 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1583 #ifndef TUNNEL_SINK
1584 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1585 #endif
1586
1587 t = pa_tagstruct_new(NULL, 0);
1588 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1589 pa_tagstruct_putu32(t, tag = u->ctag++);
1590 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1591 pa_tagstruct_put_arbitrary(t, u->auth_cookie, sizeof(u->auth_cookie));
1592
1593 #ifdef HAVE_CREDS
1594 {
1595 pa_creds ucred;
1596
1597 if (pa_iochannel_creds_supported(io))
1598 pa_iochannel_creds_enable(io);
1599
1600 ucred.uid = getuid();
1601 ucred.gid = getgid();
1602
1603 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1604 }
1605 #else
1606 pa_pstream_send_tagstruct(u->pstream, t);
1607 #endif
1608
1609 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1610
1611 pa_log_debug("Connection established, authenticating ...");
1612 }
1613
1614 #ifdef TUNNEL_SINK
1615
1616 /* Called from main context */
1617 static int sink_set_volume(pa_sink *sink) {
1618 struct userdata *u;
1619 pa_tagstruct *t;
1620 uint32_t tag;
1621
1622 pa_assert(sink);
1623 u = sink->userdata;
1624 pa_assert(u);
1625
1626 t = pa_tagstruct_new(NULL, 0);
1627 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1628 pa_tagstruct_putu32(t, tag = u->ctag++);
1629 pa_tagstruct_putu32(t, u->device_index);
1630 pa_tagstruct_put_cvolume(t, &sink->volume);
1631 pa_pstream_send_tagstruct(u->pstream, t);
1632
1633 return 0;
1634 }
1635
1636 /* Called from main context */
1637 static int sink_set_mute(pa_sink *sink) {
1638 struct userdata *u;
1639 pa_tagstruct *t;
1640 uint32_t tag;
1641
1642 pa_assert(sink);
1643 u = sink->userdata;
1644 pa_assert(u);
1645
1646 if (u->version < 11)
1647 return -1;
1648
1649 t = pa_tagstruct_new(NULL, 0);
1650 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1651 pa_tagstruct_putu32(t, tag = u->ctag++);
1652 pa_tagstruct_putu32(t, u->device_index);
1653 pa_tagstruct_put_boolean(t, !!sink->muted);
1654 pa_pstream_send_tagstruct(u->pstream, t);
1655
1656 return 0;
1657 }
1658
1659 #endif
1660
1661 /* Called from main context */
1662 static int load_key(struct userdata *u, const char*fn) {
1663 pa_assert(u);
1664
1665 u->auth_cookie_in_property = FALSE;
1666
1667 if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) {
1668 pa_log_debug("Using already loaded auth cookie.");
1669 pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1670 u->auth_cookie_in_property = 1;
1671 return 0;
1672 }
1673
1674 if (!fn)
1675 fn = PA_NATIVE_COOKIE_FILE;
1676
1677 if (pa_authkey_load_auto(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0)
1678 return -1;
1679
1680 pa_log_debug("Loading cookie from disk.");
1681
1682 if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0)
1683 u->auth_cookie_in_property = TRUE;
1684
1685 return 0;
1686 }
1687
1688 int pa__init(pa_module*m) {
1689 pa_modargs *ma = NULL;
1690 struct userdata *u = NULL;
1691 pa_sample_spec ss;
1692 pa_channel_map map;
1693 char *dn = NULL;
1694 #ifdef TUNNEL_SINK
1695 pa_sink_new_data data;
1696 #else
1697 pa_source_new_data data;
1698 #endif
1699
1700 pa_assert(m);
1701
1702 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1703 pa_log("Failed to parse module arguments");
1704 goto fail;
1705 }
1706
1707 m->userdata = u = pa_xnew0(struct userdata, 1);
1708 u->core = m->core;
1709 u->module = m;
1710 u->client = NULL;
1711 u->pdispatch = NULL;
1712 u->pstream = NULL;
1713 u->server_name = NULL;
1714 #ifdef TUNNEL_SINK
1715 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1716 u->sink = NULL;
1717 u->requested_bytes = 0;
1718 #else
1719 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1720 u->source = NULL;
1721 #endif
1722 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1723 u->ctag = 1;
1724 u->device_index = u->channel = PA_INVALID_INDEX;
1725 u->auth_cookie_in_property = FALSE;
1726 u->time_event = NULL;
1727 u->ignore_latency_before = 0;
1728 u->transport_usec = 0;
1729 u->transport_usec_valid = FALSE;
1730 u->remote_suspended = u->remote_corked = FALSE;
1731 u->counter = u->counter_delta = 0;
1732
1733 u->rtpoll = pa_rtpoll_new();
1734 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1735
1736 if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0)
1737 goto fail;
1738
1739 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1740 pa_log("No server specified.");
1741 goto fail;
1742 }
1743
1744 ss = m->core->default_sample_spec;
1745 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1746 pa_log("Invalid sample format specification");
1747 goto fail;
1748 }
1749
1750 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1751 pa_log("Failed to connect to server '%s'", u->server_name);
1752 goto fail;
1753 }
1754
1755 pa_socket_client_set_callback(u->client, on_connection, u);
1756
1757 #ifdef TUNNEL_SINK
1758
1759 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1760 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1761
1762 pa_sink_new_data_init(&data);
1763 data.driver = __FILE__;
1764 data.module = m;
1765 data.namereg_fail = TRUE;
1766 pa_sink_new_data_set_name(&data, dn);
1767 pa_sink_new_data_set_sample_spec(&data, &ss);
1768 pa_sink_new_data_set_channel_map(&data, &map);
1769 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1770 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1771 if (u->sink_name)
1772 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1773
1774 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1775 pa_sink_new_data_done(&data);
1776
1777 if (!u->sink) {
1778 pa_log("Failed to create sink.");
1779 goto fail;
1780 }
1781
1782 u->sink->parent.process_msg = sink_process_msg;
1783 u->sink->userdata = u;
1784 u->sink->set_state = sink_set_state;
1785 u->sink->set_volume = sink_set_volume;
1786 u->sink->set_mute = sink_set_mute;
1787
1788 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1789
1790 pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
1791
1792 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1793 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1794
1795 #else
1796
1797 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1798 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1799
1800 pa_source_new_data_init(&data);
1801 data.driver = __FILE__;
1802 data.module = m;
1803 data.namereg_fail = TRUE;
1804 pa_source_new_data_set_name(&data, dn);
1805 pa_source_new_data_set_sample_spec(&data, &ss);
1806 pa_source_new_data_set_channel_map(&data, &map);
1807 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1808 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1809 if (u->source_name)
1810 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1811
1812 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1813 pa_source_new_data_done(&data);
1814
1815 if (!u->source) {
1816 pa_log("Failed to create source.");
1817 goto fail;
1818 }
1819
1820 u->source->parent.process_msg = source_process_msg;
1821 u->source->set_state = source_set_state;
1822 u->source->userdata = u;
1823
1824 pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
1825
1826 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1827 pa_source_set_rtpoll(u->source, u->rtpoll);
1828 #endif
1829
1830 pa_xfree(dn);
1831
1832 u->time_event = NULL;
1833
1834 u->maxlength = 0;
1835 #ifdef TUNNEL_SINK
1836 u->tlength = u->minreq = u->prebuf = 0;
1837 #else
1838 u->fragsize = 0;
1839 #endif
1840
1841 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1842
1843 if (!(u->thread = pa_thread_new(thread_func, u))) {
1844 pa_log("Failed to create thread.");
1845 goto fail;
1846 }
1847
1848 #ifdef TUNNEL_SINK
1849 pa_sink_put(u->sink);
1850 #else
1851 pa_source_put(u->source);
1852 #endif
1853
1854 pa_modargs_free(ma);
1855
1856 return 0;
1857
1858 fail:
1859 pa__done(m);
1860
1861 if (ma)
1862 pa_modargs_free(ma);
1863
1864 pa_xfree(dn);
1865
1866 return -1;
1867 }
1868
1869 void pa__done(pa_module*m) {
1870 struct userdata* u;
1871
1872 pa_assert(m);
1873
1874 if (!(u = m->userdata))
1875 return;
1876
1877 #ifdef TUNNEL_SINK
1878 if (u->sink)
1879 pa_sink_unlink(u->sink);
1880 #else
1881 if (u->source)
1882 pa_source_unlink(u->source);
1883 #endif
1884
1885 if (u->thread) {
1886 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1887 pa_thread_free(u->thread);
1888 }
1889
1890 pa_thread_mq_done(&u->thread_mq);
1891
1892 #ifdef TUNNEL_SINK
1893 if (u->sink)
1894 pa_sink_unref(u->sink);
1895 #else
1896 if (u->source)
1897 pa_source_unref(u->source);
1898 #endif
1899
1900 if (u->rtpoll)
1901 pa_rtpoll_free(u->rtpoll);
1902
1903 if (u->pstream) {
1904 pa_pstream_unlink(u->pstream);
1905 pa_pstream_unref(u->pstream);
1906 }
1907
1908 if (u->pdispatch)
1909 pa_pdispatch_unref(u->pdispatch);
1910
1911 if (u->client)
1912 pa_socket_client_unref(u->client);
1913
1914 if (u->auth_cookie_in_property)
1915 pa_authkey_prop_unref(m->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1916
1917 if (u->smoother)
1918 pa_smoother_free(u->smoother);
1919
1920 if (u->time_event)
1921 u->core->mainloop->time_free(u->time_event);
1922
1923 #ifdef TUNNEL_SINK
1924 pa_xfree(u->sink_name);
1925 #else
1926 pa_xfree(u->source_name);
1927 #endif
1928 pa_xfree(u->server_name);
1929
1930 pa_xfree(u->device_description);
1931 pa_xfree(u->server_fqdn);
1932 pa_xfree(u->user_name);
1933
1934 pa_xfree(u);
1935 }