]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
core: get rid of rt sig/timer handling since modern Linux' ppooll() is finally fixed...
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL 10
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
179
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
183
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
187
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 #endif
197
198 pa_auth_cookie *auth_cookie;
199
200 uint32_t version;
201 uint32_t ctag;
202 uint32_t device_index;
203 uint32_t channel;
204
205 int64_t counter, counter_delta;
206
207 pa_bool_t remote_corked:1;
208 pa_bool_t remote_suspended:1;
209
210 pa_usec_t transport_usec; /* maintained in the main thread */
211 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
212
213 uint32_t ignore_latency_before;
214
215 pa_time_event *time_event;
216
217 pa_smoother *smoother;
218
219 char *device_description;
220 char *server_fqdn;
221 char *user_name;
222
223 uint32_t maxlength;
224 #ifdef TUNNEL_SINK
225 uint32_t tlength;
226 uint32_t minreq;
227 uint32_t prebuf;
228 #else
229 uint32_t fragsize;
230 #endif
231 };
232
233 static void request_latency(struct userdata *u);
234
235 /* Called from main context */
236 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
237 pa_log_debug("Got stream or client event.");
238 }
239
240 /* Called from main context */
241 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
242 struct userdata *u = userdata;
243
244 pa_assert(pd);
245 pa_assert(t);
246 pa_assert(u);
247 pa_assert(u->pdispatch == pd);
248
249 pa_log_warn("Stream killed");
250 pa_module_unload_request(u->module, TRUE);
251 }
252
253 /* Called from main context */
254 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
255 struct userdata *u = userdata;
256
257 pa_assert(pd);
258 pa_assert(t);
259 pa_assert(u);
260 pa_assert(u->pdispatch == pd);
261
262 pa_log_info("Server signalled buffer overrun/underrun.");
263 request_latency(u);
264 }
265
266 /* Called from main context */
267 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
268 struct userdata *u = userdata;
269 uint32_t channel;
270 pa_bool_t suspended;
271
272 pa_assert(pd);
273 pa_assert(t);
274 pa_assert(u);
275 pa_assert(u->pdispatch == pd);
276
277 if (pa_tagstruct_getu32(t, &channel) < 0 ||
278 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
279 !pa_tagstruct_eof(t)) {
280
281 pa_log("Invalid packet.");
282 pa_module_unload_request(u->module, TRUE);
283 return;
284 }
285
286 pa_log_debug("Server reports device suspend.");
287
288 #ifdef TUNNEL_SINK
289 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
290 #else
291 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
292 #endif
293
294 request_latency(u);
295 }
296
297 /* Called from main context */
298 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
299 struct userdata *u = userdata;
300 uint32_t channel, di;
301 const char *dn;
302 pa_bool_t suspended;
303
304 pa_assert(pd);
305 pa_assert(t);
306 pa_assert(u);
307 pa_assert(u->pdispatch == pd);
308
309 if (pa_tagstruct_getu32(t, &channel) < 0 ||
310 pa_tagstruct_getu32(t, &di) < 0 ||
311 pa_tagstruct_gets(t, &dn) < 0 ||
312 pa_tagstruct_get_boolean(t, &suspended) < 0) {
313
314 pa_log_error("Invalid packet.");
315 pa_module_unload_request(u->module, TRUE);
316 return;
317 }
318
319 pa_log_debug("Server reports a stream move.");
320
321 #ifdef TUNNEL_SINK
322 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
323 #else
324 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
325 #endif
326
327 request_latency(u);
328 }
329
330 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
331 struct userdata *u = userdata;
332 uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
333 pa_usec_t usec;
334
335 pa_assert(pd);
336 pa_assert(t);
337 pa_assert(u);
338 pa_assert(u->pdispatch == pd);
339
340 if (pa_tagstruct_getu32(t, &channel) < 0 ||
341 pa_tagstruct_getu32(t, &maxlength) < 0) {
342
343 pa_log_error("Invalid packet.");
344 pa_module_unload_request(u->module, TRUE);
345 return;
346 }
347
348 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
349 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
350 pa_tagstruct_get_usec(t, &usec) < 0) {
351
352 pa_log_error("Invalid packet.");
353 pa_module_unload_request(u->module, TRUE);
354 return;
355 }
356 } else {
357 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
358 pa_tagstruct_getu32(t, &prebuf) < 0 ||
359 pa_tagstruct_getu32(t, &minreq) < 0 ||
360 pa_tagstruct_get_usec(t, &usec) < 0) {
361
362 pa_log_error("Invalid packet.");
363 pa_module_unload_request(u->module, TRUE);
364 return;
365 }
366 }
367
368 #ifdef TUNNEL_SINK
369 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
370 #endif
371
372 request_latency(u);
373 }
374
375 #ifdef TUNNEL_SINK
376
377 /* Called from main context */
378 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
379 struct userdata *u = userdata;
380
381 pa_assert(pd);
382 pa_assert(t);
383 pa_assert(u);
384 pa_assert(u->pdispatch == pd);
385
386 pa_log_debug("Server reports playback started.");
387 request_latency(u);
388 }
389
390 #endif
391
392 /* Called from IO thread context */
393 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
394 pa_usec_t x;
395
396 pa_assert(u);
397
398 x = pa_rtclock_usec();
399
400 /* Correct by the time the requested issued needs to travel to the
401 * other side. This is a valid thread-safe access, because the
402 * main thread is waiting for us */
403
404 if (past)
405 x -= u->thread_transport_usec;
406 else
407 x += u->thread_transport_usec;
408
409 if (u->remote_suspended || u->remote_corked)
410 pa_smoother_pause(u->smoother, x);
411 else
412 pa_smoother_resume(u->smoother, x, TRUE);
413 }
414
415 /* Called from IO thread context */
416 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
417 pa_assert(u);
418
419 if (u->remote_corked == cork)
420 return;
421
422 u->remote_corked = cork;
423 check_smoother_status(u, FALSE);
424 }
425
426 /* Called from main context */
427 static void stream_cork(struct userdata *u, pa_bool_t cork) {
428 pa_tagstruct *t;
429 pa_assert(u);
430
431 if (!u->pstream)
432 return;
433
434 t = pa_tagstruct_new(NULL, 0);
435 #ifdef TUNNEL_SINK
436 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
437 #else
438 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
439 #endif
440 pa_tagstruct_putu32(t, u->ctag++);
441 pa_tagstruct_putu32(t, u->channel);
442 pa_tagstruct_put_boolean(t, !!cork);
443 pa_pstream_send_tagstruct(u->pstream, t);
444
445 request_latency(u);
446 }
447
448 /* Called from IO thread context */
449 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
450 pa_assert(u);
451
452 if (u->remote_suspended == suspend)
453 return;
454
455 u->remote_suspended = suspend;
456 check_smoother_status(u, TRUE);
457 }
458
459 #ifdef TUNNEL_SINK
460
461 /* Called from IO thread context */
462 static void send_data(struct userdata *u) {
463 pa_assert(u);
464
465 while (u->requested_bytes > 0) {
466 pa_memchunk memchunk;
467
468 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
469 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
470 pa_memblock_unref(memchunk.memblock);
471
472 u->requested_bytes -= memchunk.length;
473
474 u->counter += (int64_t) memchunk.length;
475 }
476 }
477
478 /* This function is called from IO context -- except when it is not. */
479 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
480 struct userdata *u = PA_SINK(o)->userdata;
481
482 switch (code) {
483
484 case PA_SINK_MESSAGE_SET_STATE: {
485 int r;
486
487 /* First, change the state, because otherwide pa_sink_render() would fail */
488 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
489
490 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
491
492 if (PA_SINK_IS_OPENED(u->sink->state))
493 send_data(u);
494 }
495
496 return r;
497 }
498
499 case PA_SINK_MESSAGE_GET_LATENCY: {
500 pa_usec_t yl, yr, *usec = data;
501
502 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
503 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
504
505 *usec = yl > yr ? yl - yr : 0;
506 return 0;
507 }
508
509 case SINK_MESSAGE_REQUEST:
510
511 pa_assert(offset > 0);
512 u->requested_bytes += (size_t) offset;
513
514 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
515 send_data(u);
516
517 return 0;
518
519
520 case SINK_MESSAGE_REMOTE_SUSPEND:
521
522 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
523 return 0;
524
525
526 case SINK_MESSAGE_UPDATE_LATENCY: {
527 pa_usec_t y;
528
529 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
530
531 if (y > (pa_usec_t) offset)
532 y -= (pa_usec_t) offset;
533 else
534 y = 0;
535
536 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
537
538 /* We can access this freely here, since the main thread is waiting for us */
539 u->thread_transport_usec = u->transport_usec;
540
541 return 0;
542 }
543
544 case SINK_MESSAGE_POST:
545
546 /* OK, This might be a bit confusing. This message is
547 * delivered to us from the main context -- NOT from the
548 * IO thread context where the rest of the messages are
549 * dispatched. Yeah, ugly, but I am a lazy bastard. */
550
551 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
552
553 u->counter_delta += (int64_t) chunk->length;
554
555 return 0;
556 }
557
558 return pa_sink_process_msg(o, code, data, offset, chunk);
559 }
560
561 /* Called from main context */
562 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
563 struct userdata *u;
564 pa_sink_assert_ref(s);
565 u = s->userdata;
566
567 switch ((pa_sink_state_t) state) {
568
569 case PA_SINK_SUSPENDED:
570 pa_assert(PA_SINK_IS_OPENED(s->state));
571 stream_cork(u, TRUE);
572 break;
573
574 case PA_SINK_IDLE:
575 case PA_SINK_RUNNING:
576 if (s->state == PA_SINK_SUSPENDED)
577 stream_cork(u, FALSE);
578 break;
579
580 case PA_SINK_UNLINKED:
581 case PA_SINK_INIT:
582 case PA_SINK_INVALID_STATE:
583 ;
584 }
585
586 return 0;
587 }
588
589 #else
590
591 /* This function is called from IO context -- except when it is not. */
592 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
593 struct userdata *u = PA_SOURCE(o)->userdata;
594
595 switch (code) {
596
597 case PA_SOURCE_MESSAGE_SET_STATE: {
598 int r;
599
600 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
601 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
602
603 return r;
604 }
605
606 case PA_SOURCE_MESSAGE_GET_LATENCY: {
607 pa_usec_t yr, yl, *usec = data;
608
609 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
610 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
611
612 *usec = yr > yl ? yr - yl : 0;
613 return 0;
614 }
615
616 case SOURCE_MESSAGE_POST:
617
618 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
619 pa_source_post(u->source, chunk);
620
621 u->counter += (int64_t) chunk->length;
622
623 return 0;
624
625 case SOURCE_MESSAGE_REMOTE_SUSPEND:
626
627 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
628 return 0;
629
630 case SOURCE_MESSAGE_UPDATE_LATENCY: {
631 pa_usec_t y;
632
633 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
634 y += (pa_usec_t) offset;
635
636 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
637
638 /* We can access this freely here, since the main thread is waiting for us */
639 u->thread_transport_usec = u->transport_usec;
640
641 return 0;
642 }
643 }
644
645 return pa_source_process_msg(o, code, data, offset, chunk);
646 }
647
648 /* Called from main context */
649 static int source_set_state(pa_source *s, pa_source_state_t state) {
650 struct userdata *u;
651 pa_source_assert_ref(s);
652 u = s->userdata;
653
654 switch ((pa_source_state_t) state) {
655
656 case PA_SOURCE_SUSPENDED:
657 pa_assert(PA_SOURCE_IS_OPENED(s->state));
658 stream_cork(u, TRUE);
659 break;
660
661 case PA_SOURCE_IDLE:
662 case PA_SOURCE_RUNNING:
663 if (s->state == PA_SOURCE_SUSPENDED)
664 stream_cork(u, FALSE);
665 break;
666
667 case PA_SOURCE_UNLINKED:
668 case PA_SOURCE_INIT:
669 case PA_SINK_INVALID_STATE:
670 ;
671 }
672
673 return 0;
674 }
675
676 #endif
677
678 static void thread_func(void *userdata) {
679 struct userdata *u = userdata;
680
681 pa_assert(u);
682
683 pa_log_debug("Thread starting up");
684
685 pa_thread_mq_install(&u->thread_mq);
686
687 for (;;) {
688 int ret;
689
690 #ifdef TUNNEL_SINK
691 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
692 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
693 pa_sink_process_rewind(u->sink, 0);
694 #endif
695
696 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
697 goto fail;
698
699 if (ret == 0)
700 goto finish;
701 }
702
703 fail:
704 /* If this was no regular exit from the loop we have to continue
705 * processing messages until we received PA_MESSAGE_SHUTDOWN */
706 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
707 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
708
709 finish:
710 pa_log_debug("Thread shutting down");
711 }
712
713 #ifdef TUNNEL_SINK
714 /* Called from main context */
715 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
716 struct userdata *u = userdata;
717 uint32_t bytes, channel;
718
719 pa_assert(pd);
720 pa_assert(command == PA_COMMAND_REQUEST);
721 pa_assert(t);
722 pa_assert(u);
723 pa_assert(u->pdispatch == pd);
724
725 if (pa_tagstruct_getu32(t, &channel) < 0 ||
726 pa_tagstruct_getu32(t, &bytes) < 0) {
727 pa_log("Invalid protocol reply");
728 goto fail;
729 }
730
731 if (channel != u->channel) {
732 pa_log("Received data for invalid channel");
733 goto fail;
734 }
735
736 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
737 return;
738
739 fail:
740 pa_module_unload_request(u->module, TRUE);
741 }
742
743 #endif
744
745 /* Called from main context */
746 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
747 struct userdata *u = userdata;
748 pa_usec_t sink_usec, source_usec;
749 pa_bool_t playing;
750 int64_t write_index, read_index;
751 struct timeval local, remote, now;
752 pa_sample_spec *ss;
753 int64_t delay;
754
755 pa_assert(pd);
756 pa_assert(u);
757
758 if (command != PA_COMMAND_REPLY) {
759 if (command == PA_COMMAND_ERROR)
760 pa_log("Failed to get latency.");
761 else
762 pa_log("Protocol error.");
763 goto fail;
764 }
765
766 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
767 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
768 pa_tagstruct_get_boolean(t, &playing) < 0 ||
769 pa_tagstruct_get_timeval(t, &local) < 0 ||
770 pa_tagstruct_get_timeval(t, &remote) < 0 ||
771 pa_tagstruct_gets64(t, &write_index) < 0 ||
772 pa_tagstruct_gets64(t, &read_index) < 0) {
773 pa_log("Invalid reply.");
774 goto fail;
775 }
776
777 #ifdef TUNNEL_SINK
778 if (u->version >= 13) {
779 uint64_t underrun_for = 0, playing_for = 0;
780
781 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
782 pa_tagstruct_getu64(t, &playing_for) < 0) {
783 pa_log("Invalid reply.");
784 goto fail;
785 }
786 }
787 #endif
788
789 if (!pa_tagstruct_eof(t)) {
790 pa_log("Invalid reply.");
791 goto fail;
792 }
793
794 if (tag < u->ignore_latency_before) {
795 return;
796 }
797
798 pa_gettimeofday(&now);
799
800 /* Calculate transport usec */
801 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
802 /* local and remote seem to have synchronized clocks */
803 #ifdef TUNNEL_SINK
804 u->transport_usec = pa_timeval_diff(&remote, &local);
805 #else
806 u->transport_usec = pa_timeval_diff(&now, &remote);
807 #endif
808 } else
809 u->transport_usec = pa_timeval_diff(&now, &local)/2;
810
811 /* First, take the device's delay */
812 #ifdef TUNNEL_SINK
813 delay = (int64_t) sink_usec;
814 ss = &u->sink->sample_spec;
815 #else
816 delay = (int64_t) source_usec;
817 ss = &u->source->sample_spec;
818 #endif
819
820 /* Add the length of our server-side buffer */
821 if (write_index >= read_index)
822 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
823 else
824 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
825
826 /* Our measurements are already out of date, hence correct by the *
827 * transport latency */
828 #ifdef TUNNEL_SINK
829 delay -= (int64_t) u->transport_usec;
830 #else
831 delay += (int64_t) u->transport_usec;
832 #endif
833
834 /* Now correct by what we have have read/written since we requested the update */
835 #ifdef TUNNEL_SINK
836 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
837 #else
838 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
839 #endif
840
841 #ifdef TUNNEL_SINK
842 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
843 #else
844 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
845 #endif
846
847 return;
848
849 fail:
850
851 pa_module_unload_request(u->module, TRUE);
852 }
853
854 /* Called from main context */
855 static void request_latency(struct userdata *u) {
856 pa_tagstruct *t;
857 struct timeval now;
858 uint32_t tag;
859 pa_assert(u);
860
861 t = pa_tagstruct_new(NULL, 0);
862 #ifdef TUNNEL_SINK
863 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
864 #else
865 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
866 #endif
867 pa_tagstruct_putu32(t, tag = u->ctag++);
868 pa_tagstruct_putu32(t, u->channel);
869
870 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
871
872 pa_pstream_send_tagstruct(u->pstream, t);
873 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
874
875 u->ignore_latency_before = tag;
876 u->counter_delta = 0;
877 }
878
879 /* Called from main context */
880 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
881 struct userdata *u = userdata;
882 struct timeval ntv;
883
884 pa_assert(m);
885 pa_assert(e);
886 pa_assert(u);
887
888 request_latency(u);
889
890 pa_gettimeofday(&ntv);
891 ntv.tv_sec += LATENCY_INTERVAL;
892 m->time_restart(e, &ntv);
893 }
894
895 /* Called from main context */
896 static void update_description(struct userdata *u) {
897 char *d;
898 char un[128], hn[128];
899 pa_tagstruct *t;
900
901 pa_assert(u);
902
903 if (!u->server_fqdn || !u->user_name || !u->device_description)
904 return;
905
906 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
907
908 #ifdef TUNNEL_SINK
909 pa_sink_set_description(u->sink, d);
910 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
911 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
912 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
913 #else
914 pa_source_set_description(u->source, d);
915 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
916 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
917 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
918 #endif
919
920 pa_xfree(d);
921
922 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
923 pa_get_user_name(un, sizeof(un)),
924 pa_get_host_name(hn, sizeof(hn)));
925
926 t = pa_tagstruct_new(NULL, 0);
927 #ifdef TUNNEL_SINK
928 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
929 #else
930 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
931 #endif
932 pa_tagstruct_putu32(t, u->ctag++);
933 pa_tagstruct_putu32(t, u->channel);
934 pa_tagstruct_puts(t, d);
935 pa_pstream_send_tagstruct(u->pstream, t);
936
937 pa_xfree(d);
938 }
939
940 /* Called from main context */
941 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
942 struct userdata *u = userdata;
943 pa_sample_spec ss;
944 pa_channel_map cm;
945 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
946 uint32_t cookie;
947
948 pa_assert(pd);
949 pa_assert(u);
950
951 if (command != PA_COMMAND_REPLY) {
952 if (command == PA_COMMAND_ERROR)
953 pa_log("Failed to get info.");
954 else
955 pa_log("Protocol error.");
956 goto fail;
957 }
958
959 if (pa_tagstruct_gets(t, &server_name) < 0 ||
960 pa_tagstruct_gets(t, &server_version) < 0 ||
961 pa_tagstruct_gets(t, &user_name) < 0 ||
962 pa_tagstruct_gets(t, &host_name) < 0 ||
963 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
964 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
965 pa_tagstruct_gets(t, &default_source_name) < 0 ||
966 pa_tagstruct_getu32(t, &cookie) < 0 ||
967 (u->version >= 15 &&
968 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
969
970 pa_log("Parse failure");
971 goto fail;
972 }
973
974 if (!pa_tagstruct_eof(t)) {
975 pa_log("Packet too long");
976 goto fail;
977 }
978
979 pa_xfree(u->server_fqdn);
980 u->server_fqdn = pa_xstrdup(host_name);
981
982 pa_xfree(u->user_name);
983 u->user_name = pa_xstrdup(user_name);
984
985 update_description(u);
986
987 return;
988
989 fail:
990 pa_module_unload_request(u->module, TRUE);
991 }
992
993 #ifdef TUNNEL_SINK
994
995 /* Called from main context */
996 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
997 struct userdata *u = userdata;
998 uint32_t idx, owner_module, monitor_source, flags;
999 const char *name, *description, *monitor_source_name, *driver;
1000 pa_sample_spec ss;
1001 pa_channel_map cm;
1002 pa_cvolume volume;
1003 pa_bool_t mute;
1004 pa_usec_t latency;
1005 pa_proplist *pl;
1006
1007 pa_assert(pd);
1008 pa_assert(u);
1009
1010 pl = pa_proplist_new();
1011
1012 if (command != PA_COMMAND_REPLY) {
1013 if (command == PA_COMMAND_ERROR)
1014 pa_log("Failed to get info.");
1015 else
1016 pa_log("Protocol error.");
1017 goto fail;
1018 }
1019
1020 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1021 pa_tagstruct_gets(t, &name) < 0 ||
1022 pa_tagstruct_gets(t, &description) < 0 ||
1023 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1024 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1025 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1026 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1027 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1028 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1029 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1030 pa_tagstruct_get_usec(t, &latency) < 0 ||
1031 pa_tagstruct_gets(t, &driver) < 0 ||
1032 pa_tagstruct_getu32(t, &flags) < 0) {
1033
1034 pa_log("Parse failure");
1035 goto fail;
1036 }
1037
1038 if (u->version >= 13) {
1039 pa_usec_t configured_latency;
1040
1041 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1042 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1043
1044 pa_log("Parse failure");
1045 goto fail;
1046 }
1047 }
1048
1049 if (u->version >= 15) {
1050 pa_volume_t base_volume;
1051 uint32_t state, n_volume_steps, card;
1052
1053 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1054 pa_tagstruct_getu32(t, &state) < 0 ||
1055 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1056 pa_tagstruct_getu32(t, &card) < 0) {
1057
1058 pa_log("Parse failure");
1059 goto fail;
1060 }
1061 }
1062
1063 if (!pa_tagstruct_eof(t)) {
1064 pa_log("Packet too long");
1065 goto fail;
1066 }
1067
1068 pa_proplist_free(pl);
1069
1070 if (!u->sink_name || strcmp(name, u->sink_name))
1071 return;
1072
1073 pa_xfree(u->device_description);
1074 u->device_description = pa_xstrdup(description);
1075
1076 update_description(u);
1077
1078 return;
1079
1080 fail:
1081 pa_module_unload_request(u->module, TRUE);
1082 pa_proplist_free(pl);
1083 }
1084
1085 /* Called from main context */
1086 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1087 struct userdata *u = userdata;
1088 uint32_t idx, owner_module, client, sink;
1089 pa_usec_t buffer_usec, sink_usec;
1090 const char *name, *driver, *resample_method;
1091 pa_bool_t mute;
1092 pa_sample_spec sample_spec;
1093 pa_channel_map channel_map;
1094 pa_cvolume volume;
1095 pa_proplist *pl;
1096
1097 pa_assert(pd);
1098 pa_assert(u);
1099
1100 pl = pa_proplist_new();
1101
1102 if (command != PA_COMMAND_REPLY) {
1103 if (command == PA_COMMAND_ERROR)
1104 pa_log("Failed to get info.");
1105 else
1106 pa_log("Protocol error.");
1107 goto fail;
1108 }
1109
1110 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1111 pa_tagstruct_gets(t, &name) < 0 ||
1112 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1113 pa_tagstruct_getu32(t, &client) < 0 ||
1114 pa_tagstruct_getu32(t, &sink) < 0 ||
1115 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1116 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1117 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1118 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1119 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1120 pa_tagstruct_gets(t, &resample_method) < 0 ||
1121 pa_tagstruct_gets(t, &driver) < 0) {
1122
1123 pa_log("Parse failure");
1124 goto fail;
1125 }
1126
1127 if (u->version >= 11) {
1128 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1129
1130 pa_log("Parse failure");
1131 goto fail;
1132 }
1133 }
1134
1135 if (u->version >= 13) {
1136 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1137
1138 pa_log("Parse failure");
1139 goto fail;
1140 }
1141 }
1142
1143 if (!pa_tagstruct_eof(t)) {
1144 pa_log("Packet too long");
1145 goto fail;
1146 }
1147
1148 pa_proplist_free(pl);
1149
1150 if (idx != u->device_index)
1151 return;
1152
1153 pa_assert(u->sink);
1154
1155 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1156 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1157 return;
1158
1159 pa_sink_volume_changed(u->sink, &volume, FALSE);
1160
1161 if (u->version >= 11)
1162 pa_sink_mute_changed(u->sink, mute, FALSE);
1163
1164 return;
1165
1166 fail:
1167 pa_module_unload_request(u->module, TRUE);
1168 pa_proplist_free(pl);
1169 }
1170
1171 #else
1172
1173 /* Called from main context */
1174 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1175 struct userdata *u = userdata;
1176 uint32_t idx, owner_module, monitor_of_sink, flags;
1177 const char *name, *description, *monitor_of_sink_name, *driver;
1178 pa_sample_spec ss;
1179 pa_channel_map cm;
1180 pa_cvolume volume;
1181 pa_bool_t mute;
1182 pa_usec_t latency, configured_latency;
1183 pa_proplist *pl;
1184
1185 pa_assert(pd);
1186 pa_assert(u);
1187
1188 pl = pa_proplist_new();
1189
1190 if (command != PA_COMMAND_REPLY) {
1191 if (command == PA_COMMAND_ERROR)
1192 pa_log("Failed to get info.");
1193 else
1194 pa_log("Protocol error.");
1195 goto fail;
1196 }
1197
1198 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1199 pa_tagstruct_gets(t, &name) < 0 ||
1200 pa_tagstruct_gets(t, &description) < 0 ||
1201 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1202 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1203 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1204 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1205 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1206 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1207 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1208 pa_tagstruct_get_usec(t, &latency) < 0 ||
1209 pa_tagstruct_gets(t, &driver) < 0 ||
1210 pa_tagstruct_getu32(t, &flags) < 0) {
1211
1212 pa_log("Parse failure");
1213 goto fail;
1214 }
1215
1216 if (u->version >= 13) {
1217 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1218 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1219
1220 pa_log("Parse failure");
1221 goto fail;
1222 }
1223 }
1224
1225 if (u->version >= 15) {
1226 pa_volume_t base_volume;
1227 uint32_t state, n_volume_steps, card;
1228
1229 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1230 pa_tagstruct_getu32(t, &state) < 0 ||
1231 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1232 pa_tagstruct_getu32(t, &card) < 0) {
1233
1234 pa_log("Parse failure");
1235 goto fail;
1236 }
1237 }
1238
1239 if (!pa_tagstruct_eof(t)) {
1240 pa_log("Packet too long");
1241 goto fail;
1242 }
1243
1244 pa_proplist_free(pl);
1245
1246 if (!u->source_name || strcmp(name, u->source_name))
1247 return;
1248
1249 pa_xfree(u->device_description);
1250 u->device_description = pa_xstrdup(description);
1251
1252 update_description(u);
1253
1254 return;
1255
1256 fail:
1257 pa_module_unload_request(u->module, TRUE);
1258 pa_proplist_free(pl);
1259 }
1260
1261 #endif
1262
1263 /* Called from main context */
1264 static void request_info(struct userdata *u) {
1265 pa_tagstruct *t;
1266 uint32_t tag;
1267 pa_assert(u);
1268
1269 t = pa_tagstruct_new(NULL, 0);
1270 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1271 pa_tagstruct_putu32(t, tag = u->ctag++);
1272 pa_pstream_send_tagstruct(u->pstream, t);
1273 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1274
1275 #ifdef TUNNEL_SINK
1276 t = pa_tagstruct_new(NULL, 0);
1277 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1278 pa_tagstruct_putu32(t, tag = u->ctag++);
1279 pa_tagstruct_putu32(t, u->device_index);
1280 pa_pstream_send_tagstruct(u->pstream, t);
1281 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1282
1283 if (u->sink_name) {
1284 t = pa_tagstruct_new(NULL, 0);
1285 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1286 pa_tagstruct_putu32(t, tag = u->ctag++);
1287 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1288 pa_tagstruct_puts(t, u->sink_name);
1289 pa_pstream_send_tagstruct(u->pstream, t);
1290 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1291 }
1292 #else
1293 if (u->source_name) {
1294 t = pa_tagstruct_new(NULL, 0);
1295 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1296 pa_tagstruct_putu32(t, tag = u->ctag++);
1297 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1298 pa_tagstruct_puts(t, u->source_name);
1299 pa_pstream_send_tagstruct(u->pstream, t);
1300 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1301 }
1302 #endif
1303 }
1304
1305 /* Called from main context */
1306 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1307 struct userdata *u = userdata;
1308 pa_subscription_event_type_t e;
1309 uint32_t idx;
1310
1311 pa_assert(pd);
1312 pa_assert(t);
1313 pa_assert(u);
1314 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1315
1316 if (pa_tagstruct_getu32(t, &e) < 0 ||
1317 pa_tagstruct_getu32(t, &idx) < 0) {
1318 pa_log("Invalid protocol reply");
1319 pa_module_unload_request(u->module, TRUE);
1320 return;
1321 }
1322
1323 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1324 #ifdef TUNNEL_SINK
1325 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1326 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1327 #else
1328 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1329 #endif
1330 )
1331 return;
1332
1333 request_info(u);
1334 }
1335
1336 /* Called from main context */
1337 static void start_subscribe(struct userdata *u) {
1338 pa_tagstruct *t;
1339 uint32_t tag;
1340 pa_assert(u);
1341
1342 t = pa_tagstruct_new(NULL, 0);
1343 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1344 pa_tagstruct_putu32(t, tag = u->ctag++);
1345 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1346 #ifdef TUNNEL_SINK
1347 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1348 #else
1349 PA_SUBSCRIPTION_MASK_SOURCE
1350 #endif
1351 );
1352
1353 pa_pstream_send_tagstruct(u->pstream, t);
1354 }
1355
1356 /* Called from main context */
1357 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1358 struct userdata *u = userdata;
1359 struct timeval ntv;
1360 #ifdef TUNNEL_SINK
1361 uint32_t bytes;
1362 #endif
1363
1364 pa_assert(pd);
1365 pa_assert(u);
1366 pa_assert(u->pdispatch == pd);
1367
1368 if (command != PA_COMMAND_REPLY) {
1369 if (command == PA_COMMAND_ERROR)
1370 pa_log("Failed to create stream.");
1371 else
1372 pa_log("Protocol error.");
1373 goto fail;
1374 }
1375
1376 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1377 pa_tagstruct_getu32(t, &u->device_index) < 0
1378 #ifdef TUNNEL_SINK
1379 || pa_tagstruct_getu32(t, &bytes) < 0
1380 #endif
1381 )
1382 goto parse_error;
1383
1384 if (u->version >= 9) {
1385 #ifdef TUNNEL_SINK
1386 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1387 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1388 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1389 pa_tagstruct_getu32(t, &u->minreq) < 0)
1390 goto parse_error;
1391 #else
1392 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1393 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1394 goto parse_error;
1395 #endif
1396 }
1397
1398 if (u->version >= 12) {
1399 pa_sample_spec ss;
1400 pa_channel_map cm;
1401 uint32_t device_index;
1402 const char *dn;
1403 pa_bool_t suspended;
1404
1405 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1406 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1407 pa_tagstruct_getu32(t, &device_index) < 0 ||
1408 pa_tagstruct_gets(t, &dn) < 0 ||
1409 pa_tagstruct_get_boolean(t, &suspended) < 0)
1410 goto parse_error;
1411
1412 #ifdef TUNNEL_SINK
1413 pa_xfree(u->sink_name);
1414 u->sink_name = pa_xstrdup(dn);
1415 #else
1416 pa_xfree(u->source_name);
1417 u->source_name = pa_xstrdup(dn);
1418 #endif
1419 }
1420
1421 if (u->version >= 13) {
1422 pa_usec_t usec;
1423
1424 if (pa_tagstruct_get_usec(t, &usec) < 0)
1425 goto parse_error;
1426
1427 /* #ifdef TUNNEL_SINK */
1428 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1429 /* #else */
1430 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1431 /* #endif */
1432 }
1433
1434 if (!pa_tagstruct_eof(t))
1435 goto parse_error;
1436
1437 start_subscribe(u);
1438 request_info(u);
1439
1440 pa_assert(!u->time_event);
1441 pa_gettimeofday(&ntv);
1442 ntv.tv_sec += LATENCY_INTERVAL;
1443 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1444
1445 request_latency(u);
1446
1447 pa_log_debug("Stream created.");
1448
1449 #ifdef TUNNEL_SINK
1450 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1451 #endif
1452
1453 return;
1454
1455 parse_error:
1456 pa_log("Invalid reply. (Create stream)");
1457
1458 fail:
1459 pa_module_unload_request(u->module, TRUE);
1460
1461 }
1462
1463 /* Called from main context */
1464 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1465 struct userdata *u = userdata;
1466 pa_tagstruct *reply;
1467 char name[256], un[128], hn[128];
1468 #ifdef TUNNEL_SINK
1469 pa_cvolume volume;
1470 #endif
1471
1472 pa_assert(pd);
1473 pa_assert(u);
1474 pa_assert(u->pdispatch == pd);
1475
1476 if (command != PA_COMMAND_REPLY ||
1477 pa_tagstruct_getu32(t, &u->version) < 0 ||
1478 !pa_tagstruct_eof(t)) {
1479
1480 if (command == PA_COMMAND_ERROR)
1481 pa_log("Failed to authenticate");
1482 else
1483 pa_log("Protocol error.");
1484
1485 goto fail;
1486 }
1487
1488 /* Minimum supported protocol version */
1489 if (u->version < 8) {
1490 pa_log("Incompatible protocol version");
1491 goto fail;
1492 }
1493
1494 /* Starting with protocol version 13 the MSB of the version tag
1495 reflects if shm is enabled for this connection or not. We don't
1496 support SHM here at all, so we just ignore this. */
1497
1498 if (u->version >= 13)
1499 u->version &= 0x7FFFFFFFU;
1500
1501 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1502
1503 #ifdef TUNNEL_SINK
1504 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1505 pa_sink_update_proplist(u->sink, 0, NULL);
1506
1507 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1508 u->sink_name,
1509 pa_get_user_name(un, sizeof(un)),
1510 pa_get_host_name(hn, sizeof(hn)));
1511 #else
1512 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1513 pa_source_update_proplist(u->source, 0, NULL);
1514
1515 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1516 u->source_name,
1517 pa_get_user_name(un, sizeof(un)),
1518 pa_get_host_name(hn, sizeof(hn)));
1519 #endif
1520
1521 reply = pa_tagstruct_new(NULL, 0);
1522 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1523 pa_tagstruct_putu32(reply, tag = u->ctag++);
1524
1525 if (u->version >= 13) {
1526 pa_proplist *pl;
1527 pl = pa_proplist_new();
1528 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1529 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1530 pa_init_proplist(pl);
1531 pa_tagstruct_put_proplist(reply, pl);
1532 pa_proplist_free(pl);
1533 } else
1534 pa_tagstruct_puts(reply, "PulseAudio");
1535
1536 pa_pstream_send_tagstruct(u->pstream, reply);
1537 /* We ignore the server's reply here */
1538
1539 reply = pa_tagstruct_new(NULL, 0);
1540
1541 if (u->version < 13)
1542 /* Only for older PA versions we need to fill in the maxlength */
1543 u->maxlength = 4*1024*1024;
1544
1545 #ifdef TUNNEL_SINK
1546 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1547 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1548 u->prebuf = u->tlength;
1549 #else
1550 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1551 #endif
1552
1553 #ifdef TUNNEL_SINK
1554 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1555 pa_tagstruct_putu32(reply, tag = u->ctag++);
1556
1557 if (u->version < 13)
1558 pa_tagstruct_puts(reply, name);
1559
1560 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1561 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1562 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1563 pa_tagstruct_puts(reply, u->sink_name);
1564 pa_tagstruct_putu32(reply, u->maxlength);
1565 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1566 pa_tagstruct_putu32(reply, u->tlength);
1567 pa_tagstruct_putu32(reply, u->prebuf);
1568 pa_tagstruct_putu32(reply, u->minreq);
1569 pa_tagstruct_putu32(reply, 0);
1570 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1571 pa_tagstruct_put_cvolume(reply, &volume);
1572 #else
1573 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1574 pa_tagstruct_putu32(reply, tag = u->ctag++);
1575
1576 if (u->version < 13)
1577 pa_tagstruct_puts(reply, name);
1578
1579 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1580 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1581 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1582 pa_tagstruct_puts(reply, u->source_name);
1583 pa_tagstruct_putu32(reply, u->maxlength);
1584 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1585 pa_tagstruct_putu32(reply, u->fragsize);
1586 #endif
1587
1588 if (u->version >= 12) {
1589 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1590 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1591 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1592 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1593 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1594 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1595 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1596 }
1597
1598 if (u->version >= 13) {
1599 pa_proplist *pl;
1600
1601 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1602 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1603
1604 pl = pa_proplist_new();
1605 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1606 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1607 pa_tagstruct_put_proplist(reply, pl);
1608 pa_proplist_free(pl);
1609
1610 #ifndef TUNNEL_SINK
1611 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1612 #endif
1613 }
1614
1615 if (u->version >= 14) {
1616 #ifdef TUNNEL_SINK
1617 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1618 #endif
1619 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1620 }
1621
1622 if (u->version >= 15) {
1623 #ifdef TUNNEL_SINK
1624 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1625 #endif
1626 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1627 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1628 }
1629
1630 pa_pstream_send_tagstruct(u->pstream, reply);
1631 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1632
1633 pa_log_debug("Connection authenticated, creating stream ...");
1634
1635 return;
1636
1637 fail:
1638 pa_module_unload_request(u->module, TRUE);
1639 }
1640
1641 /* Called from main context */
1642 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1643 struct userdata *u = userdata;
1644
1645 pa_assert(p);
1646 pa_assert(u);
1647
1648 pa_log_warn("Stream died.");
1649 pa_module_unload_request(u->module, TRUE);
1650 }
1651
1652 /* Called from main context */
1653 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1654 struct userdata *u = userdata;
1655
1656 pa_assert(p);
1657 pa_assert(packet);
1658 pa_assert(u);
1659
1660 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1661 pa_log("Invalid packet");
1662 pa_module_unload_request(u->module, TRUE);
1663 return;
1664 }
1665 }
1666
1667 #ifndef TUNNEL_SINK
1668 /* Called from main context */
1669 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1670 struct userdata *u = userdata;
1671
1672 pa_assert(p);
1673 pa_assert(chunk);
1674 pa_assert(u);
1675
1676 if (channel != u->channel) {
1677 pa_log("Received memory block on bad channel.");
1678 pa_module_unload_request(u->module, TRUE);
1679 return;
1680 }
1681
1682 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1683
1684 u->counter_delta += (int64_t) chunk->length;
1685 }
1686 #endif
1687
1688 /* Called from main context */
1689 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1690 struct userdata *u = userdata;
1691 pa_tagstruct *t;
1692 uint32_t tag;
1693
1694 pa_assert(sc);
1695 pa_assert(u);
1696 pa_assert(u->client == sc);
1697
1698 pa_socket_client_unref(u->client);
1699 u->client = NULL;
1700
1701 if (!io) {
1702 pa_log("Connection failed: %s", pa_cstrerror(errno));
1703 pa_module_unload_request(u->module, TRUE);
1704 return;
1705 }
1706
1707 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1708 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1709
1710 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1711 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1712 #ifndef TUNNEL_SINK
1713 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1714 #endif
1715
1716 t = pa_tagstruct_new(NULL, 0);
1717 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1718 pa_tagstruct_putu32(t, tag = u->ctag++);
1719 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1720
1721 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1722
1723 #ifdef HAVE_CREDS
1724 {
1725 pa_creds ucred;
1726
1727 if (pa_iochannel_creds_supported(io))
1728 pa_iochannel_creds_enable(io);
1729
1730 ucred.uid = getuid();
1731 ucred.gid = getgid();
1732
1733 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1734 }
1735 #else
1736 pa_pstream_send_tagstruct(u->pstream, t);
1737 #endif
1738
1739 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1740
1741 pa_log_debug("Connection established, authenticating ...");
1742 }
1743
1744 #ifdef TUNNEL_SINK
1745
1746 /* Called from main context */
1747 static void sink_set_volume(pa_sink *sink) {
1748 struct userdata *u;
1749 pa_tagstruct *t;
1750 uint32_t tag;
1751
1752 pa_assert(sink);
1753 u = sink->userdata;
1754 pa_assert(u);
1755
1756 t = pa_tagstruct_new(NULL, 0);
1757 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1758 pa_tagstruct_putu32(t, tag = u->ctag++);
1759 pa_tagstruct_putu32(t, u->device_index);
1760 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1761 pa_pstream_send_tagstruct(u->pstream, t);
1762 }
1763
1764 /* Called from main context */
1765 static void sink_set_mute(pa_sink *sink) {
1766 struct userdata *u;
1767 pa_tagstruct *t;
1768 uint32_t tag;
1769
1770 pa_assert(sink);
1771 u = sink->userdata;
1772 pa_assert(u);
1773
1774 if (u->version < 11)
1775 return;
1776
1777 t = pa_tagstruct_new(NULL, 0);
1778 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1779 pa_tagstruct_putu32(t, tag = u->ctag++);
1780 pa_tagstruct_putu32(t, u->device_index);
1781 pa_tagstruct_put_boolean(t, !!sink->muted);
1782 pa_pstream_send_tagstruct(u->pstream, t);
1783 }
1784
1785 #endif
1786
1787 int pa__init(pa_module*m) {
1788 pa_modargs *ma = NULL;
1789 struct userdata *u = NULL;
1790 pa_sample_spec ss;
1791 pa_channel_map map;
1792 char *dn = NULL;
1793 #ifdef TUNNEL_SINK
1794 pa_sink_new_data data;
1795 #else
1796 pa_source_new_data data;
1797 #endif
1798
1799 pa_assert(m);
1800
1801 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1802 pa_log("Failed to parse module arguments");
1803 goto fail;
1804 }
1805
1806 m->userdata = u = pa_xnew0(struct userdata, 1);
1807 u->core = m->core;
1808 u->module = m;
1809 u->client = NULL;
1810 u->pdispatch = NULL;
1811 u->pstream = NULL;
1812 u->server_name = NULL;
1813 #ifdef TUNNEL_SINK
1814 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1815 u->sink = NULL;
1816 u->requested_bytes = 0;
1817 #else
1818 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1819 u->source = NULL;
1820 #endif
1821 u->smoother = pa_smoother_new(
1822 PA_USEC_PER_SEC,
1823 PA_USEC_PER_SEC*2,
1824 TRUE,
1825 TRUE,
1826 10,
1827 pa_rtclock_usec(),
1828 FALSE);
1829 u->ctag = 1;
1830 u->device_index = u->channel = PA_INVALID_INDEX;
1831 u->time_event = NULL;
1832 u->ignore_latency_before = 0;
1833 u->transport_usec = u->thread_transport_usec = 0;
1834 u->remote_suspended = u->remote_corked = FALSE;
1835 u->counter = u->counter_delta = 0;
1836
1837 u->rtpoll = pa_rtpoll_new();
1838 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1839
1840 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1841 goto fail;
1842
1843 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1844 pa_log("No server specified.");
1845 goto fail;
1846 }
1847
1848 ss = m->core->default_sample_spec;
1849 map = m->core->default_channel_map;
1850 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1851 pa_log("Invalid sample format specification");
1852 goto fail;
1853 }
1854
1855 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1856 pa_log("Failed to connect to server '%s'", u->server_name);
1857 goto fail;
1858 }
1859
1860 pa_socket_client_set_callback(u->client, on_connection, u);
1861
1862 #ifdef TUNNEL_SINK
1863
1864 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1865 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1866
1867 pa_sink_new_data_init(&data);
1868 data.driver = __FILE__;
1869 data.module = m;
1870 data.namereg_fail = TRUE;
1871 pa_sink_new_data_set_name(&data, dn);
1872 pa_sink_new_data_set_sample_spec(&data, &ss);
1873 pa_sink_new_data_set_channel_map(&data, &map);
1874 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1875 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1876 if (u->sink_name)
1877 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1878
1879 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1880 pa_log("Invalid properties");
1881 pa_sink_new_data_done(&data);
1882 goto fail;
1883 }
1884
1885 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1886 pa_sink_new_data_done(&data);
1887
1888 if (!u->sink) {
1889 pa_log("Failed to create sink.");
1890 goto fail;
1891 }
1892
1893 u->sink->parent.process_msg = sink_process_msg;
1894 u->sink->userdata = u;
1895 u->sink->set_state = sink_set_state;
1896 u->sink->set_volume = sink_set_volume;
1897 u->sink->set_mute = sink_set_mute;
1898
1899 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1900
1901 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1902
1903 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1904 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1905
1906 #else
1907
1908 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1909 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1910
1911 pa_source_new_data_init(&data);
1912 data.driver = __FILE__;
1913 data.module = m;
1914 data.namereg_fail = TRUE;
1915 pa_source_new_data_set_name(&data, dn);
1916 pa_source_new_data_set_sample_spec(&data, &ss);
1917 pa_source_new_data_set_channel_map(&data, &map);
1918 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1919 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1920 if (u->source_name)
1921 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1922
1923 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1924 pa_log("Invalid properties");
1925 pa_source_new_data_done(&data);
1926 goto fail;
1927 }
1928
1929 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1930 pa_source_new_data_done(&data);
1931
1932 if (!u->source) {
1933 pa_log("Failed to create source.");
1934 goto fail;
1935 }
1936
1937 u->source->parent.process_msg = source_process_msg;
1938 u->source->set_state = source_set_state;
1939 u->source->userdata = u;
1940
1941 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1942
1943 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1944 pa_source_set_rtpoll(u->source, u->rtpoll);
1945 #endif
1946
1947 pa_xfree(dn);
1948
1949 u->time_event = NULL;
1950
1951 u->maxlength = (uint32_t) -1;
1952 #ifdef TUNNEL_SINK
1953 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1954 #else
1955 u->fragsize = (uint32_t) -1;
1956 #endif
1957
1958 if (!(u->thread = pa_thread_new(thread_func, u))) {
1959 pa_log("Failed to create thread.");
1960 goto fail;
1961 }
1962
1963 #ifdef TUNNEL_SINK
1964 pa_sink_put(u->sink);
1965 #else
1966 pa_source_put(u->source);
1967 #endif
1968
1969 pa_modargs_free(ma);
1970
1971 return 0;
1972
1973 fail:
1974 pa__done(m);
1975
1976 if (ma)
1977 pa_modargs_free(ma);
1978
1979 pa_xfree(dn);
1980
1981 return -1;
1982 }
1983
1984 void pa__done(pa_module*m) {
1985 struct userdata* u;
1986
1987 pa_assert(m);
1988
1989 if (!(u = m->userdata))
1990 return;
1991
1992 #ifdef TUNNEL_SINK
1993 if (u->sink)
1994 pa_sink_unlink(u->sink);
1995 #else
1996 if (u->source)
1997 pa_source_unlink(u->source);
1998 #endif
1999
2000 if (u->thread) {
2001 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2002 pa_thread_free(u->thread);
2003 }
2004
2005 pa_thread_mq_done(&u->thread_mq);
2006
2007 #ifdef TUNNEL_SINK
2008 if (u->sink)
2009 pa_sink_unref(u->sink);
2010 #else
2011 if (u->source)
2012 pa_source_unref(u->source);
2013 #endif
2014
2015 if (u->rtpoll)
2016 pa_rtpoll_free(u->rtpoll);
2017
2018 if (u->pstream) {
2019 pa_pstream_unlink(u->pstream);
2020 pa_pstream_unref(u->pstream);
2021 }
2022
2023 if (u->pdispatch)
2024 pa_pdispatch_unref(u->pdispatch);
2025
2026 if (u->client)
2027 pa_socket_client_unref(u->client);
2028
2029 if (u->auth_cookie)
2030 pa_auth_cookie_unref(u->auth_cookie);
2031
2032 if (u->smoother)
2033 pa_smoother_free(u->smoother);
2034
2035 if (u->time_event)
2036 u->core->mainloop->time_free(u->time_event);
2037
2038 #ifdef TUNNEL_SINK
2039 pa_xfree(u->sink_name);
2040 #else
2041 pa_xfree(u->source_name);
2042 #endif
2043 pa_xfree(u->server_name);
2044
2045 pa_xfree(u->device_description);
2046 pa_xfree(u->server_fqdn);
2047 pa_xfree(u->user_name);
2048
2049 pa_xfree(u);
2050 }