]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
0916717899c6c9e611bcfb32d9ea7706ff550df5
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
179
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
183
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
187
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 pa_mcalign *mcalign;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520
521 case SINK_MESSAGE_REMOTE_SUSPEND:
522
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
525
526
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
529
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
536
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
541
542 return 0;
543 }
544
545 case SINK_MESSAGE_POST:
546
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554 u->counter_delta += (int64_t) chunk->length;
555
556 return 0;
557 }
558
559 return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
567
568 switch ((pa_sink_state_t) state) {
569
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
574
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
580
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
584 ;
585 }
586
587 return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
595
596 switch (code) {
597
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
600
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604 return r;
605 }
606
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
609
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
615 }
616
617 case SOURCE_MESSAGE_POST: {
618 pa_memchunk c;
619
620 pa_mcalign_push(u->mcalign, chunk);
621
622 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
623
624 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
625 pa_source_post(u->source, &c);
626
627 pa_memblock_unref(c.memblock);
628
629 u->counter += (int64_t) c.length;
630 }
631
632 return 0;
633 }
634
635 case SOURCE_MESSAGE_REMOTE_SUSPEND:
636
637 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
638 return 0;
639
640 case SOURCE_MESSAGE_UPDATE_LATENCY: {
641 pa_usec_t y;
642
643 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
644 y += (pa_usec_t) offset;
645
646 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
647
648 /* We can access this freely here, since the main thread is waiting for us */
649 u->thread_transport_usec = u->transport_usec;
650
651 return 0;
652 }
653 }
654
655 return pa_source_process_msg(o, code, data, offset, chunk);
656 }
657
658 /* Called from main context */
659 static int source_set_state(pa_source *s, pa_source_state_t state) {
660 struct userdata *u;
661 pa_source_assert_ref(s);
662 u = s->userdata;
663
664 switch ((pa_source_state_t) state) {
665
666 case PA_SOURCE_SUSPENDED:
667 pa_assert(PA_SOURCE_IS_OPENED(s->state));
668 stream_cork(u, TRUE);
669 break;
670
671 case PA_SOURCE_IDLE:
672 case PA_SOURCE_RUNNING:
673 if (s->state == PA_SOURCE_SUSPENDED)
674 stream_cork(u, FALSE);
675 break;
676
677 case PA_SOURCE_UNLINKED:
678 case PA_SOURCE_INIT:
679 case PA_SINK_INVALID_STATE:
680 ;
681 }
682
683 return 0;
684 }
685
686 #endif
687
688 static void thread_func(void *userdata) {
689 struct userdata *u = userdata;
690
691 pa_assert(u);
692
693 pa_log_debug("Thread starting up");
694
695 pa_thread_mq_install(&u->thread_mq);
696
697 for (;;) {
698 int ret;
699
700 #ifdef TUNNEL_SINK
701 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
702 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
703 pa_sink_process_rewind(u->sink, 0);
704 #endif
705
706 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
707 goto fail;
708
709 if (ret == 0)
710 goto finish;
711 }
712
713 fail:
714 /* If this was no regular exit from the loop we have to continue
715 * processing messages until we received PA_MESSAGE_SHUTDOWN */
716 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
717 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
718
719 finish:
720 pa_log_debug("Thread shutting down");
721 }
722
723 #ifdef TUNNEL_SINK
724 /* Called from main context */
725 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
726 struct userdata *u = userdata;
727 uint32_t bytes, channel;
728
729 pa_assert(pd);
730 pa_assert(command == PA_COMMAND_REQUEST);
731 pa_assert(t);
732 pa_assert(u);
733 pa_assert(u->pdispatch == pd);
734
735 if (pa_tagstruct_getu32(t, &channel) < 0 ||
736 pa_tagstruct_getu32(t, &bytes) < 0) {
737 pa_log("Invalid protocol reply");
738 goto fail;
739 }
740
741 if (channel != u->channel) {
742 pa_log("Received data for invalid channel");
743 goto fail;
744 }
745
746 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
747 return;
748
749 fail:
750 pa_module_unload_request(u->module, TRUE);
751 }
752
753 #endif
754
755 /* Called from main context */
756 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 struct userdata *u = userdata;
758 pa_usec_t sink_usec, source_usec;
759 pa_bool_t playing;
760 int64_t write_index, read_index;
761 struct timeval local, remote, now;
762 pa_sample_spec *ss;
763 int64_t delay;
764
765 pa_assert(pd);
766 pa_assert(u);
767
768 if (command != PA_COMMAND_REPLY) {
769 if (command == PA_COMMAND_ERROR)
770 pa_log("Failed to get latency.");
771 else
772 pa_log("Protocol error.");
773 goto fail;
774 }
775
776 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
777 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
778 pa_tagstruct_get_boolean(t, &playing) < 0 ||
779 pa_tagstruct_get_timeval(t, &local) < 0 ||
780 pa_tagstruct_get_timeval(t, &remote) < 0 ||
781 pa_tagstruct_gets64(t, &write_index) < 0 ||
782 pa_tagstruct_gets64(t, &read_index) < 0) {
783 pa_log("Invalid reply.");
784 goto fail;
785 }
786
787 #ifdef TUNNEL_SINK
788 if (u->version >= 13) {
789 uint64_t underrun_for = 0, playing_for = 0;
790
791 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
792 pa_tagstruct_getu64(t, &playing_for) < 0) {
793 pa_log("Invalid reply.");
794 goto fail;
795 }
796 }
797 #endif
798
799 if (!pa_tagstruct_eof(t)) {
800 pa_log("Invalid reply.");
801 goto fail;
802 }
803
804 if (tag < u->ignore_latency_before) {
805 return;
806 }
807
808 pa_gettimeofday(&now);
809
810 /* Calculate transport usec */
811 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
812 /* local and remote seem to have synchronized clocks */
813 #ifdef TUNNEL_SINK
814 u->transport_usec = pa_timeval_diff(&remote, &local);
815 #else
816 u->transport_usec = pa_timeval_diff(&now, &remote);
817 #endif
818 } else
819 u->transport_usec = pa_timeval_diff(&now, &local)/2;
820
821 /* First, take the device's delay */
822 #ifdef TUNNEL_SINK
823 delay = (int64_t) sink_usec;
824 ss = &u->sink->sample_spec;
825 #else
826 delay = (int64_t) source_usec;
827 ss = &u->source->sample_spec;
828 #endif
829
830 /* Add the length of our server-side buffer */
831 if (write_index >= read_index)
832 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
833 else
834 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
835
836 /* Our measurements are already out of date, hence correct by the *
837 * transport latency */
838 #ifdef TUNNEL_SINK
839 delay -= (int64_t) u->transport_usec;
840 #else
841 delay += (int64_t) u->transport_usec;
842 #endif
843
844 /* Now correct by what we have have read/written since we requested the update */
845 #ifdef TUNNEL_SINK
846 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
847 #else
848 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #endif
850
851 #ifdef TUNNEL_SINK
852 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
853 #else
854 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #endif
856
857 return;
858
859 fail:
860
861 pa_module_unload_request(u->module, TRUE);
862 }
863
864 /* Called from main context */
865 static void request_latency(struct userdata *u) {
866 pa_tagstruct *t;
867 struct timeval now;
868 uint32_t tag;
869 pa_assert(u);
870
871 t = pa_tagstruct_new(NULL, 0);
872 #ifdef TUNNEL_SINK
873 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
874 #else
875 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
876 #endif
877 pa_tagstruct_putu32(t, tag = u->ctag++);
878 pa_tagstruct_putu32(t, u->channel);
879
880 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
881
882 pa_pstream_send_tagstruct(u->pstream, t);
883 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
884
885 u->ignore_latency_before = tag;
886 u->counter_delta = 0;
887 }
888
889 /* Called from main context */
890 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
891 struct userdata *u = userdata;
892
893 pa_assert(m);
894 pa_assert(e);
895 pa_assert(u);
896
897 request_latency(u);
898
899 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
900 }
901
902 /* Called from main context */
903 static void update_description(struct userdata *u) {
904 char *d;
905 char un[128], hn[128];
906 pa_tagstruct *t;
907
908 pa_assert(u);
909
910 if (!u->server_fqdn || !u->user_name || !u->device_description)
911 return;
912
913 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
914
915 #ifdef TUNNEL_SINK
916 pa_sink_set_description(u->sink, d);
917 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
918 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
919 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
920 #else
921 pa_source_set_description(u->source, d);
922 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
923 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
924 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
925 #endif
926
927 pa_xfree(d);
928
929 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
930 pa_get_user_name(un, sizeof(un)),
931 pa_get_host_name(hn, sizeof(hn)));
932
933 t = pa_tagstruct_new(NULL, 0);
934 #ifdef TUNNEL_SINK
935 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
936 #else
937 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
938 #endif
939 pa_tagstruct_putu32(t, u->ctag++);
940 pa_tagstruct_putu32(t, u->channel);
941 pa_tagstruct_puts(t, d);
942 pa_pstream_send_tagstruct(u->pstream, t);
943
944 pa_xfree(d);
945 }
946
947 /* Called from main context */
948 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
949 struct userdata *u = userdata;
950 pa_sample_spec ss;
951 pa_channel_map cm;
952 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
953 uint32_t cookie;
954
955 pa_assert(pd);
956 pa_assert(u);
957
958 if (command != PA_COMMAND_REPLY) {
959 if (command == PA_COMMAND_ERROR)
960 pa_log("Failed to get info.");
961 else
962 pa_log("Protocol error.");
963 goto fail;
964 }
965
966 if (pa_tagstruct_gets(t, &server_name) < 0 ||
967 pa_tagstruct_gets(t, &server_version) < 0 ||
968 pa_tagstruct_gets(t, &user_name) < 0 ||
969 pa_tagstruct_gets(t, &host_name) < 0 ||
970 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
971 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
972 pa_tagstruct_gets(t, &default_source_name) < 0 ||
973 pa_tagstruct_getu32(t, &cookie) < 0 ||
974 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
975
976 pa_log("Parse failure");
977 goto fail;
978 }
979
980 if (!pa_tagstruct_eof(t)) {
981 pa_log("Packet too long");
982 goto fail;
983 }
984
985 pa_xfree(u->server_fqdn);
986 u->server_fqdn = pa_xstrdup(host_name);
987
988 pa_xfree(u->user_name);
989 u->user_name = pa_xstrdup(user_name);
990
991 update_description(u);
992
993 return;
994
995 fail:
996 pa_module_unload_request(u->module, TRUE);
997 }
998
999 static int read_ports(struct userdata *u, pa_tagstruct *t)
1000 {
1001 if (u->version >= 16) {
1002 uint32_t n_ports;
1003 const char *s;
1004
1005 if (pa_tagstruct_getu32(t, &n_ports)) {
1006 pa_log("Parse failure");
1007 return -PA_ERR_PROTOCOL;
1008 }
1009
1010 for (uint32_t j = 0; j < n_ports; j++) {
1011 uint32_t priority;
1012
1013 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1014 pa_tagstruct_gets(t, &s) < 0 || /* description */
1015 pa_tagstruct_getu32(t, &priority) < 0) {
1016
1017 pa_log("Parse failure");
1018 return -PA_ERR_PROTOCOL;
1019 }
1020 if (u->version >= 24 && pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1021 pa_log("Parse failure");
1022 return -PA_ERR_PROTOCOL;
1023 }
1024 }
1025
1026 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1027 pa_log("Parse failure");
1028 return -PA_ERR_PROTOCOL;
1029 }
1030 }
1031 return 0;
1032 }
1033
1034
1035 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1036 uint8_t n_formats;
1037 pa_format_info *format;
1038
1039 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1040 pa_log("Parse failure");
1041 return -PA_ERR_PROTOCOL;
1042 }
1043
1044 for (uint8_t j = 0; j < n_formats; j++) {
1045 format = pa_format_info_new();
1046 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1047 pa_format_info_free(format);
1048 pa_log("Parse failure");
1049 return -PA_ERR_PROTOCOL;
1050 }
1051 pa_format_info_free(format);
1052 }
1053 return 0;
1054 }
1055
1056 #ifdef TUNNEL_SINK
1057
1058 /* Called from main context */
1059 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1060 struct userdata *u = userdata;
1061 uint32_t idx, owner_module, monitor_source, flags;
1062 const char *name, *description, *monitor_source_name, *driver;
1063 pa_sample_spec ss;
1064 pa_channel_map cm;
1065 pa_cvolume volume;
1066 pa_bool_t mute;
1067 pa_usec_t latency;
1068 pa_proplist *pl;
1069
1070 pa_assert(pd);
1071 pa_assert(u);
1072
1073 pl = pa_proplist_new();
1074
1075 if (command != PA_COMMAND_REPLY) {
1076 if (command == PA_COMMAND_ERROR)
1077 pa_log("Failed to get info.");
1078 else
1079 pa_log("Protocol error.");
1080 goto fail;
1081 }
1082
1083 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1084 pa_tagstruct_gets(t, &name) < 0 ||
1085 pa_tagstruct_gets(t, &description) < 0 ||
1086 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1087 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1088 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1089 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1090 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1091 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1092 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1093 pa_tagstruct_get_usec(t, &latency) < 0 ||
1094 pa_tagstruct_gets(t, &driver) < 0 ||
1095 pa_tagstruct_getu32(t, &flags) < 0) {
1096
1097 pa_log("Parse failure");
1098 goto fail;
1099 }
1100
1101 if (u->version >= 13) {
1102 pa_usec_t configured_latency;
1103
1104 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1105 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1106
1107 pa_log("Parse failure");
1108 goto fail;
1109 }
1110 }
1111
1112 if (u->version >= 15) {
1113 pa_volume_t base_volume;
1114 uint32_t state, n_volume_steps, card;
1115
1116 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1117 pa_tagstruct_getu32(t, &state) < 0 ||
1118 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1119 pa_tagstruct_getu32(t, &card) < 0) {
1120
1121 pa_log("Parse failure");
1122 goto fail;
1123 }
1124 }
1125
1126 if (read_ports(u, t) < 0)
1127 goto fail;
1128
1129 if (u->version >= 21 && read_formats(u, t) < 0)
1130 goto fail;
1131
1132 if (!pa_tagstruct_eof(t)) {
1133 pa_log("Packet too long");
1134 goto fail;
1135 }
1136
1137 pa_proplist_free(pl);
1138
1139 if (!u->sink_name || strcmp(name, u->sink_name))
1140 return;
1141
1142 pa_xfree(u->device_description);
1143 u->device_description = pa_xstrdup(description);
1144
1145 update_description(u);
1146
1147 return;
1148
1149 fail:
1150 pa_module_unload_request(u->module, TRUE);
1151 pa_proplist_free(pl);
1152 }
1153
1154 /* Called from main context */
1155 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1156 struct userdata *u = userdata;
1157 uint32_t idx, owner_module, client, sink;
1158 pa_usec_t buffer_usec, sink_usec;
1159 const char *name, *driver, *resample_method;
1160 pa_bool_t mute = FALSE;
1161 pa_sample_spec sample_spec;
1162 pa_channel_map channel_map;
1163 pa_cvolume volume;
1164 pa_proplist *pl;
1165 pa_bool_t b;
1166
1167 pa_assert(pd);
1168 pa_assert(u);
1169
1170 pl = pa_proplist_new();
1171
1172 if (command != PA_COMMAND_REPLY) {
1173 if (command == PA_COMMAND_ERROR)
1174 pa_log("Failed to get info.");
1175 else
1176 pa_log("Protocol error.");
1177 goto fail;
1178 }
1179
1180 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1181 pa_tagstruct_gets(t, &name) < 0 ||
1182 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1183 pa_tagstruct_getu32(t, &client) < 0 ||
1184 pa_tagstruct_getu32(t, &sink) < 0 ||
1185 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1186 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1187 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1188 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1189 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1190 pa_tagstruct_gets(t, &resample_method) < 0 ||
1191 pa_tagstruct_gets(t, &driver) < 0) {
1192
1193 pa_log("Parse failure");
1194 goto fail;
1195 }
1196
1197 if (u->version >= 11) {
1198 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1199
1200 pa_log("Parse failure");
1201 goto fail;
1202 }
1203 }
1204
1205 if (u->version >= 13) {
1206 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1207
1208 pa_log("Parse failure");
1209 goto fail;
1210 }
1211 }
1212
1213 if (u->version >= 19) {
1214 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1215
1216 pa_log("Parse failure");
1217 goto fail;
1218 }
1219 }
1220
1221 if (u->version >= 20) {
1222 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1223 pa_tagstruct_get_boolean(t, &b) < 0) {
1224
1225 pa_log("Parse failure");
1226 goto fail;
1227 }
1228 }
1229
1230 if (u->version >= 21) {
1231 pa_format_info *format = pa_format_info_new();
1232
1233 if (pa_tagstruct_get_format_info(t, format) < 0) {
1234 pa_format_info_free(format);
1235 pa_log("Parse failure");
1236 goto fail;
1237 }
1238 pa_format_info_free(format);
1239 }
1240
1241 if (!pa_tagstruct_eof(t)) {
1242 pa_log("Packet too long");
1243 goto fail;
1244 }
1245
1246 pa_proplist_free(pl);
1247
1248 if (idx != u->device_index)
1249 return;
1250
1251 pa_assert(u->sink);
1252
1253 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1254 pa_cvolume_equal(&volume, &u->sink->real_volume))
1255 return;
1256
1257 pa_sink_volume_changed(u->sink, &volume);
1258
1259 if (u->version >= 11)
1260 pa_sink_mute_changed(u->sink, mute);
1261
1262 return;
1263
1264 fail:
1265 pa_module_unload_request(u->module, TRUE);
1266 pa_proplist_free(pl);
1267 }
1268
1269 #else
1270
1271 /* Called from main context */
1272 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1273 struct userdata *u = userdata;
1274 uint32_t idx, owner_module, monitor_of_sink, flags;
1275 const char *name, *description, *monitor_of_sink_name, *driver;
1276 pa_sample_spec ss;
1277 pa_channel_map cm;
1278 pa_cvolume volume;
1279 pa_bool_t mute;
1280 pa_usec_t latency, configured_latency;
1281 pa_proplist *pl;
1282
1283 pa_assert(pd);
1284 pa_assert(u);
1285
1286 pl = pa_proplist_new();
1287
1288 if (command != PA_COMMAND_REPLY) {
1289 if (command == PA_COMMAND_ERROR)
1290 pa_log("Failed to get info.");
1291 else
1292 pa_log("Protocol error.");
1293 goto fail;
1294 }
1295
1296 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1297 pa_tagstruct_gets(t, &name) < 0 ||
1298 pa_tagstruct_gets(t, &description) < 0 ||
1299 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1300 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1301 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1302 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1303 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1304 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1305 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1306 pa_tagstruct_get_usec(t, &latency) < 0 ||
1307 pa_tagstruct_gets(t, &driver) < 0 ||
1308 pa_tagstruct_getu32(t, &flags) < 0) {
1309
1310 pa_log("Parse failure");
1311 goto fail;
1312 }
1313
1314 if (u->version >= 13) {
1315 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1316 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1317
1318 pa_log("Parse failure");
1319 goto fail;
1320 }
1321 }
1322
1323 if (u->version >= 15) {
1324 pa_volume_t base_volume;
1325 uint32_t state, n_volume_steps, card;
1326
1327 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1328 pa_tagstruct_getu32(t, &state) < 0 ||
1329 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1330 pa_tagstruct_getu32(t, &card) < 0) {
1331
1332 pa_log("Parse failure");
1333 goto fail;
1334 }
1335 }
1336
1337 if (read_ports(u, t) < 0)
1338 goto fail;
1339
1340 if (u->version >= 22 && read_formats(u, t) < 0)
1341 goto fail;
1342
1343 if (!pa_tagstruct_eof(t)) {
1344 pa_log("Packet too long");
1345 goto fail;
1346 }
1347
1348 pa_proplist_free(pl);
1349
1350 if (!u->source_name || strcmp(name, u->source_name))
1351 return;
1352
1353 pa_xfree(u->device_description);
1354 u->device_description = pa_xstrdup(description);
1355
1356 update_description(u);
1357
1358 return;
1359
1360 fail:
1361 pa_module_unload_request(u->module, TRUE);
1362 pa_proplist_free(pl);
1363 }
1364
1365 #endif
1366
1367 /* Called from main context */
1368 static void request_info(struct userdata *u) {
1369 pa_tagstruct *t;
1370 uint32_t tag;
1371 pa_assert(u);
1372
1373 t = pa_tagstruct_new(NULL, 0);
1374 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1375 pa_tagstruct_putu32(t, tag = u->ctag++);
1376 pa_pstream_send_tagstruct(u->pstream, t);
1377 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1378
1379 #ifdef TUNNEL_SINK
1380 t = pa_tagstruct_new(NULL, 0);
1381 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1382 pa_tagstruct_putu32(t, tag = u->ctag++);
1383 pa_tagstruct_putu32(t, u->device_index);
1384 pa_pstream_send_tagstruct(u->pstream, t);
1385 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1386
1387 if (u->sink_name) {
1388 t = pa_tagstruct_new(NULL, 0);
1389 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1390 pa_tagstruct_putu32(t, tag = u->ctag++);
1391 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1392 pa_tagstruct_puts(t, u->sink_name);
1393 pa_pstream_send_tagstruct(u->pstream, t);
1394 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1395 }
1396 #else
1397 if (u->source_name) {
1398 t = pa_tagstruct_new(NULL, 0);
1399 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1400 pa_tagstruct_putu32(t, tag = u->ctag++);
1401 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1402 pa_tagstruct_puts(t, u->source_name);
1403 pa_pstream_send_tagstruct(u->pstream, t);
1404 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1405 }
1406 #endif
1407 }
1408
1409 /* Called from main context */
1410 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1411 struct userdata *u = userdata;
1412 pa_subscription_event_type_t e;
1413 uint32_t idx;
1414
1415 pa_assert(pd);
1416 pa_assert(t);
1417 pa_assert(u);
1418 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1419
1420 if (pa_tagstruct_getu32(t, &e) < 0 ||
1421 pa_tagstruct_getu32(t, &idx) < 0) {
1422 pa_log("Invalid protocol reply");
1423 pa_module_unload_request(u->module, TRUE);
1424 return;
1425 }
1426
1427 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1428 #ifdef TUNNEL_SINK
1429 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1430 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1431 #else
1432 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1433 #endif
1434 )
1435 return;
1436
1437 request_info(u);
1438 }
1439
1440 /* Called from main context */
1441 static void start_subscribe(struct userdata *u) {
1442 pa_tagstruct *t;
1443 pa_assert(u);
1444
1445 t = pa_tagstruct_new(NULL, 0);
1446 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1447 pa_tagstruct_putu32(t, u->ctag++);
1448 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1449 #ifdef TUNNEL_SINK
1450 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1451 #else
1452 PA_SUBSCRIPTION_MASK_SOURCE
1453 #endif
1454 );
1455
1456 pa_pstream_send_tagstruct(u->pstream, t);
1457 }
1458
1459 /* Called from main context */
1460 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1461 struct userdata *u = userdata;
1462 #ifdef TUNNEL_SINK
1463 uint32_t bytes;
1464 #endif
1465
1466 pa_assert(pd);
1467 pa_assert(u);
1468 pa_assert(u->pdispatch == pd);
1469
1470 if (command != PA_COMMAND_REPLY) {
1471 if (command == PA_COMMAND_ERROR)
1472 pa_log("Failed to create stream.");
1473 else
1474 pa_log("Protocol error.");
1475 goto fail;
1476 }
1477
1478 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1479 pa_tagstruct_getu32(t, &u->device_index) < 0
1480 #ifdef TUNNEL_SINK
1481 || pa_tagstruct_getu32(t, &bytes) < 0
1482 #endif
1483 )
1484 goto parse_error;
1485
1486 if (u->version >= 9) {
1487 #ifdef TUNNEL_SINK
1488 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1489 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1490 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1491 pa_tagstruct_getu32(t, &u->minreq) < 0)
1492 goto parse_error;
1493 #else
1494 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1495 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1496 goto parse_error;
1497 #endif
1498 }
1499
1500 if (u->version >= 12) {
1501 pa_sample_spec ss;
1502 pa_channel_map cm;
1503 uint32_t device_index;
1504 const char *dn;
1505 pa_bool_t suspended;
1506
1507 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1508 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1509 pa_tagstruct_getu32(t, &device_index) < 0 ||
1510 pa_tagstruct_gets(t, &dn) < 0 ||
1511 pa_tagstruct_get_boolean(t, &suspended) < 0)
1512 goto parse_error;
1513
1514 #ifdef TUNNEL_SINK
1515 pa_xfree(u->sink_name);
1516 u->sink_name = pa_xstrdup(dn);
1517 #else
1518 pa_xfree(u->source_name);
1519 u->source_name = pa_xstrdup(dn);
1520 #endif
1521 }
1522
1523 if (u->version >= 13) {
1524 pa_usec_t usec;
1525
1526 if (pa_tagstruct_get_usec(t, &usec) < 0)
1527 goto parse_error;
1528
1529 /* #ifdef TUNNEL_SINK */
1530 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1531 /* #else */
1532 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1533 /* #endif */
1534 }
1535
1536 if (u->version >= 21) {
1537 pa_format_info *format = pa_format_info_new();
1538
1539 if (pa_tagstruct_get_format_info(t, format) < 0) {
1540 pa_format_info_free(format);
1541 goto parse_error;
1542 }
1543
1544 pa_format_info_free(format);
1545 }
1546
1547 if (!pa_tagstruct_eof(t))
1548 goto parse_error;
1549
1550 start_subscribe(u);
1551 request_info(u);
1552
1553 pa_assert(!u->time_event);
1554 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1555
1556 request_latency(u);
1557
1558 pa_log_debug("Stream created.");
1559
1560 #ifdef TUNNEL_SINK
1561 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1562 #endif
1563
1564 return;
1565
1566 parse_error:
1567 pa_log("Invalid reply. (Create stream)");
1568
1569 fail:
1570 pa_module_unload_request(u->module, TRUE);
1571
1572 }
1573
1574 /* Called from main context */
1575 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1576 struct userdata *u = userdata;
1577 pa_tagstruct *reply;
1578 char name[256], un[128], hn[128];
1579 pa_cvolume volume;
1580
1581 pa_assert(pd);
1582 pa_assert(u);
1583 pa_assert(u->pdispatch == pd);
1584
1585 if (command != PA_COMMAND_REPLY ||
1586 pa_tagstruct_getu32(t, &u->version) < 0 ||
1587 !pa_tagstruct_eof(t)) {
1588
1589 if (command == PA_COMMAND_ERROR)
1590 pa_log("Failed to authenticate");
1591 else
1592 pa_log("Protocol error.");
1593
1594 goto fail;
1595 }
1596
1597 /* Minimum supported protocol version */
1598 if (u->version < 8) {
1599 pa_log("Incompatible protocol version");
1600 goto fail;
1601 }
1602
1603 /* Starting with protocol version 13 the MSB of the version tag
1604 reflects if shm is enabled for this connection or not. We don't
1605 support SHM here at all, so we just ignore this. */
1606
1607 if (u->version >= 13)
1608 u->version &= 0x7FFFFFFFU;
1609
1610 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1611
1612 #ifdef TUNNEL_SINK
1613 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1614 pa_sink_update_proplist(u->sink, 0, NULL);
1615
1616 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1617 u->sink_name,
1618 pa_get_user_name(un, sizeof(un)),
1619 pa_get_host_name(hn, sizeof(hn)));
1620 #else
1621 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1622 pa_source_update_proplist(u->source, 0, NULL);
1623
1624 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1625 u->source_name,
1626 pa_get_user_name(un, sizeof(un)),
1627 pa_get_host_name(hn, sizeof(hn)));
1628 #endif
1629
1630 reply = pa_tagstruct_new(NULL, 0);
1631 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1632 pa_tagstruct_putu32(reply, u->ctag++);
1633
1634 if (u->version >= 13) {
1635 pa_proplist *pl;
1636 pl = pa_proplist_new();
1637 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1638 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1639 pa_init_proplist(pl);
1640 pa_tagstruct_put_proplist(reply, pl);
1641 pa_proplist_free(pl);
1642 } else
1643 pa_tagstruct_puts(reply, "PulseAudio");
1644
1645 pa_pstream_send_tagstruct(u->pstream, reply);
1646 /* We ignore the server's reply here */
1647
1648 reply = pa_tagstruct_new(NULL, 0);
1649
1650 if (u->version < 13)
1651 /* Only for older PA versions we need to fill in the maxlength */
1652 u->maxlength = 4*1024*1024;
1653
1654 #ifdef TUNNEL_SINK
1655 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1656 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1657 u->prebuf = u->tlength;
1658 #else
1659 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1660 #endif
1661
1662 #ifdef TUNNEL_SINK
1663 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1664 pa_tagstruct_putu32(reply, tag = u->ctag++);
1665
1666 if (u->version < 13)
1667 pa_tagstruct_puts(reply, name);
1668
1669 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1670 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1671 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1672 pa_tagstruct_puts(reply, u->sink_name);
1673 pa_tagstruct_putu32(reply, u->maxlength);
1674 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1675 pa_tagstruct_putu32(reply, u->tlength);
1676 pa_tagstruct_putu32(reply, u->prebuf);
1677 pa_tagstruct_putu32(reply, u->minreq);
1678 pa_tagstruct_putu32(reply, 0);
1679 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1680 pa_tagstruct_put_cvolume(reply, &volume);
1681 #else
1682 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1683 pa_tagstruct_putu32(reply, tag = u->ctag++);
1684
1685 if (u->version < 13)
1686 pa_tagstruct_puts(reply, name);
1687
1688 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1689 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1690 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1691 pa_tagstruct_puts(reply, u->source_name);
1692 pa_tagstruct_putu32(reply, u->maxlength);
1693 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1694 pa_tagstruct_putu32(reply, u->fragsize);
1695 #endif
1696
1697 if (u->version >= 12) {
1698 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1699 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1700 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1701 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1702 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1703 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1704 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1705 }
1706
1707 if (u->version >= 13) {
1708 pa_proplist *pl;
1709
1710 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1711 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1712
1713 pl = pa_proplist_new();
1714 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1715 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1716 pa_tagstruct_put_proplist(reply, pl);
1717 pa_proplist_free(pl);
1718
1719 #ifndef TUNNEL_SINK
1720 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1721 #endif
1722 }
1723
1724 if (u->version >= 14) {
1725 #ifdef TUNNEL_SINK
1726 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1727 #endif
1728 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1729 }
1730
1731 if (u->version >= 15) {
1732 #ifdef TUNNEL_SINK
1733 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1734 #endif
1735 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1736 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1737 }
1738
1739 #ifdef TUNNEL_SINK
1740 if (u->version >= 17)
1741 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1742
1743 if (u->version >= 18)
1744 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1745 #endif
1746
1747 #ifdef TUNNEL_SINK
1748 if (u->version >= 21) {
1749 /* We're not using the extended API, so n_formats = 0 and that's that */
1750 pa_tagstruct_putu8(reply, 0);
1751 }
1752 #else
1753 if (u->version >= 22) {
1754 /* We're not using the extended API, so n_formats = 0 and that's that */
1755 pa_tagstruct_putu8(reply, 0);
1756 pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1757 pa_tagstruct_put_cvolume(reply, &volume);
1758 pa_tagstruct_put_boolean(reply, FALSE); /* muted */
1759 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1760 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1761 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1762 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1763 }
1764 #endif
1765
1766 pa_pstream_send_tagstruct(u->pstream, reply);
1767 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1768
1769 pa_log_debug("Connection authenticated, creating stream ...");
1770
1771 return;
1772
1773 fail:
1774 pa_module_unload_request(u->module, TRUE);
1775 }
1776
1777 /* Called from main context */
1778 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1779 struct userdata *u = userdata;
1780
1781 pa_assert(p);
1782 pa_assert(u);
1783
1784 pa_log_warn("Stream died.");
1785 pa_module_unload_request(u->module, TRUE);
1786 }
1787
1788 /* Called from main context */
1789 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1790 struct userdata *u = userdata;
1791
1792 pa_assert(p);
1793 pa_assert(packet);
1794 pa_assert(u);
1795
1796 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1797 pa_log("Invalid packet");
1798 pa_module_unload_request(u->module, TRUE);
1799 return;
1800 }
1801 }
1802
1803 #ifndef TUNNEL_SINK
1804 /* Called from main context */
1805 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1806 struct userdata *u = userdata;
1807
1808 pa_assert(p);
1809 pa_assert(chunk);
1810 pa_assert(u);
1811
1812 if (channel != u->channel) {
1813 pa_log("Received memory block on bad channel.");
1814 pa_module_unload_request(u->module, TRUE);
1815 return;
1816 }
1817
1818 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1819
1820 u->counter_delta += (int64_t) chunk->length;
1821 }
1822 #endif
1823
1824 /* Called from main context */
1825 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1826 struct userdata *u = userdata;
1827 pa_tagstruct *t;
1828 uint32_t tag;
1829
1830 pa_assert(sc);
1831 pa_assert(u);
1832 pa_assert(u->client == sc);
1833
1834 pa_socket_client_unref(u->client);
1835 u->client = NULL;
1836
1837 if (!io) {
1838 pa_log("Connection failed: %s", pa_cstrerror(errno));
1839 pa_module_unload_request(u->module, TRUE);
1840 return;
1841 }
1842
1843 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1844 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1845
1846 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1847 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1848 #ifndef TUNNEL_SINK
1849 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1850 #endif
1851
1852 t = pa_tagstruct_new(NULL, 0);
1853 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1854 pa_tagstruct_putu32(t, tag = u->ctag++);
1855 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1856
1857 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1858
1859 #ifdef HAVE_CREDS
1860 {
1861 pa_creds ucred;
1862
1863 if (pa_iochannel_creds_supported(io))
1864 pa_iochannel_creds_enable(io);
1865
1866 ucred.uid = getuid();
1867 ucred.gid = getgid();
1868
1869 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1870 }
1871 #else
1872 pa_pstream_send_tagstruct(u->pstream, t);
1873 #endif
1874
1875 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1876
1877 pa_log_debug("Connection established, authenticating ...");
1878 }
1879
1880 #ifdef TUNNEL_SINK
1881
1882 /* Called from main context */
1883 static void sink_set_volume(pa_sink *sink) {
1884 struct userdata *u;
1885 pa_tagstruct *t;
1886
1887 pa_assert(sink);
1888 u = sink->userdata;
1889 pa_assert(u);
1890
1891 t = pa_tagstruct_new(NULL, 0);
1892 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1893 pa_tagstruct_putu32(t, u->ctag++);
1894 pa_tagstruct_putu32(t, u->device_index);
1895 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1896 pa_pstream_send_tagstruct(u->pstream, t);
1897 }
1898
1899 /* Called from main context */
1900 static void sink_set_mute(pa_sink *sink) {
1901 struct userdata *u;
1902 pa_tagstruct *t;
1903
1904 pa_assert(sink);
1905 u = sink->userdata;
1906 pa_assert(u);
1907
1908 if (u->version < 11)
1909 return;
1910
1911 t = pa_tagstruct_new(NULL, 0);
1912 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1913 pa_tagstruct_putu32(t, u->ctag++);
1914 pa_tagstruct_putu32(t, u->device_index);
1915 pa_tagstruct_put_boolean(t, !!sink->muted);
1916 pa_pstream_send_tagstruct(u->pstream, t);
1917 }
1918
1919 #endif
1920
1921 int pa__init(pa_module*m) {
1922 pa_modargs *ma = NULL;
1923 struct userdata *u = NULL;
1924 pa_sample_spec ss;
1925 pa_channel_map map;
1926 char *dn = NULL;
1927 #ifdef TUNNEL_SINK
1928 pa_sink_new_data data;
1929 #else
1930 pa_source_new_data data;
1931 #endif
1932
1933 pa_assert(m);
1934
1935 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1936 pa_log("Failed to parse module arguments");
1937 goto fail;
1938 }
1939
1940 m->userdata = u = pa_xnew0(struct userdata, 1);
1941 u->core = m->core;
1942 u->module = m;
1943 u->client = NULL;
1944 u->pdispatch = NULL;
1945 u->pstream = NULL;
1946 u->server_name = NULL;
1947 #ifdef TUNNEL_SINK
1948 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1949 u->sink = NULL;
1950 u->requested_bytes = 0;
1951 #else
1952 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1953 u->source = NULL;
1954 #endif
1955 u->smoother = pa_smoother_new(
1956 PA_USEC_PER_SEC,
1957 PA_USEC_PER_SEC*2,
1958 TRUE,
1959 TRUE,
1960 10,
1961 pa_rtclock_now(),
1962 FALSE);
1963 u->ctag = 1;
1964 u->device_index = u->channel = PA_INVALID_INDEX;
1965 u->time_event = NULL;
1966 u->ignore_latency_before = 0;
1967 u->transport_usec = u->thread_transport_usec = 0;
1968 u->remote_suspended = u->remote_corked = FALSE;
1969 u->counter = u->counter_delta = 0;
1970
1971 u->rtpoll = pa_rtpoll_new();
1972 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1973
1974 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1975 goto fail;
1976
1977 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1978 pa_log("No server specified.");
1979 goto fail;
1980 }
1981
1982 ss = m->core->default_sample_spec;
1983 map = m->core->default_channel_map;
1984 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1985 pa_log("Invalid sample format specification");
1986 goto fail;
1987 }
1988
1989 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1990 pa_log("Failed to connect to server '%s'", u->server_name);
1991 goto fail;
1992 }
1993
1994 pa_socket_client_set_callback(u->client, on_connection, u);
1995
1996 #ifdef TUNNEL_SINK
1997
1998 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1999 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2000
2001 pa_sink_new_data_init(&data);
2002 data.driver = __FILE__;
2003 data.module = m;
2004 data.namereg_fail = TRUE;
2005 pa_sink_new_data_set_name(&data, dn);
2006 pa_sink_new_data_set_sample_spec(&data, &ss);
2007 pa_sink_new_data_set_channel_map(&data, &map);
2008 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2009 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2010 if (u->sink_name)
2011 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2012
2013 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2014 pa_log("Invalid properties");
2015 pa_sink_new_data_done(&data);
2016 goto fail;
2017 }
2018
2019 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2020 pa_sink_new_data_done(&data);
2021
2022 if (!u->sink) {
2023 pa_log("Failed to create sink.");
2024 goto fail;
2025 }
2026
2027 u->sink->parent.process_msg = sink_process_msg;
2028 u->sink->userdata = u;
2029 u->sink->set_state = sink_set_state;
2030 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2031 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2032
2033 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2034
2035 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2036
2037 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2038 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2039
2040 #else
2041
2042 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2043 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2044
2045 pa_source_new_data_init(&data);
2046 data.driver = __FILE__;
2047 data.module = m;
2048 data.namereg_fail = TRUE;
2049 pa_source_new_data_set_name(&data, dn);
2050 pa_source_new_data_set_sample_spec(&data, &ss);
2051 pa_source_new_data_set_channel_map(&data, &map);
2052 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2053 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2054 if (u->source_name)
2055 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2056
2057 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2058 pa_log("Invalid properties");
2059 pa_source_new_data_done(&data);
2060 goto fail;
2061 }
2062
2063 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2064 pa_source_new_data_done(&data);
2065
2066 if (!u->source) {
2067 pa_log("Failed to create source.");
2068 goto fail;
2069 }
2070
2071 u->source->parent.process_msg = source_process_msg;
2072 u->source->set_state = source_set_state;
2073 u->source->userdata = u;
2074
2075 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2076
2077 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2078 pa_source_set_rtpoll(u->source, u->rtpoll);
2079
2080 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2081 #endif
2082
2083 pa_xfree(dn);
2084
2085 u->time_event = NULL;
2086
2087 u->maxlength = (uint32_t) -1;
2088 #ifdef TUNNEL_SINK
2089 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2090 #else
2091 u->fragsize = (uint32_t) -1;
2092 #endif
2093
2094 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2095 pa_log("Failed to create thread.");
2096 goto fail;
2097 }
2098
2099 #ifdef TUNNEL_SINK
2100 pa_sink_put(u->sink);
2101 #else
2102 pa_source_put(u->source);
2103 #endif
2104
2105 pa_modargs_free(ma);
2106
2107 return 0;
2108
2109 fail:
2110 pa__done(m);
2111
2112 if (ma)
2113 pa_modargs_free(ma);
2114
2115 pa_xfree(dn);
2116
2117 return -1;
2118 }
2119
2120 void pa__done(pa_module*m) {
2121 struct userdata* u;
2122
2123 pa_assert(m);
2124
2125 if (!(u = m->userdata))
2126 return;
2127
2128 #ifdef TUNNEL_SINK
2129 if (u->sink)
2130 pa_sink_unlink(u->sink);
2131 #else
2132 if (u->source)
2133 pa_source_unlink(u->source);
2134 #endif
2135
2136 if (u->thread) {
2137 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2138 pa_thread_free(u->thread);
2139 }
2140
2141 pa_thread_mq_done(&u->thread_mq);
2142
2143 #ifdef TUNNEL_SINK
2144 if (u->sink)
2145 pa_sink_unref(u->sink);
2146 #else
2147 if (u->source)
2148 pa_source_unref(u->source);
2149 #endif
2150
2151 if (u->rtpoll)
2152 pa_rtpoll_free(u->rtpoll);
2153
2154 if (u->pstream) {
2155 pa_pstream_unlink(u->pstream);
2156 pa_pstream_unref(u->pstream);
2157 }
2158
2159 if (u->pdispatch)
2160 pa_pdispatch_unref(u->pdispatch);
2161
2162 if (u->client)
2163 pa_socket_client_unref(u->client);
2164
2165 if (u->auth_cookie)
2166 pa_auth_cookie_unref(u->auth_cookie);
2167
2168 if (u->smoother)
2169 pa_smoother_free(u->smoother);
2170
2171 if (u->time_event)
2172 u->core->mainloop->time_free(u->time_event);
2173
2174 #ifndef TUNNEL_SINK
2175 if (u->mcalign)
2176 pa_mcalign_free(u->mcalign);
2177 #endif
2178
2179 #ifdef TUNNEL_SINK
2180 pa_xfree(u->sink_name);
2181 #else
2182 pa_xfree(u->source_name);
2183 #endif
2184 pa_xfree(u->server_name);
2185
2186 pa_xfree(u->device_description);
2187 pa_xfree(u->server_fqdn);
2188 pa_xfree(u->user_name);
2189
2190 pa_xfree(u);
2191 }