]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
update module-tunnel for recent protocol changes
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "server=<address> "
68 "sink=<remote sink name> "
69 "cookie=<filename> "
70 "format=<sample format> "
71 "channels=<number of channels> "
72 "rate=<sample rate> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
75 #else
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
77 PA_MODULE_USAGE(
78 "server=<address> "
79 "source=<remote source name> "
80 "cookie=<filename> "
81 "format=<sample format> "
82 "channels=<number of channels> "
83 "rate=<sample rate> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
86 #endif
87
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION);
90 PA_MODULE_LOAD_ONCE(FALSE);
91
92 static const char* const valid_modargs[] = {
93 "server",
94 "cookie",
95 "format",
96 "channels",
97 "rate",
98 #ifdef TUNNEL_SINK
99 "sink_name",
100 "sink",
101 #else
102 "source_name",
103 "source",
104 #endif
105 "channel_map",
106 NULL,
107 };
108
109 #define DEFAULT_TIMEOUT 5
110
111 #define LATENCY_INTERVAL 10
112
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
114
115 #ifdef TUNNEL_SINK
116
117 enum {
118 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
119 SINK_MESSAGE_REMOTE_SUSPEND,
120 SINK_MESSAGE_UPDATE_LATENCY,
121 SINK_MESSAGE_POST
122 };
123
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
126
127 #else
128
129 enum {
130 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
131 SOURCE_MESSAGE_REMOTE_SUSPEND,
132 SOURCE_MESSAGE_UPDATE_LATENCY
133 };
134
135 #define DEFAULT_FRAGSIZE_MSEC 25
136
137 #endif
138
139 #ifdef TUNNEL_SINK
140 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
141 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 #endif
143 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148
149 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
150 #ifdef TUNNEL_SINK
151 [PA_COMMAND_REQUEST] = command_request,
152 [PA_COMMAND_STARTED] = command_started,
153 #endif
154 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
155 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
156 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
157 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
158 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
159 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
160 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
161 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
162 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved
163 };
164
165 struct userdata {
166 pa_core *core;
167 pa_module *module;
168
169 pa_thread_mq thread_mq;
170 pa_rtpoll *rtpoll;
171 pa_thread *thread;
172
173 pa_socket_client *client;
174 pa_pstream *pstream;
175 pa_pdispatch *pdispatch;
176
177 char *server_name;
178 #ifdef TUNNEL_SINK
179 char *sink_name;
180 pa_sink *sink;
181 size_t requested_bytes;
182 #else
183 char *source_name;
184 pa_source *source;
185 #endif
186
187 pa_auth_cookie *auth_cookie;
188
189 uint32_t version;
190 uint32_t ctag;
191 uint32_t device_index;
192 uint32_t channel;
193
194 int64_t counter, counter_delta;
195
196 pa_bool_t remote_corked:1;
197 pa_bool_t remote_suspended:1;
198
199 pa_usec_t transport_usec;
200 pa_bool_t transport_usec_valid;
201
202 uint32_t ignore_latency_before;
203
204 pa_time_event *time_event;
205
206 pa_smoother *smoother;
207
208 char *device_description;
209 char *server_fqdn;
210 char *user_name;
211
212 uint32_t maxlength;
213 #ifdef TUNNEL_SINK
214 uint32_t tlength;
215 uint32_t minreq;
216 uint32_t prebuf;
217 #else
218 uint32_t fragsize;
219 #endif
220 };
221
222 static void request_latency(struct userdata *u);
223
224 /* Called from main context */
225 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
226 struct userdata *u = userdata;
227
228 pa_assert(pd);
229 pa_assert(t);
230 pa_assert(u);
231 pa_assert(u->pdispatch == pd);
232
233 pa_log_warn("Stream killed");
234 pa_module_unload_request(u->module, TRUE);
235 }
236
237 /* Called from main context */
238 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
239 struct userdata *u = userdata;
240
241 pa_assert(pd);
242 pa_assert(t);
243 pa_assert(u);
244 pa_assert(u->pdispatch == pd);
245
246 pa_log_info("Server signalled buffer overrun/underrun.");
247 request_latency(u);
248 }
249
250 /* Called from main context */
251 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
252 struct userdata *u = userdata;
253 uint32_t channel;
254 pa_bool_t suspended;
255
256 pa_assert(pd);
257 pa_assert(t);
258 pa_assert(u);
259 pa_assert(u->pdispatch == pd);
260
261 if (pa_tagstruct_getu32(t, &channel) < 0 ||
262 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
263 !pa_tagstruct_eof(t)) {
264 pa_log("Invalid packet");
265 pa_module_unload_request(u->module, TRUE);
266 return;
267 }
268
269 #ifdef TUNNEL_SINK
270 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
271 #else
272 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
273 #endif
274
275 request_latency(u);
276 }
277
278 /* Called from main context */
279 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
280 struct userdata *u = userdata;
281
282 pa_assert(pd);
283 pa_assert(t);
284 pa_assert(u);
285 pa_assert(u->pdispatch == pd);
286
287 pa_log_debug("Server reports a stream move.");
288 request_latency(u);
289 }
290
291 #ifdef TUNNEL_SINK
292
293 /* Called from main context */
294 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
295 struct userdata *u = userdata;
296
297 pa_assert(pd);
298 pa_assert(t);
299 pa_assert(u);
300 pa_assert(u->pdispatch == pd);
301
302 pa_log_debug("Server reports playback started.");
303 request_latency(u);
304 }
305
306 #endif
307
308 /* Called from IO thread context */
309 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
310 pa_usec_t x;
311 pa_assert(u);
312
313 if (u->remote_corked == cork)
314 return;
315
316 u->remote_corked = cork;
317 x = pa_rtclock_usec();
318
319 /* Correct by the time this needs to travel to the other side.
320 * This is a valid thread-safe access, because the main thread is
321 * waiting for us */
322 if (u->transport_usec_valid)
323 x += u->transport_usec;
324
325 if (u->remote_suspended || u->remote_corked)
326 pa_smoother_pause(u->smoother, x);
327 else
328 pa_smoother_resume(u->smoother, x);
329 }
330
331 /* Called from main context */
332 static void stream_cork(struct userdata *u, pa_bool_t cork) {
333 pa_tagstruct *t;
334 pa_assert(u);
335
336 if (!u->pstream)
337 return;
338
339 t = pa_tagstruct_new(NULL, 0);
340 #ifdef TUNNEL_SINK
341 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
342 #else
343 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
344 #endif
345 pa_tagstruct_putu32(t, u->ctag++);
346 pa_tagstruct_putu32(t, u->channel);
347 pa_tagstruct_put_boolean(t, !!cork);
348 pa_pstream_send_tagstruct(u->pstream, t);
349
350 request_latency(u);
351 }
352
353 /* Called from IO thread context */
354 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
355 pa_usec_t x;
356 pa_assert(u);
357
358 if (u->remote_suspended == suspend)
359 return;
360
361 u->remote_suspended = suspend;
362
363 x = pa_rtclock_usec();
364
365 /* Correct by the time this needed to travel from the other side.
366 * This is a valid thread-safe access, because the main thread is
367 * waiting for us */
368 if (u->transport_usec_valid)
369 x -= u->transport_usec;
370
371 if (u->remote_suspended || u->remote_corked)
372 pa_smoother_pause(u->smoother, x);
373 else
374 pa_smoother_resume(u->smoother, x);
375 }
376
377 #ifdef TUNNEL_SINK
378
379 /* Called from IO thread context */
380 static void send_data(struct userdata *u) {
381 pa_assert(u);
382
383 while (u->requested_bytes > 0) {
384 pa_memchunk memchunk;
385
386 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
387 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
388 pa_memblock_unref(memchunk.memblock);
389
390 u->requested_bytes -= memchunk.length;
391
392 u->counter += (int64_t) memchunk.length;
393 }
394 }
395
396 /* This function is called from IO context -- except when it is not. */
397 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
398 struct userdata *u = PA_SINK(o)->userdata;
399
400 switch (code) {
401
402 case PA_SINK_MESSAGE_SET_STATE: {
403 int r;
404
405 /* First, change the state, because otherwide pa_sink_render() would fail */
406 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
407
408 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
409
410 if (PA_SINK_IS_OPENED(u->sink->state))
411 send_data(u);
412 }
413
414 return r;
415 }
416
417 case PA_SINK_MESSAGE_GET_LATENCY: {
418 pa_usec_t yl, yr, *usec = data;
419
420 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
421 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
422
423 *usec = yl > yr ? yl - yr : 0;
424 return 0;
425 }
426
427 case SINK_MESSAGE_REQUEST:
428
429 pa_assert(offset > 0);
430 u->requested_bytes += (size_t) offset;
431
432 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
433 send_data(u);
434
435 return 0;
436
437
438 case SINK_MESSAGE_REMOTE_SUSPEND:
439
440 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
441 return 0;
442
443
444 case SINK_MESSAGE_UPDATE_LATENCY: {
445 pa_usec_t y;
446
447 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
448
449 if (y > (pa_usec_t) offset || offset < 0)
450 y -= (pa_usec_t) offset;
451 else
452 y = 0;
453
454 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
455
456 return 0;
457 }
458
459 case SINK_MESSAGE_POST:
460
461 /* OK, This might be a bit confusing. This message is
462 * delivered to us from the main context -- NOT from the
463 * IO thread context where the rest of the messages are
464 * dispatched. Yeah, ugly, but I am a lazy bastard. */
465
466 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
467
468 u->counter_delta += (int64_t) chunk->length;
469
470 return 0;
471 }
472
473 return pa_sink_process_msg(o, code, data, offset, chunk);
474 }
475
476 /* Called from main context */
477 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
478 struct userdata *u;
479 pa_sink_assert_ref(s);
480 u = s->userdata;
481
482 switch ((pa_sink_state_t) state) {
483
484 case PA_SINK_SUSPENDED:
485 pa_assert(PA_SINK_IS_OPENED(s->state));
486 stream_cork(u, TRUE);
487 break;
488
489 case PA_SINK_IDLE:
490 case PA_SINK_RUNNING:
491 if (s->state == PA_SINK_SUSPENDED)
492 stream_cork(u, FALSE);
493 break;
494
495 case PA_SINK_UNLINKED:
496 case PA_SINK_INIT:
497 ;
498 }
499
500 return 0;
501 }
502
503 #else
504
505 /* This function is called from IO context -- except when it is not. */
506 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
507 struct userdata *u = PA_SOURCE(o)->userdata;
508
509 switch (code) {
510
511 case PA_SINK_MESSAGE_SET_STATE: {
512 int r;
513
514 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
515 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
516
517 return r;
518 }
519
520 case PA_SOURCE_MESSAGE_GET_LATENCY: {
521 pa_usec_t yr, yl, *usec = data;
522
523 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SINK(o)->sample_spec);
524 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
525
526 *usec = yr > yl ? yr - yl : 0;
527 return 0;
528 }
529
530 case SOURCE_MESSAGE_POST:
531
532 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
533 pa_source_post(u->source, chunk);
534
535 u->counter += (int64_t) chunk->length;
536
537 return 0;
538
539 case SOURCE_MESSAGE_REMOTE_SUSPEND:
540
541 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
542 return 0;
543
544 case SOURCE_MESSAGE_UPDATE_LATENCY: {
545 pa_usec_t y;
546
547 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
548
549 if (offset >= 0 || y > (pa_usec_t) -offset)
550 y += (pa_usec_t) offset;
551 else
552 y = 0;
553
554 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
555
556 return 0;
557 }
558 }
559
560 return pa_source_process_msg(o, code, data, offset, chunk);
561 }
562
563 /* Called from main context */
564 static int source_set_state(pa_source *s, pa_source_state_t state) {
565 struct userdata *u;
566 pa_source_assert_ref(s);
567 u = s->userdata;
568
569 switch ((pa_source_state_t) state) {
570
571 case PA_SOURCE_SUSPENDED:
572 pa_assert(PA_SOURCE_IS_OPENED(s->state));
573 stream_cork(u, TRUE);
574 break;
575
576 case PA_SOURCE_IDLE:
577 case PA_SOURCE_RUNNING:
578 if (s->state == PA_SOURCE_SUSPENDED)
579 stream_cork(u, FALSE);
580 break;
581
582 case PA_SOURCE_UNLINKED:
583 case PA_SOURCE_INIT:
584 ;
585 }
586
587 return 0;
588 }
589
590 #endif
591
592 static void thread_func(void *userdata) {
593 struct userdata *u = userdata;
594
595 pa_assert(u);
596
597 pa_log_debug("Thread starting up");
598
599 pa_thread_mq_install(&u->thread_mq);
600 pa_rtpoll_install(u->rtpoll);
601
602 for (;;) {
603 int ret;
604
605 #ifdef TUNNEL_SINK
606 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
607 if (u->sink->thread_info.rewind_requested)
608 pa_sink_process_rewind(u->sink, 0);
609 #endif
610
611 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
612 goto fail;
613
614 if (ret == 0)
615 goto finish;
616 }
617
618 fail:
619 /* If this was no regular exit from the loop we have to continue
620 * processing messages until we received PA_MESSAGE_SHUTDOWN */
621 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
622 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
623
624 finish:
625 pa_log_debug("Thread shutting down");
626 }
627
628 #ifdef TUNNEL_SINK
629 /* Called from main context */
630 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631 struct userdata *u = userdata;
632 uint32_t bytes, channel;
633
634 pa_assert(pd);
635 pa_assert(command == PA_COMMAND_REQUEST);
636 pa_assert(t);
637 pa_assert(u);
638 pa_assert(u->pdispatch == pd);
639
640 if (pa_tagstruct_getu32(t, &channel) < 0 ||
641 pa_tagstruct_getu32(t, &bytes) < 0) {
642 pa_log("Invalid protocol reply");
643 goto fail;
644 }
645
646 if (channel != u->channel) {
647 pa_log("Recieved data for invalid channel");
648 goto fail;
649 }
650
651 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
652 return;
653
654 fail:
655 pa_module_unload_request(u->module, TRUE);
656 }
657
658 #endif
659
660 /* Called from main context */
661 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
662 struct userdata *u = userdata;
663 pa_usec_t sink_usec, source_usec, transport_usec;
664 pa_bool_t playing;
665 int64_t write_index, read_index;
666 struct timeval local, remote, now;
667 pa_sample_spec *ss;
668 int64_t delay;
669
670 pa_assert(pd);
671 pa_assert(u);
672
673 if (command != PA_COMMAND_REPLY) {
674 if (command == PA_COMMAND_ERROR)
675 pa_log("Failed to get latency.");
676 else
677 pa_log("Protocol error.");
678 goto fail;
679 }
680
681 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
682 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
683 pa_tagstruct_get_boolean(t, &playing) < 0 ||
684 pa_tagstruct_get_timeval(t, &local) < 0 ||
685 pa_tagstruct_get_timeval(t, &remote) < 0 ||
686 pa_tagstruct_gets64(t, &write_index) < 0 ||
687 pa_tagstruct_gets64(t, &read_index) < 0) {
688 pa_log("Invalid reply.");
689 goto fail;
690 }
691
692 #ifdef TUNNEL_SINK
693 if (u->version >= 13) {
694 uint64_t underrun_for = 0, playing_for = 0;
695
696 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
697 pa_tagstruct_getu64(t, &playing_for) < 0) {
698 pa_log("Invalid reply.");
699 goto fail;
700 }
701 }
702 #endif
703
704 if (!pa_tagstruct_eof(t)) {
705 pa_log("Invalid reply.");
706 goto fail;
707 }
708
709 if (tag < u->ignore_latency_before) {
710 request_latency(u);
711 return;
712 }
713
714 pa_gettimeofday(&now);
715
716 /* Calculate transport usec */
717 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
718 /* local and remote seem to have synchronized clocks */
719 #ifdef TUNNEL_SINK
720 u->transport_usec = pa_timeval_diff(&remote, &local);
721 #else
722 u->transport_usec = pa_timeval_diff(&now, &remote);
723 #endif
724 } else
725 u->transport_usec = pa_timeval_diff(&now, &local)/2;
726 u->transport_usec_valid = TRUE;
727
728 /* First, take the device's delay */
729 #ifdef TUNNEL_SINK
730 delay = (int64_t) sink_usec;
731 ss = &u->sink->sample_spec;
732 #else
733 delay = (int64_t) source_usec;
734 ss = &u->source->sample_spec;
735 #endif
736
737 /* Add the length of our server-side buffer */
738 if (write_index >= read_index)
739 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
740 else
741 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
742
743 /* Our measurements are already out of date, hence correct by the *
744 * transport latency */
745 #ifdef TUNNEL_SINK
746 delay -= (int64_t) transport_usec;
747 #else
748 delay += (int64_t) transport_usec;
749 #endif
750
751 /* Now correct by what we have have read/written since we requested the update */
752 #ifdef TUNNEL_SINK
753 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
754 #else
755 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
756 #endif
757
758 #ifdef TUNNEL_SINK
759 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
760 #else
761 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
762 #endif
763
764 return;
765
766 fail:
767
768 pa_module_unload_request(u->module, TRUE);
769 }
770
771 /* Called from main context */
772 static void request_latency(struct userdata *u) {
773 pa_tagstruct *t;
774 struct timeval now;
775 uint32_t tag;
776 pa_assert(u);
777
778 t = pa_tagstruct_new(NULL, 0);
779 #ifdef TUNNEL_SINK
780 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
781 #else
782 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
783 #endif
784 pa_tagstruct_putu32(t, tag = u->ctag++);
785 pa_tagstruct_putu32(t, u->channel);
786
787 pa_gettimeofday(&now);
788 pa_tagstruct_put_timeval(t, &now);
789
790 pa_pstream_send_tagstruct(u->pstream, t);
791 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
792
793 u->ignore_latency_before = tag;
794 u->counter_delta = 0;
795 }
796
797 /* Called from main context */
798 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
799 struct userdata *u = userdata;
800 struct timeval ntv;
801
802 pa_assert(m);
803 pa_assert(e);
804 pa_assert(u);
805
806 request_latency(u);
807
808 pa_gettimeofday(&ntv);
809 ntv.tv_sec += LATENCY_INTERVAL;
810 m->time_restart(e, &ntv);
811 }
812
813 /* Called from main context */
814 static void update_description(struct userdata *u) {
815 char *d;
816 char un[128], hn[128];
817 pa_tagstruct *t;
818
819 pa_assert(u);
820
821 if (!u->server_fqdn || !u->user_name || !u->device_description)
822 return;
823
824 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
825
826 #ifdef TUNNEL_SINK
827 pa_sink_set_description(u->sink, d);
828 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
829 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
830 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
831 #else
832 pa_source_set_description(u->source, d);
833 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
834 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
835 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
836 #endif
837
838 pa_xfree(d);
839
840 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
841 pa_get_user_name(un, sizeof(un)),
842 pa_get_host_name(hn, sizeof(hn)));
843
844 t = pa_tagstruct_new(NULL, 0);
845 #ifdef TUNNEL_SINK
846 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
847 #else
848 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
849 #endif
850 pa_tagstruct_putu32(t, u->ctag++);
851 pa_tagstruct_putu32(t, u->channel);
852 pa_tagstruct_puts(t, d);
853 pa_pstream_send_tagstruct(u->pstream, t);
854
855 pa_xfree(d);
856 }
857
858 /* Called from main context */
859 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
860 struct userdata *u = userdata;
861 pa_sample_spec ss;
862 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
863 uint32_t cookie;
864
865 pa_assert(pd);
866 pa_assert(u);
867
868 if (command != PA_COMMAND_REPLY) {
869 if (command == PA_COMMAND_ERROR)
870 pa_log("Failed to get info.");
871 else
872 pa_log("Protocol error.");
873 goto fail;
874 }
875
876 if (pa_tagstruct_gets(t, &server_name) < 0 ||
877 pa_tagstruct_gets(t, &server_version) < 0 ||
878 pa_tagstruct_gets(t, &user_name) < 0 ||
879 pa_tagstruct_gets(t, &host_name) < 0 ||
880 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
881 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
882 pa_tagstruct_gets(t, &default_source_name) < 0 ||
883 pa_tagstruct_getu32(t, &cookie) < 0) {
884
885 pa_log("Parse failure");
886 goto fail;
887 }
888
889 if (!pa_tagstruct_eof(t)) {
890 pa_log("Packet too long");
891 goto fail;
892 }
893
894 pa_xfree(u->server_fqdn);
895 u->server_fqdn = pa_xstrdup(host_name);
896
897 pa_xfree(u->user_name);
898 u->user_name = pa_xstrdup(user_name);
899
900 update_description(u);
901
902 return;
903
904 fail:
905 pa_module_unload_request(u->module, TRUE);
906 }
907
908 #ifdef TUNNEL_SINK
909
910 /* Called from main context */
911 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
912 struct userdata *u = userdata;
913 uint32_t idx, owner_module, monitor_source, flags;
914 const char *name, *description, *monitor_source_name, *driver;
915 pa_sample_spec ss;
916 pa_channel_map cm;
917 pa_cvolume volume;
918 pa_bool_t mute;
919 pa_usec_t latency;
920 pa_proplist *pl;
921
922 pa_assert(pd);
923 pa_assert(u);
924
925 pl = pa_proplist_new();
926
927 if (command != PA_COMMAND_REPLY) {
928 if (command == PA_COMMAND_ERROR)
929 pa_log("Failed to get info.");
930 else
931 pa_log("Protocol error.");
932 goto fail;
933 }
934
935 if (pa_tagstruct_getu32(t, &idx) < 0 ||
936 pa_tagstruct_gets(t, &name) < 0 ||
937 pa_tagstruct_gets(t, &description) < 0 ||
938 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
939 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
940 pa_tagstruct_getu32(t, &owner_module) < 0 ||
941 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
942 pa_tagstruct_get_boolean(t, &mute) < 0 ||
943 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
944 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
945 pa_tagstruct_get_usec(t, &latency) < 0 ||
946 pa_tagstruct_gets(t, &driver) < 0 ||
947 pa_tagstruct_getu32(t, &flags) < 0) {
948
949 pa_log("Parse failure");
950 goto fail;
951 }
952
953 if (u->version >= 13) {
954 pa_usec_t configured_latency;
955
956 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
957 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
958
959 pa_log("Parse failure");
960 goto fail;
961 }
962 }
963
964 if (!pa_tagstruct_eof(t)) {
965 pa_log("Packet too long");
966 goto fail;
967 }
968
969 pa_proplist_free(pl);
970
971 if (!u->sink_name || strcmp(name, u->sink_name))
972 return;
973
974 pa_xfree(u->device_description);
975 u->device_description = pa_xstrdup(description);
976
977 update_description(u);
978
979 return;
980
981 fail:
982 pa_module_unload_request(u->module, TRUE);
983 pa_proplist_free(pl);
984 }
985
986 /* Called from main context */
987 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
988 struct userdata *u = userdata;
989 uint32_t idx, owner_module, client, sink;
990 pa_usec_t buffer_usec, sink_usec;
991 const char *name, *driver, *resample_method;
992 pa_bool_t mute;
993 pa_sample_spec sample_spec;
994 pa_channel_map channel_map;
995 pa_cvolume volume;
996 pa_proplist *pl;
997
998 pa_assert(pd);
999 pa_assert(u);
1000
1001 pl = pa_proplist_new();
1002
1003 if (command != PA_COMMAND_REPLY) {
1004 if (command == PA_COMMAND_ERROR)
1005 pa_log("Failed to get info.");
1006 else
1007 pa_log("Protocol error.");
1008 goto fail;
1009 }
1010
1011 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1012 pa_tagstruct_gets(t, &name) < 0 ||
1013 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1014 pa_tagstruct_getu32(t, &client) < 0 ||
1015 pa_tagstruct_getu32(t, &sink) < 0 ||
1016 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1017 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1018 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1019 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1020 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1021 pa_tagstruct_gets(t, &resample_method) < 0 ||
1022 pa_tagstruct_gets(t, &driver) < 0) {
1023
1024 pa_log("Parse failure");
1025 goto fail;
1026 }
1027
1028 if (u->version >= 11) {
1029 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1030
1031 pa_log("Parse failure");
1032 goto fail;
1033 }
1034 }
1035
1036 if (u->version >= 13) {
1037 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1038
1039 pa_log("Parse failure");
1040 goto fail;
1041 }
1042 }
1043
1044 if (!pa_tagstruct_eof(t)) {
1045 pa_log("Packet too long");
1046 goto fail;
1047 }
1048
1049 pa_proplist_free(pl);
1050
1051 if (idx != u->device_index)
1052 return;
1053
1054 pa_assert(u->sink);
1055
1056 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1057 pa_cvolume_equal(&volume, &u->sink->volume))
1058 return;
1059
1060 memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
1061
1062 if (u->version >= 11)
1063 u->sink->muted = !!mute;
1064
1065 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
1066 return;
1067
1068 fail:
1069 pa_module_unload_request(u->module, TRUE);
1070 pa_proplist_free(pl);
1071 }
1072
1073 #else
1074
1075 /* Called from main context */
1076 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1077 struct userdata *u = userdata;
1078 uint32_t idx, owner_module, monitor_of_sink, flags;
1079 const char *name, *description, *monitor_of_sink_name, *driver;
1080 pa_sample_spec ss;
1081 pa_channel_map cm;
1082 pa_cvolume volume;
1083 pa_bool_t mute;
1084 pa_usec_t latency, configured_latency;
1085 pa_proplist *pl;
1086
1087 pa_assert(pd);
1088 pa_assert(u);
1089
1090 pl = pa_proplist_new();
1091
1092 if (command != PA_COMMAND_REPLY) {
1093 if (command == PA_COMMAND_ERROR)
1094 pa_log("Failed to get info.");
1095 else
1096 pa_log("Protocol error.");
1097 goto fail;
1098 }
1099
1100 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1101 pa_tagstruct_gets(t, &name) < 0 ||
1102 pa_tagstruct_gets(t, &description) < 0 ||
1103 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1104 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1105 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1106 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1107 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1108 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1109 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1110 pa_tagstruct_get_usec(t, &latency) < 0 ||
1111 pa_tagstruct_gets(t, &driver) < 0 ||
1112 pa_tagstruct_getu32(t, &flags) < 0) {
1113
1114 pa_log("Parse failure");
1115 goto fail;
1116 }
1117
1118 if (u->version >= 13) {
1119 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1120 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1121
1122 pa_log("Parse failure");
1123 goto fail;
1124 }
1125 }
1126
1127 if (!pa_tagstruct_eof(t)) {
1128 pa_log("Packet too long");
1129 goto fail;
1130 }
1131
1132 pa_proplist_free(pl);
1133
1134 if (!u->source_name || strcmp(name, u->source_name))
1135 return;
1136
1137 pa_xfree(u->device_description);
1138 u->device_description = pa_xstrdup(description);
1139
1140 update_description(u);
1141
1142 return;
1143
1144 fail:
1145 pa_module_unload_request(u->module, TRUE);
1146 pa_proplist_free(pl);
1147 }
1148
1149 #endif
1150
1151 /* Called from main context */
1152 static void request_info(struct userdata *u) {
1153 pa_tagstruct *t;
1154 uint32_t tag;
1155 pa_assert(u);
1156
1157 t = pa_tagstruct_new(NULL, 0);
1158 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1159 pa_tagstruct_putu32(t, tag = u->ctag++);
1160 pa_pstream_send_tagstruct(u->pstream, t);
1161 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1162
1163 #ifdef TUNNEL_SINK
1164 t = pa_tagstruct_new(NULL, 0);
1165 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1166 pa_tagstruct_putu32(t, tag = u->ctag++);
1167 pa_tagstruct_putu32(t, u->device_index);
1168 pa_pstream_send_tagstruct(u->pstream, t);
1169 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1170
1171 if (u->sink_name) {
1172 t = pa_tagstruct_new(NULL, 0);
1173 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1174 pa_tagstruct_putu32(t, tag = u->ctag++);
1175 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1176 pa_tagstruct_puts(t, u->sink_name);
1177 pa_pstream_send_tagstruct(u->pstream, t);
1178 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1179 }
1180 #else
1181 if (u->source_name) {
1182 t = pa_tagstruct_new(NULL, 0);
1183 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1184 pa_tagstruct_putu32(t, tag = u->ctag++);
1185 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1186 pa_tagstruct_puts(t, u->source_name);
1187 pa_pstream_send_tagstruct(u->pstream, t);
1188 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1189 }
1190 #endif
1191 }
1192
1193 /* Called from main context */
1194 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1195 struct userdata *u = userdata;
1196 pa_subscription_event_type_t e;
1197 uint32_t idx;
1198
1199 pa_assert(pd);
1200 pa_assert(t);
1201 pa_assert(u);
1202 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1203
1204 if (pa_tagstruct_getu32(t, &e) < 0 ||
1205 pa_tagstruct_getu32(t, &idx) < 0) {
1206 pa_log("Invalid protocol reply");
1207 pa_module_unload_request(u->module, TRUE);
1208 return;
1209 }
1210
1211 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1212 #ifdef TUNNEL_SINK
1213 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1214 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1215 #else
1216 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1217 #endif
1218 )
1219 return;
1220
1221 request_info(u);
1222 }
1223
1224 /* Called from main context */
1225 static void start_subscribe(struct userdata *u) {
1226 pa_tagstruct *t;
1227 uint32_t tag;
1228 pa_assert(u);
1229
1230 t = pa_tagstruct_new(NULL, 0);
1231 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1232 pa_tagstruct_putu32(t, tag = u->ctag++);
1233 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1234 #ifdef TUNNEL_SINK
1235 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1236 #else
1237 PA_SUBSCRIPTION_MASK_SOURCE
1238 #endif
1239 );
1240
1241 pa_pstream_send_tagstruct(u->pstream, t);
1242 }
1243
1244 /* Called from main context */
1245 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1246 struct userdata *u = userdata;
1247 struct timeval ntv;
1248 #ifdef TUNNEL_SINK
1249 uint32_t bytes;
1250 #endif
1251
1252 pa_assert(pd);
1253 pa_assert(u);
1254 pa_assert(u->pdispatch == pd);
1255
1256 if (command != PA_COMMAND_REPLY) {
1257 if (command == PA_COMMAND_ERROR)
1258 pa_log("Failed to create stream.");
1259 else
1260 pa_log("Protocol error.");
1261 goto fail;
1262 }
1263
1264 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1265 pa_tagstruct_getu32(t, &u->device_index) < 0
1266 #ifdef TUNNEL_SINK
1267 || pa_tagstruct_getu32(t, &bytes) < 0
1268 #endif
1269 )
1270 goto parse_error;
1271
1272 if (u->version >= 9) {
1273 #ifdef TUNNEL_SINK
1274 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1275 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1276 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1277 pa_tagstruct_getu32(t, &u->minreq) < 0)
1278 goto parse_error;
1279 #else
1280 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1281 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1282 goto parse_error;
1283 #endif
1284 }
1285
1286 if (u->version >= 12) {
1287 pa_sample_spec ss;
1288 pa_channel_map cm;
1289 uint32_t device_index;
1290 const char *dn;
1291 pa_bool_t suspended;
1292
1293 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1294 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1295 pa_tagstruct_getu32(t, &device_index) < 0 ||
1296 pa_tagstruct_gets(t, &dn) < 0 ||
1297 pa_tagstruct_get_boolean(t, &suspended) < 0)
1298 goto parse_error;
1299
1300 #ifdef TUNNEL_SINK
1301 pa_xfree(u->sink_name);
1302 u->sink_name = pa_xstrdup(dn);
1303 #else
1304 pa_xfree(u->source_name);
1305 u->source_name = pa_xstrdup(dn);
1306 #endif
1307 }
1308
1309 if (u->version >= 13) {
1310 pa_usec_t usec;
1311
1312 if (pa_tagstruct_get_usec(t, &usec) < 0)
1313 goto parse_error;
1314
1315 #ifdef TUNNEL_SINK
1316 pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0);
1317 #else
1318 pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0);
1319 #endif
1320 }
1321
1322 if (!pa_tagstruct_eof(t))
1323 goto parse_error;
1324
1325 start_subscribe(u);
1326 request_info(u);
1327
1328 pa_assert(!u->time_event);
1329 pa_gettimeofday(&ntv);
1330 ntv.tv_sec += LATENCY_INTERVAL;
1331 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1332
1333 request_latency(u);
1334
1335 pa_log_debug("Stream created.");
1336
1337 #ifdef TUNNEL_SINK
1338 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1339 #endif
1340
1341 return;
1342
1343 parse_error:
1344 pa_log("Invalid reply. (Create stream)");
1345
1346 fail:
1347 pa_module_unload_request(u->module, TRUE);
1348
1349 }
1350
1351 /* Called from main context */
1352 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1353 struct userdata *u = userdata;
1354 pa_tagstruct *reply;
1355 char name[256], un[128], hn[128];
1356 #ifdef TUNNEL_SINK
1357 pa_cvolume volume;
1358 #endif
1359
1360 pa_assert(pd);
1361 pa_assert(u);
1362 pa_assert(u->pdispatch == pd);
1363
1364 if (command != PA_COMMAND_REPLY ||
1365 pa_tagstruct_getu32(t, &u->version) < 0 ||
1366 !pa_tagstruct_eof(t)) {
1367
1368 if (command == PA_COMMAND_ERROR)
1369 pa_log("Failed to authenticate");
1370 else
1371 pa_log("Protocol error.");
1372
1373 goto fail;
1374 }
1375
1376 /* Minimum supported protocol version */
1377 if (u->version < 8) {
1378 pa_log("Incompatible protocol version");
1379 goto fail;
1380 }
1381
1382 /* Starting with protocol version 13 the MSB of the version tag
1383 reflects if shm is enabled for this connection or not. We don't
1384 support SHM here at all, so we just ignore this. */
1385
1386 if (u->version >= 13)
1387 u->version &= 0x7FFFFFFFU;
1388
1389 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1390
1391 #ifdef TUNNEL_SINK
1392 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1393 u->sink_name,
1394 pa_get_user_name(un, sizeof(un)),
1395 pa_get_host_name(hn, sizeof(hn)));
1396 #else
1397 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1398 u->source_name,
1399 pa_get_user_name(un, sizeof(un)),
1400 pa_get_host_name(hn, sizeof(hn)));
1401 #endif
1402
1403 reply = pa_tagstruct_new(NULL, 0);
1404 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1405 pa_tagstruct_putu32(reply, tag = u->ctag++);
1406
1407 if (u->version >= 13) {
1408 pa_proplist *pl;
1409 pl = pa_proplist_new();
1410 pa_init_proplist(pl);
1411 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1412 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1413 pa_tagstruct_put_proplist(reply, pl);
1414 pa_proplist_free(pl);
1415 } else
1416 pa_tagstruct_puts(reply, "PulseAudio");
1417
1418 pa_pstream_send_tagstruct(u->pstream, reply);
1419 /* We ignore the server's reply here */
1420
1421 reply = pa_tagstruct_new(NULL, 0);
1422
1423 if (u->version < 13)
1424 /* Only for older PA versions we need to fill in the maxlength */
1425 u->maxlength = 4*1024*1024;
1426
1427 #ifdef TUNNEL_SINK
1428 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1429 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1430 u->prebuf = u->tlength;
1431 #else
1432 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1433 #endif
1434
1435 #ifdef TUNNEL_SINK
1436 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1437 pa_tagstruct_putu32(reply, tag = u->ctag++);
1438
1439 if (u->version < 13)
1440 pa_tagstruct_puts(reply, name);
1441
1442 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1443 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1444 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1445 pa_tagstruct_puts(reply, u->sink_name);
1446 pa_tagstruct_putu32(reply, u->maxlength);
1447 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1448 pa_tagstruct_putu32(reply, u->tlength);
1449 pa_tagstruct_putu32(reply, u->prebuf);
1450 pa_tagstruct_putu32(reply, u->minreq);
1451 pa_tagstruct_putu32(reply, 0);
1452 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1453 pa_tagstruct_put_cvolume(reply, &volume);
1454 #else
1455 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1456 pa_tagstruct_putu32(reply, tag = u->ctag++);
1457
1458 if (u->version < 13)
1459 pa_tagstruct_puts(reply, name);
1460
1461 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1462 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1463 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1464 pa_tagstruct_puts(reply, u->source_name);
1465 pa_tagstruct_putu32(reply, u->maxlength);
1466 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1467 pa_tagstruct_putu32(reply, u->fragsize);
1468 #endif
1469
1470 if (u->version >= 12) {
1471 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1472 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1473 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1474 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1475 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1476 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1477 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1478 }
1479
1480 if (u->version >= 13) {
1481 pa_proplist *pl;
1482
1483 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1484 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1485
1486 pl = pa_proplist_new();
1487 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1488 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1489 pa_tagstruct_put_proplist(reply, pl);
1490 pa_proplist_free(pl);
1491
1492 #ifndef TUNNEL_SINK
1493 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1494 #endif
1495 }
1496
1497 if (u->version >= 14) {
1498 #ifdef TUNNEL_SINK
1499 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1500 #endif
1501 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1502 }
1503
1504 pa_pstream_send_tagstruct(u->pstream, reply);
1505 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1506
1507 pa_log_debug("Connection authenticated, creating stream ...");
1508
1509 return;
1510
1511 fail:
1512 pa_module_unload_request(u->module, TRUE);
1513 }
1514
1515 /* Called from main context */
1516 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1517 struct userdata *u = userdata;
1518
1519 pa_assert(p);
1520 pa_assert(u);
1521
1522 pa_log_warn("Stream died.");
1523 pa_module_unload_request(u->module, TRUE);
1524 }
1525
1526 /* Called from main context */
1527 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1528 struct userdata *u = userdata;
1529
1530 pa_assert(p);
1531 pa_assert(packet);
1532 pa_assert(u);
1533
1534 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1535 pa_log("Invalid packet");
1536 pa_module_unload_request(u->module, TRUE);
1537 return;
1538 }
1539 }
1540
1541 #ifndef TUNNEL_SINK
1542 /* Called from main context */
1543 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1544 struct userdata *u = userdata;
1545
1546 pa_assert(p);
1547 pa_assert(chunk);
1548 pa_assert(u);
1549
1550 if (channel != u->channel) {
1551 pa_log("Recieved memory block on bad channel.");
1552 pa_module_unload_request(u->module, TRUE);
1553 return;
1554 }
1555
1556 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1557
1558 u->counter_delta += (int64_t) chunk->length;
1559 }
1560
1561 #endif
1562
1563 /* Called from main context */
1564 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1565 struct userdata *u = userdata;
1566 pa_tagstruct *t;
1567 uint32_t tag;
1568
1569 pa_assert(sc);
1570 pa_assert(u);
1571 pa_assert(u->client == sc);
1572
1573 pa_socket_client_unref(u->client);
1574 u->client = NULL;
1575
1576 if (!io) {
1577 pa_log("Connection failed: %s", pa_cstrerror(errno));
1578 pa_module_unload_request(u->module, TRUE);
1579 return;
1580 }
1581
1582 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1583 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1584
1585 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1586 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1587 #ifndef TUNNEL_SINK
1588 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1589 #endif
1590
1591 t = pa_tagstruct_new(NULL, 0);
1592 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1593 pa_tagstruct_putu32(t, tag = u->ctag++);
1594 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1595
1596 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1597
1598 #ifdef HAVE_CREDS
1599 {
1600 pa_creds ucred;
1601
1602 if (pa_iochannel_creds_supported(io))
1603 pa_iochannel_creds_enable(io);
1604
1605 ucred.uid = getuid();
1606 ucred.gid = getgid();
1607
1608 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1609 }
1610 #else
1611 pa_pstream_send_tagstruct(u->pstream, t);
1612 #endif
1613
1614 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1615
1616 pa_log_debug("Connection established, authenticating ...");
1617 }
1618
1619 #ifdef TUNNEL_SINK
1620
1621 /* Called from main context */
1622 static int sink_set_volume(pa_sink *sink) {
1623 struct userdata *u;
1624 pa_tagstruct *t;
1625 uint32_t tag;
1626
1627 pa_assert(sink);
1628 u = sink->userdata;
1629 pa_assert(u);
1630
1631 t = pa_tagstruct_new(NULL, 0);
1632 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1633 pa_tagstruct_putu32(t, tag = u->ctag++);
1634 pa_tagstruct_putu32(t, u->device_index);
1635 pa_tagstruct_put_cvolume(t, &sink->volume);
1636 pa_pstream_send_tagstruct(u->pstream, t);
1637
1638 return 0;
1639 }
1640
1641 /* Called from main context */
1642 static int sink_set_mute(pa_sink *sink) {
1643 struct userdata *u;
1644 pa_tagstruct *t;
1645 uint32_t tag;
1646
1647 pa_assert(sink);
1648 u = sink->userdata;
1649 pa_assert(u);
1650
1651 if (u->version < 11)
1652 return -1;
1653
1654 t = pa_tagstruct_new(NULL, 0);
1655 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1656 pa_tagstruct_putu32(t, tag = u->ctag++);
1657 pa_tagstruct_putu32(t, u->device_index);
1658 pa_tagstruct_put_boolean(t, !!sink->muted);
1659 pa_pstream_send_tagstruct(u->pstream, t);
1660
1661 return 0;
1662 }
1663
1664 #endif
1665
1666 int pa__init(pa_module*m) {
1667 pa_modargs *ma = NULL;
1668 struct userdata *u = NULL;
1669 pa_sample_spec ss;
1670 pa_channel_map map;
1671 char *dn = NULL;
1672 #ifdef TUNNEL_SINK
1673 pa_sink_new_data data;
1674 #else
1675 pa_source_new_data data;
1676 #endif
1677
1678 pa_assert(m);
1679
1680 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1681 pa_log("Failed to parse module arguments");
1682 goto fail;
1683 }
1684
1685 m->userdata = u = pa_xnew0(struct userdata, 1);
1686 u->core = m->core;
1687 u->module = m;
1688 u->client = NULL;
1689 u->pdispatch = NULL;
1690 u->pstream = NULL;
1691 u->server_name = NULL;
1692 #ifdef TUNNEL_SINK
1693 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1694 u->sink = NULL;
1695 u->requested_bytes = 0;
1696 #else
1697 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1698 u->source = NULL;
1699 #endif
1700 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1701 u->ctag = 1;
1702 u->device_index = u->channel = PA_INVALID_INDEX;
1703 u->time_event = NULL;
1704 u->ignore_latency_before = 0;
1705 u->transport_usec = 0;
1706 u->transport_usec_valid = FALSE;
1707 u->remote_suspended = u->remote_corked = FALSE;
1708 u->counter = u->counter_delta = 0;
1709
1710 u->rtpoll = pa_rtpoll_new();
1711 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1712
1713 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1714 goto fail;
1715
1716 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1717 pa_log("No server specified.");
1718 goto fail;
1719 }
1720
1721 ss = m->core->default_sample_spec;
1722 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1723 pa_log("Invalid sample format specification");
1724 goto fail;
1725 }
1726
1727 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1728 pa_log("Failed to connect to server '%s'", u->server_name);
1729 goto fail;
1730 }
1731
1732 pa_socket_client_set_callback(u->client, on_connection, u);
1733
1734 #ifdef TUNNEL_SINK
1735
1736 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1737 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1738
1739 pa_sink_new_data_init(&data);
1740 data.driver = __FILE__;
1741 data.module = m;
1742 data.namereg_fail = TRUE;
1743 pa_sink_new_data_set_name(&data, dn);
1744 pa_sink_new_data_set_sample_spec(&data, &ss);
1745 pa_sink_new_data_set_channel_map(&data, &map);
1746 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1747 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1748 if (u->sink_name)
1749 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1750
1751 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1752 pa_sink_new_data_done(&data);
1753
1754 if (!u->sink) {
1755 pa_log("Failed to create sink.");
1756 goto fail;
1757 }
1758
1759 u->sink->parent.process_msg = sink_process_msg;
1760 u->sink->userdata = u;
1761 u->sink->set_state = sink_set_state;
1762 u->sink->set_volume = sink_set_volume;
1763 u->sink->set_mute = sink_set_mute;
1764
1765 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1766
1767 pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0);
1768
1769 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1770 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1771
1772 #else
1773
1774 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1775 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1776
1777 pa_source_new_data_init(&data);
1778 data.driver = __FILE__;
1779 data.module = m;
1780 data.namereg_fail = TRUE;
1781 pa_source_new_data_set_name(&data, dn);
1782 pa_source_new_data_set_sample_spec(&data, &ss);
1783 pa_source_new_data_set_channel_map(&data, &map);
1784 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1785 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1786 if (u->source_name)
1787 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1788
1789 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1790 pa_source_new_data_done(&data);
1791
1792 if (!u->source) {
1793 pa_log("Failed to create source.");
1794 goto fail;
1795 }
1796
1797 u->source->parent.process_msg = source_process_msg;
1798 u->source->set_state = source_set_state;
1799 u->source->userdata = u;
1800
1801 pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0);
1802
1803 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1804 pa_source_set_rtpoll(u->source, u->rtpoll);
1805 #endif
1806
1807 pa_xfree(dn);
1808
1809 u->time_event = NULL;
1810
1811 u->maxlength = 0;
1812 #ifdef TUNNEL_SINK
1813 u->tlength = u->minreq = u->prebuf = 0;
1814 #else
1815 u->fragsize = 0;
1816 #endif
1817
1818 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1819
1820 if (!(u->thread = pa_thread_new(thread_func, u))) {
1821 pa_log("Failed to create thread.");
1822 goto fail;
1823 }
1824
1825 #ifdef TUNNEL_SINK
1826 pa_sink_put(u->sink);
1827 #else
1828 pa_source_put(u->source);
1829 #endif
1830
1831 pa_modargs_free(ma);
1832
1833 return 0;
1834
1835 fail:
1836 pa__done(m);
1837
1838 if (ma)
1839 pa_modargs_free(ma);
1840
1841 pa_xfree(dn);
1842
1843 return -1;
1844 }
1845
1846 void pa__done(pa_module*m) {
1847 struct userdata* u;
1848
1849 pa_assert(m);
1850
1851 if (!(u = m->userdata))
1852 return;
1853
1854 #ifdef TUNNEL_SINK
1855 if (u->sink)
1856 pa_sink_unlink(u->sink);
1857 #else
1858 if (u->source)
1859 pa_source_unlink(u->source);
1860 #endif
1861
1862 if (u->thread) {
1863 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1864 pa_thread_free(u->thread);
1865 }
1866
1867 pa_thread_mq_done(&u->thread_mq);
1868
1869 #ifdef TUNNEL_SINK
1870 if (u->sink)
1871 pa_sink_unref(u->sink);
1872 #else
1873 if (u->source)
1874 pa_source_unref(u->source);
1875 #endif
1876
1877 if (u->rtpoll)
1878 pa_rtpoll_free(u->rtpoll);
1879
1880 if (u->pstream) {
1881 pa_pstream_unlink(u->pstream);
1882 pa_pstream_unref(u->pstream);
1883 }
1884
1885 if (u->pdispatch)
1886 pa_pdispatch_unref(u->pdispatch);
1887
1888 if (u->client)
1889 pa_socket_client_unref(u->client);
1890
1891 if (u->auth_cookie)
1892 pa_auth_cookie_unref(u->auth_cookie);
1893
1894 if (u->smoother)
1895 pa_smoother_free(u->smoother);
1896
1897 if (u->time_event)
1898 u->core->mainloop->time_free(u->time_event);
1899
1900 #ifdef TUNNEL_SINK
1901 pa_xfree(u->sink_name);
1902 #else
1903 pa_xfree(u->source_name);
1904 #endif
1905 pa_xfree(u->server_name);
1906
1907 pa_xfree(u->device_description);
1908 pa_xfree(u->server_fqdn);
1909 pa_xfree(u->user_name);
1910
1911 pa_xfree(u);
1912 }