]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
1ddfd259596dc698ecce8f983553520dc955844f
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #ifdef HAVE_X11
35 #include <xcb/xcb.h>
36 #endif
37
38 #include <pulse/rtclock.h>
39 #include <pulse/timeval.h>
40 #include <pulse/util.h>
41 #include <pulse/version.h>
42 #include <pulse/xmalloc.h>
43
44 #include <pulsecore/module.h>
45 #include <pulsecore/core-util.h>
46 #include <pulsecore/modargs.h>
47 #include <pulsecore/log.h>
48 #include <pulsecore/core-subscribe.h>
49 #include <pulsecore/pdispatch.h>
50 #include <pulsecore/pstream.h>
51 #include <pulsecore/pstream-util.h>
52 #include <pulsecore/socket-client.h>
53 #include <pulsecore/time-smoother.h>
54 #include <pulsecore/thread.h>
55 #include <pulsecore/thread-mq.h>
56 #include <pulsecore/core-rtclock.h>
57 #include <pulsecore/core-error.h>
58 #include <pulsecore/proplist-util.h>
59 #include <pulsecore/auth-cookie.h>
60 #include <pulsecore/mcalign.h>
61 #include <pulsecore/strlist.h>
62
63 #ifdef HAVE_X11
64 #include <pulsecore/x11prop.h>
65 #endif
66
67 #ifdef TUNNEL_SINK
68 #include "module-tunnel-sink-symdef.h"
69 #else
70 #include "module-tunnel-source-symdef.h"
71 #endif
72
73 #define ENV_DEFAULT_SINK "PULSE_SINK"
74 #define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
75 #define ENV_DEFAULT_SERVER "PULSE_SERVER"
76 #define ENV_COOKIE_FILE "PULSE_COOKIE"
77
78 #ifdef TUNNEL_SINK
79 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
80 PA_MODULE_USAGE(
81 "sink_name=<name for the local sink> "
82 "sink_properties=<properties for the local sink> "
83 "auto=<determine server/sink/cookie automatically> "
84 "server=<address> "
85 "sink=<remote sink name> "
86 "cookie=<filename> "
87 "format=<sample format> "
88 "channels=<number of channels> "
89 "rate=<sample rate> "
90 "channel_map=<channel map>");
91 #else
92 PA_MODULE_DESCRIPTION("Tunnel module for sources");
93 PA_MODULE_USAGE(
94 "source_name=<name for the local source> "
95 "source_properties=<properties for the local source> "
96 "auto=<determine server/source/cookie automatically> "
97 "server=<address> "
98 "source=<remote source name> "
99 "cookie=<filename> "
100 "format=<sample format> "
101 "channels=<number of channels> "
102 "rate=<sample rate> "
103 "channel_map=<channel map>");
104 #endif
105
106 PA_MODULE_AUTHOR("Lennart Poettering");
107 PA_MODULE_VERSION(PACKAGE_VERSION);
108 PA_MODULE_LOAD_ONCE(false);
109
110 static const char* const valid_modargs[] = {
111 "auto",
112 "server",
113 "cookie",
114 "format",
115 "channels",
116 "rate",
117 #ifdef TUNNEL_SINK
118 "sink_name",
119 "sink_properties",
120 "sink",
121 #else
122 "source_name",
123 "source_properties",
124 "source",
125 #endif
126 "channel_map",
127 NULL,
128 };
129
130 #define DEFAULT_TIMEOUT 5
131
132 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
133
134 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
135
136 #ifdef TUNNEL_SINK
137
138 enum {
139 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
140 SINK_MESSAGE_REMOTE_SUSPEND,
141 SINK_MESSAGE_UPDATE_LATENCY,
142 SINK_MESSAGE_POST
143 };
144
145 #define DEFAULT_TLENGTH_MSEC 150
146 #define DEFAULT_MINREQ_MSEC 25
147
148 #else
149
150 enum {
151 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
152 SOURCE_MESSAGE_REMOTE_SUSPEND,
153 SOURCE_MESSAGE_UPDATE_LATENCY
154 };
155
156 #define DEFAULT_FRAGSIZE_MSEC 25
157
158 #endif
159
160 #ifdef TUNNEL_SINK
161 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
162 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
163 #endif
164 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
165 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
166 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
167 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
168 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
169 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
170 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
171
172 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
173 #ifdef TUNNEL_SINK
174 [PA_COMMAND_REQUEST] = command_request,
175 [PA_COMMAND_STARTED] = command_started,
176 #endif
177 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
178 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
179 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
180 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
181 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
182 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
183 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
184 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
185 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
186 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
187 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
188 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
189 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
190 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
191 };
192
193 struct userdata {
194 pa_core *core;
195 pa_module *module;
196
197 pa_thread_mq thread_mq;
198 pa_rtpoll *rtpoll;
199 pa_thread *thread;
200
201 pa_socket_client *client;
202 pa_pstream *pstream;
203 pa_pdispatch *pdispatch;
204
205 char *server_name;
206 #ifdef TUNNEL_SINK
207 char *sink_name;
208 pa_sink *sink;
209 size_t requested_bytes;
210 #else
211 char *source_name;
212 pa_source *source;
213 pa_mcalign *mcalign;
214 #endif
215
216 pa_auth_cookie *auth_cookie;
217
218 uint32_t version;
219 uint32_t ctag;
220 uint32_t device_index;
221 uint32_t channel;
222
223 int64_t counter, counter_delta;
224
225 bool remote_corked:1;
226 bool remote_suspended:1;
227
228 pa_usec_t transport_usec; /* maintained in the main thread */
229 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
230
231 uint32_t ignore_latency_before;
232
233 pa_time_event *time_event;
234
235 pa_smoother *smoother;
236
237 char *device_description;
238 char *server_fqdn;
239 char *user_name;
240
241 uint32_t maxlength;
242 #ifdef TUNNEL_SINK
243 uint32_t tlength;
244 uint32_t minreq;
245 uint32_t prebuf;
246 #else
247 uint32_t fragsize;
248 #endif
249 };
250
251 static void request_latency(struct userdata *u);
252
253 /* Called from main context */
254 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
255 pa_log_debug("Got stream or client event.");
256 }
257
258 /* Called from main context */
259 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
260 struct userdata *u = userdata;
261
262 pa_assert(pd);
263 pa_assert(t);
264 pa_assert(u);
265 pa_assert(u->pdispatch == pd);
266
267 pa_log_warn("Stream killed");
268 pa_module_unload_request(u->module, true);
269 }
270
271 /* Called from main context */
272 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
273 struct userdata *u = userdata;
274
275 pa_assert(pd);
276 pa_assert(t);
277 pa_assert(u);
278 pa_assert(u->pdispatch == pd);
279
280 pa_log_info("Server signalled buffer overrun/underrun.");
281 request_latency(u);
282 }
283
284 /* Called from main context */
285 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
286 struct userdata *u = userdata;
287 uint32_t channel;
288 bool suspended;
289
290 pa_assert(pd);
291 pa_assert(t);
292 pa_assert(u);
293 pa_assert(u->pdispatch == pd);
294
295 if (pa_tagstruct_getu32(t, &channel) < 0 ||
296 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
297 !pa_tagstruct_eof(t)) {
298
299 pa_log("Invalid packet.");
300 pa_module_unload_request(u->module, true);
301 return;
302 }
303
304 pa_log_debug("Server reports device suspend.");
305
306 #ifdef TUNNEL_SINK
307 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
308 #else
309 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
310 #endif
311
312 request_latency(u);
313 }
314
315 /* Called from main context */
316 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
317 struct userdata *u = userdata;
318 uint32_t channel, di;
319 const char *dn;
320 bool suspended;
321
322 pa_assert(pd);
323 pa_assert(t);
324 pa_assert(u);
325 pa_assert(u->pdispatch == pd);
326
327 if (pa_tagstruct_getu32(t, &channel) < 0 ||
328 pa_tagstruct_getu32(t, &di) < 0 ||
329 pa_tagstruct_gets(t, &dn) < 0 ||
330 pa_tagstruct_get_boolean(t, &suspended) < 0) {
331
332 pa_log_error("Invalid packet.");
333 pa_module_unload_request(u->module, true);
334 return;
335 }
336
337 pa_log_debug("Server reports a stream move.");
338
339 #ifdef TUNNEL_SINK
340 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
341 #else
342 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
343 #endif
344
345 request_latency(u);
346 }
347
348 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
349 struct userdata *u = userdata;
350 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
351 pa_usec_t usec;
352
353 pa_assert(pd);
354 pa_assert(t);
355 pa_assert(u);
356 pa_assert(u->pdispatch == pd);
357
358 if (pa_tagstruct_getu32(t, &channel) < 0 ||
359 pa_tagstruct_getu32(t, &maxlength) < 0) {
360
361 pa_log_error("Invalid packet.");
362 pa_module_unload_request(u->module, true);
363 return;
364 }
365
366 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
367 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
368 pa_tagstruct_get_usec(t, &usec) < 0) {
369
370 pa_log_error("Invalid packet.");
371 pa_module_unload_request(u->module, true);
372 return;
373 }
374 } else {
375 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
376 pa_tagstruct_getu32(t, &prebuf) < 0 ||
377 pa_tagstruct_getu32(t, &minreq) < 0 ||
378 pa_tagstruct_get_usec(t, &usec) < 0) {
379
380 pa_log_error("Invalid packet.");
381 pa_module_unload_request(u->module, true);
382 return;
383 }
384 }
385
386 #ifdef TUNNEL_SINK
387 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
388 #endif
389
390 request_latency(u);
391 }
392
393 #ifdef TUNNEL_SINK
394
395 /* Called from main context */
396 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
397 struct userdata *u = userdata;
398
399 pa_assert(pd);
400 pa_assert(t);
401 pa_assert(u);
402 pa_assert(u->pdispatch == pd);
403
404 pa_log_debug("Server reports playback started.");
405 request_latency(u);
406 }
407
408 #endif
409
410 /* Called from IO thread context */
411 static void check_smoother_status(struct userdata *u, bool past) {
412 pa_usec_t x;
413
414 pa_assert(u);
415
416 x = pa_rtclock_now();
417
418 /* Correct by the time the requested issued needs to travel to the
419 * other side. This is a valid thread-safe access, because the
420 * main thread is waiting for us */
421
422 if (past)
423 x -= u->thread_transport_usec;
424 else
425 x += u->thread_transport_usec;
426
427 if (u->remote_suspended || u->remote_corked)
428 pa_smoother_pause(u->smoother, x);
429 else
430 pa_smoother_resume(u->smoother, x, true);
431 }
432
433 /* Called from IO thread context */
434 static void stream_cork_within_thread(struct userdata *u, bool cork) {
435 pa_assert(u);
436
437 if (u->remote_corked == cork)
438 return;
439
440 u->remote_corked = cork;
441 check_smoother_status(u, false);
442 }
443
444 /* Called from main context */
445 static void stream_cork(struct userdata *u, bool cork) {
446 pa_tagstruct *t;
447 pa_assert(u);
448
449 if (!u->pstream)
450 return;
451
452 t = pa_tagstruct_new(NULL, 0);
453 #ifdef TUNNEL_SINK
454 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
455 #else
456 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
457 #endif
458 pa_tagstruct_putu32(t, u->ctag++);
459 pa_tagstruct_putu32(t, u->channel);
460 pa_tagstruct_put_boolean(t, !!cork);
461 pa_pstream_send_tagstruct(u->pstream, t);
462
463 request_latency(u);
464 }
465
466 /* Called from IO thread context */
467 static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
468 pa_assert(u);
469
470 if (u->remote_suspended == suspend)
471 return;
472
473 u->remote_suspended = suspend;
474 check_smoother_status(u, true);
475 }
476
477 #ifdef TUNNEL_SINK
478
479 /* Called from IO thread context */
480 static void send_data(struct userdata *u) {
481 pa_assert(u);
482
483 while (u->requested_bytes > 0) {
484 pa_memchunk memchunk;
485
486 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
487 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
488 pa_memblock_unref(memchunk.memblock);
489
490 u->requested_bytes -= memchunk.length;
491
492 u->counter += (int64_t) memchunk.length;
493 }
494 }
495
496 /* This function is called from IO context -- except when it is not. */
497 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
498 struct userdata *u = PA_SINK(o)->userdata;
499
500 switch (code) {
501
502 case PA_SINK_MESSAGE_SET_STATE: {
503 int r;
504
505 /* First, change the state, because otherwise pa_sink_render() would fail */
506 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
507
508 stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
509
510 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
511 send_data(u);
512 }
513
514 return r;
515 }
516
517 case PA_SINK_MESSAGE_GET_LATENCY: {
518 pa_usec_t yl, yr, *usec = data;
519
520 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
521 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
522
523 *usec = yl > yr ? yl - yr : 0;
524 return 0;
525 }
526
527 case SINK_MESSAGE_REQUEST:
528
529 pa_assert(offset > 0);
530 u->requested_bytes += (size_t) offset;
531
532 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
533 send_data(u);
534
535 return 0;
536
537 case SINK_MESSAGE_REMOTE_SUSPEND:
538
539 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
540 return 0;
541
542 case SINK_MESSAGE_UPDATE_LATENCY: {
543 pa_usec_t y;
544
545 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
546
547 if (y > (pa_usec_t) offset)
548 y -= (pa_usec_t) offset;
549 else
550 y = 0;
551
552 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
553
554 /* We can access this freely here, since the main thread is waiting for us */
555 u->thread_transport_usec = u->transport_usec;
556
557 return 0;
558 }
559
560 case SINK_MESSAGE_POST:
561
562 /* OK, This might be a bit confusing. This message is
563 * delivered to us from the main context -- NOT from the
564 * IO thread context where the rest of the messages are
565 * dispatched. Yeah, ugly, but I am a lazy bastard. */
566
567 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
568
569 u->counter_delta += (int64_t) chunk->length;
570
571 return 0;
572 }
573
574 return pa_sink_process_msg(o, code, data, offset, chunk);
575 }
576
577 /* Called from main context */
578 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
579 struct userdata *u;
580 pa_sink_assert_ref(s);
581 u = s->userdata;
582
583 switch ((pa_sink_state_t) state) {
584
585 case PA_SINK_SUSPENDED:
586 pa_assert(PA_SINK_IS_OPENED(s->state));
587 stream_cork(u, true);
588 break;
589
590 case PA_SINK_IDLE:
591 case PA_SINK_RUNNING:
592 if (s->state == PA_SINK_SUSPENDED)
593 stream_cork(u, false);
594 break;
595
596 case PA_SINK_UNLINKED:
597 case PA_SINK_INIT:
598 case PA_SINK_INVALID_STATE:
599 ;
600 }
601
602 return 0;
603 }
604
605 #else
606
607 /* This function is called from IO context -- except when it is not. */
608 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
609 struct userdata *u = PA_SOURCE(o)->userdata;
610
611 switch (code) {
612
613 case PA_SOURCE_MESSAGE_SET_STATE: {
614 int r;
615
616 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
617 stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
618
619 return r;
620 }
621
622 case PA_SOURCE_MESSAGE_GET_LATENCY: {
623 pa_usec_t yr, yl, *usec = data;
624
625 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
626 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
627
628 *usec = yr > yl ? yr - yl : 0;
629 return 0;
630 }
631
632 case SOURCE_MESSAGE_POST: {
633 pa_memchunk c;
634
635 pa_mcalign_push(u->mcalign, chunk);
636
637 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
638
639 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
640 pa_source_post(u->source, &c);
641
642 pa_memblock_unref(c.memblock);
643
644 u->counter += (int64_t) c.length;
645 }
646
647 return 0;
648 }
649
650 case SOURCE_MESSAGE_REMOTE_SUSPEND:
651
652 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
653 return 0;
654
655 case SOURCE_MESSAGE_UPDATE_LATENCY: {
656 pa_usec_t y;
657
658 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
659 y += (pa_usec_t) offset;
660
661 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
662
663 /* We can access this freely here, since the main thread is waiting for us */
664 u->thread_transport_usec = u->transport_usec;
665
666 return 0;
667 }
668 }
669
670 return pa_source_process_msg(o, code, data, offset, chunk);
671 }
672
673 /* Called from main context */
674 static int source_set_state(pa_source *s, pa_source_state_t state) {
675 struct userdata *u;
676 pa_source_assert_ref(s);
677 u = s->userdata;
678
679 switch ((pa_source_state_t) state) {
680
681 case PA_SOURCE_SUSPENDED:
682 pa_assert(PA_SOURCE_IS_OPENED(s->state));
683 stream_cork(u, true);
684 break;
685
686 case PA_SOURCE_IDLE:
687 case PA_SOURCE_RUNNING:
688 if (s->state == PA_SOURCE_SUSPENDED)
689 stream_cork(u, false);
690 break;
691
692 case PA_SOURCE_UNLINKED:
693 case PA_SOURCE_INIT:
694 case PA_SINK_INVALID_STATE:
695 ;
696 }
697
698 return 0;
699 }
700
701 #endif
702
703 static void thread_func(void *userdata) {
704 struct userdata *u = userdata;
705
706 pa_assert(u);
707
708 pa_log_debug("Thread starting up");
709
710 pa_thread_mq_install(&u->thread_mq);
711
712 for (;;) {
713 int ret;
714
715 #ifdef TUNNEL_SINK
716 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
717 pa_sink_process_rewind(u->sink, 0);
718 #endif
719
720 if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0)
721 goto fail;
722
723 if (ret == 0)
724 goto finish;
725 }
726
727 fail:
728 /* If this was no regular exit from the loop we have to continue
729 * processing messages until we received PA_MESSAGE_SHUTDOWN */
730 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
731 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
732
733 finish:
734 pa_log_debug("Thread shutting down");
735 }
736
737 #ifdef TUNNEL_SINK
738 /* Called from main context */
739 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
740 struct userdata *u = userdata;
741 uint32_t bytes, channel;
742
743 pa_assert(pd);
744 pa_assert(command == PA_COMMAND_REQUEST);
745 pa_assert(t);
746 pa_assert(u);
747 pa_assert(u->pdispatch == pd);
748
749 if (pa_tagstruct_getu32(t, &channel) < 0 ||
750 pa_tagstruct_getu32(t, &bytes) < 0) {
751 pa_log("Invalid protocol reply");
752 goto fail;
753 }
754
755 if (channel != u->channel) {
756 pa_log("Received data for invalid channel");
757 goto fail;
758 }
759
760 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
761 return;
762
763 fail:
764 pa_module_unload_request(u->module, true);
765 }
766
767 #endif
768
769 /* Called from main context */
770 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
771 struct userdata *u = userdata;
772 pa_usec_t sink_usec, source_usec;
773 bool playing;
774 int64_t write_index, read_index;
775 struct timeval local, remote, now;
776 pa_sample_spec *ss;
777 int64_t delay;
778
779 pa_assert(pd);
780 pa_assert(u);
781
782 if (command != PA_COMMAND_REPLY) {
783 if (command == PA_COMMAND_ERROR)
784 pa_log("Failed to get latency.");
785 else
786 pa_log("Protocol error.");
787 goto fail;
788 }
789
790 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
791 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
792 pa_tagstruct_get_boolean(t, &playing) < 0 ||
793 pa_tagstruct_get_timeval(t, &local) < 0 ||
794 pa_tagstruct_get_timeval(t, &remote) < 0 ||
795 pa_tagstruct_gets64(t, &write_index) < 0 ||
796 pa_tagstruct_gets64(t, &read_index) < 0) {
797 pa_log("Invalid reply.");
798 goto fail;
799 }
800
801 #ifdef TUNNEL_SINK
802 if (u->version >= 13) {
803 uint64_t underrun_for = 0, playing_for = 0;
804
805 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
806 pa_tagstruct_getu64(t, &playing_for) < 0) {
807 pa_log("Invalid reply.");
808 goto fail;
809 }
810 }
811 #endif
812
813 if (!pa_tagstruct_eof(t)) {
814 pa_log("Invalid reply.");
815 goto fail;
816 }
817
818 if (tag < u->ignore_latency_before) {
819 return;
820 }
821
822 pa_gettimeofday(&now);
823
824 /* Calculate transport usec */
825 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
826 /* local and remote seem to have synchronized clocks */
827 #ifdef TUNNEL_SINK
828 u->transport_usec = pa_timeval_diff(&remote, &local);
829 #else
830 u->transport_usec = pa_timeval_diff(&now, &remote);
831 #endif
832 } else
833 u->transport_usec = pa_timeval_diff(&now, &local)/2;
834
835 /* First, take the device's delay */
836 #ifdef TUNNEL_SINK
837 delay = (int64_t) sink_usec;
838 ss = &u->sink->sample_spec;
839 #else
840 delay = (int64_t) source_usec;
841 ss = &u->source->sample_spec;
842 #endif
843
844 /* Add the length of our server-side buffer */
845 if (write_index >= read_index)
846 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
847 else
848 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
849
850 /* Our measurements are already out of date, hence correct by the *
851 * transport latency */
852 #ifdef TUNNEL_SINK
853 delay -= (int64_t) u->transport_usec;
854 #else
855 delay += (int64_t) u->transport_usec;
856 #endif
857
858 /* Now correct by what we have have read/written since we requested the update */
859 #ifdef TUNNEL_SINK
860 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
861 #else
862 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
863 #endif
864
865 #ifdef TUNNEL_SINK
866 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
867 #else
868 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
869 #endif
870
871 return;
872
873 fail:
874
875 pa_module_unload_request(u->module, true);
876 }
877
878 /* Called from main context */
879 static void request_latency(struct userdata *u) {
880 pa_tagstruct *t;
881 struct timeval now;
882 uint32_t tag;
883 pa_assert(u);
884
885 t = pa_tagstruct_new(NULL, 0);
886 #ifdef TUNNEL_SINK
887 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
888 #else
889 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
890 #endif
891 pa_tagstruct_putu32(t, tag = u->ctag++);
892 pa_tagstruct_putu32(t, u->channel);
893
894 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
895
896 pa_pstream_send_tagstruct(u->pstream, t);
897 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
898
899 u->ignore_latency_before = tag;
900 u->counter_delta = 0;
901 }
902
903 /* Called from main context */
904 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
905 struct userdata *u = userdata;
906
907 pa_assert(m);
908 pa_assert(e);
909 pa_assert(u);
910
911 request_latency(u);
912
913 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
914 }
915
916 /* Called from main context */
917 static void update_description(struct userdata *u) {
918 char *d;
919 char un[128], hn[128];
920 pa_tagstruct *t;
921
922 pa_assert(u);
923
924 if (!u->server_fqdn || !u->user_name || !u->device_description)
925 return;
926
927 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
928
929 #ifdef TUNNEL_SINK
930 pa_sink_set_description(u->sink, d);
931 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
932 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
933 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
934 #else
935 pa_source_set_description(u->source, d);
936 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
937 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
938 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
939 #endif
940
941 pa_xfree(d);
942
943 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
944 pa_get_user_name(un, sizeof(un)),
945 pa_get_host_name(hn, sizeof(hn)));
946
947 t = pa_tagstruct_new(NULL, 0);
948 #ifdef TUNNEL_SINK
949 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
950 #else
951 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
952 #endif
953 pa_tagstruct_putu32(t, u->ctag++);
954 pa_tagstruct_putu32(t, u->channel);
955 pa_tagstruct_puts(t, d);
956 pa_pstream_send_tagstruct(u->pstream, t);
957
958 pa_xfree(d);
959 }
960
961 /* Called from main context */
962 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
963 struct userdata *u = userdata;
964 pa_sample_spec ss;
965 pa_channel_map cm;
966 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
967 uint32_t cookie;
968
969 pa_assert(pd);
970 pa_assert(u);
971
972 if (command != PA_COMMAND_REPLY) {
973 if (command == PA_COMMAND_ERROR)
974 pa_log("Failed to get info.");
975 else
976 pa_log("Protocol error.");
977 goto fail;
978 }
979
980 if (pa_tagstruct_gets(t, &server_name) < 0 ||
981 pa_tagstruct_gets(t, &server_version) < 0 ||
982 pa_tagstruct_gets(t, &user_name) < 0 ||
983 pa_tagstruct_gets(t, &host_name) < 0 ||
984 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
985 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
986 pa_tagstruct_gets(t, &default_source_name) < 0 ||
987 pa_tagstruct_getu32(t, &cookie) < 0 ||
988 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
989
990 pa_log("Parse failure");
991 goto fail;
992 }
993
994 if (!pa_tagstruct_eof(t)) {
995 pa_log("Packet too long");
996 goto fail;
997 }
998
999 pa_xfree(u->server_fqdn);
1000 u->server_fqdn = pa_xstrdup(host_name);
1001
1002 pa_xfree(u->user_name);
1003 u->user_name = pa_xstrdup(user_name);
1004
1005 update_description(u);
1006
1007 return;
1008
1009 fail:
1010 pa_module_unload_request(u->module, true);
1011 }
1012
1013 static int read_ports(struct userdata *u, pa_tagstruct *t) {
1014 if (u->version >= 16) {
1015 uint32_t n_ports;
1016 const char *s;
1017
1018 if (pa_tagstruct_getu32(t, &n_ports)) {
1019 pa_log("Parse failure");
1020 return -PA_ERR_PROTOCOL;
1021 }
1022
1023 for (uint32_t j = 0; j < n_ports; j++) {
1024 uint32_t priority;
1025
1026 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1027 pa_tagstruct_gets(t, &s) < 0 || /* description */
1028 pa_tagstruct_getu32(t, &priority) < 0) {
1029
1030 pa_log("Parse failure");
1031 return -PA_ERR_PROTOCOL;
1032 }
1033 if (u->version >= 24 && pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1034 pa_log("Parse failure");
1035 return -PA_ERR_PROTOCOL;
1036 }
1037 }
1038
1039 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1040 pa_log("Parse failure");
1041 return -PA_ERR_PROTOCOL;
1042 }
1043 }
1044 return 0;
1045 }
1046
1047 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1048 uint8_t n_formats;
1049 pa_format_info *format;
1050
1051 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1052 pa_log("Parse failure");
1053 return -PA_ERR_PROTOCOL;
1054 }
1055
1056 for (uint8_t j = 0; j < n_formats; j++) {
1057 format = pa_format_info_new();
1058 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1059 pa_format_info_free(format);
1060 pa_log("Parse failure");
1061 return -PA_ERR_PROTOCOL;
1062 }
1063 pa_format_info_free(format);
1064 }
1065 return 0;
1066 }
1067
1068 #ifdef TUNNEL_SINK
1069
1070 /* Called from main context */
1071 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1072 struct userdata *u = userdata;
1073 uint32_t idx, owner_module, monitor_source, flags;
1074 const char *name, *description, *monitor_source_name, *driver;
1075 pa_sample_spec ss;
1076 pa_channel_map cm;
1077 pa_cvolume volume;
1078 bool mute;
1079 pa_usec_t latency;
1080
1081 pa_assert(pd);
1082 pa_assert(u);
1083
1084 if (command != PA_COMMAND_REPLY) {
1085 if (command == PA_COMMAND_ERROR)
1086 pa_log("Failed to get info.");
1087 else
1088 pa_log("Protocol error.");
1089 goto fail;
1090 }
1091
1092 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1093 pa_tagstruct_gets(t, &name) < 0 ||
1094 pa_tagstruct_gets(t, &description) < 0 ||
1095 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1096 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1097 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1098 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1099 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1100 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1101 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1102 pa_tagstruct_get_usec(t, &latency) < 0 ||
1103 pa_tagstruct_gets(t, &driver) < 0 ||
1104 pa_tagstruct_getu32(t, &flags) < 0) {
1105
1106 pa_log("Parse failure");
1107 goto fail;
1108 }
1109
1110 if (u->version >= 13) {
1111 pa_usec_t configured_latency;
1112
1113 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1114 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1115
1116 pa_log("Parse failure");
1117 goto fail;
1118 }
1119 }
1120
1121 if (u->version >= 15) {
1122 pa_volume_t base_volume;
1123 uint32_t state, n_volume_steps, card;
1124
1125 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1126 pa_tagstruct_getu32(t, &state) < 0 ||
1127 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1128 pa_tagstruct_getu32(t, &card) < 0) {
1129
1130 pa_log("Parse failure");
1131 goto fail;
1132 }
1133 }
1134
1135 if (read_ports(u, t) < 0)
1136 goto fail;
1137
1138 if (u->version >= 21 && read_formats(u, t) < 0)
1139 goto fail;
1140
1141 if (!pa_tagstruct_eof(t)) {
1142 pa_log("Packet too long");
1143 goto fail;
1144 }
1145
1146 if (!u->sink_name || !pa_streq(name, u->sink_name))
1147 return;
1148
1149 pa_xfree(u->device_description);
1150 u->device_description = pa_xstrdup(description);
1151
1152 update_description(u);
1153
1154 return;
1155
1156 fail:
1157 pa_module_unload_request(u->module, true);
1158 }
1159
1160 /* Called from main context */
1161 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1162 struct userdata *u = userdata;
1163 uint32_t idx, owner_module, client, sink;
1164 pa_usec_t buffer_usec, sink_usec;
1165 const char *name, *driver, *resample_method;
1166 bool mute = false;
1167 pa_sample_spec sample_spec;
1168 pa_channel_map channel_map;
1169 pa_cvolume volume;
1170 bool b;
1171
1172 pa_assert(pd);
1173 pa_assert(u);
1174
1175 if (command != PA_COMMAND_REPLY) {
1176 if (command == PA_COMMAND_ERROR)
1177 pa_log("Failed to get info.");
1178 else
1179 pa_log("Protocol error.");
1180 goto fail;
1181 }
1182
1183 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1184 pa_tagstruct_gets(t, &name) < 0 ||
1185 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1186 pa_tagstruct_getu32(t, &client) < 0 ||
1187 pa_tagstruct_getu32(t, &sink) < 0 ||
1188 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1189 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1190 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1191 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1192 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1193 pa_tagstruct_gets(t, &resample_method) < 0 ||
1194 pa_tagstruct_gets(t, &driver) < 0) {
1195
1196 pa_log("Parse failure");
1197 goto fail;
1198 }
1199
1200 if (u->version >= 11) {
1201 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1202
1203 pa_log("Parse failure");
1204 goto fail;
1205 }
1206 }
1207
1208 if (u->version >= 13) {
1209 if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1210
1211 pa_log("Parse failure");
1212 goto fail;
1213 }
1214 }
1215
1216 if (u->version >= 19) {
1217 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1218
1219 pa_log("Parse failure");
1220 goto fail;
1221 }
1222 }
1223
1224 if (u->version >= 20) {
1225 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1226 pa_tagstruct_get_boolean(t, &b) < 0) {
1227
1228 pa_log("Parse failure");
1229 goto fail;
1230 }
1231 }
1232
1233 if (u->version >= 21) {
1234 pa_format_info *format = pa_format_info_new();
1235
1236 if (pa_tagstruct_get_format_info(t, format) < 0) {
1237 pa_format_info_free(format);
1238 pa_log("Parse failure");
1239 goto fail;
1240 }
1241 pa_format_info_free(format);
1242 }
1243
1244 if (!pa_tagstruct_eof(t)) {
1245 pa_log("Packet too long");
1246 goto fail;
1247 }
1248
1249 if (idx != u->device_index)
1250 return;
1251
1252 pa_assert(u->sink);
1253
1254 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1255 pa_cvolume_equal(&volume, &u->sink->real_volume))
1256 return;
1257
1258 pa_sink_volume_changed(u->sink, &volume);
1259
1260 if (u->version >= 11)
1261 pa_sink_mute_changed(u->sink, mute);
1262
1263 return;
1264
1265 fail:
1266 pa_module_unload_request(u->module, true);
1267 }
1268
1269 #else
1270
1271 /* Called from main context */
1272 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1273 struct userdata *u = userdata;
1274 uint32_t idx, owner_module, monitor_of_sink, flags;
1275 const char *name, *description, *monitor_of_sink_name, *driver;
1276 pa_sample_spec ss;
1277 pa_channel_map cm;
1278 pa_cvolume volume;
1279 bool mute;
1280 pa_usec_t latency, configured_latency;
1281
1282 pa_assert(pd);
1283 pa_assert(u);
1284
1285 if (command != PA_COMMAND_REPLY) {
1286 if (command == PA_COMMAND_ERROR)
1287 pa_log("Failed to get info.");
1288 else
1289 pa_log("Protocol error.");
1290 goto fail;
1291 }
1292
1293 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1294 pa_tagstruct_gets(t, &name) < 0 ||
1295 pa_tagstruct_gets(t, &description) < 0 ||
1296 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1297 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1298 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1299 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1300 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1301 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1302 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1303 pa_tagstruct_get_usec(t, &latency) < 0 ||
1304 pa_tagstruct_gets(t, &driver) < 0 ||
1305 pa_tagstruct_getu32(t, &flags) < 0) {
1306
1307 pa_log("Parse failure");
1308 goto fail;
1309 }
1310
1311 if (u->version >= 13) {
1312 if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1313 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1314
1315 pa_log("Parse failure");
1316 goto fail;
1317 }
1318 }
1319
1320 if (u->version >= 15) {
1321 pa_volume_t base_volume;
1322 uint32_t state, n_volume_steps, card;
1323
1324 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1325 pa_tagstruct_getu32(t, &state) < 0 ||
1326 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1327 pa_tagstruct_getu32(t, &card) < 0) {
1328
1329 pa_log("Parse failure");
1330 goto fail;
1331 }
1332 }
1333
1334 if (read_ports(u, t) < 0)
1335 goto fail;
1336
1337 if (u->version >= 22 && read_formats(u, t) < 0)
1338 goto fail;
1339
1340 if (!pa_tagstruct_eof(t)) {
1341 pa_log("Packet too long");
1342 goto fail;
1343 }
1344
1345 if (!u->source_name || !pa_streq(name, u->source_name))
1346 return;
1347
1348 pa_xfree(u->device_description);
1349 u->device_description = pa_xstrdup(description);
1350
1351 update_description(u);
1352
1353 return;
1354
1355 fail:
1356 pa_module_unload_request(u->module, true);
1357 }
1358
1359 #endif
1360
1361 /* Called from main context */
1362 static void request_info(struct userdata *u) {
1363 pa_tagstruct *t;
1364 uint32_t tag;
1365 pa_assert(u);
1366
1367 t = pa_tagstruct_new(NULL, 0);
1368 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1369 pa_tagstruct_putu32(t, tag = u->ctag++);
1370 pa_pstream_send_tagstruct(u->pstream, t);
1371 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1372
1373 #ifdef TUNNEL_SINK
1374 t = pa_tagstruct_new(NULL, 0);
1375 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1376 pa_tagstruct_putu32(t, tag = u->ctag++);
1377 pa_tagstruct_putu32(t, u->device_index);
1378 pa_pstream_send_tagstruct(u->pstream, t);
1379 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1380
1381 if (u->sink_name) {
1382 t = pa_tagstruct_new(NULL, 0);
1383 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1384 pa_tagstruct_putu32(t, tag = u->ctag++);
1385 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1386 pa_tagstruct_puts(t, u->sink_name);
1387 pa_pstream_send_tagstruct(u->pstream, t);
1388 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1389 }
1390 #else
1391 if (u->source_name) {
1392 t = pa_tagstruct_new(NULL, 0);
1393 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1394 pa_tagstruct_putu32(t, tag = u->ctag++);
1395 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1396 pa_tagstruct_puts(t, u->source_name);
1397 pa_pstream_send_tagstruct(u->pstream, t);
1398 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1399 }
1400 #endif
1401 }
1402
1403 /* Called from main context */
1404 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1405 struct userdata *u = userdata;
1406 pa_subscription_event_type_t e;
1407 uint32_t idx;
1408
1409 pa_assert(pd);
1410 pa_assert(t);
1411 pa_assert(u);
1412 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1413
1414 if (pa_tagstruct_getu32(t, &e) < 0 ||
1415 pa_tagstruct_getu32(t, &idx) < 0) {
1416 pa_log("Invalid protocol reply");
1417 pa_module_unload_request(u->module, true);
1418 return;
1419 }
1420
1421 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1422 #ifdef TUNNEL_SINK
1423 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1424 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1425 #else
1426 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1427 #endif
1428 )
1429 return;
1430
1431 request_info(u);
1432 }
1433
1434 /* Called from main context */
1435 static void start_subscribe(struct userdata *u) {
1436 pa_tagstruct *t;
1437 pa_assert(u);
1438
1439 t = pa_tagstruct_new(NULL, 0);
1440 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1441 pa_tagstruct_putu32(t, u->ctag++);
1442 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1443 #ifdef TUNNEL_SINK
1444 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1445 #else
1446 PA_SUBSCRIPTION_MASK_SOURCE
1447 #endif
1448 );
1449
1450 pa_pstream_send_tagstruct(u->pstream, t);
1451 }
1452
1453 /* Called from main context */
1454 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1455 struct userdata *u = userdata;
1456 #ifdef TUNNEL_SINK
1457 uint32_t bytes;
1458 #endif
1459
1460 pa_assert(pd);
1461 pa_assert(u);
1462 pa_assert(u->pdispatch == pd);
1463
1464 if (command != PA_COMMAND_REPLY) {
1465 if (command == PA_COMMAND_ERROR)
1466 pa_log("Failed to create stream.");
1467 else
1468 pa_log("Protocol error.");
1469 goto fail;
1470 }
1471
1472 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1473 pa_tagstruct_getu32(t, &u->device_index) < 0
1474 #ifdef TUNNEL_SINK
1475 || pa_tagstruct_getu32(t, &bytes) < 0
1476 #endif
1477 )
1478 goto parse_error;
1479
1480 if (u->version >= 9) {
1481 #ifdef TUNNEL_SINK
1482 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1483 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1484 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1485 pa_tagstruct_getu32(t, &u->minreq) < 0)
1486 goto parse_error;
1487 #else
1488 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1489 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1490 goto parse_error;
1491 #endif
1492 }
1493
1494 if (u->version >= 12) {
1495 pa_sample_spec ss;
1496 pa_channel_map cm;
1497 uint32_t device_index;
1498 const char *dn;
1499 bool suspended;
1500
1501 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1502 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1503 pa_tagstruct_getu32(t, &device_index) < 0 ||
1504 pa_tagstruct_gets(t, &dn) < 0 ||
1505 pa_tagstruct_get_boolean(t, &suspended) < 0)
1506 goto parse_error;
1507
1508 #ifdef TUNNEL_SINK
1509 pa_xfree(u->sink_name);
1510 u->sink_name = pa_xstrdup(dn);
1511 #else
1512 pa_xfree(u->source_name);
1513 u->source_name = pa_xstrdup(dn);
1514 #endif
1515 }
1516
1517 if (u->version >= 13) {
1518 pa_usec_t usec;
1519
1520 if (pa_tagstruct_get_usec(t, &usec) < 0)
1521 goto parse_error;
1522
1523 /* #ifdef TUNNEL_SINK */
1524 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1525 /* #else */
1526 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1527 /* #endif */
1528 }
1529
1530 if (u->version >= 21) {
1531 pa_format_info *format = pa_format_info_new();
1532
1533 if (pa_tagstruct_get_format_info(t, format) < 0) {
1534 pa_format_info_free(format);
1535 goto parse_error;
1536 }
1537
1538 pa_format_info_free(format);
1539 }
1540
1541 if (!pa_tagstruct_eof(t))
1542 goto parse_error;
1543
1544 start_subscribe(u);
1545 request_info(u);
1546
1547 pa_assert(!u->time_event);
1548 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1549
1550 request_latency(u);
1551
1552 pa_log_debug("Stream created.");
1553
1554 #ifdef TUNNEL_SINK
1555 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1556 #endif
1557
1558 return;
1559
1560 parse_error:
1561 pa_log("Invalid reply. (Create stream)");
1562
1563 fail:
1564 pa_module_unload_request(u->module, true);
1565
1566 }
1567
1568 /* Called from main context */
1569 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1570 struct userdata *u = userdata;
1571 pa_tagstruct *reply;
1572 char name[256], un[128], hn[128];
1573 pa_cvolume volume;
1574
1575 pa_assert(pd);
1576 pa_assert(u);
1577 pa_assert(u->pdispatch == pd);
1578
1579 if (command != PA_COMMAND_REPLY ||
1580 pa_tagstruct_getu32(t, &u->version) < 0 ||
1581 !pa_tagstruct_eof(t)) {
1582
1583 if (command == PA_COMMAND_ERROR)
1584 pa_log("Failed to authenticate");
1585 else
1586 pa_log("Protocol error.");
1587
1588 goto fail;
1589 }
1590
1591 /* Minimum supported protocol version */
1592 if (u->version < 8) {
1593 pa_log("Incompatible protocol version");
1594 goto fail;
1595 }
1596
1597 /* Starting with protocol version 13 the MSB of the version tag
1598 reflects if shm is enabled for this connection or not. We don't
1599 support SHM here at all, so we just ignore this. */
1600
1601 if (u->version >= 13)
1602 u->version &= 0x7FFFFFFFU;
1603
1604 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1605
1606 #ifdef TUNNEL_SINK
1607 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1608 pa_sink_update_proplist(u->sink, 0, NULL);
1609
1610 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1611 u->sink_name,
1612 pa_get_user_name(un, sizeof(un)),
1613 pa_get_host_name(hn, sizeof(hn)));
1614 #else
1615 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1616 pa_source_update_proplist(u->source, 0, NULL);
1617
1618 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1619 u->source_name,
1620 pa_get_user_name(un, sizeof(un)),
1621 pa_get_host_name(hn, sizeof(hn)));
1622 #endif
1623
1624 reply = pa_tagstruct_new(NULL, 0);
1625 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1626 pa_tagstruct_putu32(reply, u->ctag++);
1627
1628 if (u->version >= 13) {
1629 pa_proplist *pl;
1630 pl = pa_proplist_new();
1631 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1632 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1633 pa_init_proplist(pl);
1634 pa_tagstruct_put_proplist(reply, pl);
1635 pa_proplist_free(pl);
1636 } else
1637 pa_tagstruct_puts(reply, "PulseAudio");
1638
1639 pa_pstream_send_tagstruct(u->pstream, reply);
1640 /* We ignore the server's reply here */
1641
1642 reply = pa_tagstruct_new(NULL, 0);
1643
1644 if (u->version < 13)
1645 /* Only for older PA versions we need to fill in the maxlength */
1646 u->maxlength = 4*1024*1024;
1647
1648 #ifdef TUNNEL_SINK
1649 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1650 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1651 u->prebuf = u->tlength;
1652 #else
1653 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1654 #endif
1655
1656 #ifdef TUNNEL_SINK
1657 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1658 pa_tagstruct_putu32(reply, tag = u->ctag++);
1659
1660 if (u->version < 13)
1661 pa_tagstruct_puts(reply, name);
1662
1663 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1664 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1665 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1666 pa_tagstruct_puts(reply, u->sink_name);
1667 pa_tagstruct_putu32(reply, u->maxlength);
1668 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1669 pa_tagstruct_putu32(reply, u->tlength);
1670 pa_tagstruct_putu32(reply, u->prebuf);
1671 pa_tagstruct_putu32(reply, u->minreq);
1672 pa_tagstruct_putu32(reply, 0);
1673 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1674 pa_tagstruct_put_cvolume(reply, &volume);
1675 #else
1676 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1677 pa_tagstruct_putu32(reply, tag = u->ctag++);
1678
1679 if (u->version < 13)
1680 pa_tagstruct_puts(reply, name);
1681
1682 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1683 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1684 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1685 pa_tagstruct_puts(reply, u->source_name);
1686 pa_tagstruct_putu32(reply, u->maxlength);
1687 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1688 pa_tagstruct_putu32(reply, u->fragsize);
1689 #endif
1690
1691 if (u->version >= 12) {
1692 pa_tagstruct_put_boolean(reply, false); /* no_remap */
1693 pa_tagstruct_put_boolean(reply, false); /* no_remix */
1694 pa_tagstruct_put_boolean(reply, false); /* fix_format */
1695 pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1696 pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1697 pa_tagstruct_put_boolean(reply, true); /* no_move */
1698 pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1699 }
1700
1701 if (u->version >= 13) {
1702 pa_proplist *pl;
1703
1704 pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1705 pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1706
1707 pl = pa_proplist_new();
1708 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1709 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1710 pa_tagstruct_put_proplist(reply, pl);
1711 pa_proplist_free(pl);
1712
1713 #ifndef TUNNEL_SINK
1714 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1715 #endif
1716 }
1717
1718 if (u->version >= 14) {
1719 #ifdef TUNNEL_SINK
1720 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1721 #endif
1722 pa_tagstruct_put_boolean(reply, true); /* early rquests */
1723 }
1724
1725 if (u->version >= 15) {
1726 #ifdef TUNNEL_SINK
1727 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1728 #endif
1729 pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1730 pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1731 }
1732
1733 #ifdef TUNNEL_SINK
1734 if (u->version >= 17)
1735 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1736
1737 if (u->version >= 18)
1738 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1739 #endif
1740
1741 #ifdef TUNNEL_SINK
1742 if (u->version >= 21) {
1743 /* We're not using the extended API, so n_formats = 0 and that's that */
1744 pa_tagstruct_putu8(reply, 0);
1745 }
1746 #else
1747 if (u->version >= 22) {
1748 /* We're not using the extended API, so n_formats = 0 and that's that */
1749 pa_tagstruct_putu8(reply, 0);
1750 pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1751 pa_tagstruct_put_cvolume(reply, &volume);
1752 pa_tagstruct_put_boolean(reply, false); /* muted */
1753 pa_tagstruct_put_boolean(reply, false); /* volume_set */
1754 pa_tagstruct_put_boolean(reply, false); /* muted_set */
1755 pa_tagstruct_put_boolean(reply, false); /* relative volume */
1756 pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1757 }
1758 #endif
1759
1760 pa_pstream_send_tagstruct(u->pstream, reply);
1761 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1762
1763 pa_log_debug("Connection authenticated, creating stream ...");
1764
1765 return;
1766
1767 fail:
1768 pa_module_unload_request(u->module, true);
1769 }
1770
1771 /* Called from main context */
1772 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1773 struct userdata *u = userdata;
1774
1775 pa_assert(p);
1776 pa_assert(u);
1777
1778 pa_log_warn("Stream died.");
1779 pa_module_unload_request(u->module, true);
1780 }
1781
1782 /* Called from main context */
1783 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1784 struct userdata *u = userdata;
1785
1786 pa_assert(p);
1787 pa_assert(packet);
1788 pa_assert(u);
1789
1790 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1791 pa_log("Invalid packet");
1792 pa_module_unload_request(u->module, true);
1793 return;
1794 }
1795 }
1796
1797 #ifndef TUNNEL_SINK
1798 /* Called from main context */
1799 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1800 struct userdata *u = userdata;
1801
1802 pa_assert(p);
1803 pa_assert(chunk);
1804 pa_assert(u);
1805
1806 if (channel != u->channel) {
1807 pa_log("Received memory block on bad channel.");
1808 pa_module_unload_request(u->module, true);
1809 return;
1810 }
1811
1812 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1813
1814 u->counter_delta += (int64_t) chunk->length;
1815 }
1816 #endif
1817
1818 /* Called from main context */
1819 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1820 struct userdata *u = userdata;
1821 pa_tagstruct *t;
1822 uint32_t tag;
1823
1824 pa_assert(sc);
1825 pa_assert(u);
1826 pa_assert(u->client == sc);
1827
1828 pa_socket_client_unref(u->client);
1829 u->client = NULL;
1830
1831 if (!io) {
1832 pa_log("Connection failed: %s", pa_cstrerror(errno));
1833 pa_module_unload_request(u->module, true);
1834 return;
1835 }
1836
1837 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1838 u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
1839
1840 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1841 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1842 #ifndef TUNNEL_SINK
1843 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1844 #endif
1845
1846 t = pa_tagstruct_new(NULL, 0);
1847 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1848 pa_tagstruct_putu32(t, tag = u->ctag++);
1849 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1850
1851 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1852
1853 #ifdef HAVE_CREDS
1854 {
1855 pa_creds ucred;
1856
1857 if (pa_iochannel_creds_supported(io))
1858 pa_iochannel_creds_enable(io);
1859
1860 ucred.uid = getuid();
1861 ucred.gid = getgid();
1862
1863 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1864 }
1865 #else
1866 pa_pstream_send_tagstruct(u->pstream, t);
1867 #endif
1868
1869 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1870
1871 pa_log_debug("Connection established, authenticating ...");
1872 }
1873
1874 #ifdef TUNNEL_SINK
1875
1876 /* Called from main context */
1877 static void sink_set_volume(pa_sink *sink) {
1878 struct userdata *u;
1879 pa_tagstruct *t;
1880
1881 pa_assert(sink);
1882 u = sink->userdata;
1883 pa_assert(u);
1884
1885 t = pa_tagstruct_new(NULL, 0);
1886 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1887 pa_tagstruct_putu32(t, u->ctag++);
1888 pa_tagstruct_putu32(t, u->device_index);
1889 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1890 pa_pstream_send_tagstruct(u->pstream, t);
1891 }
1892
1893 /* Called from main context */
1894 static void sink_set_mute(pa_sink *sink) {
1895 struct userdata *u;
1896 pa_tagstruct *t;
1897
1898 pa_assert(sink);
1899 u = sink->userdata;
1900 pa_assert(u);
1901
1902 if (u->version < 11)
1903 return;
1904
1905 t = pa_tagstruct_new(NULL, 0);
1906 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1907 pa_tagstruct_putu32(t, u->ctag++);
1908 pa_tagstruct_putu32(t, u->device_index);
1909 pa_tagstruct_put_boolean(t, !!sink->muted);
1910 pa_pstream_send_tagstruct(u->pstream, t);
1911 }
1912
1913 #endif
1914
1915 int pa__init(pa_module*m) {
1916 pa_modargs *ma = NULL;
1917 struct userdata *u = NULL;
1918 char *server = NULL;
1919 pa_strlist *server_list = NULL;
1920 pa_sample_spec ss;
1921 pa_channel_map map;
1922 char *dn = NULL;
1923 #ifdef TUNNEL_SINK
1924 pa_sink_new_data data;
1925 #else
1926 pa_source_new_data data;
1927 #endif
1928 bool automatic;
1929 #ifdef HAVE_X11
1930 xcb_connection_t *xcb = NULL;
1931 #endif
1932 const char *cookie_path;
1933
1934 pa_assert(m);
1935
1936 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1937 pa_log("Failed to parse module arguments");
1938 goto fail;
1939 }
1940
1941 m->userdata = u = pa_xnew0(struct userdata, 1);
1942 u->core = m->core;
1943 u->module = m;
1944 u->client = NULL;
1945 u->pdispatch = NULL;
1946 u->pstream = NULL;
1947 u->server_name = NULL;
1948 #ifdef TUNNEL_SINK
1949 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1950 u->sink = NULL;
1951 u->requested_bytes = 0;
1952 #else
1953 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1954 u->source = NULL;
1955 #endif
1956 u->smoother = pa_smoother_new(
1957 PA_USEC_PER_SEC,
1958 PA_USEC_PER_SEC*2,
1959 true,
1960 true,
1961 10,
1962 pa_rtclock_now(),
1963 false);
1964 u->ctag = 1;
1965 u->device_index = u->channel = PA_INVALID_INDEX;
1966 u->time_event = NULL;
1967 u->ignore_latency_before = 0;
1968 u->transport_usec = u->thread_transport_usec = 0;
1969 u->remote_suspended = u->remote_corked = false;
1970 u->counter = u->counter_delta = 0;
1971
1972 u->rtpoll = pa_rtpoll_new();
1973 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1974
1975 if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) {
1976 pa_log("Failed to parse argument \"auto\".");
1977 goto fail;
1978 }
1979
1980 cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
1981 server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
1982
1983 if (automatic) {
1984 #ifdef HAVE_X11
1985 /* Need an X11 connection to get root properties */
1986 if (getenv("DISPLAY") != NULL) {
1987 if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL)))
1988 pa_log("xcb_connect() failed");
1989 else {
1990 if (xcb_connection_has_error(xcb)) {
1991 pa_log("xcb_connection_has_error() returned true");
1992 xcb_disconnect(xcb);
1993 xcb = NULL;
1994 }
1995 }
1996 }
1997 #endif
1998
1999 /* Figure out the cookie the same way a normal client would */
2000 if (cookie_path)
2001 cookie_path = getenv(ENV_COOKIE_FILE);
2002
2003 #ifdef HAVE_X11
2004 if (!cookie_path && xcb) {
2005 char t[1024];
2006 if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) {
2007 uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
2008
2009 if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie))
2010 pa_log("Failed to parse cookie data");
2011 else {
2012 if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie))))
2013 goto fail;
2014 }
2015 }
2016 }
2017 #endif
2018
2019 /* Same thing for the server name */
2020 if (!server)
2021 server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER));
2022
2023 #ifdef HAVE_X11
2024 if (!server && xcb) {
2025 char t[1024];
2026 if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t)))
2027 server = pa_xstrdup(t);
2028 }
2029 #endif
2030
2031 /* Also determine the default sink/source on the other server */
2032 #ifdef TUNNEL_SINK
2033 if (!u->sink_name)
2034 u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK));
2035
2036 #ifdef HAVE_X11
2037 if (!u->sink_name && xcb) {
2038 char t[1024];
2039 if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t)))
2040 u->sink_name = pa_xstrdup(t);
2041 }
2042 #endif
2043 #else
2044 if (!u->source_name)
2045 u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE));
2046
2047 #ifdef HAVE_X11
2048 if (!u->source_name && xcb) {
2049 char t[1024];
2050 if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t)))
2051 u->source_name = pa_xstrdup(t);
2052 }
2053 #endif
2054 #endif
2055 }
2056
2057 if (!cookie_path && !u->auth_cookie)
2058 cookie_path = PA_NATIVE_COOKIE_FILE;
2059
2060 if (cookie_path) {
2061 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH)))
2062 goto fail;
2063 }
2064
2065 if (server) {
2066 if (!(server_list = pa_strlist_parse(server))) {
2067 pa_log("Invalid server specified.");
2068 goto fail;
2069 }
2070 } else {
2071 char *ufn;
2072
2073 if (!automatic) {
2074 pa_log("No server specified.");
2075 goto fail;
2076 }
2077
2078 pa_log("No server address found. Attempting default local sockets.");
2079
2080 /* The system wide instance via PF_LOCAL */
2081 server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET);
2082
2083 /* The user instance via PF_LOCAL */
2084 if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) {
2085 server_list = pa_strlist_prepend(server_list, ufn);
2086 pa_xfree(ufn);
2087 }
2088 }
2089
2090 ss = m->core->default_sample_spec;
2091 map = m->core->default_channel_map;
2092 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
2093 pa_log("Invalid sample format specification");
2094 goto fail;
2095 }
2096
2097 for (;;) {
2098 server_list = pa_strlist_pop(server_list, &u->server_name);
2099
2100 if (!u->server_name) {
2101 pa_log("Failed to connect to server '%s'", server);
2102 goto fail;
2103 }
2104
2105 pa_log_debug("Trying to connect to %s...", u->server_name);
2106
2107 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
2108 pa_xfree(u->server_name);
2109 u->server_name = NULL;
2110 continue;
2111 }
2112
2113 break;
2114 }
2115
2116 pa_socket_client_set_callback(u->client, on_connection, u);
2117
2118 #ifdef TUNNEL_SINK
2119
2120 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
2121 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2122
2123 pa_sink_new_data_init(&data);
2124 data.driver = __FILE__;
2125 data.module = m;
2126 data.namereg_fail = false;
2127 pa_sink_new_data_set_name(&data, dn);
2128 pa_sink_new_data_set_sample_spec(&data, &ss);
2129 pa_sink_new_data_set_channel_map(&data, &map);
2130 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2131 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2132 if (u->sink_name)
2133 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2134
2135 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2136 pa_log("Invalid properties");
2137 pa_sink_new_data_done(&data);
2138 goto fail;
2139 }
2140
2141 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2142 pa_sink_new_data_done(&data);
2143
2144 if (!u->sink) {
2145 pa_log("Failed to create sink.");
2146 goto fail;
2147 }
2148
2149 u->sink->parent.process_msg = sink_process_msg;
2150 u->sink->userdata = u;
2151 u->sink->set_state = sink_set_state;
2152 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2153 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2154
2155 u->sink->refresh_volume = u->sink->refresh_muted = false;
2156
2157 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2158
2159 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2160 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2161
2162 #else
2163
2164 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2165 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2166
2167 pa_source_new_data_init(&data);
2168 data.driver = __FILE__;
2169 data.module = m;
2170 data.namereg_fail = false;
2171 pa_source_new_data_set_name(&data, dn);
2172 pa_source_new_data_set_sample_spec(&data, &ss);
2173 pa_source_new_data_set_channel_map(&data, &map);
2174 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2175 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2176 if (u->source_name)
2177 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2178
2179 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2180 pa_log("Invalid properties");
2181 pa_source_new_data_done(&data);
2182 goto fail;
2183 }
2184
2185 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2186 pa_source_new_data_done(&data);
2187
2188 if (!u->source) {
2189 pa_log("Failed to create source.");
2190 goto fail;
2191 }
2192
2193 u->source->parent.process_msg = source_process_msg;
2194 u->source->set_state = source_set_state;
2195 u->source->userdata = u;
2196
2197 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2198
2199 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2200 pa_source_set_rtpoll(u->source, u->rtpoll);
2201
2202 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2203 #endif
2204
2205 pa_xfree(dn);
2206
2207 u->time_event = NULL;
2208
2209 u->maxlength = (uint32_t) -1;
2210 #ifdef TUNNEL_SINK
2211 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2212 #else
2213 u->fragsize = (uint32_t) -1;
2214 #endif
2215
2216 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2217 pa_log("Failed to create thread.");
2218 goto fail;
2219 }
2220
2221 #ifdef TUNNEL_SINK
2222 pa_sink_put(u->sink);
2223 #else
2224 pa_source_put(u->source);
2225 #endif
2226
2227 if (server)
2228 pa_xfree(server);
2229
2230 if (server_list)
2231 pa_strlist_free(server_list);
2232
2233 #ifdef HAVE_X11
2234 if (xcb)
2235 xcb_disconnect(xcb);
2236 #endif
2237
2238 pa_modargs_free(ma);
2239
2240 return 0;
2241
2242 fail:
2243 pa__done(m);
2244
2245 if (server)
2246 pa_xfree(server);
2247
2248 if (server_list)
2249 pa_strlist_free(server_list);
2250
2251 #ifdef HAVE_X11
2252 if (xcb)
2253 xcb_disconnect(xcb);
2254 #endif
2255
2256 if (ma)
2257 pa_modargs_free(ma);
2258
2259 pa_xfree(dn);
2260
2261 return -1;
2262 }
2263
2264 void pa__done(pa_module*m) {
2265 struct userdata* u;
2266
2267 pa_assert(m);
2268
2269 if (!(u = m->userdata))
2270 return;
2271
2272 #ifdef TUNNEL_SINK
2273 if (u->sink)
2274 pa_sink_unlink(u->sink);
2275 #else
2276 if (u->source)
2277 pa_source_unlink(u->source);
2278 #endif
2279
2280 if (u->thread) {
2281 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2282 pa_thread_free(u->thread);
2283 }
2284
2285 pa_thread_mq_done(&u->thread_mq);
2286
2287 #ifdef TUNNEL_SINK
2288 if (u->sink)
2289 pa_sink_unref(u->sink);
2290 #else
2291 if (u->source)
2292 pa_source_unref(u->source);
2293 #endif
2294
2295 if (u->rtpoll)
2296 pa_rtpoll_free(u->rtpoll);
2297
2298 if (u->pstream) {
2299 pa_pstream_unlink(u->pstream);
2300 pa_pstream_unref(u->pstream);
2301 }
2302
2303 if (u->pdispatch)
2304 pa_pdispatch_unref(u->pdispatch);
2305
2306 if (u->client)
2307 pa_socket_client_unref(u->client);
2308
2309 if (u->auth_cookie)
2310 pa_auth_cookie_unref(u->auth_cookie);
2311
2312 if (u->smoother)
2313 pa_smoother_free(u->smoother);
2314
2315 if (u->time_event)
2316 u->core->mainloop->time_free(u->time_event);
2317
2318 #ifndef TUNNEL_SINK
2319 if (u->mcalign)
2320 pa_mcalign_free(u->mcalign);
2321 #endif
2322
2323 #ifdef TUNNEL_SINK
2324 pa_xfree(u->sink_name);
2325 #else
2326 pa_xfree(u->source_name);
2327 #endif
2328 pa_xfree(u->server_name);
2329
2330 pa_xfree(u->device_description);
2331 pa_xfree(u->server_fqdn);
2332 pa_xfree(u->user_name);
2333
2334 pa_xfree(u);
2335 }