]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
introduce default channel map in addition to the default sample spec
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "server=<address> "
68 "sink=<remote sink name> "
69 "cookie=<filename> "
70 "format=<sample format> "
71 "channels=<number of channels> "
72 "rate=<sample rate> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
75 #else
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
77 PA_MODULE_USAGE(
78 "server=<address> "
79 "source=<remote source name> "
80 "cookie=<filename> "
81 "format=<sample format> "
82 "channels=<number of channels> "
83 "rate=<sample rate> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
86 #endif
87
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION);
90 PA_MODULE_LOAD_ONCE(FALSE);
91
92 static const char* const valid_modargs[] = {
93 "server",
94 "cookie",
95 "format",
96 "channels",
97 "rate",
98 #ifdef TUNNEL_SINK
99 "sink_name",
100 "sink",
101 #else
102 "source_name",
103 "source",
104 #endif
105 "channel_map",
106 NULL,
107 };
108
109 #define DEFAULT_TIMEOUT 5
110
111 #define LATENCY_INTERVAL 10
112
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
114
115 #ifdef TUNNEL_SINK
116
117 enum {
118 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
119 SINK_MESSAGE_REMOTE_SUSPEND,
120 SINK_MESSAGE_UPDATE_LATENCY,
121 SINK_MESSAGE_POST
122 };
123
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
126
127 #else
128
129 enum {
130 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
131 SOURCE_MESSAGE_REMOTE_SUSPEND,
132 SOURCE_MESSAGE_UPDATE_LATENCY
133 };
134
135 #define DEFAULT_FRAGSIZE_MSEC 25
136
137 #endif
138
139 #ifdef TUNNEL_SINK
140 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
141 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 #endif
143 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148
149 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
150 #ifdef TUNNEL_SINK
151 [PA_COMMAND_REQUEST] = command_request,
152 [PA_COMMAND_STARTED] = command_started,
153 #endif
154 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
155 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
156 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
157 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
158 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
159 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
160 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
161 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
162 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved
163 };
164
165 struct userdata {
166 pa_core *core;
167 pa_module *module;
168
169 pa_thread_mq thread_mq;
170 pa_rtpoll *rtpoll;
171 pa_thread *thread;
172
173 pa_socket_client *client;
174 pa_pstream *pstream;
175 pa_pdispatch *pdispatch;
176
177 char *server_name;
178 #ifdef TUNNEL_SINK
179 char *sink_name;
180 pa_sink *sink;
181 size_t requested_bytes;
182 #else
183 char *source_name;
184 pa_source *source;
185 #endif
186
187 pa_auth_cookie *auth_cookie;
188
189 uint32_t version;
190 uint32_t ctag;
191 uint32_t device_index;
192 uint32_t channel;
193
194 int64_t counter, counter_delta;
195
196 pa_bool_t remote_corked:1;
197 pa_bool_t remote_suspended:1;
198
199 pa_usec_t transport_usec;
200 pa_bool_t transport_usec_valid;
201
202 uint32_t ignore_latency_before;
203
204 pa_time_event *time_event;
205
206 pa_smoother *smoother;
207
208 char *device_description;
209 char *server_fqdn;
210 char *user_name;
211
212 uint32_t maxlength;
213 #ifdef TUNNEL_SINK
214 uint32_t tlength;
215 uint32_t minreq;
216 uint32_t prebuf;
217 #else
218 uint32_t fragsize;
219 #endif
220 };
221
222 static void request_latency(struct userdata *u);
223
224 /* Called from main context */
225 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
226 struct userdata *u = userdata;
227
228 pa_assert(pd);
229 pa_assert(t);
230 pa_assert(u);
231 pa_assert(u->pdispatch == pd);
232
233 pa_log_warn("Stream killed");
234 pa_module_unload_request(u->module, TRUE);
235 }
236
237 /* Called from main context */
238 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
239 struct userdata *u = userdata;
240
241 pa_assert(pd);
242 pa_assert(t);
243 pa_assert(u);
244 pa_assert(u->pdispatch == pd);
245
246 pa_log_info("Server signalled buffer overrun/underrun.");
247 request_latency(u);
248 }
249
250 /* Called from main context */
251 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
252 struct userdata *u = userdata;
253 uint32_t channel;
254 pa_bool_t suspended;
255
256 pa_assert(pd);
257 pa_assert(t);
258 pa_assert(u);
259 pa_assert(u->pdispatch == pd);
260
261 if (pa_tagstruct_getu32(t, &channel) < 0 ||
262 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
263 !pa_tagstruct_eof(t)) {
264 pa_log("Invalid packet");
265 pa_module_unload_request(u->module, TRUE);
266 return;
267 }
268
269 #ifdef TUNNEL_SINK
270 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
271 #else
272 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
273 #endif
274
275 request_latency(u);
276 }
277
278 /* Called from main context */
279 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
280 struct userdata *u = userdata;
281
282 pa_assert(pd);
283 pa_assert(t);
284 pa_assert(u);
285 pa_assert(u->pdispatch == pd);
286
287 pa_log_debug("Server reports a stream move.");
288 request_latency(u);
289 }
290
291 #ifdef TUNNEL_SINK
292
293 /* Called from main context */
294 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
295 struct userdata *u = userdata;
296
297 pa_assert(pd);
298 pa_assert(t);
299 pa_assert(u);
300 pa_assert(u->pdispatch == pd);
301
302 pa_log_debug("Server reports playback started.");
303 request_latency(u);
304 }
305
306 #endif
307
308 /* Called from IO thread context */
309 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
310 pa_usec_t x;
311 pa_assert(u);
312
313 if (u->remote_corked == cork)
314 return;
315
316 u->remote_corked = cork;
317 x = pa_rtclock_usec();
318
319 /* Correct by the time this needs to travel to the other side.
320 * This is a valid thread-safe access, because the main thread is
321 * waiting for us */
322 if (u->transport_usec_valid)
323 x += u->transport_usec;
324
325 if (u->remote_suspended || u->remote_corked)
326 pa_smoother_pause(u->smoother, x);
327 else
328 pa_smoother_resume(u->smoother, x);
329 }
330
331 /* Called from main context */
332 static void stream_cork(struct userdata *u, pa_bool_t cork) {
333 pa_tagstruct *t;
334 pa_assert(u);
335
336 if (!u->pstream)
337 return;
338
339 t = pa_tagstruct_new(NULL, 0);
340 #ifdef TUNNEL_SINK
341 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
342 #else
343 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
344 #endif
345 pa_tagstruct_putu32(t, u->ctag++);
346 pa_tagstruct_putu32(t, u->channel);
347 pa_tagstruct_put_boolean(t, !!cork);
348 pa_pstream_send_tagstruct(u->pstream, t);
349
350 request_latency(u);
351 }
352
353 /* Called from IO thread context */
354 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
355 pa_usec_t x;
356 pa_assert(u);
357
358 if (u->remote_suspended == suspend)
359 return;
360
361 u->remote_suspended = suspend;
362
363 x = pa_rtclock_usec();
364
365 /* Correct by the time this needed to travel from the other side.
366 * This is a valid thread-safe access, because the main thread is
367 * waiting for us */
368 if (u->transport_usec_valid)
369 x -= u->transport_usec;
370
371 if (u->remote_suspended || u->remote_corked)
372 pa_smoother_pause(u->smoother, x);
373 else
374 pa_smoother_resume(u->smoother, x);
375 }
376
377 #ifdef TUNNEL_SINK
378
379 /* Called from IO thread context */
380 static void send_data(struct userdata *u) {
381 pa_assert(u);
382
383 while (u->requested_bytes > 0) {
384 pa_memchunk memchunk;
385
386 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
387 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
388 pa_memblock_unref(memchunk.memblock);
389
390 u->requested_bytes -= memchunk.length;
391
392 u->counter += (int64_t) memchunk.length;
393 }
394 }
395
396 /* This function is called from IO context -- except when it is not. */
397 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
398 struct userdata *u = PA_SINK(o)->userdata;
399
400 switch (code) {
401
402 case PA_SINK_MESSAGE_SET_STATE: {
403 int r;
404
405 /* First, change the state, because otherwide pa_sink_render() would fail */
406 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
407
408 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
409
410 if (PA_SINK_IS_OPENED(u->sink->state))
411 send_data(u);
412 }
413
414 return r;
415 }
416
417 case PA_SINK_MESSAGE_GET_LATENCY: {
418 pa_usec_t yl, yr, *usec = data;
419
420 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
421 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
422
423 *usec = yl > yr ? yl - yr : 0;
424 return 0;
425 }
426
427 case SINK_MESSAGE_REQUEST:
428
429 pa_assert(offset > 0);
430 u->requested_bytes += (size_t) offset;
431
432 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
433 send_data(u);
434
435 return 0;
436
437
438 case SINK_MESSAGE_REMOTE_SUSPEND:
439
440 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
441 return 0;
442
443
444 case SINK_MESSAGE_UPDATE_LATENCY: {
445 pa_usec_t y;
446
447 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
448
449 if (y > (pa_usec_t) offset || offset < 0)
450 y -= (pa_usec_t) offset;
451 else
452 y = 0;
453
454 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
455
456 return 0;
457 }
458
459 case SINK_MESSAGE_POST:
460
461 /* OK, This might be a bit confusing. This message is
462 * delivered to us from the main context -- NOT from the
463 * IO thread context where the rest of the messages are
464 * dispatched. Yeah, ugly, but I am a lazy bastard. */
465
466 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
467
468 u->counter_delta += (int64_t) chunk->length;
469
470 return 0;
471 }
472
473 return pa_sink_process_msg(o, code, data, offset, chunk);
474 }
475
476 /* Called from main context */
477 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
478 struct userdata *u;
479 pa_sink_assert_ref(s);
480 u = s->userdata;
481
482 switch ((pa_sink_state_t) state) {
483
484 case PA_SINK_SUSPENDED:
485 pa_assert(PA_SINK_IS_OPENED(s->state));
486 stream_cork(u, TRUE);
487 break;
488
489 case PA_SINK_IDLE:
490 case PA_SINK_RUNNING:
491 if (s->state == PA_SINK_SUSPENDED)
492 stream_cork(u, FALSE);
493 break;
494
495 case PA_SINK_UNLINKED:
496 case PA_SINK_INIT:
497 case PA_SINK_INVALID_STATE:
498 ;
499 }
500
501 return 0;
502 }
503
504 #else
505
506 /* This function is called from IO context -- except when it is not. */
507 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
508 struct userdata *u = PA_SOURCE(o)->userdata;
509
510 switch (code) {
511
512 case PA_SOURCE_MESSAGE_SET_STATE: {
513 int r;
514
515 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
516 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
517
518 return r;
519 }
520
521 case PA_SOURCE_MESSAGE_GET_LATENCY: {
522 pa_usec_t yr, yl, *usec = data;
523
524 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
525 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
526
527 *usec = yr > yl ? yr - yl : 0;
528 return 0;
529 }
530
531 case SOURCE_MESSAGE_POST:
532
533 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
534 pa_source_post(u->source, chunk);
535
536 u->counter += (int64_t) chunk->length;
537
538 return 0;
539
540 case SOURCE_MESSAGE_REMOTE_SUSPEND:
541
542 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
543 return 0;
544
545 case SOURCE_MESSAGE_UPDATE_LATENCY: {
546 pa_usec_t y;
547
548 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
549
550 if (offset >= 0 || y > (pa_usec_t) -offset)
551 y += (pa_usec_t) offset;
552 else
553 y = 0;
554
555 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
556
557 return 0;
558 }
559 }
560
561 return pa_source_process_msg(o, code, data, offset, chunk);
562 }
563
564 /* Called from main context */
565 static int source_set_state(pa_source *s, pa_source_state_t state) {
566 struct userdata *u;
567 pa_source_assert_ref(s);
568 u = s->userdata;
569
570 switch ((pa_source_state_t) state) {
571
572 case PA_SOURCE_SUSPENDED:
573 pa_assert(PA_SOURCE_IS_OPENED(s->state));
574 stream_cork(u, TRUE);
575 break;
576
577 case PA_SOURCE_IDLE:
578 case PA_SOURCE_RUNNING:
579 if (s->state == PA_SOURCE_SUSPENDED)
580 stream_cork(u, FALSE);
581 break;
582
583 case PA_SOURCE_UNLINKED:
584 case PA_SOURCE_INIT:
585 case PA_SINK_INVALID_STATE:
586 ;
587 }
588
589 return 0;
590 }
591
592 #endif
593
594 static void thread_func(void *userdata) {
595 struct userdata *u = userdata;
596
597 pa_assert(u);
598
599 pa_log_debug("Thread starting up");
600
601 pa_thread_mq_install(&u->thread_mq);
602 pa_rtpoll_install(u->rtpoll);
603
604 for (;;) {
605 int ret;
606
607 #ifdef TUNNEL_SINK
608 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
609 if (u->sink->thread_info.rewind_requested)
610 pa_sink_process_rewind(u->sink, 0);
611 #endif
612
613 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
614 goto fail;
615
616 if (ret == 0)
617 goto finish;
618 }
619
620 fail:
621 /* If this was no regular exit from the loop we have to continue
622 * processing messages until we received PA_MESSAGE_SHUTDOWN */
623 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
624 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
625
626 finish:
627 pa_log_debug("Thread shutting down");
628 }
629
630 #ifdef TUNNEL_SINK
631 /* Called from main context */
632 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633 struct userdata *u = userdata;
634 uint32_t bytes, channel;
635
636 pa_assert(pd);
637 pa_assert(command == PA_COMMAND_REQUEST);
638 pa_assert(t);
639 pa_assert(u);
640 pa_assert(u->pdispatch == pd);
641
642 if (pa_tagstruct_getu32(t, &channel) < 0 ||
643 pa_tagstruct_getu32(t, &bytes) < 0) {
644 pa_log("Invalid protocol reply");
645 goto fail;
646 }
647
648 if (channel != u->channel) {
649 pa_log("Recieved data for invalid channel");
650 goto fail;
651 }
652
653 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
654 return;
655
656 fail:
657 pa_module_unload_request(u->module, TRUE);
658 }
659
660 #endif
661
662 /* Called from main context */
663 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
664 struct userdata *u = userdata;
665 pa_usec_t sink_usec, source_usec, transport_usec = 0;
666 pa_bool_t playing;
667 int64_t write_index, read_index;
668 struct timeval local, remote, now;
669 pa_sample_spec *ss;
670 int64_t delay;
671
672 pa_assert(pd);
673 pa_assert(u);
674
675 if (command != PA_COMMAND_REPLY) {
676 if (command == PA_COMMAND_ERROR)
677 pa_log("Failed to get latency.");
678 else
679 pa_log("Protocol error.");
680 goto fail;
681 }
682
683 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
684 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
685 pa_tagstruct_get_boolean(t, &playing) < 0 ||
686 pa_tagstruct_get_timeval(t, &local) < 0 ||
687 pa_tagstruct_get_timeval(t, &remote) < 0 ||
688 pa_tagstruct_gets64(t, &write_index) < 0 ||
689 pa_tagstruct_gets64(t, &read_index) < 0) {
690 pa_log("Invalid reply.");
691 goto fail;
692 }
693
694 #ifdef TUNNEL_SINK
695 if (u->version >= 13) {
696 uint64_t underrun_for = 0, playing_for = 0;
697
698 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
699 pa_tagstruct_getu64(t, &playing_for) < 0) {
700 pa_log("Invalid reply.");
701 goto fail;
702 }
703 }
704 #endif
705
706 if (!pa_tagstruct_eof(t)) {
707 pa_log("Invalid reply.");
708 goto fail;
709 }
710
711 if (tag < u->ignore_latency_before) {
712 request_latency(u);
713 return;
714 }
715
716 pa_gettimeofday(&now);
717
718 /* Calculate transport usec */
719 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
720 /* local and remote seem to have synchronized clocks */
721 #ifdef TUNNEL_SINK
722 u->transport_usec = pa_timeval_diff(&remote, &local);
723 #else
724 u->transport_usec = pa_timeval_diff(&now, &remote);
725 #endif
726 } else
727 u->transport_usec = pa_timeval_diff(&now, &local)/2;
728 u->transport_usec_valid = TRUE;
729
730 /* First, take the device's delay */
731 #ifdef TUNNEL_SINK
732 delay = (int64_t) sink_usec;
733 ss = &u->sink->sample_spec;
734 #else
735 delay = (int64_t) source_usec;
736 ss = &u->source->sample_spec;
737 #endif
738
739 /* Add the length of our server-side buffer */
740 if (write_index >= read_index)
741 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
742 else
743 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
744
745 /* Our measurements are already out of date, hence correct by the *
746 * transport latency */
747 #ifdef TUNNEL_SINK
748 delay -= (int64_t) transport_usec;
749 #else
750 delay += (int64_t) transport_usec;
751 #endif
752
753 /* Now correct by what we have have read/written since we requested the update */
754 #ifdef TUNNEL_SINK
755 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
756 #else
757 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
758 #endif
759
760 #ifdef TUNNEL_SINK
761 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
762 #else
763 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
764 #endif
765
766 return;
767
768 fail:
769
770 pa_module_unload_request(u->module, TRUE);
771 }
772
773 /* Called from main context */
774 static void request_latency(struct userdata *u) {
775 pa_tagstruct *t;
776 struct timeval now;
777 uint32_t tag;
778 pa_assert(u);
779
780 t = pa_tagstruct_new(NULL, 0);
781 #ifdef TUNNEL_SINK
782 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
783 #else
784 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
785 #endif
786 pa_tagstruct_putu32(t, tag = u->ctag++);
787 pa_tagstruct_putu32(t, u->channel);
788
789 pa_gettimeofday(&now);
790 pa_tagstruct_put_timeval(t, &now);
791
792 pa_pstream_send_tagstruct(u->pstream, t);
793 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
794
795 u->ignore_latency_before = tag;
796 u->counter_delta = 0;
797 }
798
799 /* Called from main context */
800 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
801 struct userdata *u = userdata;
802 struct timeval ntv;
803
804 pa_assert(m);
805 pa_assert(e);
806 pa_assert(u);
807
808 request_latency(u);
809
810 pa_gettimeofday(&ntv);
811 ntv.tv_sec += LATENCY_INTERVAL;
812 m->time_restart(e, &ntv);
813 }
814
815 /* Called from main context */
816 static void update_description(struct userdata *u) {
817 char *d;
818 char un[128], hn[128];
819 pa_tagstruct *t;
820
821 pa_assert(u);
822
823 if (!u->server_fqdn || !u->user_name || !u->device_description)
824 return;
825
826 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
827
828 #ifdef TUNNEL_SINK
829 pa_sink_set_description(u->sink, d);
830 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
831 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
832 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
833 #else
834 pa_source_set_description(u->source, d);
835 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
836 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
837 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
838 #endif
839
840 pa_xfree(d);
841
842 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
843 pa_get_user_name(un, sizeof(un)),
844 pa_get_host_name(hn, sizeof(hn)));
845
846 t = pa_tagstruct_new(NULL, 0);
847 #ifdef TUNNEL_SINK
848 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
849 #else
850 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
851 #endif
852 pa_tagstruct_putu32(t, u->ctag++);
853 pa_tagstruct_putu32(t, u->channel);
854 pa_tagstruct_puts(t, d);
855 pa_pstream_send_tagstruct(u->pstream, t);
856
857 pa_xfree(d);
858 }
859
860 /* Called from main context */
861 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
862 struct userdata *u = userdata;
863 pa_sample_spec ss;
864 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
865 uint32_t cookie;
866
867 pa_assert(pd);
868 pa_assert(u);
869
870 if (command != PA_COMMAND_REPLY) {
871 if (command == PA_COMMAND_ERROR)
872 pa_log("Failed to get info.");
873 else
874 pa_log("Protocol error.");
875 goto fail;
876 }
877
878 if (pa_tagstruct_gets(t, &server_name) < 0 ||
879 pa_tagstruct_gets(t, &server_version) < 0 ||
880 pa_tagstruct_gets(t, &user_name) < 0 ||
881 pa_tagstruct_gets(t, &host_name) < 0 ||
882 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
883 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
884 pa_tagstruct_gets(t, &default_source_name) < 0 ||
885 pa_tagstruct_getu32(t, &cookie) < 0) {
886
887 pa_log("Parse failure");
888 goto fail;
889 }
890
891 if (!pa_tagstruct_eof(t)) {
892 pa_log("Packet too long");
893 goto fail;
894 }
895
896 pa_xfree(u->server_fqdn);
897 u->server_fqdn = pa_xstrdup(host_name);
898
899 pa_xfree(u->user_name);
900 u->user_name = pa_xstrdup(user_name);
901
902 update_description(u);
903
904 return;
905
906 fail:
907 pa_module_unload_request(u->module, TRUE);
908 }
909
910 #ifdef TUNNEL_SINK
911
912 /* Called from main context */
913 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
914 struct userdata *u = userdata;
915 uint32_t idx, owner_module, monitor_source, flags;
916 const char *name, *description, *monitor_source_name, *driver;
917 pa_sample_spec ss;
918 pa_channel_map cm;
919 pa_cvolume volume;
920 pa_bool_t mute;
921 pa_usec_t latency;
922 pa_proplist *pl;
923
924 pa_assert(pd);
925 pa_assert(u);
926
927 pl = pa_proplist_new();
928
929 if (command != PA_COMMAND_REPLY) {
930 if (command == PA_COMMAND_ERROR)
931 pa_log("Failed to get info.");
932 else
933 pa_log("Protocol error.");
934 goto fail;
935 }
936
937 if (pa_tagstruct_getu32(t, &idx) < 0 ||
938 pa_tagstruct_gets(t, &name) < 0 ||
939 pa_tagstruct_gets(t, &description) < 0 ||
940 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
941 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
942 pa_tagstruct_getu32(t, &owner_module) < 0 ||
943 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
944 pa_tagstruct_get_boolean(t, &mute) < 0 ||
945 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
946 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
947 pa_tagstruct_get_usec(t, &latency) < 0 ||
948 pa_tagstruct_gets(t, &driver) < 0 ||
949 pa_tagstruct_getu32(t, &flags) < 0) {
950
951 pa_log("Parse failure");
952 goto fail;
953 }
954
955 if (u->version >= 13) {
956 pa_usec_t configured_latency;
957
958 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
959 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
960
961 pa_log("Parse failure");
962 goto fail;
963 }
964 }
965
966 if (!pa_tagstruct_eof(t)) {
967 pa_log("Packet too long");
968 goto fail;
969 }
970
971 pa_proplist_free(pl);
972
973 if (!u->sink_name || strcmp(name, u->sink_name))
974 return;
975
976 pa_xfree(u->device_description);
977 u->device_description = pa_xstrdup(description);
978
979 update_description(u);
980
981 return;
982
983 fail:
984 pa_module_unload_request(u->module, TRUE);
985 pa_proplist_free(pl);
986 }
987
988 /* Called from main context */
989 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
990 struct userdata *u = userdata;
991 uint32_t idx, owner_module, client, sink;
992 pa_usec_t buffer_usec, sink_usec;
993 const char *name, *driver, *resample_method;
994 pa_bool_t mute;
995 pa_sample_spec sample_spec;
996 pa_channel_map channel_map;
997 pa_cvolume volume;
998 pa_proplist *pl;
999
1000 pa_assert(pd);
1001 pa_assert(u);
1002
1003 pl = pa_proplist_new();
1004
1005 if (command != PA_COMMAND_REPLY) {
1006 if (command == PA_COMMAND_ERROR)
1007 pa_log("Failed to get info.");
1008 else
1009 pa_log("Protocol error.");
1010 goto fail;
1011 }
1012
1013 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1014 pa_tagstruct_gets(t, &name) < 0 ||
1015 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1016 pa_tagstruct_getu32(t, &client) < 0 ||
1017 pa_tagstruct_getu32(t, &sink) < 0 ||
1018 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1019 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1020 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1021 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1022 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1023 pa_tagstruct_gets(t, &resample_method) < 0 ||
1024 pa_tagstruct_gets(t, &driver) < 0) {
1025
1026 pa_log("Parse failure");
1027 goto fail;
1028 }
1029
1030 if (u->version >= 11) {
1031 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1032
1033 pa_log("Parse failure");
1034 goto fail;
1035 }
1036 }
1037
1038 if (u->version >= 13) {
1039 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1040
1041 pa_log("Parse failure");
1042 goto fail;
1043 }
1044 }
1045
1046 if (!pa_tagstruct_eof(t)) {
1047 pa_log("Packet too long");
1048 goto fail;
1049 }
1050
1051 pa_proplist_free(pl);
1052
1053 if (idx != u->device_index)
1054 return;
1055
1056 pa_assert(u->sink);
1057
1058 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1059 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1060 return;
1061
1062 memcpy(&u->sink->virtual_volume, &volume, sizeof(pa_cvolume));
1063
1064 if (u->version >= 11)
1065 u->sink->muted = !!mute;
1066
1067 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
1068 return;
1069
1070 fail:
1071 pa_module_unload_request(u->module, TRUE);
1072 pa_proplist_free(pl);
1073 }
1074
1075 #else
1076
1077 /* Called from main context */
1078 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1079 struct userdata *u = userdata;
1080 uint32_t idx, owner_module, monitor_of_sink, flags;
1081 const char *name, *description, *monitor_of_sink_name, *driver;
1082 pa_sample_spec ss;
1083 pa_channel_map cm;
1084 pa_cvolume volume;
1085 pa_bool_t mute;
1086 pa_usec_t latency, configured_latency;
1087 pa_proplist *pl;
1088
1089 pa_assert(pd);
1090 pa_assert(u);
1091
1092 pl = pa_proplist_new();
1093
1094 if (command != PA_COMMAND_REPLY) {
1095 if (command == PA_COMMAND_ERROR)
1096 pa_log("Failed to get info.");
1097 else
1098 pa_log("Protocol error.");
1099 goto fail;
1100 }
1101
1102 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1103 pa_tagstruct_gets(t, &name) < 0 ||
1104 pa_tagstruct_gets(t, &description) < 0 ||
1105 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1106 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1107 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1108 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1109 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1110 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1111 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1112 pa_tagstruct_get_usec(t, &latency) < 0 ||
1113 pa_tagstruct_gets(t, &driver) < 0 ||
1114 pa_tagstruct_getu32(t, &flags) < 0) {
1115
1116 pa_log("Parse failure");
1117 goto fail;
1118 }
1119
1120 if (u->version >= 13) {
1121 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1122 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1123
1124 pa_log("Parse failure");
1125 goto fail;
1126 }
1127 }
1128
1129 if (!pa_tagstruct_eof(t)) {
1130 pa_log("Packet too long");
1131 goto fail;
1132 }
1133
1134 pa_proplist_free(pl);
1135
1136 if (!u->source_name || strcmp(name, u->source_name))
1137 return;
1138
1139 pa_xfree(u->device_description);
1140 u->device_description = pa_xstrdup(description);
1141
1142 update_description(u);
1143
1144 return;
1145
1146 fail:
1147 pa_module_unload_request(u->module, TRUE);
1148 pa_proplist_free(pl);
1149 }
1150
1151 #endif
1152
1153 /* Called from main context */
1154 static void request_info(struct userdata *u) {
1155 pa_tagstruct *t;
1156 uint32_t tag;
1157 pa_assert(u);
1158
1159 t = pa_tagstruct_new(NULL, 0);
1160 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1161 pa_tagstruct_putu32(t, tag = u->ctag++);
1162 pa_pstream_send_tagstruct(u->pstream, t);
1163 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1164
1165 #ifdef TUNNEL_SINK
1166 t = pa_tagstruct_new(NULL, 0);
1167 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1168 pa_tagstruct_putu32(t, tag = u->ctag++);
1169 pa_tagstruct_putu32(t, u->device_index);
1170 pa_pstream_send_tagstruct(u->pstream, t);
1171 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1172
1173 if (u->sink_name) {
1174 t = pa_tagstruct_new(NULL, 0);
1175 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1176 pa_tagstruct_putu32(t, tag = u->ctag++);
1177 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1178 pa_tagstruct_puts(t, u->sink_name);
1179 pa_pstream_send_tagstruct(u->pstream, t);
1180 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1181 }
1182 #else
1183 if (u->source_name) {
1184 t = pa_tagstruct_new(NULL, 0);
1185 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1186 pa_tagstruct_putu32(t, tag = u->ctag++);
1187 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1188 pa_tagstruct_puts(t, u->source_name);
1189 pa_pstream_send_tagstruct(u->pstream, t);
1190 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1191 }
1192 #endif
1193 }
1194
1195 /* Called from main context */
1196 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1197 struct userdata *u = userdata;
1198 pa_subscription_event_type_t e;
1199 uint32_t idx;
1200
1201 pa_assert(pd);
1202 pa_assert(t);
1203 pa_assert(u);
1204 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1205
1206 if (pa_tagstruct_getu32(t, &e) < 0 ||
1207 pa_tagstruct_getu32(t, &idx) < 0) {
1208 pa_log("Invalid protocol reply");
1209 pa_module_unload_request(u->module, TRUE);
1210 return;
1211 }
1212
1213 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1214 #ifdef TUNNEL_SINK
1215 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1216 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1217 #else
1218 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1219 #endif
1220 )
1221 return;
1222
1223 request_info(u);
1224 }
1225
1226 /* Called from main context */
1227 static void start_subscribe(struct userdata *u) {
1228 pa_tagstruct *t;
1229 uint32_t tag;
1230 pa_assert(u);
1231
1232 t = pa_tagstruct_new(NULL, 0);
1233 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1234 pa_tagstruct_putu32(t, tag = u->ctag++);
1235 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1236 #ifdef TUNNEL_SINK
1237 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1238 #else
1239 PA_SUBSCRIPTION_MASK_SOURCE
1240 #endif
1241 );
1242
1243 pa_pstream_send_tagstruct(u->pstream, t);
1244 }
1245
1246 /* Called from main context */
1247 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1248 struct userdata *u = userdata;
1249 struct timeval ntv;
1250 #ifdef TUNNEL_SINK
1251 uint32_t bytes;
1252 #endif
1253
1254 pa_assert(pd);
1255 pa_assert(u);
1256 pa_assert(u->pdispatch == pd);
1257
1258 if (command != PA_COMMAND_REPLY) {
1259 if (command == PA_COMMAND_ERROR)
1260 pa_log("Failed to create stream.");
1261 else
1262 pa_log("Protocol error.");
1263 goto fail;
1264 }
1265
1266 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1267 pa_tagstruct_getu32(t, &u->device_index) < 0
1268 #ifdef TUNNEL_SINK
1269 || pa_tagstruct_getu32(t, &bytes) < 0
1270 #endif
1271 )
1272 goto parse_error;
1273
1274 if (u->version >= 9) {
1275 #ifdef TUNNEL_SINK
1276 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1277 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1278 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1279 pa_tagstruct_getu32(t, &u->minreq) < 0)
1280 goto parse_error;
1281 #else
1282 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1283 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1284 goto parse_error;
1285 #endif
1286 }
1287
1288 if (u->version >= 12) {
1289 pa_sample_spec ss;
1290 pa_channel_map cm;
1291 uint32_t device_index;
1292 const char *dn;
1293 pa_bool_t suspended;
1294
1295 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1296 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1297 pa_tagstruct_getu32(t, &device_index) < 0 ||
1298 pa_tagstruct_gets(t, &dn) < 0 ||
1299 pa_tagstruct_get_boolean(t, &suspended) < 0)
1300 goto parse_error;
1301
1302 #ifdef TUNNEL_SINK
1303 pa_xfree(u->sink_name);
1304 u->sink_name = pa_xstrdup(dn);
1305 #else
1306 pa_xfree(u->source_name);
1307 u->source_name = pa_xstrdup(dn);
1308 #endif
1309 }
1310
1311 if (u->version >= 13) {
1312 pa_usec_t usec;
1313
1314 if (pa_tagstruct_get_usec(t, &usec) < 0)
1315 goto parse_error;
1316
1317 #ifdef TUNNEL_SINK
1318 pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
1319 #else
1320 pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
1321 #endif
1322 }
1323
1324 if (!pa_tagstruct_eof(t))
1325 goto parse_error;
1326
1327 start_subscribe(u);
1328 request_info(u);
1329
1330 pa_assert(!u->time_event);
1331 pa_gettimeofday(&ntv);
1332 ntv.tv_sec += LATENCY_INTERVAL;
1333 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1334
1335 request_latency(u);
1336
1337 pa_log_debug("Stream created.");
1338
1339 #ifdef TUNNEL_SINK
1340 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1341 #endif
1342
1343 return;
1344
1345 parse_error:
1346 pa_log("Invalid reply. (Create stream)");
1347
1348 fail:
1349 pa_module_unload_request(u->module, TRUE);
1350
1351 }
1352
1353 /* Called from main context */
1354 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1355 struct userdata *u = userdata;
1356 pa_tagstruct *reply;
1357 char name[256], un[128], hn[128];
1358 #ifdef TUNNEL_SINK
1359 pa_cvolume volume;
1360 #endif
1361
1362 pa_assert(pd);
1363 pa_assert(u);
1364 pa_assert(u->pdispatch == pd);
1365
1366 if (command != PA_COMMAND_REPLY ||
1367 pa_tagstruct_getu32(t, &u->version) < 0 ||
1368 !pa_tagstruct_eof(t)) {
1369
1370 if (command == PA_COMMAND_ERROR)
1371 pa_log("Failed to authenticate");
1372 else
1373 pa_log("Protocol error.");
1374
1375 goto fail;
1376 }
1377
1378 /* Minimum supported protocol version */
1379 if (u->version < 8) {
1380 pa_log("Incompatible protocol version");
1381 goto fail;
1382 }
1383
1384 /* Starting with protocol version 13 the MSB of the version tag
1385 reflects if shm is enabled for this connection or not. We don't
1386 support SHM here at all, so we just ignore this. */
1387
1388 if (u->version >= 13)
1389 u->version &= 0x7FFFFFFFU;
1390
1391 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1392
1393 #ifdef TUNNEL_SINK
1394 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1395 u->sink_name,
1396 pa_get_user_name(un, sizeof(un)),
1397 pa_get_host_name(hn, sizeof(hn)));
1398 #else
1399 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1400 u->source_name,
1401 pa_get_user_name(un, sizeof(un)),
1402 pa_get_host_name(hn, sizeof(hn)));
1403 #endif
1404
1405 reply = pa_tagstruct_new(NULL, 0);
1406 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1407 pa_tagstruct_putu32(reply, tag = u->ctag++);
1408
1409 if (u->version >= 13) {
1410 pa_proplist *pl;
1411 pl = pa_proplist_new();
1412 pa_init_proplist(pl);
1413 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1414 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1415 pa_tagstruct_put_proplist(reply, pl);
1416 pa_proplist_free(pl);
1417 } else
1418 pa_tagstruct_puts(reply, "PulseAudio");
1419
1420 pa_pstream_send_tagstruct(u->pstream, reply);
1421 /* We ignore the server's reply here */
1422
1423 reply = pa_tagstruct_new(NULL, 0);
1424
1425 if (u->version < 13)
1426 /* Only for older PA versions we need to fill in the maxlength */
1427 u->maxlength = 4*1024*1024;
1428
1429 #ifdef TUNNEL_SINK
1430 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1431 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1432 u->prebuf = u->tlength;
1433 #else
1434 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1435 #endif
1436
1437 #ifdef TUNNEL_SINK
1438 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1439 pa_tagstruct_putu32(reply, tag = u->ctag++);
1440
1441 if (u->version < 13)
1442 pa_tagstruct_puts(reply, name);
1443
1444 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1445 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1446 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1447 pa_tagstruct_puts(reply, u->sink_name);
1448 pa_tagstruct_putu32(reply, u->maxlength);
1449 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1450 pa_tagstruct_putu32(reply, u->tlength);
1451 pa_tagstruct_putu32(reply, u->prebuf);
1452 pa_tagstruct_putu32(reply, u->minreq);
1453 pa_tagstruct_putu32(reply, 0);
1454 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1455 pa_tagstruct_put_cvolume(reply, &volume);
1456 #else
1457 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1458 pa_tagstruct_putu32(reply, tag = u->ctag++);
1459
1460 if (u->version < 13)
1461 pa_tagstruct_puts(reply, name);
1462
1463 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1464 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1465 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1466 pa_tagstruct_puts(reply, u->source_name);
1467 pa_tagstruct_putu32(reply, u->maxlength);
1468 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1469 pa_tagstruct_putu32(reply, u->fragsize);
1470 #endif
1471
1472 if (u->version >= 12) {
1473 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1474 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1475 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1476 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1477 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1478 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1479 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1480 }
1481
1482 if (u->version >= 13) {
1483 pa_proplist *pl;
1484
1485 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1486 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1487
1488 pl = pa_proplist_new();
1489 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1490 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1491 pa_tagstruct_put_proplist(reply, pl);
1492 pa_proplist_free(pl);
1493
1494 #ifndef TUNNEL_SINK
1495 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1496 #endif
1497 }
1498
1499 if (u->version >= 14) {
1500 #ifdef TUNNEL_SINK
1501 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1502 #endif
1503 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1504 }
1505
1506 pa_pstream_send_tagstruct(u->pstream, reply);
1507 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1508
1509 pa_log_debug("Connection authenticated, creating stream ...");
1510
1511 return;
1512
1513 fail:
1514 pa_module_unload_request(u->module, TRUE);
1515 }
1516
1517 /* Called from main context */
1518 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1519 struct userdata *u = userdata;
1520
1521 pa_assert(p);
1522 pa_assert(u);
1523
1524 pa_log_warn("Stream died.");
1525 pa_module_unload_request(u->module, TRUE);
1526 }
1527
1528 /* Called from main context */
1529 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1530 struct userdata *u = userdata;
1531
1532 pa_assert(p);
1533 pa_assert(packet);
1534 pa_assert(u);
1535
1536 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1537 pa_log("Invalid packet");
1538 pa_module_unload_request(u->module, TRUE);
1539 return;
1540 }
1541 }
1542
1543 #ifndef TUNNEL_SINK
1544 /* Called from main context */
1545 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1546 struct userdata *u = userdata;
1547
1548 pa_assert(p);
1549 pa_assert(chunk);
1550 pa_assert(u);
1551
1552 if (channel != u->channel) {
1553 pa_log("Recieved memory block on bad channel.");
1554 pa_module_unload_request(u->module, TRUE);
1555 return;
1556 }
1557
1558 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1559
1560 u->counter_delta += (int64_t) chunk->length;
1561 }
1562
1563 #endif
1564
1565 /* Called from main context */
1566 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1567 struct userdata *u = userdata;
1568 pa_tagstruct *t;
1569 uint32_t tag;
1570
1571 pa_assert(sc);
1572 pa_assert(u);
1573 pa_assert(u->client == sc);
1574
1575 pa_socket_client_unref(u->client);
1576 u->client = NULL;
1577
1578 if (!io) {
1579 pa_log("Connection failed: %s", pa_cstrerror(errno));
1580 pa_module_unload_request(u->module, TRUE);
1581 return;
1582 }
1583
1584 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1585 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1586
1587 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1588 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1589 #ifndef TUNNEL_SINK
1590 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1591 #endif
1592
1593 t = pa_tagstruct_new(NULL, 0);
1594 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1595 pa_tagstruct_putu32(t, tag = u->ctag++);
1596 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1597
1598 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1599
1600 #ifdef HAVE_CREDS
1601 {
1602 pa_creds ucred;
1603
1604 if (pa_iochannel_creds_supported(io))
1605 pa_iochannel_creds_enable(io);
1606
1607 ucred.uid = getuid();
1608 ucred.gid = getgid();
1609
1610 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1611 }
1612 #else
1613 pa_pstream_send_tagstruct(u->pstream, t);
1614 #endif
1615
1616 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1617
1618 pa_log_debug("Connection established, authenticating ...");
1619 }
1620
1621 #ifdef TUNNEL_SINK
1622
1623 /* Called from main context */
1624 static void sink_set_volume(pa_sink *sink) {
1625 struct userdata *u;
1626 pa_tagstruct *t;
1627 uint32_t tag;
1628
1629 pa_assert(sink);
1630 u = sink->userdata;
1631 pa_assert(u);
1632
1633 t = pa_tagstruct_new(NULL, 0);
1634 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1635 pa_tagstruct_putu32(t, tag = u->ctag++);
1636 pa_tagstruct_putu32(t, u->device_index);
1637 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1638 pa_pstream_send_tagstruct(u->pstream, t);
1639 }
1640
1641 /* Called from main context */
1642 static void sink_set_mute(pa_sink *sink) {
1643 struct userdata *u;
1644 pa_tagstruct *t;
1645 uint32_t tag;
1646
1647 pa_assert(sink);
1648 u = sink->userdata;
1649 pa_assert(u);
1650
1651 if (u->version < 11)
1652 return;
1653
1654 t = pa_tagstruct_new(NULL, 0);
1655 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1656 pa_tagstruct_putu32(t, tag = u->ctag++);
1657 pa_tagstruct_putu32(t, u->device_index);
1658 pa_tagstruct_put_boolean(t, !!sink->muted);
1659 pa_pstream_send_tagstruct(u->pstream, t);
1660 }
1661
1662 #endif
1663
1664 int pa__init(pa_module*m) {
1665 pa_modargs *ma = NULL;
1666 struct userdata *u = NULL;
1667 pa_sample_spec ss;
1668 pa_channel_map map;
1669 char *dn = NULL;
1670 #ifdef TUNNEL_SINK
1671 pa_sink_new_data data;
1672 #else
1673 pa_source_new_data data;
1674 #endif
1675
1676 pa_assert(m);
1677
1678 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1679 pa_log("Failed to parse module arguments");
1680 goto fail;
1681 }
1682
1683 m->userdata = u = pa_xnew0(struct userdata, 1);
1684 u->core = m->core;
1685 u->module = m;
1686 u->client = NULL;
1687 u->pdispatch = NULL;
1688 u->pstream = NULL;
1689 u->server_name = NULL;
1690 #ifdef TUNNEL_SINK
1691 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1692 u->sink = NULL;
1693 u->requested_bytes = 0;
1694 #else
1695 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1696 u->source = NULL;
1697 #endif
1698 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1699 u->ctag = 1;
1700 u->device_index = u->channel = PA_INVALID_INDEX;
1701 u->time_event = NULL;
1702 u->ignore_latency_before = 0;
1703 u->transport_usec = 0;
1704 u->transport_usec_valid = FALSE;
1705 u->remote_suspended = u->remote_corked = FALSE;
1706 u->counter = u->counter_delta = 0;
1707
1708 u->rtpoll = pa_rtpoll_new();
1709 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1710
1711 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1712 goto fail;
1713
1714 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1715 pa_log("No server specified.");
1716 goto fail;
1717 }
1718
1719 ss = m->core->default_sample_spec;
1720 map = m->core->default_channel_map;
1721 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1722 pa_log("Invalid sample format specification");
1723 goto fail;
1724 }
1725
1726 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1727 pa_log("Failed to connect to server '%s'", u->server_name);
1728 goto fail;
1729 }
1730
1731 pa_socket_client_set_callback(u->client, on_connection, u);
1732
1733 #ifdef TUNNEL_SINK
1734
1735 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1736 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1737
1738 pa_sink_new_data_init(&data);
1739 data.driver = __FILE__;
1740 data.module = m;
1741 data.namereg_fail = TRUE;
1742 pa_sink_new_data_set_name(&data, dn);
1743 pa_sink_new_data_set_sample_spec(&data, &ss);
1744 pa_sink_new_data_set_channel_map(&data, &map);
1745 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1746 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1747 if (u->sink_name)
1748 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1749
1750 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1751 pa_sink_new_data_done(&data);
1752
1753 if (!u->sink) {
1754 pa_log("Failed to create sink.");
1755 goto fail;
1756 }
1757
1758 u->sink->parent.process_msg = sink_process_msg;
1759 u->sink->userdata = u;
1760 u->sink->set_state = sink_set_state;
1761 u->sink->set_volume = sink_set_volume;
1762 u->sink->set_mute = sink_set_mute;
1763
1764 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1765
1766 pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
1767
1768 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1769 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1770
1771 #else
1772
1773 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1774 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1775
1776 pa_source_new_data_init(&data);
1777 data.driver = __FILE__;
1778 data.module = m;
1779 data.namereg_fail = TRUE;
1780 pa_source_new_data_set_name(&data, dn);
1781 pa_source_new_data_set_sample_spec(&data, &ss);
1782 pa_source_new_data_set_channel_map(&data, &map);
1783 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1784 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1785 if (u->source_name)
1786 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1787
1788 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1789 pa_source_new_data_done(&data);
1790
1791 if (!u->source) {
1792 pa_log("Failed to create source.");
1793 goto fail;
1794 }
1795
1796 u->source->parent.process_msg = source_process_msg;
1797 u->source->set_state = source_set_state;
1798 u->source->userdata = u;
1799
1800 pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
1801
1802 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1803 pa_source_set_rtpoll(u->source, u->rtpoll);
1804 #endif
1805
1806 pa_xfree(dn);
1807
1808 u->time_event = NULL;
1809
1810 u->maxlength = 0;
1811 #ifdef TUNNEL_SINK
1812 u->tlength = u->minreq = u->prebuf = 0;
1813 #else
1814 u->fragsize = 0;
1815 #endif
1816
1817 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1818
1819 if (!(u->thread = pa_thread_new(thread_func, u))) {
1820 pa_log("Failed to create thread.");
1821 goto fail;
1822 }
1823
1824 #ifdef TUNNEL_SINK
1825 pa_sink_put(u->sink);
1826 #else
1827 pa_source_put(u->source);
1828 #endif
1829
1830 pa_modargs_free(ma);
1831
1832 return 0;
1833
1834 fail:
1835 pa__done(m);
1836
1837 if (ma)
1838 pa_modargs_free(ma);
1839
1840 pa_xfree(dn);
1841
1842 return -1;
1843 }
1844
1845 void pa__done(pa_module*m) {
1846 struct userdata* u;
1847
1848 pa_assert(m);
1849
1850 if (!(u = m->userdata))
1851 return;
1852
1853 #ifdef TUNNEL_SINK
1854 if (u->sink)
1855 pa_sink_unlink(u->sink);
1856 #else
1857 if (u->source)
1858 pa_source_unlink(u->source);
1859 #endif
1860
1861 if (u->thread) {
1862 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1863 pa_thread_free(u->thread);
1864 }
1865
1866 pa_thread_mq_done(&u->thread_mq);
1867
1868 #ifdef TUNNEL_SINK
1869 if (u->sink)
1870 pa_sink_unref(u->sink);
1871 #else
1872 if (u->source)
1873 pa_source_unref(u->source);
1874 #endif
1875
1876 if (u->rtpoll)
1877 pa_rtpoll_free(u->rtpoll);
1878
1879 if (u->pstream) {
1880 pa_pstream_unlink(u->pstream);
1881 pa_pstream_unref(u->pstream);
1882 }
1883
1884 if (u->pdispatch)
1885 pa_pdispatch_unref(u->pdispatch);
1886
1887 if (u->client)
1888 pa_socket_client_unref(u->client);
1889
1890 if (u->auth_cookie)
1891 pa_auth_cookie_unref(u->auth_cookie);
1892
1893 if (u->smoother)
1894 pa_smoother_free(u->smoother);
1895
1896 if (u->time_event)
1897 u->core->mainloop->time_free(u->time_event);
1898
1899 #ifdef TUNNEL_SINK
1900 pa_xfree(u->sink_name);
1901 #else
1902 pa_xfree(u->source_name);
1903 #endif
1904 pa_xfree(u->server_name);
1905
1906 pa_xfree(u->device_description);
1907 pa_xfree(u->server_fqdn);
1908 pa_xfree(u->user_name);
1909
1910 pa_xfree(u);
1911 }