]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
Merge commit 'elmarco/bluetooth-fixes'
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "server=<address> "
68 "sink=<remote sink name> "
69 "cookie=<filename> "
70 "format=<sample format> "
71 "channels=<number of channels> "
72 "rate=<sample rate> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
75 #else
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
77 PA_MODULE_USAGE(
78 "server=<address> "
79 "source=<remote source name> "
80 "cookie=<filename> "
81 "format=<sample format> "
82 "channels=<number of channels> "
83 "rate=<sample rate> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
86 #endif
87
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION);
90 PA_MODULE_LOAD_ONCE(FALSE);
91
92 static const char* const valid_modargs[] = {
93 "server",
94 "cookie",
95 "format",
96 "channels",
97 "rate",
98 #ifdef TUNNEL_SINK
99 "sink_name",
100 "sink",
101 #else
102 "source_name",
103 "source",
104 #endif
105 "channel_map",
106 NULL,
107 };
108
109 #define DEFAULT_TIMEOUT 5
110
111 #define LATENCY_INTERVAL 10
112
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
114
115 #ifdef TUNNEL_SINK
116
117 enum {
118 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
119 SINK_MESSAGE_REMOTE_SUSPEND,
120 SINK_MESSAGE_UPDATE_LATENCY,
121 SINK_MESSAGE_POST
122 };
123
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
126
127 #else
128
129 enum {
130 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
131 SOURCE_MESSAGE_REMOTE_SUSPEND,
132 SOURCE_MESSAGE_UPDATE_LATENCY
133 };
134
135 #define DEFAULT_FRAGSIZE_MSEC 25
136
137 #endif
138
139 #ifdef TUNNEL_SINK
140 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
141 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 #endif
143 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149
150 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
151 #ifdef TUNNEL_SINK
152 [PA_COMMAND_REQUEST] = command_request,
153 [PA_COMMAND_STARTED] = command_started,
154 #endif
155 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
156 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
157 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
158 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
159 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
160 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
161 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
162 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
163 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
164 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
165 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
166 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event
167 };
168
169 struct userdata {
170 pa_core *core;
171 pa_module *module;
172
173 pa_thread_mq thread_mq;
174 pa_rtpoll *rtpoll;
175 pa_thread *thread;
176
177 pa_socket_client *client;
178 pa_pstream *pstream;
179 pa_pdispatch *pdispatch;
180
181 char *server_name;
182 #ifdef TUNNEL_SINK
183 char *sink_name;
184 pa_sink *sink;
185 size_t requested_bytes;
186 #else
187 char *source_name;
188 pa_source *source;
189 #endif
190
191 pa_auth_cookie *auth_cookie;
192
193 uint32_t version;
194 uint32_t ctag;
195 uint32_t device_index;
196 uint32_t channel;
197
198 int64_t counter, counter_delta;
199
200 pa_bool_t remote_corked:1;
201 pa_bool_t remote_suspended:1;
202
203 pa_usec_t transport_usec; /* maintained in the main thread */
204 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
205
206 uint32_t ignore_latency_before;
207
208 pa_time_event *time_event;
209
210 pa_smoother *smoother;
211
212 char *device_description;
213 char *server_fqdn;
214 char *user_name;
215
216 uint32_t maxlength;
217 #ifdef TUNNEL_SINK
218 uint32_t tlength;
219 uint32_t minreq;
220 uint32_t prebuf;
221 #else
222 uint32_t fragsize;
223 #endif
224 };
225
226 static void request_latency(struct userdata *u);
227
228 /* Called from main context */
229 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
230 pa_log_debug("Got stream or client event.");
231 }
232
233 /* Called from main context */
234 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
235 struct userdata *u = userdata;
236
237 pa_assert(pd);
238 pa_assert(t);
239 pa_assert(u);
240 pa_assert(u->pdispatch == pd);
241
242 pa_log_warn("Stream killed");
243 pa_module_unload_request(u->module, TRUE);
244 }
245
246 /* Called from main context */
247 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
248 struct userdata *u = userdata;
249
250 pa_assert(pd);
251 pa_assert(t);
252 pa_assert(u);
253 pa_assert(u->pdispatch == pd);
254
255 pa_log_info("Server signalled buffer overrun/underrun.");
256 request_latency(u);
257 }
258
259 /* Called from main context */
260 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
261 struct userdata *u = userdata;
262 uint32_t channel;
263 pa_bool_t suspended;
264
265 pa_assert(pd);
266 pa_assert(t);
267 pa_assert(u);
268 pa_assert(u->pdispatch == pd);
269
270 if (pa_tagstruct_getu32(t, &channel) < 0 ||
271 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
272 !pa_tagstruct_eof(t)) {
273
274 pa_log("Invalid packet.");
275 pa_module_unload_request(u->module, TRUE);
276 return;
277 }
278
279 pa_log_debug("Server reports device suspend.");
280
281 #ifdef TUNNEL_SINK
282 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
283 #else
284 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
285 #endif
286
287 request_latency(u);
288 }
289
290 /* Called from main context */
291 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
292 struct userdata *u = userdata;
293 uint32_t channel, di;
294 const char *dn;
295 pa_bool_t suspended;
296
297 pa_assert(pd);
298 pa_assert(t);
299 pa_assert(u);
300 pa_assert(u->pdispatch == pd);
301
302 if (pa_tagstruct_getu32(t, &channel) < 0 ||
303 pa_tagstruct_getu32(t, &di) < 0 ||
304 pa_tagstruct_gets(t, &dn) < 0 ||
305 pa_tagstruct_get_boolean(t, &suspended) < 0) {
306
307 pa_log_error("Invalid packet.");
308 pa_module_unload_request(u->module, TRUE);
309 return;
310 }
311
312 pa_log_debug("Server reports a stream move.");
313
314 #ifdef TUNNEL_SINK
315 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
316 #else
317 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
318 #endif
319
320 request_latency(u);
321 }
322
323 #ifdef TUNNEL_SINK
324
325 /* Called from main context */
326 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
327 struct userdata *u = userdata;
328
329 pa_assert(pd);
330 pa_assert(t);
331 pa_assert(u);
332 pa_assert(u->pdispatch == pd);
333
334 pa_log_debug("Server reports playback started.");
335 request_latency(u);
336 }
337
338 #endif
339
340 /* Called from IO thread context */
341 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
342 pa_usec_t x;
343
344 pa_assert(u);
345
346 x = pa_rtclock_usec();
347
348 /* Correct by the time the requested issued needs to travel to the
349 * other side. This is a valid thread-safe access, because the
350 * main thread is waiting for us */
351
352 if (past)
353 x -= u->thread_transport_usec;
354 else
355 x += u->thread_transport_usec;
356
357 if (u->remote_suspended || u->remote_corked)
358 pa_smoother_pause(u->smoother, x);
359 else
360 pa_smoother_resume(u->smoother, x);
361 }
362
363 /* Called from IO thread context */
364 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
365 pa_assert(u);
366
367 if (u->remote_corked == cork)
368 return;
369
370 u->remote_corked = cork;
371 check_smoother_status(u, FALSE);
372 }
373
374 /* Called from main context */
375 static void stream_cork(struct userdata *u, pa_bool_t cork) {
376 pa_tagstruct *t;
377 pa_assert(u);
378
379 if (!u->pstream)
380 return;
381
382 t = pa_tagstruct_new(NULL, 0);
383 #ifdef TUNNEL_SINK
384 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
385 #else
386 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
387 #endif
388 pa_tagstruct_putu32(t, u->ctag++);
389 pa_tagstruct_putu32(t, u->channel);
390 pa_tagstruct_put_boolean(t, !!cork);
391 pa_pstream_send_tagstruct(u->pstream, t);
392
393 request_latency(u);
394 }
395
396 /* Called from IO thread context */
397 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
398 pa_assert(u);
399
400 if (u->remote_suspended == suspend)
401 return;
402
403 u->remote_suspended = suspend;
404 check_smoother_status(u, TRUE);
405 }
406
407 #ifdef TUNNEL_SINK
408
409 /* Called from IO thread context */
410 static void send_data(struct userdata *u) {
411 pa_assert(u);
412
413 while (u->requested_bytes > 0) {
414 pa_memchunk memchunk;
415
416 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
417 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
418 pa_memblock_unref(memchunk.memblock);
419
420 u->requested_bytes -= memchunk.length;
421
422 u->counter += (int64_t) memchunk.length;
423 }
424 }
425
426 /* This function is called from IO context -- except when it is not. */
427 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
428 struct userdata *u = PA_SINK(o)->userdata;
429
430 switch (code) {
431
432 case PA_SINK_MESSAGE_SET_STATE: {
433 int r;
434
435 /* First, change the state, because otherwide pa_sink_render() would fail */
436 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
437
438 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
439
440 if (PA_SINK_IS_OPENED(u->sink->state))
441 send_data(u);
442 }
443
444 return r;
445 }
446
447 case PA_SINK_MESSAGE_GET_LATENCY: {
448 pa_usec_t yl, yr, *usec = data;
449
450 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
451 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
452
453 *usec = yl > yr ? yl - yr : 0;
454 return 0;
455 }
456
457 case SINK_MESSAGE_REQUEST:
458
459 pa_assert(offset > 0);
460 u->requested_bytes += (size_t) offset;
461
462 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
463 send_data(u);
464
465 return 0;
466
467
468 case SINK_MESSAGE_REMOTE_SUSPEND:
469
470 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
471 return 0;
472
473
474 case SINK_MESSAGE_UPDATE_LATENCY: {
475 pa_usec_t y;
476
477 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
478
479 if (y > (pa_usec_t) offset)
480 y -= (pa_usec_t) offset;
481 else
482 y = 0;
483
484 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
485
486 /* We can access this freely here, since the main thread is waiting for us */
487 u->thread_transport_usec = u->transport_usec;
488
489 return 0;
490 }
491
492 case SINK_MESSAGE_POST:
493
494 /* OK, This might be a bit confusing. This message is
495 * delivered to us from the main context -- NOT from the
496 * IO thread context where the rest of the messages are
497 * dispatched. Yeah, ugly, but I am a lazy bastard. */
498
499 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
500
501 u->counter_delta += (int64_t) chunk->length;
502
503 return 0;
504 }
505
506 return pa_sink_process_msg(o, code, data, offset, chunk);
507 }
508
509 /* Called from main context */
510 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
511 struct userdata *u;
512 pa_sink_assert_ref(s);
513 u = s->userdata;
514
515 switch ((pa_sink_state_t) state) {
516
517 case PA_SINK_SUSPENDED:
518 pa_assert(PA_SINK_IS_OPENED(s->state));
519 stream_cork(u, TRUE);
520 break;
521
522 case PA_SINK_IDLE:
523 case PA_SINK_RUNNING:
524 if (s->state == PA_SINK_SUSPENDED)
525 stream_cork(u, FALSE);
526 break;
527
528 case PA_SINK_UNLINKED:
529 case PA_SINK_INIT:
530 case PA_SINK_INVALID_STATE:
531 ;
532 }
533
534 return 0;
535 }
536
537 #else
538
539 /* This function is called from IO context -- except when it is not. */
540 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
541 struct userdata *u = PA_SOURCE(o)->userdata;
542
543 switch (code) {
544
545 case PA_SOURCE_MESSAGE_SET_STATE: {
546 int r;
547
548 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
549 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
550
551 return r;
552 }
553
554 case PA_SOURCE_MESSAGE_GET_LATENCY: {
555 pa_usec_t yr, yl, *usec = data;
556
557 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
558 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
559
560 *usec = yr > yl ? yr - yl : 0;
561 return 0;
562 }
563
564 case SOURCE_MESSAGE_POST:
565
566 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
567 pa_source_post(u->source, chunk);
568
569 u->counter += (int64_t) chunk->length;
570
571 return 0;
572
573 case SOURCE_MESSAGE_REMOTE_SUSPEND:
574
575 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
576 return 0;
577
578 case SOURCE_MESSAGE_UPDATE_LATENCY: {
579 pa_usec_t y;
580
581 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
582 y += (pa_usec_t) offset;
583
584 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
585
586 /* We can access this freely here, since the main thread is waiting for us */
587 u->thread_transport_usec = u->transport_usec;
588
589 return 0;
590 }
591 }
592
593 return pa_source_process_msg(o, code, data, offset, chunk);
594 }
595
596 /* Called from main context */
597 static int source_set_state(pa_source *s, pa_source_state_t state) {
598 struct userdata *u;
599 pa_source_assert_ref(s);
600 u = s->userdata;
601
602 switch ((pa_source_state_t) state) {
603
604 case PA_SOURCE_SUSPENDED:
605 pa_assert(PA_SOURCE_IS_OPENED(s->state));
606 stream_cork(u, TRUE);
607 break;
608
609 case PA_SOURCE_IDLE:
610 case PA_SOURCE_RUNNING:
611 if (s->state == PA_SOURCE_SUSPENDED)
612 stream_cork(u, FALSE);
613 break;
614
615 case PA_SOURCE_UNLINKED:
616 case PA_SOURCE_INIT:
617 case PA_SINK_INVALID_STATE:
618 ;
619 }
620
621 return 0;
622 }
623
624 #endif
625
626 static void thread_func(void *userdata) {
627 struct userdata *u = userdata;
628
629 pa_assert(u);
630
631 pa_log_debug("Thread starting up");
632
633 pa_thread_mq_install(&u->thread_mq);
634 pa_rtpoll_install(u->rtpoll);
635
636 for (;;) {
637 int ret;
638
639 #ifdef TUNNEL_SINK
640 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
641 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
642 pa_sink_process_rewind(u->sink, 0);
643 #endif
644
645 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
646 goto fail;
647
648 if (ret == 0)
649 goto finish;
650 }
651
652 fail:
653 /* If this was no regular exit from the loop we have to continue
654 * processing messages until we received PA_MESSAGE_SHUTDOWN */
655 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
656 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
657
658 finish:
659 pa_log_debug("Thread shutting down");
660 }
661
662 #ifdef TUNNEL_SINK
663 /* Called from main context */
664 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
665 struct userdata *u = userdata;
666 uint32_t bytes, channel;
667
668 pa_assert(pd);
669 pa_assert(command == PA_COMMAND_REQUEST);
670 pa_assert(t);
671 pa_assert(u);
672 pa_assert(u->pdispatch == pd);
673
674 if (pa_tagstruct_getu32(t, &channel) < 0 ||
675 pa_tagstruct_getu32(t, &bytes) < 0) {
676 pa_log("Invalid protocol reply");
677 goto fail;
678 }
679
680 if (channel != u->channel) {
681 pa_log("Recieved data for invalid channel");
682 goto fail;
683 }
684
685 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
686 return;
687
688 fail:
689 pa_module_unload_request(u->module, TRUE);
690 }
691
692 #endif
693
694 /* Called from main context */
695 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
696 struct userdata *u = userdata;
697 pa_usec_t sink_usec, source_usec;
698 pa_bool_t playing;
699 int64_t write_index, read_index;
700 struct timeval local, remote, now;
701 pa_sample_spec *ss;
702 int64_t delay;
703
704 pa_assert(pd);
705 pa_assert(u);
706
707 if (command != PA_COMMAND_REPLY) {
708 if (command == PA_COMMAND_ERROR)
709 pa_log("Failed to get latency.");
710 else
711 pa_log("Protocol error.");
712 goto fail;
713 }
714
715 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
716 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
717 pa_tagstruct_get_boolean(t, &playing) < 0 ||
718 pa_tagstruct_get_timeval(t, &local) < 0 ||
719 pa_tagstruct_get_timeval(t, &remote) < 0 ||
720 pa_tagstruct_gets64(t, &write_index) < 0 ||
721 pa_tagstruct_gets64(t, &read_index) < 0) {
722 pa_log("Invalid reply.");
723 goto fail;
724 }
725
726 #ifdef TUNNEL_SINK
727 if (u->version >= 13) {
728 uint64_t underrun_for = 0, playing_for = 0;
729
730 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
731 pa_tagstruct_getu64(t, &playing_for) < 0) {
732 pa_log("Invalid reply.");
733 goto fail;
734 }
735 }
736 #endif
737
738 if (!pa_tagstruct_eof(t)) {
739 pa_log("Invalid reply.");
740 goto fail;
741 }
742
743 if (tag < u->ignore_latency_before) {
744 return;
745 }
746
747 pa_gettimeofday(&now);
748
749 /* Calculate transport usec */
750 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
751 /* local and remote seem to have synchronized clocks */
752 #ifdef TUNNEL_SINK
753 u->transport_usec = pa_timeval_diff(&remote, &local);
754 #else
755 u->transport_usec = pa_timeval_diff(&now, &remote);
756 #endif
757 } else
758 u->transport_usec = pa_timeval_diff(&now, &local)/2;
759
760 /* First, take the device's delay */
761 #ifdef TUNNEL_SINK
762 delay = (int64_t) sink_usec;
763 ss = &u->sink->sample_spec;
764 #else
765 delay = (int64_t) source_usec;
766 ss = &u->source->sample_spec;
767 #endif
768
769 /* Add the length of our server-side buffer */
770 if (write_index >= read_index)
771 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
772 else
773 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
774
775 /* Our measurements are already out of date, hence correct by the *
776 * transport latency */
777 #ifdef TUNNEL_SINK
778 delay -= (int64_t) u->transport_usec;
779 #else
780 delay += (int64_t) u->transport_usec;
781 #endif
782
783 /* Now correct by what we have have read/written since we requested the update */
784 #ifdef TUNNEL_SINK
785 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
786 #else
787 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
788 #endif
789
790 #ifdef TUNNEL_SINK
791 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
792 #else
793 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
794 #endif
795
796 return;
797
798 fail:
799
800 pa_module_unload_request(u->module, TRUE);
801 }
802
803 /* Called from main context */
804 static void request_latency(struct userdata *u) {
805 pa_tagstruct *t;
806 struct timeval now;
807 uint32_t tag;
808 pa_assert(u);
809
810 t = pa_tagstruct_new(NULL, 0);
811 #ifdef TUNNEL_SINK
812 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
813 #else
814 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
815 #endif
816 pa_tagstruct_putu32(t, tag = u->ctag++);
817 pa_tagstruct_putu32(t, u->channel);
818
819 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
820
821 pa_pstream_send_tagstruct(u->pstream, t);
822 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
823
824 u->ignore_latency_before = tag;
825 u->counter_delta = 0;
826 }
827
828 /* Called from main context */
829 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
830 struct userdata *u = userdata;
831 struct timeval ntv;
832
833 pa_assert(m);
834 pa_assert(e);
835 pa_assert(u);
836
837 request_latency(u);
838
839 pa_gettimeofday(&ntv);
840 ntv.tv_sec += LATENCY_INTERVAL;
841 m->time_restart(e, &ntv);
842 }
843
844 /* Called from main context */
845 static void update_description(struct userdata *u) {
846 char *d;
847 char un[128], hn[128];
848 pa_tagstruct *t;
849
850 pa_assert(u);
851
852 if (!u->server_fqdn || !u->user_name || !u->device_description)
853 return;
854
855 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
856
857 #ifdef TUNNEL_SINK
858 pa_sink_set_description(u->sink, d);
859 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
860 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
861 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
862 #else
863 pa_source_set_description(u->source, d);
864 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
865 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
866 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
867 #endif
868
869 pa_xfree(d);
870
871 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
872 pa_get_user_name(un, sizeof(un)),
873 pa_get_host_name(hn, sizeof(hn)));
874
875 t = pa_tagstruct_new(NULL, 0);
876 #ifdef TUNNEL_SINK
877 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
878 #else
879 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
880 #endif
881 pa_tagstruct_putu32(t, u->ctag++);
882 pa_tagstruct_putu32(t, u->channel);
883 pa_tagstruct_puts(t, d);
884 pa_pstream_send_tagstruct(u->pstream, t);
885
886 pa_xfree(d);
887 }
888
889 /* Called from main context */
890 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
891 struct userdata *u = userdata;
892 pa_sample_spec ss;
893 pa_channel_map cm;
894 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
895 uint32_t cookie;
896
897 pa_assert(pd);
898 pa_assert(u);
899
900 if (command != PA_COMMAND_REPLY) {
901 if (command == PA_COMMAND_ERROR)
902 pa_log("Failed to get info.");
903 else
904 pa_log("Protocol error.");
905 goto fail;
906 }
907
908 if (pa_tagstruct_gets(t, &server_name) < 0 ||
909 pa_tagstruct_gets(t, &server_version) < 0 ||
910 pa_tagstruct_gets(t, &user_name) < 0 ||
911 pa_tagstruct_gets(t, &host_name) < 0 ||
912 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
913 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
914 pa_tagstruct_gets(t, &default_source_name) < 0 ||
915 pa_tagstruct_getu32(t, &cookie) < 0 ||
916 (u->version >= 15 &&
917 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
918
919 pa_log("Parse failure");
920 goto fail;
921 }
922
923 if (!pa_tagstruct_eof(t)) {
924 pa_log("Packet too long");
925 goto fail;
926 }
927
928 pa_xfree(u->server_fqdn);
929 u->server_fqdn = pa_xstrdup(host_name);
930
931 pa_xfree(u->user_name);
932 u->user_name = pa_xstrdup(user_name);
933
934 update_description(u);
935
936 return;
937
938 fail:
939 pa_module_unload_request(u->module, TRUE);
940 }
941
942 #ifdef TUNNEL_SINK
943
944 /* Called from main context */
945 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
946 struct userdata *u = userdata;
947 uint32_t idx, owner_module, monitor_source, flags;
948 const char *name, *description, *monitor_source_name, *driver;
949 pa_sample_spec ss;
950 pa_channel_map cm;
951 pa_cvolume volume;
952 pa_bool_t mute;
953 pa_usec_t latency;
954 pa_proplist *pl;
955
956 pa_assert(pd);
957 pa_assert(u);
958
959 pl = pa_proplist_new();
960
961 if (command != PA_COMMAND_REPLY) {
962 if (command == PA_COMMAND_ERROR)
963 pa_log("Failed to get info.");
964 else
965 pa_log("Protocol error.");
966 goto fail;
967 }
968
969 if (pa_tagstruct_getu32(t, &idx) < 0 ||
970 pa_tagstruct_gets(t, &name) < 0 ||
971 pa_tagstruct_gets(t, &description) < 0 ||
972 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
973 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
974 pa_tagstruct_getu32(t, &owner_module) < 0 ||
975 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
976 pa_tagstruct_get_boolean(t, &mute) < 0 ||
977 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
978 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
979 pa_tagstruct_get_usec(t, &latency) < 0 ||
980 pa_tagstruct_gets(t, &driver) < 0 ||
981 pa_tagstruct_getu32(t, &flags) < 0) {
982
983 pa_log("Parse failure");
984 goto fail;
985 }
986
987 if (u->version >= 13) {
988 pa_usec_t configured_latency;
989
990 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
991 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
992
993 pa_log("Parse failure");
994 goto fail;
995 }
996 }
997
998 if (u->version >= 15) {
999 pa_volume_t base_volume;
1000 uint32_t state, n_volume_steps, card;
1001
1002 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1003 pa_tagstruct_getu32(t, &state) < 0 ||
1004 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1005 pa_tagstruct_getu32(t, &card) < 0) {
1006
1007 pa_log("Parse failure");
1008 goto fail;
1009 }
1010 }
1011
1012 if (!pa_tagstruct_eof(t)) {
1013 pa_log("Packet too long");
1014 goto fail;
1015 }
1016
1017 pa_proplist_free(pl);
1018
1019 if (!u->sink_name || strcmp(name, u->sink_name))
1020 return;
1021
1022 pa_xfree(u->device_description);
1023 u->device_description = pa_xstrdup(description);
1024
1025 update_description(u);
1026
1027 return;
1028
1029 fail:
1030 pa_module_unload_request(u->module, TRUE);
1031 pa_proplist_free(pl);
1032 }
1033
1034 /* Called from main context */
1035 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1036 struct userdata *u = userdata;
1037 uint32_t idx, owner_module, client, sink;
1038 pa_usec_t buffer_usec, sink_usec;
1039 const char *name, *driver, *resample_method;
1040 pa_bool_t mute;
1041 pa_sample_spec sample_spec;
1042 pa_channel_map channel_map;
1043 pa_cvolume volume;
1044 pa_proplist *pl;
1045
1046 pa_assert(pd);
1047 pa_assert(u);
1048
1049 pl = pa_proplist_new();
1050
1051 if (command != PA_COMMAND_REPLY) {
1052 if (command == PA_COMMAND_ERROR)
1053 pa_log("Failed to get info.");
1054 else
1055 pa_log("Protocol error.");
1056 goto fail;
1057 }
1058
1059 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1060 pa_tagstruct_gets(t, &name) < 0 ||
1061 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1062 pa_tagstruct_getu32(t, &client) < 0 ||
1063 pa_tagstruct_getu32(t, &sink) < 0 ||
1064 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1065 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1066 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1067 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1068 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1069 pa_tagstruct_gets(t, &resample_method) < 0 ||
1070 pa_tagstruct_gets(t, &driver) < 0) {
1071
1072 pa_log("Parse failure");
1073 goto fail;
1074 }
1075
1076 if (u->version >= 11) {
1077 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1078
1079 pa_log("Parse failure");
1080 goto fail;
1081 }
1082 }
1083
1084 if (u->version >= 13) {
1085 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1086
1087 pa_log("Parse failure");
1088 goto fail;
1089 }
1090 }
1091
1092 if (!pa_tagstruct_eof(t)) {
1093 pa_log("Packet too long");
1094 goto fail;
1095 }
1096
1097 pa_proplist_free(pl);
1098
1099 if (idx != u->device_index)
1100 return;
1101
1102 pa_assert(u->sink);
1103
1104 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1105 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1106 return;
1107
1108 pa_sink_volume_changed(u->sink, &volume);
1109
1110 if (u->version >= 11)
1111 pa_sink_mute_changed(u->sink, mute);
1112
1113 return;
1114
1115 fail:
1116 pa_module_unload_request(u->module, TRUE);
1117 pa_proplist_free(pl);
1118 }
1119
1120 #else
1121
1122 /* Called from main context */
1123 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1124 struct userdata *u = userdata;
1125 uint32_t idx, owner_module, monitor_of_sink, flags;
1126 const char *name, *description, *monitor_of_sink_name, *driver;
1127 pa_sample_spec ss;
1128 pa_channel_map cm;
1129 pa_cvolume volume;
1130 pa_bool_t mute;
1131 pa_usec_t latency, configured_latency;
1132 pa_proplist *pl;
1133
1134 pa_assert(pd);
1135 pa_assert(u);
1136
1137 pl = pa_proplist_new();
1138
1139 if (command != PA_COMMAND_REPLY) {
1140 if (command == PA_COMMAND_ERROR)
1141 pa_log("Failed to get info.");
1142 else
1143 pa_log("Protocol error.");
1144 goto fail;
1145 }
1146
1147 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1148 pa_tagstruct_gets(t, &name) < 0 ||
1149 pa_tagstruct_gets(t, &description) < 0 ||
1150 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1151 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1152 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1153 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1154 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1155 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1156 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1157 pa_tagstruct_get_usec(t, &latency) < 0 ||
1158 pa_tagstruct_gets(t, &driver) < 0 ||
1159 pa_tagstruct_getu32(t, &flags) < 0) {
1160
1161 pa_log("Parse failure");
1162 goto fail;
1163 }
1164
1165 if (u->version >= 13) {
1166 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1167 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1168
1169 pa_log("Parse failure");
1170 goto fail;
1171 }
1172 }
1173
1174 if (u->version >= 15) {
1175 pa_volume_t base_volume;
1176 uint32_t state, n_volume_steps, card;
1177
1178 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1179 pa_tagstruct_getu32(t, &state) < 0 ||
1180 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1181 pa_tagstruct_getu32(t, &card) < 0) {
1182
1183 pa_log("Parse failure");
1184 goto fail;
1185 }
1186 }
1187
1188 if (!pa_tagstruct_eof(t)) {
1189 pa_log("Packet too long");
1190 goto fail;
1191 }
1192
1193 pa_proplist_free(pl);
1194
1195 if (!u->source_name || strcmp(name, u->source_name))
1196 return;
1197
1198 pa_xfree(u->device_description);
1199 u->device_description = pa_xstrdup(description);
1200
1201 update_description(u);
1202
1203 return;
1204
1205 fail:
1206 pa_module_unload_request(u->module, TRUE);
1207 pa_proplist_free(pl);
1208 }
1209
1210 #endif
1211
1212 /* Called from main context */
1213 static void request_info(struct userdata *u) {
1214 pa_tagstruct *t;
1215 uint32_t tag;
1216 pa_assert(u);
1217
1218 t = pa_tagstruct_new(NULL, 0);
1219 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1220 pa_tagstruct_putu32(t, tag = u->ctag++);
1221 pa_pstream_send_tagstruct(u->pstream, t);
1222 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1223
1224 #ifdef TUNNEL_SINK
1225 t = pa_tagstruct_new(NULL, 0);
1226 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1227 pa_tagstruct_putu32(t, tag = u->ctag++);
1228 pa_tagstruct_putu32(t, u->device_index);
1229 pa_pstream_send_tagstruct(u->pstream, t);
1230 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1231
1232 if (u->sink_name) {
1233 t = pa_tagstruct_new(NULL, 0);
1234 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1235 pa_tagstruct_putu32(t, tag = u->ctag++);
1236 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1237 pa_tagstruct_puts(t, u->sink_name);
1238 pa_pstream_send_tagstruct(u->pstream, t);
1239 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1240 }
1241 #else
1242 if (u->source_name) {
1243 t = pa_tagstruct_new(NULL, 0);
1244 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1245 pa_tagstruct_putu32(t, tag = u->ctag++);
1246 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1247 pa_tagstruct_puts(t, u->source_name);
1248 pa_pstream_send_tagstruct(u->pstream, t);
1249 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1250 }
1251 #endif
1252 }
1253
1254 /* Called from main context */
1255 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1256 struct userdata *u = userdata;
1257 pa_subscription_event_type_t e;
1258 uint32_t idx;
1259
1260 pa_assert(pd);
1261 pa_assert(t);
1262 pa_assert(u);
1263 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1264
1265 if (pa_tagstruct_getu32(t, &e) < 0 ||
1266 pa_tagstruct_getu32(t, &idx) < 0) {
1267 pa_log("Invalid protocol reply");
1268 pa_module_unload_request(u->module, TRUE);
1269 return;
1270 }
1271
1272 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1273 #ifdef TUNNEL_SINK
1274 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1275 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1276 #else
1277 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1278 #endif
1279 )
1280 return;
1281
1282 request_info(u);
1283 }
1284
1285 /* Called from main context */
1286 static void start_subscribe(struct userdata *u) {
1287 pa_tagstruct *t;
1288 uint32_t tag;
1289 pa_assert(u);
1290
1291 t = pa_tagstruct_new(NULL, 0);
1292 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1293 pa_tagstruct_putu32(t, tag = u->ctag++);
1294 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1295 #ifdef TUNNEL_SINK
1296 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1297 #else
1298 PA_SUBSCRIPTION_MASK_SOURCE
1299 #endif
1300 );
1301
1302 pa_pstream_send_tagstruct(u->pstream, t);
1303 }
1304
1305 /* Called from main context */
1306 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1307 struct userdata *u = userdata;
1308 struct timeval ntv;
1309 #ifdef TUNNEL_SINK
1310 uint32_t bytes;
1311 #endif
1312
1313 pa_assert(pd);
1314 pa_assert(u);
1315 pa_assert(u->pdispatch == pd);
1316
1317 if (command != PA_COMMAND_REPLY) {
1318 if (command == PA_COMMAND_ERROR)
1319 pa_log("Failed to create stream.");
1320 else
1321 pa_log("Protocol error.");
1322 goto fail;
1323 }
1324
1325 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1326 pa_tagstruct_getu32(t, &u->device_index) < 0
1327 #ifdef TUNNEL_SINK
1328 || pa_tagstruct_getu32(t, &bytes) < 0
1329 #endif
1330 )
1331 goto parse_error;
1332
1333 if (u->version >= 9) {
1334 #ifdef TUNNEL_SINK
1335 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1336 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1337 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1338 pa_tagstruct_getu32(t, &u->minreq) < 0)
1339 goto parse_error;
1340 #else
1341 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1342 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1343 goto parse_error;
1344 #endif
1345 }
1346
1347 if (u->version >= 12) {
1348 pa_sample_spec ss;
1349 pa_channel_map cm;
1350 uint32_t device_index;
1351 const char *dn;
1352 pa_bool_t suspended;
1353
1354 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1355 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1356 pa_tagstruct_getu32(t, &device_index) < 0 ||
1357 pa_tagstruct_gets(t, &dn) < 0 ||
1358 pa_tagstruct_get_boolean(t, &suspended) < 0)
1359 goto parse_error;
1360
1361 #ifdef TUNNEL_SINK
1362 pa_xfree(u->sink_name);
1363 u->sink_name = pa_xstrdup(dn);
1364 #else
1365 pa_xfree(u->source_name);
1366 u->source_name = pa_xstrdup(dn);
1367 #endif
1368 }
1369
1370 if (u->version >= 13) {
1371 pa_usec_t usec;
1372
1373 if (pa_tagstruct_get_usec(t, &usec) < 0)
1374 goto parse_error;
1375
1376 /* #ifdef TUNNEL_SINK */
1377 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1378 /* #else */
1379 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1380 /* #endif */
1381 }
1382
1383 if (!pa_tagstruct_eof(t))
1384 goto parse_error;
1385
1386 start_subscribe(u);
1387 request_info(u);
1388
1389 pa_assert(!u->time_event);
1390 pa_gettimeofday(&ntv);
1391 ntv.tv_sec += LATENCY_INTERVAL;
1392 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1393
1394 request_latency(u);
1395
1396 pa_log_debug("Stream created.");
1397
1398 #ifdef TUNNEL_SINK
1399 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1400 #endif
1401
1402 return;
1403
1404 parse_error:
1405 pa_log("Invalid reply. (Create stream)");
1406
1407 fail:
1408 pa_module_unload_request(u->module, TRUE);
1409
1410 }
1411
1412 /* Called from main context */
1413 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1414 struct userdata *u = userdata;
1415 pa_tagstruct *reply;
1416 char name[256], un[128], hn[128];
1417 #ifdef TUNNEL_SINK
1418 pa_cvolume volume;
1419 #endif
1420
1421 pa_assert(pd);
1422 pa_assert(u);
1423 pa_assert(u->pdispatch == pd);
1424
1425 if (command != PA_COMMAND_REPLY ||
1426 pa_tagstruct_getu32(t, &u->version) < 0 ||
1427 !pa_tagstruct_eof(t)) {
1428
1429 if (command == PA_COMMAND_ERROR)
1430 pa_log("Failed to authenticate");
1431 else
1432 pa_log("Protocol error.");
1433
1434 goto fail;
1435 }
1436
1437 /* Minimum supported protocol version */
1438 if (u->version < 8) {
1439 pa_log("Incompatible protocol version");
1440 goto fail;
1441 }
1442
1443 /* Starting with protocol version 13 the MSB of the version tag
1444 reflects if shm is enabled for this connection or not. We don't
1445 support SHM here at all, so we just ignore this. */
1446
1447 if (u->version >= 13)
1448 u->version &= 0x7FFFFFFFU;
1449
1450 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1451
1452 #ifdef TUNNEL_SINK
1453 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1454 pa_sink_update_proplist(u->sink, 0, NULL);
1455
1456 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1457 u->sink_name,
1458 pa_get_user_name(un, sizeof(un)),
1459 pa_get_host_name(hn, sizeof(hn)));
1460 #else
1461 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1462 pa_source_update_proplist(u->source, 0, NULL);
1463
1464 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1465 u->source_name,
1466 pa_get_user_name(un, sizeof(un)),
1467 pa_get_host_name(hn, sizeof(hn)));
1468 #endif
1469
1470 reply = pa_tagstruct_new(NULL, 0);
1471 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1472 pa_tagstruct_putu32(reply, tag = u->ctag++);
1473
1474 if (u->version >= 13) {
1475 pa_proplist *pl;
1476 pl = pa_proplist_new();
1477 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1478 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1479 pa_init_proplist(pl);
1480 pa_tagstruct_put_proplist(reply, pl);
1481 pa_proplist_free(pl);
1482 } else
1483 pa_tagstruct_puts(reply, "PulseAudio");
1484
1485 pa_pstream_send_tagstruct(u->pstream, reply);
1486 /* We ignore the server's reply here */
1487
1488 reply = pa_tagstruct_new(NULL, 0);
1489
1490 if (u->version < 13)
1491 /* Only for older PA versions we need to fill in the maxlength */
1492 u->maxlength = 4*1024*1024;
1493
1494 #ifdef TUNNEL_SINK
1495 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1496 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1497 u->prebuf = u->tlength;
1498 #else
1499 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1500 #endif
1501
1502 #ifdef TUNNEL_SINK
1503 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1504 pa_tagstruct_putu32(reply, tag = u->ctag++);
1505
1506 if (u->version < 13)
1507 pa_tagstruct_puts(reply, name);
1508
1509 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1510 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1511 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1512 pa_tagstruct_puts(reply, u->sink_name);
1513 pa_tagstruct_putu32(reply, u->maxlength);
1514 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1515 pa_tagstruct_putu32(reply, u->tlength);
1516 pa_tagstruct_putu32(reply, u->prebuf);
1517 pa_tagstruct_putu32(reply, u->minreq);
1518 pa_tagstruct_putu32(reply, 0);
1519 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1520 pa_tagstruct_put_cvolume(reply, &volume);
1521 #else
1522 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1523 pa_tagstruct_putu32(reply, tag = u->ctag++);
1524
1525 if (u->version < 13)
1526 pa_tagstruct_puts(reply, name);
1527
1528 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1529 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1530 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1531 pa_tagstruct_puts(reply, u->source_name);
1532 pa_tagstruct_putu32(reply, u->maxlength);
1533 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1534 pa_tagstruct_putu32(reply, u->fragsize);
1535 #endif
1536
1537 if (u->version >= 12) {
1538 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1539 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1540 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1541 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1542 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1543 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1544 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1545 }
1546
1547 if (u->version >= 13) {
1548 pa_proplist *pl;
1549
1550 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1551 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1552
1553 pl = pa_proplist_new();
1554 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1555 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1556 pa_tagstruct_put_proplist(reply, pl);
1557 pa_proplist_free(pl);
1558
1559 #ifndef TUNNEL_SINK
1560 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1561 #endif
1562 }
1563
1564 if (u->version >= 14) {
1565 #ifdef TUNNEL_SINK
1566 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1567 #endif
1568 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1569 }
1570
1571 if (u->version >= 15) {
1572 #ifdef TUNNEL_SINK
1573 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1574 #endif
1575 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1576 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1577 }
1578
1579 pa_pstream_send_tagstruct(u->pstream, reply);
1580 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1581
1582 pa_log_debug("Connection authenticated, creating stream ...");
1583
1584 return;
1585
1586 fail:
1587 pa_module_unload_request(u->module, TRUE);
1588 }
1589
1590 /* Called from main context */
1591 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1592 struct userdata *u = userdata;
1593
1594 pa_assert(p);
1595 pa_assert(u);
1596
1597 pa_log_warn("Stream died.");
1598 pa_module_unload_request(u->module, TRUE);
1599 }
1600
1601 /* Called from main context */
1602 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1603 struct userdata *u = userdata;
1604
1605 pa_assert(p);
1606 pa_assert(packet);
1607 pa_assert(u);
1608
1609 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1610 pa_log("Invalid packet");
1611 pa_module_unload_request(u->module, TRUE);
1612 return;
1613 }
1614 }
1615
1616 #ifndef TUNNEL_SINK
1617 /* Called from main context */
1618 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1619 struct userdata *u = userdata;
1620
1621 pa_assert(p);
1622 pa_assert(chunk);
1623 pa_assert(u);
1624
1625 if (channel != u->channel) {
1626 pa_log("Recieved memory block on bad channel.");
1627 pa_module_unload_request(u->module, TRUE);
1628 return;
1629 }
1630
1631 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1632
1633 u->counter_delta += (int64_t) chunk->length;
1634 }
1635 #endif
1636
1637 /* Called from main context */
1638 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1639 struct userdata *u = userdata;
1640 pa_tagstruct *t;
1641 uint32_t tag;
1642
1643 pa_assert(sc);
1644 pa_assert(u);
1645 pa_assert(u->client == sc);
1646
1647 pa_socket_client_unref(u->client);
1648 u->client = NULL;
1649
1650 if (!io) {
1651 pa_log("Connection failed: %s", pa_cstrerror(errno));
1652 pa_module_unload_request(u->module, TRUE);
1653 return;
1654 }
1655
1656 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1657 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1658
1659 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1660 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1661 #ifndef TUNNEL_SINK
1662 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1663 #endif
1664
1665 t = pa_tagstruct_new(NULL, 0);
1666 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1667 pa_tagstruct_putu32(t, tag = u->ctag++);
1668 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1669
1670 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1671
1672 #ifdef HAVE_CREDS
1673 {
1674 pa_creds ucred;
1675
1676 if (pa_iochannel_creds_supported(io))
1677 pa_iochannel_creds_enable(io);
1678
1679 ucred.uid = getuid();
1680 ucred.gid = getgid();
1681
1682 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1683 }
1684 #else
1685 pa_pstream_send_tagstruct(u->pstream, t);
1686 #endif
1687
1688 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1689
1690 pa_log_debug("Connection established, authenticating ...");
1691 }
1692
1693 #ifdef TUNNEL_SINK
1694
1695 /* Called from main context */
1696 static void sink_set_volume(pa_sink *sink) {
1697 struct userdata *u;
1698 pa_tagstruct *t;
1699 uint32_t tag;
1700
1701 pa_assert(sink);
1702 u = sink->userdata;
1703 pa_assert(u);
1704
1705 t = pa_tagstruct_new(NULL, 0);
1706 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1707 pa_tagstruct_putu32(t, tag = u->ctag++);
1708 pa_tagstruct_putu32(t, u->device_index);
1709 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1710 pa_pstream_send_tagstruct(u->pstream, t);
1711 }
1712
1713 /* Called from main context */
1714 static void sink_set_mute(pa_sink *sink) {
1715 struct userdata *u;
1716 pa_tagstruct *t;
1717 uint32_t tag;
1718
1719 pa_assert(sink);
1720 u = sink->userdata;
1721 pa_assert(u);
1722
1723 if (u->version < 11)
1724 return;
1725
1726 t = pa_tagstruct_new(NULL, 0);
1727 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1728 pa_tagstruct_putu32(t, tag = u->ctag++);
1729 pa_tagstruct_putu32(t, u->device_index);
1730 pa_tagstruct_put_boolean(t, !!sink->muted);
1731 pa_pstream_send_tagstruct(u->pstream, t);
1732 }
1733
1734 #endif
1735
1736 int pa__init(pa_module*m) {
1737 pa_modargs *ma = NULL;
1738 struct userdata *u = NULL;
1739 pa_sample_spec ss;
1740 pa_channel_map map;
1741 char *dn = NULL;
1742 #ifdef TUNNEL_SINK
1743 pa_sink_new_data data;
1744 #else
1745 pa_source_new_data data;
1746 #endif
1747
1748 pa_assert(m);
1749
1750 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1751 pa_log("Failed to parse module arguments");
1752 goto fail;
1753 }
1754
1755 m->userdata = u = pa_xnew0(struct userdata, 1);
1756 u->core = m->core;
1757 u->module = m;
1758 u->client = NULL;
1759 u->pdispatch = NULL;
1760 u->pstream = NULL;
1761 u->server_name = NULL;
1762 #ifdef TUNNEL_SINK
1763 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1764 u->sink = NULL;
1765 u->requested_bytes = 0;
1766 #else
1767 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1768 u->source = NULL;
1769 #endif
1770 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1771 u->ctag = 1;
1772 u->device_index = u->channel = PA_INVALID_INDEX;
1773 u->time_event = NULL;
1774 u->ignore_latency_before = 0;
1775 u->transport_usec = u->thread_transport_usec = 0;
1776 u->remote_suspended = u->remote_corked = FALSE;
1777 u->counter = u->counter_delta = 0;
1778
1779 u->rtpoll = pa_rtpoll_new();
1780 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1781
1782 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1783 goto fail;
1784
1785 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1786 pa_log("No server specified.");
1787 goto fail;
1788 }
1789
1790 ss = m->core->default_sample_spec;
1791 map = m->core->default_channel_map;
1792 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1793 pa_log("Invalid sample format specification");
1794 goto fail;
1795 }
1796
1797 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1798 pa_log("Failed to connect to server '%s'", u->server_name);
1799 goto fail;
1800 }
1801
1802 pa_socket_client_set_callback(u->client, on_connection, u);
1803
1804 #ifdef TUNNEL_SINK
1805
1806 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1807 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1808
1809 pa_sink_new_data_init(&data);
1810 data.driver = __FILE__;
1811 data.module = m;
1812 data.namereg_fail = TRUE;
1813 pa_sink_new_data_set_name(&data, dn);
1814 pa_sink_new_data_set_sample_spec(&data, &ss);
1815 pa_sink_new_data_set_channel_map(&data, &map);
1816 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1817 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1818 if (u->sink_name)
1819 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1820
1821 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1822 pa_sink_new_data_done(&data);
1823
1824 if (!u->sink) {
1825 pa_log("Failed to create sink.");
1826 goto fail;
1827 }
1828
1829 u->sink->parent.process_msg = sink_process_msg;
1830 u->sink->userdata = u;
1831 u->sink->set_state = sink_set_state;
1832 u->sink->set_volume = sink_set_volume;
1833 u->sink->set_mute = sink_set_mute;
1834
1835 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1836
1837 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1838
1839 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1840 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1841
1842 #else
1843
1844 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1845 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1846
1847 pa_source_new_data_init(&data);
1848 data.driver = __FILE__;
1849 data.module = m;
1850 data.namereg_fail = TRUE;
1851 pa_source_new_data_set_name(&data, dn);
1852 pa_source_new_data_set_sample_spec(&data, &ss);
1853 pa_source_new_data_set_channel_map(&data, &map);
1854 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1855 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1856 if (u->source_name)
1857 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1858
1859 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1860 pa_source_new_data_done(&data);
1861
1862 if (!u->source) {
1863 pa_log("Failed to create source.");
1864 goto fail;
1865 }
1866
1867 u->source->parent.process_msg = source_process_msg;
1868 u->source->set_state = source_set_state;
1869 u->source->userdata = u;
1870
1871 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1872
1873 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1874 pa_source_set_rtpoll(u->source, u->rtpoll);
1875 #endif
1876
1877 pa_xfree(dn);
1878
1879 u->time_event = NULL;
1880
1881 u->maxlength = 0;
1882 #ifdef TUNNEL_SINK
1883 u->tlength = u->minreq = u->prebuf = 0;
1884 #else
1885 u->fragsize = 0;
1886 #endif
1887
1888 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1889
1890 if (!(u->thread = pa_thread_new(thread_func, u))) {
1891 pa_log("Failed to create thread.");
1892 goto fail;
1893 }
1894
1895 #ifdef TUNNEL_SINK
1896 pa_sink_put(u->sink);
1897 #else
1898 pa_source_put(u->source);
1899 #endif
1900
1901 pa_modargs_free(ma);
1902
1903 return 0;
1904
1905 fail:
1906 pa__done(m);
1907
1908 if (ma)
1909 pa_modargs_free(ma);
1910
1911 pa_xfree(dn);
1912
1913 return -1;
1914 }
1915
1916 void pa__done(pa_module*m) {
1917 struct userdata* u;
1918
1919 pa_assert(m);
1920
1921 if (!(u = m->userdata))
1922 return;
1923
1924 #ifdef TUNNEL_SINK
1925 if (u->sink)
1926 pa_sink_unlink(u->sink);
1927 #else
1928 if (u->source)
1929 pa_source_unlink(u->source);
1930 #endif
1931
1932 if (u->thread) {
1933 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1934 pa_thread_free(u->thread);
1935 }
1936
1937 pa_thread_mq_done(&u->thread_mq);
1938
1939 #ifdef TUNNEL_SINK
1940 if (u->sink)
1941 pa_sink_unref(u->sink);
1942 #else
1943 if (u->source)
1944 pa_source_unref(u->source);
1945 #endif
1946
1947 if (u->rtpoll)
1948 pa_rtpoll_free(u->rtpoll);
1949
1950 if (u->pstream) {
1951 pa_pstream_unlink(u->pstream);
1952 pa_pstream_unref(u->pstream);
1953 }
1954
1955 if (u->pdispatch)
1956 pa_pdispatch_unref(u->pdispatch);
1957
1958 if (u->client)
1959 pa_socket_client_unref(u->client);
1960
1961 if (u->auth_cookie)
1962 pa_auth_cookie_unref(u->auth_cookie);
1963
1964 if (u->smoother)
1965 pa_smoother_free(u->smoother);
1966
1967 if (u->time_event)
1968 u->core->mainloop->time_free(u->time_event);
1969
1970 #ifdef TUNNEL_SINK
1971 pa_xfree(u->sink_name);
1972 #else
1973 pa_xfree(u->source_name);
1974 #endif
1975 pa_xfree(u->server_name);
1976
1977 pa_xfree(u->device_description);
1978 pa_xfree(u->server_fqdn);
1979 pa_xfree(u->user_name);
1980
1981 pa_xfree(u);
1982 }