]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
Merge commit 'origin/master-tx'
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL 10
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
179
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
183
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
187
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 #endif
197
198 pa_auth_cookie *auth_cookie;
199
200 uint32_t version;
201 uint32_t ctag;
202 uint32_t device_index;
203 uint32_t channel;
204
205 int64_t counter, counter_delta;
206
207 pa_bool_t remote_corked:1;
208 pa_bool_t remote_suspended:1;
209
210 pa_usec_t transport_usec; /* maintained in the main thread */
211 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
212
213 uint32_t ignore_latency_before;
214
215 pa_time_event *time_event;
216
217 pa_smoother *smoother;
218
219 char *device_description;
220 char *server_fqdn;
221 char *user_name;
222
223 uint32_t maxlength;
224 #ifdef TUNNEL_SINK
225 uint32_t tlength;
226 uint32_t minreq;
227 uint32_t prebuf;
228 #else
229 uint32_t fragsize;
230 #endif
231 };
232
233 static void request_latency(struct userdata *u);
234
235 /* Called from main context */
236 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
237 pa_log_debug("Got stream or client event.");
238 }
239
240 /* Called from main context */
241 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
242 struct userdata *u = userdata;
243
244 pa_assert(pd);
245 pa_assert(t);
246 pa_assert(u);
247 pa_assert(u->pdispatch == pd);
248
249 pa_log_warn("Stream killed");
250 pa_module_unload_request(u->module, TRUE);
251 }
252
253 /* Called from main context */
254 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
255 struct userdata *u = userdata;
256
257 pa_assert(pd);
258 pa_assert(t);
259 pa_assert(u);
260 pa_assert(u->pdispatch == pd);
261
262 pa_log_info("Server signalled buffer overrun/underrun.");
263 request_latency(u);
264 }
265
266 /* Called from main context */
267 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
268 struct userdata *u = userdata;
269 uint32_t channel;
270 pa_bool_t suspended;
271
272 pa_assert(pd);
273 pa_assert(t);
274 pa_assert(u);
275 pa_assert(u->pdispatch == pd);
276
277 if (pa_tagstruct_getu32(t, &channel) < 0 ||
278 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
279 !pa_tagstruct_eof(t)) {
280
281 pa_log("Invalid packet.");
282 pa_module_unload_request(u->module, TRUE);
283 return;
284 }
285
286 pa_log_debug("Server reports device suspend.");
287
288 #ifdef TUNNEL_SINK
289 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
290 #else
291 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
292 #endif
293
294 request_latency(u);
295 }
296
297 /* Called from main context */
298 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
299 struct userdata *u = userdata;
300 uint32_t channel, di;
301 const char *dn;
302 pa_bool_t suspended;
303
304 pa_assert(pd);
305 pa_assert(t);
306 pa_assert(u);
307 pa_assert(u->pdispatch == pd);
308
309 if (pa_tagstruct_getu32(t, &channel) < 0 ||
310 pa_tagstruct_getu32(t, &di) < 0 ||
311 pa_tagstruct_gets(t, &dn) < 0 ||
312 pa_tagstruct_get_boolean(t, &suspended) < 0) {
313
314 pa_log_error("Invalid packet.");
315 pa_module_unload_request(u->module, TRUE);
316 return;
317 }
318
319 pa_log_debug("Server reports a stream move.");
320
321 #ifdef TUNNEL_SINK
322 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
323 #else
324 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
325 #endif
326
327 request_latency(u);
328 }
329
330 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
331 struct userdata *u = userdata;
332 uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
333 pa_usec_t usec;
334
335 pa_assert(pd);
336 pa_assert(t);
337 pa_assert(u);
338 pa_assert(u->pdispatch == pd);
339
340 if (pa_tagstruct_getu32(t, &channel) < 0 ||
341 pa_tagstruct_getu32(t, &maxlength) < 0) {
342
343 pa_log_error("Invalid packet.");
344 pa_module_unload_request(u->module, TRUE);
345 return;
346 }
347
348 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
349 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
350 pa_tagstruct_get_usec(t, &usec) < 0) {
351
352 pa_log_error("Invalid packet.");
353 pa_module_unload_request(u->module, TRUE);
354 return;
355 }
356 } else {
357 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
358 pa_tagstruct_getu32(t, &prebuf) < 0 ||
359 pa_tagstruct_getu32(t, &minreq) < 0 ||
360 pa_tagstruct_get_usec(t, &usec) < 0) {
361
362 pa_log_error("Invalid packet.");
363 pa_module_unload_request(u->module, TRUE);
364 return;
365 }
366 }
367
368 #ifdef TUNNEL_SINK
369 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
370 #endif
371
372 request_latency(u);
373 }
374
375 #ifdef TUNNEL_SINK
376
377 /* Called from main context */
378 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
379 struct userdata *u = userdata;
380
381 pa_assert(pd);
382 pa_assert(t);
383 pa_assert(u);
384 pa_assert(u->pdispatch == pd);
385
386 pa_log_debug("Server reports playback started.");
387 request_latency(u);
388 }
389
390 #endif
391
392 /* Called from IO thread context */
393 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
394 pa_usec_t x;
395
396 pa_assert(u);
397
398 x = pa_rtclock_usec();
399
400 /* Correct by the time the requested issued needs to travel to the
401 * other side. This is a valid thread-safe access, because the
402 * main thread is waiting for us */
403
404 if (past)
405 x -= u->thread_transport_usec;
406 else
407 x += u->thread_transport_usec;
408
409 if (u->remote_suspended || u->remote_corked)
410 pa_smoother_pause(u->smoother, x);
411 else
412 pa_smoother_resume(u->smoother, x, TRUE);
413 }
414
415 /* Called from IO thread context */
416 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
417 pa_assert(u);
418
419 if (u->remote_corked == cork)
420 return;
421
422 u->remote_corked = cork;
423 check_smoother_status(u, FALSE);
424 }
425
426 /* Called from main context */
427 static void stream_cork(struct userdata *u, pa_bool_t cork) {
428 pa_tagstruct *t;
429 pa_assert(u);
430
431 if (!u->pstream)
432 return;
433
434 t = pa_tagstruct_new(NULL, 0);
435 #ifdef TUNNEL_SINK
436 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
437 #else
438 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
439 #endif
440 pa_tagstruct_putu32(t, u->ctag++);
441 pa_tagstruct_putu32(t, u->channel);
442 pa_tagstruct_put_boolean(t, !!cork);
443 pa_pstream_send_tagstruct(u->pstream, t);
444
445 request_latency(u);
446 }
447
448 /* Called from IO thread context */
449 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
450 pa_assert(u);
451
452 if (u->remote_suspended == suspend)
453 return;
454
455 u->remote_suspended = suspend;
456 check_smoother_status(u, TRUE);
457 }
458
459 #ifdef TUNNEL_SINK
460
461 /* Called from IO thread context */
462 static void send_data(struct userdata *u) {
463 pa_assert(u);
464
465 while (u->requested_bytes > 0) {
466 pa_memchunk memchunk;
467
468 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
469 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
470 pa_memblock_unref(memchunk.memblock);
471
472 u->requested_bytes -= memchunk.length;
473
474 u->counter += (int64_t) memchunk.length;
475 }
476 }
477
478 /* This function is called from IO context -- except when it is not. */
479 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
480 struct userdata *u = PA_SINK(o)->userdata;
481
482 switch (code) {
483
484 case PA_SINK_MESSAGE_SET_STATE: {
485 int r;
486
487 /* First, change the state, because otherwide pa_sink_render() would fail */
488 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
489
490 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
491
492 if (PA_SINK_IS_OPENED(u->sink->state))
493 send_data(u);
494 }
495
496 return r;
497 }
498
499 case PA_SINK_MESSAGE_GET_LATENCY: {
500 pa_usec_t yl, yr, *usec = data;
501
502 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
503 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
504
505 *usec = yl > yr ? yl - yr : 0;
506 return 0;
507 }
508
509 case SINK_MESSAGE_REQUEST:
510
511 pa_assert(offset > 0);
512 u->requested_bytes += (size_t) offset;
513
514 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
515 send_data(u);
516
517 return 0;
518
519
520 case SINK_MESSAGE_REMOTE_SUSPEND:
521
522 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
523 return 0;
524
525
526 case SINK_MESSAGE_UPDATE_LATENCY: {
527 pa_usec_t y;
528
529 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
530
531 if (y > (pa_usec_t) offset)
532 y -= (pa_usec_t) offset;
533 else
534 y = 0;
535
536 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
537
538 /* We can access this freely here, since the main thread is waiting for us */
539 u->thread_transport_usec = u->transport_usec;
540
541 return 0;
542 }
543
544 case SINK_MESSAGE_POST:
545
546 /* OK, This might be a bit confusing. This message is
547 * delivered to us from the main context -- NOT from the
548 * IO thread context where the rest of the messages are
549 * dispatched. Yeah, ugly, but I am a lazy bastard. */
550
551 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
552
553 u->counter_delta += (int64_t) chunk->length;
554
555 return 0;
556 }
557
558 return pa_sink_process_msg(o, code, data, offset, chunk);
559 }
560
561 /* Called from main context */
562 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
563 struct userdata *u;
564 pa_sink_assert_ref(s);
565 u = s->userdata;
566
567 switch ((pa_sink_state_t) state) {
568
569 case PA_SINK_SUSPENDED:
570 pa_assert(PA_SINK_IS_OPENED(s->state));
571 stream_cork(u, TRUE);
572 break;
573
574 case PA_SINK_IDLE:
575 case PA_SINK_RUNNING:
576 if (s->state == PA_SINK_SUSPENDED)
577 stream_cork(u, FALSE);
578 break;
579
580 case PA_SINK_UNLINKED:
581 case PA_SINK_INIT:
582 case PA_SINK_INVALID_STATE:
583 ;
584 }
585
586 return 0;
587 }
588
589 #else
590
591 /* This function is called from IO context -- except when it is not. */
592 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
593 struct userdata *u = PA_SOURCE(o)->userdata;
594
595 switch (code) {
596
597 case PA_SOURCE_MESSAGE_SET_STATE: {
598 int r;
599
600 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
601 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
602
603 return r;
604 }
605
606 case PA_SOURCE_MESSAGE_GET_LATENCY: {
607 pa_usec_t yr, yl, *usec = data;
608
609 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
610 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
611
612 *usec = yr > yl ? yr - yl : 0;
613 return 0;
614 }
615
616 case SOURCE_MESSAGE_POST:
617
618 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
619 pa_source_post(u->source, chunk);
620
621 u->counter += (int64_t) chunk->length;
622
623 return 0;
624
625 case SOURCE_MESSAGE_REMOTE_SUSPEND:
626
627 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
628 return 0;
629
630 case SOURCE_MESSAGE_UPDATE_LATENCY: {
631 pa_usec_t y;
632
633 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
634 y += (pa_usec_t) offset;
635
636 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
637
638 /* We can access this freely here, since the main thread is waiting for us */
639 u->thread_transport_usec = u->transport_usec;
640
641 return 0;
642 }
643 }
644
645 return pa_source_process_msg(o, code, data, offset, chunk);
646 }
647
648 /* Called from main context */
649 static int source_set_state(pa_source *s, pa_source_state_t state) {
650 struct userdata *u;
651 pa_source_assert_ref(s);
652 u = s->userdata;
653
654 switch ((pa_source_state_t) state) {
655
656 case PA_SOURCE_SUSPENDED:
657 pa_assert(PA_SOURCE_IS_OPENED(s->state));
658 stream_cork(u, TRUE);
659 break;
660
661 case PA_SOURCE_IDLE:
662 case PA_SOURCE_RUNNING:
663 if (s->state == PA_SOURCE_SUSPENDED)
664 stream_cork(u, FALSE);
665 break;
666
667 case PA_SOURCE_UNLINKED:
668 case PA_SOURCE_INIT:
669 case PA_SINK_INVALID_STATE:
670 ;
671 }
672
673 return 0;
674 }
675
676 #endif
677
678 static void thread_func(void *userdata) {
679 struct userdata *u = userdata;
680
681 pa_assert(u);
682
683 pa_log_debug("Thread starting up");
684
685 pa_thread_mq_install(&u->thread_mq);
686 pa_rtpoll_install(u->rtpoll);
687
688 for (;;) {
689 int ret;
690
691 #ifdef TUNNEL_SINK
692 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
693 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
694 pa_sink_process_rewind(u->sink, 0);
695 #endif
696
697 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
698 goto fail;
699
700 if (ret == 0)
701 goto finish;
702 }
703
704 fail:
705 /* If this was no regular exit from the loop we have to continue
706 * processing messages until we received PA_MESSAGE_SHUTDOWN */
707 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
708 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
709
710 finish:
711 pa_log_debug("Thread shutting down");
712 }
713
714 #ifdef TUNNEL_SINK
715 /* Called from main context */
716 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 struct userdata *u = userdata;
718 uint32_t bytes, channel;
719
720 pa_assert(pd);
721 pa_assert(command == PA_COMMAND_REQUEST);
722 pa_assert(t);
723 pa_assert(u);
724 pa_assert(u->pdispatch == pd);
725
726 if (pa_tagstruct_getu32(t, &channel) < 0 ||
727 pa_tagstruct_getu32(t, &bytes) < 0) {
728 pa_log("Invalid protocol reply");
729 goto fail;
730 }
731
732 if (channel != u->channel) {
733 pa_log("Recieved data for invalid channel");
734 goto fail;
735 }
736
737 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
738 return;
739
740 fail:
741 pa_module_unload_request(u->module, TRUE);
742 }
743
744 #endif
745
746 /* Called from main context */
747 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
748 struct userdata *u = userdata;
749 pa_usec_t sink_usec, source_usec;
750 pa_bool_t playing;
751 int64_t write_index, read_index;
752 struct timeval local, remote, now;
753 pa_sample_spec *ss;
754 int64_t delay;
755
756 pa_assert(pd);
757 pa_assert(u);
758
759 if (command != PA_COMMAND_REPLY) {
760 if (command == PA_COMMAND_ERROR)
761 pa_log("Failed to get latency.");
762 else
763 pa_log("Protocol error.");
764 goto fail;
765 }
766
767 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
768 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
769 pa_tagstruct_get_boolean(t, &playing) < 0 ||
770 pa_tagstruct_get_timeval(t, &local) < 0 ||
771 pa_tagstruct_get_timeval(t, &remote) < 0 ||
772 pa_tagstruct_gets64(t, &write_index) < 0 ||
773 pa_tagstruct_gets64(t, &read_index) < 0) {
774 pa_log("Invalid reply.");
775 goto fail;
776 }
777
778 #ifdef TUNNEL_SINK
779 if (u->version >= 13) {
780 uint64_t underrun_for = 0, playing_for = 0;
781
782 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
783 pa_tagstruct_getu64(t, &playing_for) < 0) {
784 pa_log("Invalid reply.");
785 goto fail;
786 }
787 }
788 #endif
789
790 if (!pa_tagstruct_eof(t)) {
791 pa_log("Invalid reply.");
792 goto fail;
793 }
794
795 if (tag < u->ignore_latency_before) {
796 return;
797 }
798
799 pa_gettimeofday(&now);
800
801 /* Calculate transport usec */
802 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
803 /* local and remote seem to have synchronized clocks */
804 #ifdef TUNNEL_SINK
805 u->transport_usec = pa_timeval_diff(&remote, &local);
806 #else
807 u->transport_usec = pa_timeval_diff(&now, &remote);
808 #endif
809 } else
810 u->transport_usec = pa_timeval_diff(&now, &local)/2;
811
812 /* First, take the device's delay */
813 #ifdef TUNNEL_SINK
814 delay = (int64_t) sink_usec;
815 ss = &u->sink->sample_spec;
816 #else
817 delay = (int64_t) source_usec;
818 ss = &u->source->sample_spec;
819 #endif
820
821 /* Add the length of our server-side buffer */
822 if (write_index >= read_index)
823 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
824 else
825 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
826
827 /* Our measurements are already out of date, hence correct by the *
828 * transport latency */
829 #ifdef TUNNEL_SINK
830 delay -= (int64_t) u->transport_usec;
831 #else
832 delay += (int64_t) u->transport_usec;
833 #endif
834
835 /* Now correct by what we have have read/written since we requested the update */
836 #ifdef TUNNEL_SINK
837 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
838 #else
839 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
840 #endif
841
842 #ifdef TUNNEL_SINK
843 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
844 #else
845 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
846 #endif
847
848 return;
849
850 fail:
851
852 pa_module_unload_request(u->module, TRUE);
853 }
854
855 /* Called from main context */
856 static void request_latency(struct userdata *u) {
857 pa_tagstruct *t;
858 struct timeval now;
859 uint32_t tag;
860 pa_assert(u);
861
862 t = pa_tagstruct_new(NULL, 0);
863 #ifdef TUNNEL_SINK
864 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
865 #else
866 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
867 #endif
868 pa_tagstruct_putu32(t, tag = u->ctag++);
869 pa_tagstruct_putu32(t, u->channel);
870
871 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
872
873 pa_pstream_send_tagstruct(u->pstream, t);
874 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
875
876 u->ignore_latency_before = tag;
877 u->counter_delta = 0;
878 }
879
880 /* Called from main context */
881 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
882 struct userdata *u = userdata;
883 struct timeval ntv;
884
885 pa_assert(m);
886 pa_assert(e);
887 pa_assert(u);
888
889 request_latency(u);
890
891 pa_gettimeofday(&ntv);
892 ntv.tv_sec += LATENCY_INTERVAL;
893 m->time_restart(e, &ntv);
894 }
895
896 /* Called from main context */
897 static void update_description(struct userdata *u) {
898 char *d;
899 char un[128], hn[128];
900 pa_tagstruct *t;
901
902 pa_assert(u);
903
904 if (!u->server_fqdn || !u->user_name || !u->device_description)
905 return;
906
907 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
908
909 #ifdef TUNNEL_SINK
910 pa_sink_set_description(u->sink, d);
911 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
912 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
913 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
914 #else
915 pa_source_set_description(u->source, d);
916 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
917 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
918 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
919 #endif
920
921 pa_xfree(d);
922
923 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
924 pa_get_user_name(un, sizeof(un)),
925 pa_get_host_name(hn, sizeof(hn)));
926
927 t = pa_tagstruct_new(NULL, 0);
928 #ifdef TUNNEL_SINK
929 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
930 #else
931 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
932 #endif
933 pa_tagstruct_putu32(t, u->ctag++);
934 pa_tagstruct_putu32(t, u->channel);
935 pa_tagstruct_puts(t, d);
936 pa_pstream_send_tagstruct(u->pstream, t);
937
938 pa_xfree(d);
939 }
940
941 /* Called from main context */
942 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
943 struct userdata *u = userdata;
944 pa_sample_spec ss;
945 pa_channel_map cm;
946 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
947 uint32_t cookie;
948
949 pa_assert(pd);
950 pa_assert(u);
951
952 if (command != PA_COMMAND_REPLY) {
953 if (command == PA_COMMAND_ERROR)
954 pa_log("Failed to get info.");
955 else
956 pa_log("Protocol error.");
957 goto fail;
958 }
959
960 if (pa_tagstruct_gets(t, &server_name) < 0 ||
961 pa_tagstruct_gets(t, &server_version) < 0 ||
962 pa_tagstruct_gets(t, &user_name) < 0 ||
963 pa_tagstruct_gets(t, &host_name) < 0 ||
964 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
965 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
966 pa_tagstruct_gets(t, &default_source_name) < 0 ||
967 pa_tagstruct_getu32(t, &cookie) < 0 ||
968 (u->version >= 15 &&
969 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
970
971 pa_log("Parse failure");
972 goto fail;
973 }
974
975 if (!pa_tagstruct_eof(t)) {
976 pa_log("Packet too long");
977 goto fail;
978 }
979
980 pa_xfree(u->server_fqdn);
981 u->server_fqdn = pa_xstrdup(host_name);
982
983 pa_xfree(u->user_name);
984 u->user_name = pa_xstrdup(user_name);
985
986 update_description(u);
987
988 return;
989
990 fail:
991 pa_module_unload_request(u->module, TRUE);
992 }
993
994 #ifdef TUNNEL_SINK
995
996 /* Called from main context */
997 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
998 struct userdata *u = userdata;
999 uint32_t idx, owner_module, monitor_source, flags;
1000 const char *name, *description, *monitor_source_name, *driver;
1001 pa_sample_spec ss;
1002 pa_channel_map cm;
1003 pa_cvolume volume;
1004 pa_bool_t mute;
1005 pa_usec_t latency;
1006 pa_proplist *pl;
1007
1008 pa_assert(pd);
1009 pa_assert(u);
1010
1011 pl = pa_proplist_new();
1012
1013 if (command != PA_COMMAND_REPLY) {
1014 if (command == PA_COMMAND_ERROR)
1015 pa_log("Failed to get info.");
1016 else
1017 pa_log("Protocol error.");
1018 goto fail;
1019 }
1020
1021 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1022 pa_tagstruct_gets(t, &name) < 0 ||
1023 pa_tagstruct_gets(t, &description) < 0 ||
1024 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1025 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1026 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1027 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1028 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1029 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1030 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1031 pa_tagstruct_get_usec(t, &latency) < 0 ||
1032 pa_tagstruct_gets(t, &driver) < 0 ||
1033 pa_tagstruct_getu32(t, &flags) < 0) {
1034
1035 pa_log("Parse failure");
1036 goto fail;
1037 }
1038
1039 if (u->version >= 13) {
1040 pa_usec_t configured_latency;
1041
1042 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1043 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1044
1045 pa_log("Parse failure");
1046 goto fail;
1047 }
1048 }
1049
1050 if (u->version >= 15) {
1051 pa_volume_t base_volume;
1052 uint32_t state, n_volume_steps, card;
1053
1054 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1055 pa_tagstruct_getu32(t, &state) < 0 ||
1056 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1057 pa_tagstruct_getu32(t, &card) < 0) {
1058
1059 pa_log("Parse failure");
1060 goto fail;
1061 }
1062 }
1063
1064 if (!pa_tagstruct_eof(t)) {
1065 pa_log("Packet too long");
1066 goto fail;
1067 }
1068
1069 pa_proplist_free(pl);
1070
1071 if (!u->sink_name || strcmp(name, u->sink_name))
1072 return;
1073
1074 pa_xfree(u->device_description);
1075 u->device_description = pa_xstrdup(description);
1076
1077 update_description(u);
1078
1079 return;
1080
1081 fail:
1082 pa_module_unload_request(u->module, TRUE);
1083 pa_proplist_free(pl);
1084 }
1085
1086 /* Called from main context */
1087 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1088 struct userdata *u = userdata;
1089 uint32_t idx, owner_module, client, sink;
1090 pa_usec_t buffer_usec, sink_usec;
1091 const char *name, *driver, *resample_method;
1092 pa_bool_t mute;
1093 pa_sample_spec sample_spec;
1094 pa_channel_map channel_map;
1095 pa_cvolume volume;
1096 pa_proplist *pl;
1097
1098 pa_assert(pd);
1099 pa_assert(u);
1100
1101 pl = pa_proplist_new();
1102
1103 if (command != PA_COMMAND_REPLY) {
1104 if (command == PA_COMMAND_ERROR)
1105 pa_log("Failed to get info.");
1106 else
1107 pa_log("Protocol error.");
1108 goto fail;
1109 }
1110
1111 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1112 pa_tagstruct_gets(t, &name) < 0 ||
1113 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1114 pa_tagstruct_getu32(t, &client) < 0 ||
1115 pa_tagstruct_getu32(t, &sink) < 0 ||
1116 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1117 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1118 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1119 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1120 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1121 pa_tagstruct_gets(t, &resample_method) < 0 ||
1122 pa_tagstruct_gets(t, &driver) < 0) {
1123
1124 pa_log("Parse failure");
1125 goto fail;
1126 }
1127
1128 if (u->version >= 11) {
1129 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1130
1131 pa_log("Parse failure");
1132 goto fail;
1133 }
1134 }
1135
1136 if (u->version >= 13) {
1137 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1138
1139 pa_log("Parse failure");
1140 goto fail;
1141 }
1142 }
1143
1144 if (!pa_tagstruct_eof(t)) {
1145 pa_log("Packet too long");
1146 goto fail;
1147 }
1148
1149 pa_proplist_free(pl);
1150
1151 if (idx != u->device_index)
1152 return;
1153
1154 pa_assert(u->sink);
1155
1156 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1157 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1158 return;
1159
1160 pa_sink_volume_changed(u->sink, &volume);
1161
1162 if (u->version >= 11)
1163 pa_sink_mute_changed(u->sink, mute);
1164
1165 return;
1166
1167 fail:
1168 pa_module_unload_request(u->module, TRUE);
1169 pa_proplist_free(pl);
1170 }
1171
1172 #else
1173
1174 /* Called from main context */
1175 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1176 struct userdata *u = userdata;
1177 uint32_t idx, owner_module, monitor_of_sink, flags;
1178 const char *name, *description, *monitor_of_sink_name, *driver;
1179 pa_sample_spec ss;
1180 pa_channel_map cm;
1181 pa_cvolume volume;
1182 pa_bool_t mute;
1183 pa_usec_t latency, configured_latency;
1184 pa_proplist *pl;
1185
1186 pa_assert(pd);
1187 pa_assert(u);
1188
1189 pl = pa_proplist_new();
1190
1191 if (command != PA_COMMAND_REPLY) {
1192 if (command == PA_COMMAND_ERROR)
1193 pa_log("Failed to get info.");
1194 else
1195 pa_log("Protocol error.");
1196 goto fail;
1197 }
1198
1199 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1200 pa_tagstruct_gets(t, &name) < 0 ||
1201 pa_tagstruct_gets(t, &description) < 0 ||
1202 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1203 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1204 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1205 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1206 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1207 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1208 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1209 pa_tagstruct_get_usec(t, &latency) < 0 ||
1210 pa_tagstruct_gets(t, &driver) < 0 ||
1211 pa_tagstruct_getu32(t, &flags) < 0) {
1212
1213 pa_log("Parse failure");
1214 goto fail;
1215 }
1216
1217 if (u->version >= 13) {
1218 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1219 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1220
1221 pa_log("Parse failure");
1222 goto fail;
1223 }
1224 }
1225
1226 if (u->version >= 15) {
1227 pa_volume_t base_volume;
1228 uint32_t state, n_volume_steps, card;
1229
1230 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1231 pa_tagstruct_getu32(t, &state) < 0 ||
1232 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1233 pa_tagstruct_getu32(t, &card) < 0) {
1234
1235 pa_log("Parse failure");
1236 goto fail;
1237 }
1238 }
1239
1240 if (!pa_tagstruct_eof(t)) {
1241 pa_log("Packet too long");
1242 goto fail;
1243 }
1244
1245 pa_proplist_free(pl);
1246
1247 if (!u->source_name || strcmp(name, u->source_name))
1248 return;
1249
1250 pa_xfree(u->device_description);
1251 u->device_description = pa_xstrdup(description);
1252
1253 update_description(u);
1254
1255 return;
1256
1257 fail:
1258 pa_module_unload_request(u->module, TRUE);
1259 pa_proplist_free(pl);
1260 }
1261
1262 #endif
1263
1264 /* Called from main context */
1265 static void request_info(struct userdata *u) {
1266 pa_tagstruct *t;
1267 uint32_t tag;
1268 pa_assert(u);
1269
1270 t = pa_tagstruct_new(NULL, 0);
1271 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1272 pa_tagstruct_putu32(t, tag = u->ctag++);
1273 pa_pstream_send_tagstruct(u->pstream, t);
1274 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1275
1276 #ifdef TUNNEL_SINK
1277 t = pa_tagstruct_new(NULL, 0);
1278 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1279 pa_tagstruct_putu32(t, tag = u->ctag++);
1280 pa_tagstruct_putu32(t, u->device_index);
1281 pa_pstream_send_tagstruct(u->pstream, t);
1282 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1283
1284 if (u->sink_name) {
1285 t = pa_tagstruct_new(NULL, 0);
1286 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1287 pa_tagstruct_putu32(t, tag = u->ctag++);
1288 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1289 pa_tagstruct_puts(t, u->sink_name);
1290 pa_pstream_send_tagstruct(u->pstream, t);
1291 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1292 }
1293 #else
1294 if (u->source_name) {
1295 t = pa_tagstruct_new(NULL, 0);
1296 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1297 pa_tagstruct_putu32(t, tag = u->ctag++);
1298 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1299 pa_tagstruct_puts(t, u->source_name);
1300 pa_pstream_send_tagstruct(u->pstream, t);
1301 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1302 }
1303 #endif
1304 }
1305
1306 /* Called from main context */
1307 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1308 struct userdata *u = userdata;
1309 pa_subscription_event_type_t e;
1310 uint32_t idx;
1311
1312 pa_assert(pd);
1313 pa_assert(t);
1314 pa_assert(u);
1315 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1316
1317 if (pa_tagstruct_getu32(t, &e) < 0 ||
1318 pa_tagstruct_getu32(t, &idx) < 0) {
1319 pa_log("Invalid protocol reply");
1320 pa_module_unload_request(u->module, TRUE);
1321 return;
1322 }
1323
1324 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1325 #ifdef TUNNEL_SINK
1326 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1327 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1328 #else
1329 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1330 #endif
1331 )
1332 return;
1333
1334 request_info(u);
1335 }
1336
1337 /* Called from main context */
1338 static void start_subscribe(struct userdata *u) {
1339 pa_tagstruct *t;
1340 uint32_t tag;
1341 pa_assert(u);
1342
1343 t = pa_tagstruct_new(NULL, 0);
1344 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1345 pa_tagstruct_putu32(t, tag = u->ctag++);
1346 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1347 #ifdef TUNNEL_SINK
1348 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1349 #else
1350 PA_SUBSCRIPTION_MASK_SOURCE
1351 #endif
1352 );
1353
1354 pa_pstream_send_tagstruct(u->pstream, t);
1355 }
1356
1357 /* Called from main context */
1358 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1359 struct userdata *u = userdata;
1360 struct timeval ntv;
1361 #ifdef TUNNEL_SINK
1362 uint32_t bytes;
1363 #endif
1364
1365 pa_assert(pd);
1366 pa_assert(u);
1367 pa_assert(u->pdispatch == pd);
1368
1369 if (command != PA_COMMAND_REPLY) {
1370 if (command == PA_COMMAND_ERROR)
1371 pa_log("Failed to create stream.");
1372 else
1373 pa_log("Protocol error.");
1374 goto fail;
1375 }
1376
1377 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1378 pa_tagstruct_getu32(t, &u->device_index) < 0
1379 #ifdef TUNNEL_SINK
1380 || pa_tagstruct_getu32(t, &bytes) < 0
1381 #endif
1382 )
1383 goto parse_error;
1384
1385 if (u->version >= 9) {
1386 #ifdef TUNNEL_SINK
1387 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1388 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1389 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1390 pa_tagstruct_getu32(t, &u->minreq) < 0)
1391 goto parse_error;
1392 #else
1393 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1394 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1395 goto parse_error;
1396 #endif
1397 }
1398
1399 if (u->version >= 12) {
1400 pa_sample_spec ss;
1401 pa_channel_map cm;
1402 uint32_t device_index;
1403 const char *dn;
1404 pa_bool_t suspended;
1405
1406 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1407 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1408 pa_tagstruct_getu32(t, &device_index) < 0 ||
1409 pa_tagstruct_gets(t, &dn) < 0 ||
1410 pa_tagstruct_get_boolean(t, &suspended) < 0)
1411 goto parse_error;
1412
1413 #ifdef TUNNEL_SINK
1414 pa_xfree(u->sink_name);
1415 u->sink_name = pa_xstrdup(dn);
1416 #else
1417 pa_xfree(u->source_name);
1418 u->source_name = pa_xstrdup(dn);
1419 #endif
1420 }
1421
1422 if (u->version >= 13) {
1423 pa_usec_t usec;
1424
1425 if (pa_tagstruct_get_usec(t, &usec) < 0)
1426 goto parse_error;
1427
1428 /* #ifdef TUNNEL_SINK */
1429 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1430 /* #else */
1431 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1432 /* #endif */
1433 }
1434
1435 if (!pa_tagstruct_eof(t))
1436 goto parse_error;
1437
1438 start_subscribe(u);
1439 request_info(u);
1440
1441 pa_assert(!u->time_event);
1442 pa_gettimeofday(&ntv);
1443 ntv.tv_sec += LATENCY_INTERVAL;
1444 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1445
1446 request_latency(u);
1447
1448 pa_log_debug("Stream created.");
1449
1450 #ifdef TUNNEL_SINK
1451 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1452 #endif
1453
1454 return;
1455
1456 parse_error:
1457 pa_log("Invalid reply. (Create stream)");
1458
1459 fail:
1460 pa_module_unload_request(u->module, TRUE);
1461
1462 }
1463
1464 /* Called from main context */
1465 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1466 struct userdata *u = userdata;
1467 pa_tagstruct *reply;
1468 char name[256], un[128], hn[128];
1469 #ifdef TUNNEL_SINK
1470 pa_cvolume volume;
1471 #endif
1472
1473 pa_assert(pd);
1474 pa_assert(u);
1475 pa_assert(u->pdispatch == pd);
1476
1477 if (command != PA_COMMAND_REPLY ||
1478 pa_tagstruct_getu32(t, &u->version) < 0 ||
1479 !pa_tagstruct_eof(t)) {
1480
1481 if (command == PA_COMMAND_ERROR)
1482 pa_log("Failed to authenticate");
1483 else
1484 pa_log("Protocol error.");
1485
1486 goto fail;
1487 }
1488
1489 /* Minimum supported protocol version */
1490 if (u->version < 8) {
1491 pa_log("Incompatible protocol version");
1492 goto fail;
1493 }
1494
1495 /* Starting with protocol version 13 the MSB of the version tag
1496 reflects if shm is enabled for this connection or not. We don't
1497 support SHM here at all, so we just ignore this. */
1498
1499 if (u->version >= 13)
1500 u->version &= 0x7FFFFFFFU;
1501
1502 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1503
1504 #ifdef TUNNEL_SINK
1505 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1506 pa_sink_update_proplist(u->sink, 0, NULL);
1507
1508 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1509 u->sink_name,
1510 pa_get_user_name(un, sizeof(un)),
1511 pa_get_host_name(hn, sizeof(hn)));
1512 #else
1513 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1514 pa_source_update_proplist(u->source, 0, NULL);
1515
1516 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1517 u->source_name,
1518 pa_get_user_name(un, sizeof(un)),
1519 pa_get_host_name(hn, sizeof(hn)));
1520 #endif
1521
1522 reply = pa_tagstruct_new(NULL, 0);
1523 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1524 pa_tagstruct_putu32(reply, tag = u->ctag++);
1525
1526 if (u->version >= 13) {
1527 pa_proplist *pl;
1528 pl = pa_proplist_new();
1529 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1530 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1531 pa_init_proplist(pl);
1532 pa_tagstruct_put_proplist(reply, pl);
1533 pa_proplist_free(pl);
1534 } else
1535 pa_tagstruct_puts(reply, "PulseAudio");
1536
1537 pa_pstream_send_tagstruct(u->pstream, reply);
1538 /* We ignore the server's reply here */
1539
1540 reply = pa_tagstruct_new(NULL, 0);
1541
1542 if (u->version < 13)
1543 /* Only for older PA versions we need to fill in the maxlength */
1544 u->maxlength = 4*1024*1024;
1545
1546 #ifdef TUNNEL_SINK
1547 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1548 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1549 u->prebuf = u->tlength;
1550 #else
1551 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1552 #endif
1553
1554 #ifdef TUNNEL_SINK
1555 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1556 pa_tagstruct_putu32(reply, tag = u->ctag++);
1557
1558 if (u->version < 13)
1559 pa_tagstruct_puts(reply, name);
1560
1561 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1562 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1563 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1564 pa_tagstruct_puts(reply, u->sink_name);
1565 pa_tagstruct_putu32(reply, u->maxlength);
1566 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1567 pa_tagstruct_putu32(reply, u->tlength);
1568 pa_tagstruct_putu32(reply, u->prebuf);
1569 pa_tagstruct_putu32(reply, u->minreq);
1570 pa_tagstruct_putu32(reply, 0);
1571 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1572 pa_tagstruct_put_cvolume(reply, &volume);
1573 #else
1574 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1575 pa_tagstruct_putu32(reply, tag = u->ctag++);
1576
1577 if (u->version < 13)
1578 pa_tagstruct_puts(reply, name);
1579
1580 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1581 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1582 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1583 pa_tagstruct_puts(reply, u->source_name);
1584 pa_tagstruct_putu32(reply, u->maxlength);
1585 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1586 pa_tagstruct_putu32(reply, u->fragsize);
1587 #endif
1588
1589 if (u->version >= 12) {
1590 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1591 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1592 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1593 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1594 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1595 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1596 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1597 }
1598
1599 if (u->version >= 13) {
1600 pa_proplist *pl;
1601
1602 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1603 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1604
1605 pl = pa_proplist_new();
1606 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1607 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1608 pa_tagstruct_put_proplist(reply, pl);
1609 pa_proplist_free(pl);
1610
1611 #ifndef TUNNEL_SINK
1612 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1613 #endif
1614 }
1615
1616 if (u->version >= 14) {
1617 #ifdef TUNNEL_SINK
1618 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1619 #endif
1620 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1621 }
1622
1623 if (u->version >= 15) {
1624 #ifdef TUNNEL_SINK
1625 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1626 #endif
1627 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1628 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1629 }
1630
1631 pa_pstream_send_tagstruct(u->pstream, reply);
1632 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1633
1634 pa_log_debug("Connection authenticated, creating stream ...");
1635
1636 return;
1637
1638 fail:
1639 pa_module_unload_request(u->module, TRUE);
1640 }
1641
1642 /* Called from main context */
1643 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1644 struct userdata *u = userdata;
1645
1646 pa_assert(p);
1647 pa_assert(u);
1648
1649 pa_log_warn("Stream died.");
1650 pa_module_unload_request(u->module, TRUE);
1651 }
1652
1653 /* Called from main context */
1654 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1655 struct userdata *u = userdata;
1656
1657 pa_assert(p);
1658 pa_assert(packet);
1659 pa_assert(u);
1660
1661 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1662 pa_log("Invalid packet");
1663 pa_module_unload_request(u->module, TRUE);
1664 return;
1665 }
1666 }
1667
1668 #ifndef TUNNEL_SINK
1669 /* Called from main context */
1670 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1671 struct userdata *u = userdata;
1672
1673 pa_assert(p);
1674 pa_assert(chunk);
1675 pa_assert(u);
1676
1677 if (channel != u->channel) {
1678 pa_log("Recieved memory block on bad channel.");
1679 pa_module_unload_request(u->module, TRUE);
1680 return;
1681 }
1682
1683 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1684
1685 u->counter_delta += (int64_t) chunk->length;
1686 }
1687 #endif
1688
1689 /* Called from main context */
1690 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1691 struct userdata *u = userdata;
1692 pa_tagstruct *t;
1693 uint32_t tag;
1694
1695 pa_assert(sc);
1696 pa_assert(u);
1697 pa_assert(u->client == sc);
1698
1699 pa_socket_client_unref(u->client);
1700 u->client = NULL;
1701
1702 if (!io) {
1703 pa_log("Connection failed: %s", pa_cstrerror(errno));
1704 pa_module_unload_request(u->module, TRUE);
1705 return;
1706 }
1707
1708 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1709 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1710
1711 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1712 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1713 #ifndef TUNNEL_SINK
1714 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1715 #endif
1716
1717 t = pa_tagstruct_new(NULL, 0);
1718 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1719 pa_tagstruct_putu32(t, tag = u->ctag++);
1720 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1721
1722 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1723
1724 #ifdef HAVE_CREDS
1725 {
1726 pa_creds ucred;
1727
1728 if (pa_iochannel_creds_supported(io))
1729 pa_iochannel_creds_enable(io);
1730
1731 ucred.uid = getuid();
1732 ucred.gid = getgid();
1733
1734 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1735 }
1736 #else
1737 pa_pstream_send_tagstruct(u->pstream, t);
1738 #endif
1739
1740 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1741
1742 pa_log_debug("Connection established, authenticating ...");
1743 }
1744
1745 #ifdef TUNNEL_SINK
1746
1747 /* Called from main context */
1748 static void sink_set_volume(pa_sink *sink) {
1749 struct userdata *u;
1750 pa_tagstruct *t;
1751 uint32_t tag;
1752
1753 pa_assert(sink);
1754 u = sink->userdata;
1755 pa_assert(u);
1756
1757 t = pa_tagstruct_new(NULL, 0);
1758 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1759 pa_tagstruct_putu32(t, tag = u->ctag++);
1760 pa_tagstruct_putu32(t, u->device_index);
1761 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1762 pa_pstream_send_tagstruct(u->pstream, t);
1763 }
1764
1765 /* Called from main context */
1766 static void sink_set_mute(pa_sink *sink) {
1767 struct userdata *u;
1768 pa_tagstruct *t;
1769 uint32_t tag;
1770
1771 pa_assert(sink);
1772 u = sink->userdata;
1773 pa_assert(u);
1774
1775 if (u->version < 11)
1776 return;
1777
1778 t = pa_tagstruct_new(NULL, 0);
1779 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1780 pa_tagstruct_putu32(t, tag = u->ctag++);
1781 pa_tagstruct_putu32(t, u->device_index);
1782 pa_tagstruct_put_boolean(t, !!sink->muted);
1783 pa_pstream_send_tagstruct(u->pstream, t);
1784 }
1785
1786 #endif
1787
1788 int pa__init(pa_module*m) {
1789 pa_modargs *ma = NULL;
1790 struct userdata *u = NULL;
1791 pa_sample_spec ss;
1792 pa_channel_map map;
1793 char *dn = NULL;
1794 #ifdef TUNNEL_SINK
1795 pa_sink_new_data data;
1796 #else
1797 pa_source_new_data data;
1798 #endif
1799
1800 pa_assert(m);
1801
1802 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1803 pa_log("Failed to parse module arguments");
1804 goto fail;
1805 }
1806
1807 m->userdata = u = pa_xnew0(struct userdata, 1);
1808 u->core = m->core;
1809 u->module = m;
1810 u->client = NULL;
1811 u->pdispatch = NULL;
1812 u->pstream = NULL;
1813 u->server_name = NULL;
1814 #ifdef TUNNEL_SINK
1815 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1816 u->sink = NULL;
1817 u->requested_bytes = 0;
1818 #else
1819 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1820 u->source = NULL;
1821 #endif
1822 u->smoother = pa_smoother_new(
1823 PA_USEC_PER_SEC,
1824 PA_USEC_PER_SEC*2,
1825 TRUE,
1826 TRUE,
1827 10,
1828 pa_rtclock_usec(),
1829 FALSE);
1830 u->ctag = 1;
1831 u->device_index = u->channel = PA_INVALID_INDEX;
1832 u->time_event = NULL;
1833 u->ignore_latency_before = 0;
1834 u->transport_usec = u->thread_transport_usec = 0;
1835 u->remote_suspended = u->remote_corked = FALSE;
1836 u->counter = u->counter_delta = 0;
1837
1838 u->rtpoll = pa_rtpoll_new();
1839 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1840
1841 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1842 goto fail;
1843
1844 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1845 pa_log("No server specified.");
1846 goto fail;
1847 }
1848
1849 ss = m->core->default_sample_spec;
1850 map = m->core->default_channel_map;
1851 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1852 pa_log("Invalid sample format specification");
1853 goto fail;
1854 }
1855
1856 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1857 pa_log("Failed to connect to server '%s'", u->server_name);
1858 goto fail;
1859 }
1860
1861 pa_socket_client_set_callback(u->client, on_connection, u);
1862
1863 #ifdef TUNNEL_SINK
1864
1865 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1866 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1867
1868 pa_sink_new_data_init(&data);
1869 data.driver = __FILE__;
1870 data.module = m;
1871 data.namereg_fail = TRUE;
1872 pa_sink_new_data_set_name(&data, dn);
1873 pa_sink_new_data_set_sample_spec(&data, &ss);
1874 pa_sink_new_data_set_channel_map(&data, &map);
1875 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1876 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1877 if (u->sink_name)
1878 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1879
1880 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1881 pa_log("Invalid properties");
1882 pa_sink_new_data_done(&data);
1883 goto fail;
1884 }
1885
1886 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1887 pa_sink_new_data_done(&data);
1888
1889 if (!u->sink) {
1890 pa_log("Failed to create sink.");
1891 goto fail;
1892 }
1893
1894 u->sink->parent.process_msg = sink_process_msg;
1895 u->sink->userdata = u;
1896 u->sink->set_state = sink_set_state;
1897 u->sink->set_volume = sink_set_volume;
1898 u->sink->set_mute = sink_set_mute;
1899
1900 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1901
1902 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1903
1904 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1905 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1906
1907 #else
1908
1909 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1910 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1911
1912 pa_source_new_data_init(&data);
1913 data.driver = __FILE__;
1914 data.module = m;
1915 data.namereg_fail = TRUE;
1916 pa_source_new_data_set_name(&data, dn);
1917 pa_source_new_data_set_sample_spec(&data, &ss);
1918 pa_source_new_data_set_channel_map(&data, &map);
1919 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1920 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1921 if (u->source_name)
1922 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1923
1924 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1925 pa_log("Invalid properties");
1926 pa_source_new_data_done(&data);
1927 goto fail;
1928 }
1929
1930 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1931 pa_source_new_data_done(&data);
1932
1933 if (!u->source) {
1934 pa_log("Failed to create source.");
1935 goto fail;
1936 }
1937
1938 u->source->parent.process_msg = source_process_msg;
1939 u->source->set_state = source_set_state;
1940 u->source->userdata = u;
1941
1942 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1943
1944 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1945 pa_source_set_rtpoll(u->source, u->rtpoll);
1946 #endif
1947
1948 pa_xfree(dn);
1949
1950 u->time_event = NULL;
1951
1952 u->maxlength = (uint32_t) -1;
1953 #ifdef TUNNEL_SINK
1954 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1955 #else
1956 u->fragsize = (uint32_t) -1;
1957 #endif
1958
1959 if (!(u->thread = pa_thread_new(thread_func, u))) {
1960 pa_log("Failed to create thread.");
1961 goto fail;
1962 }
1963
1964 #ifdef TUNNEL_SINK
1965 pa_sink_put(u->sink);
1966 #else
1967 pa_source_put(u->source);
1968 #endif
1969
1970 pa_modargs_free(ma);
1971
1972 return 0;
1973
1974 fail:
1975 pa__done(m);
1976
1977 if (ma)
1978 pa_modargs_free(ma);
1979
1980 pa_xfree(dn);
1981
1982 return -1;
1983 }
1984
1985 void pa__done(pa_module*m) {
1986 struct userdata* u;
1987
1988 pa_assert(m);
1989
1990 if (!(u = m->userdata))
1991 return;
1992
1993 #ifdef TUNNEL_SINK
1994 if (u->sink)
1995 pa_sink_unlink(u->sink);
1996 #else
1997 if (u->source)
1998 pa_source_unlink(u->source);
1999 #endif
2000
2001 if (u->thread) {
2002 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2003 pa_thread_free(u->thread);
2004 }
2005
2006 pa_thread_mq_done(&u->thread_mq);
2007
2008 #ifdef TUNNEL_SINK
2009 if (u->sink)
2010 pa_sink_unref(u->sink);
2011 #else
2012 if (u->source)
2013 pa_source_unref(u->source);
2014 #endif
2015
2016 if (u->rtpoll)
2017 pa_rtpoll_free(u->rtpoll);
2018
2019 if (u->pstream) {
2020 pa_pstream_unlink(u->pstream);
2021 pa_pstream_unref(u->pstream);
2022 }
2023
2024 if (u->pdispatch)
2025 pa_pdispatch_unref(u->pdispatch);
2026
2027 if (u->client)
2028 pa_socket_client_unref(u->client);
2029
2030 if (u->auth_cookie)
2031 pa_auth_cookie_unref(u->auth_cookie);
2032
2033 if (u->smoother)
2034 pa_smoother_free(u->smoother);
2035
2036 if (u->time_event)
2037 u->core->mainloop->time_free(u->time_event);
2038
2039 #ifdef TUNNEL_SINK
2040 pa_xfree(u->sink_name);
2041 #else
2042 pa_xfree(u->source_name);
2043 #endif
2044 pa_xfree(u->server_name);
2045
2046 pa_xfree(u->device_description);
2047 pa_xfree(u->server_fqdn);
2048 pa_xfree(u->user_name);
2049
2050 pa_xfree(u);
2051 }