]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
pulse: move pa_rtclock_now in pulsecommon
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58
59 #ifdef TUNNEL_SINK
60 #include "module-tunnel-sink-symdef.h"
61 #else
62 #include "module-tunnel-source-symdef.h"
63 #endif
64
65 #ifdef TUNNEL_SINK
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 PA_MODULE_USAGE(
68 "sink_name=<name for the local sink> "
69 "sink_properties=<properties for the local sink> "
70 "server=<address> "
71 "sink=<remote sink name> "
72 "cookie=<filename> "
73 "format=<sample format> "
74 "channels=<number of channels> "
75 "rate=<sample rate> "
76 "channel_map=<channel map>");
77 #else
78 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 PA_MODULE_USAGE(
80 "source_name=<name for the local source> "
81 "source_properties=<properties for the local source> "
82 "server=<address> "
83 "source=<remote source name> "
84 "cookie=<filename> "
85 "format=<sample format> "
86 "channels=<number of channels> "
87 "rate=<sample rate> "
88 "channel_map=<channel map>");
89 #endif
90
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION);
93 PA_MODULE_LOAD_ONCE(FALSE);
94
95 static const char* const valid_modargs[] = {
96 "server",
97 "cookie",
98 "format",
99 "channels",
100 "rate",
101 #ifdef TUNNEL_SINK
102 "sink_name",
103 "sink_properties",
104 "sink",
105 #else
106 "source_name",
107 "source_properties",
108 "source",
109 #endif
110 "channel_map",
111 NULL,
112 };
113
114 #define DEFAULT_TIMEOUT 5
115
116 #define LATENCY_INTERVAL 10
117
118 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
119
120 #ifdef TUNNEL_SINK
121
122 enum {
123 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
124 SINK_MESSAGE_REMOTE_SUSPEND,
125 SINK_MESSAGE_UPDATE_LATENCY,
126 SINK_MESSAGE_POST
127 };
128
129 #define DEFAULT_TLENGTH_MSEC 150
130 #define DEFAULT_MINREQ_MSEC 25
131
132 #else
133
134 enum {
135 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
136 SOURCE_MESSAGE_REMOTE_SUSPEND,
137 SOURCE_MESSAGE_UPDATE_LATENCY
138 };
139
140 #define DEFAULT_FRAGSIZE_MSEC 25
141
142 #endif
143
144 #ifdef TUNNEL_SINK
145 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 #endif
148 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155
156 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
157 #ifdef TUNNEL_SINK
158 [PA_COMMAND_REQUEST] = command_request,
159 [PA_COMMAND_STARTED] = command_started,
160 #endif
161 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
162 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
164 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
166 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
168 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
170 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
173 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
174 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
175 };
176
177 struct userdata {
178 pa_core *core;
179 pa_module *module;
180
181 pa_thread_mq thread_mq;
182 pa_rtpoll *rtpoll;
183 pa_thread *thread;
184
185 pa_socket_client *client;
186 pa_pstream *pstream;
187 pa_pdispatch *pdispatch;
188
189 char *server_name;
190 #ifdef TUNNEL_SINK
191 char *sink_name;
192 pa_sink *sink;
193 size_t requested_bytes;
194 #else
195 char *source_name;
196 pa_source *source;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwide pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520
521 case SINK_MESSAGE_REMOTE_SUSPEND:
522
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
525
526
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
529
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
536
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
541
542 return 0;
543 }
544
545 case SINK_MESSAGE_POST:
546
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554 u->counter_delta += (int64_t) chunk->length;
555
556 return 0;
557 }
558
559 return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
567
568 switch ((pa_sink_state_t) state) {
569
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
574
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
580
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
584 ;
585 }
586
587 return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
595
596 switch (code) {
597
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
600
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604 return r;
605 }
606
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
609
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
615 }
616
617 case SOURCE_MESSAGE_POST:
618
619 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
620 pa_source_post(u->source, chunk);
621
622 u->counter += (int64_t) chunk->length;
623
624 return 0;
625
626 case SOURCE_MESSAGE_REMOTE_SUSPEND:
627
628 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
629 return 0;
630
631 case SOURCE_MESSAGE_UPDATE_LATENCY: {
632 pa_usec_t y;
633
634 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
635 y += (pa_usec_t) offset;
636
637 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
638
639 /* We can access this freely here, since the main thread is waiting for us */
640 u->thread_transport_usec = u->transport_usec;
641
642 return 0;
643 }
644 }
645
646 return pa_source_process_msg(o, code, data, offset, chunk);
647 }
648
649 /* Called from main context */
650 static int source_set_state(pa_source *s, pa_source_state_t state) {
651 struct userdata *u;
652 pa_source_assert_ref(s);
653 u = s->userdata;
654
655 switch ((pa_source_state_t) state) {
656
657 case PA_SOURCE_SUSPENDED:
658 pa_assert(PA_SOURCE_IS_OPENED(s->state));
659 stream_cork(u, TRUE);
660 break;
661
662 case PA_SOURCE_IDLE:
663 case PA_SOURCE_RUNNING:
664 if (s->state == PA_SOURCE_SUSPENDED)
665 stream_cork(u, FALSE);
666 break;
667
668 case PA_SOURCE_UNLINKED:
669 case PA_SOURCE_INIT:
670 case PA_SINK_INVALID_STATE:
671 ;
672 }
673
674 return 0;
675 }
676
677 #endif
678
679 static void thread_func(void *userdata) {
680 struct userdata *u = userdata;
681
682 pa_assert(u);
683
684 pa_log_debug("Thread starting up");
685
686 pa_thread_mq_install(&u->thread_mq);
687 pa_rtpoll_install(u->rtpoll);
688
689 for (;;) {
690 int ret;
691
692 #ifdef TUNNEL_SINK
693 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
694 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
695 pa_sink_process_rewind(u->sink, 0);
696 #endif
697
698 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
699 goto fail;
700
701 if (ret == 0)
702 goto finish;
703 }
704
705 fail:
706 /* If this was no regular exit from the loop we have to continue
707 * processing messages until we received PA_MESSAGE_SHUTDOWN */
708 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
709 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
710
711 finish:
712 pa_log_debug("Thread shutting down");
713 }
714
715 #ifdef TUNNEL_SINK
716 /* Called from main context */
717 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718 struct userdata *u = userdata;
719 uint32_t bytes, channel;
720
721 pa_assert(pd);
722 pa_assert(command == PA_COMMAND_REQUEST);
723 pa_assert(t);
724 pa_assert(u);
725 pa_assert(u->pdispatch == pd);
726
727 if (pa_tagstruct_getu32(t, &channel) < 0 ||
728 pa_tagstruct_getu32(t, &bytes) < 0) {
729 pa_log("Invalid protocol reply");
730 goto fail;
731 }
732
733 if (channel != u->channel) {
734 pa_log("Received data for invalid channel");
735 goto fail;
736 }
737
738 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
739 return;
740
741 fail:
742 pa_module_unload_request(u->module, TRUE);
743 }
744
745 #endif
746
747 /* Called from main context */
748 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
749 struct userdata *u = userdata;
750 pa_usec_t sink_usec, source_usec;
751 pa_bool_t playing;
752 int64_t write_index, read_index;
753 struct timeval local, remote, now;
754 pa_sample_spec *ss;
755 int64_t delay;
756
757 pa_assert(pd);
758 pa_assert(u);
759
760 if (command != PA_COMMAND_REPLY) {
761 if (command == PA_COMMAND_ERROR)
762 pa_log("Failed to get latency.");
763 else
764 pa_log("Protocol error.");
765 goto fail;
766 }
767
768 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
769 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
770 pa_tagstruct_get_boolean(t, &playing) < 0 ||
771 pa_tagstruct_get_timeval(t, &local) < 0 ||
772 pa_tagstruct_get_timeval(t, &remote) < 0 ||
773 pa_tagstruct_gets64(t, &write_index) < 0 ||
774 pa_tagstruct_gets64(t, &read_index) < 0) {
775 pa_log("Invalid reply.");
776 goto fail;
777 }
778
779 #ifdef TUNNEL_SINK
780 if (u->version >= 13) {
781 uint64_t underrun_for = 0, playing_for = 0;
782
783 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
784 pa_tagstruct_getu64(t, &playing_for) < 0) {
785 pa_log("Invalid reply.");
786 goto fail;
787 }
788 }
789 #endif
790
791 if (!pa_tagstruct_eof(t)) {
792 pa_log("Invalid reply.");
793 goto fail;
794 }
795
796 if (tag < u->ignore_latency_before) {
797 return;
798 }
799
800 pa_gettimeofday(&now);
801
802 /* Calculate transport usec */
803 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
804 /* local and remote seem to have synchronized clocks */
805 #ifdef TUNNEL_SINK
806 u->transport_usec = pa_timeval_diff(&remote, &local);
807 #else
808 u->transport_usec = pa_timeval_diff(&now, &remote);
809 #endif
810 } else
811 u->transport_usec = pa_timeval_diff(&now, &local)/2;
812
813 /* First, take the device's delay */
814 #ifdef TUNNEL_SINK
815 delay = (int64_t) sink_usec;
816 ss = &u->sink->sample_spec;
817 #else
818 delay = (int64_t) source_usec;
819 ss = &u->source->sample_spec;
820 #endif
821
822 /* Add the length of our server-side buffer */
823 if (write_index >= read_index)
824 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
825 else
826 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
827
828 /* Our measurements are already out of date, hence correct by the *
829 * transport latency */
830 #ifdef TUNNEL_SINK
831 delay -= (int64_t) u->transport_usec;
832 #else
833 delay += (int64_t) u->transport_usec;
834 #endif
835
836 /* Now correct by what we have have read/written since we requested the update */
837 #ifdef TUNNEL_SINK
838 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
839 #else
840 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
841 #endif
842
843 #ifdef TUNNEL_SINK
844 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
845 #else
846 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
847 #endif
848
849 return;
850
851 fail:
852
853 pa_module_unload_request(u->module, TRUE);
854 }
855
856 /* Called from main context */
857 static void request_latency(struct userdata *u) {
858 pa_tagstruct *t;
859 struct timeval now;
860 uint32_t tag;
861 pa_assert(u);
862
863 t = pa_tagstruct_new(NULL, 0);
864 #ifdef TUNNEL_SINK
865 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
866 #else
867 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
868 #endif
869 pa_tagstruct_putu32(t, tag = u->ctag++);
870 pa_tagstruct_putu32(t, u->channel);
871
872 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
873
874 pa_pstream_send_tagstruct(u->pstream, t);
875 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
876
877 u->ignore_latency_before = tag;
878 u->counter_delta = 0;
879 }
880
881 /* Called from main context */
882 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
883 struct userdata *u = userdata;
884 struct timeval ntv;
885
886 pa_assert(m);
887 pa_assert(e);
888 pa_assert(u);
889
890 request_latency(u);
891
892 pa_gettimeofday(&ntv);
893 ntv.tv_sec += LATENCY_INTERVAL;
894 m->time_restart(e, &ntv);
895 }
896
897 /* Called from main context */
898 static void update_description(struct userdata *u) {
899 char *d;
900 char un[128], hn[128];
901 pa_tagstruct *t;
902
903 pa_assert(u);
904
905 if (!u->server_fqdn || !u->user_name || !u->device_description)
906 return;
907
908 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
909
910 #ifdef TUNNEL_SINK
911 pa_sink_set_description(u->sink, d);
912 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
913 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
914 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
915 #else
916 pa_source_set_description(u->source, d);
917 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
918 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
919 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
920 #endif
921
922 pa_xfree(d);
923
924 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
925 pa_get_user_name(un, sizeof(un)),
926 pa_get_host_name(hn, sizeof(hn)));
927
928 t = pa_tagstruct_new(NULL, 0);
929 #ifdef TUNNEL_SINK
930 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
931 #else
932 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
933 #endif
934 pa_tagstruct_putu32(t, u->ctag++);
935 pa_tagstruct_putu32(t, u->channel);
936 pa_tagstruct_puts(t, d);
937 pa_pstream_send_tagstruct(u->pstream, t);
938
939 pa_xfree(d);
940 }
941
942 /* Called from main context */
943 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
944 struct userdata *u = userdata;
945 pa_sample_spec ss;
946 pa_channel_map cm;
947 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
948 uint32_t cookie;
949
950 pa_assert(pd);
951 pa_assert(u);
952
953 if (command != PA_COMMAND_REPLY) {
954 if (command == PA_COMMAND_ERROR)
955 pa_log("Failed to get info.");
956 else
957 pa_log("Protocol error.");
958 goto fail;
959 }
960
961 if (pa_tagstruct_gets(t, &server_name) < 0 ||
962 pa_tagstruct_gets(t, &server_version) < 0 ||
963 pa_tagstruct_gets(t, &user_name) < 0 ||
964 pa_tagstruct_gets(t, &host_name) < 0 ||
965 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
966 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
967 pa_tagstruct_gets(t, &default_source_name) < 0 ||
968 pa_tagstruct_getu32(t, &cookie) < 0 ||
969 (u->version >= 15 &&
970 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
971
972 pa_log("Parse failure");
973 goto fail;
974 }
975
976 if (!pa_tagstruct_eof(t)) {
977 pa_log("Packet too long");
978 goto fail;
979 }
980
981 pa_xfree(u->server_fqdn);
982 u->server_fqdn = pa_xstrdup(host_name);
983
984 pa_xfree(u->user_name);
985 u->user_name = pa_xstrdup(user_name);
986
987 update_description(u);
988
989 return;
990
991 fail:
992 pa_module_unload_request(u->module, TRUE);
993 }
994
995 #ifdef TUNNEL_SINK
996
997 /* Called from main context */
998 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
999 struct userdata *u = userdata;
1000 uint32_t idx, owner_module, monitor_source, flags;
1001 const char *name, *description, *monitor_source_name, *driver;
1002 pa_sample_spec ss;
1003 pa_channel_map cm;
1004 pa_cvolume volume;
1005 pa_bool_t mute;
1006 pa_usec_t latency;
1007 pa_proplist *pl;
1008
1009 pa_assert(pd);
1010 pa_assert(u);
1011
1012 pl = pa_proplist_new();
1013
1014 if (command != PA_COMMAND_REPLY) {
1015 if (command == PA_COMMAND_ERROR)
1016 pa_log("Failed to get info.");
1017 else
1018 pa_log("Protocol error.");
1019 goto fail;
1020 }
1021
1022 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1023 pa_tagstruct_gets(t, &name) < 0 ||
1024 pa_tagstruct_gets(t, &description) < 0 ||
1025 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1026 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1027 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1028 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1029 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1030 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1031 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1032 pa_tagstruct_get_usec(t, &latency) < 0 ||
1033 pa_tagstruct_gets(t, &driver) < 0 ||
1034 pa_tagstruct_getu32(t, &flags) < 0) {
1035
1036 pa_log("Parse failure");
1037 goto fail;
1038 }
1039
1040 if (u->version >= 13) {
1041 pa_usec_t configured_latency;
1042
1043 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1044 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1045
1046 pa_log("Parse failure");
1047 goto fail;
1048 }
1049 }
1050
1051 if (u->version >= 15) {
1052 pa_volume_t base_volume;
1053 uint32_t state, n_volume_steps, card;
1054
1055 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1056 pa_tagstruct_getu32(t, &state) < 0 ||
1057 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1058 pa_tagstruct_getu32(t, &card) < 0) {
1059
1060 pa_log("Parse failure");
1061 goto fail;
1062 }
1063 }
1064
1065 if (!pa_tagstruct_eof(t)) {
1066 pa_log("Packet too long");
1067 goto fail;
1068 }
1069
1070 pa_proplist_free(pl);
1071
1072 if (!u->sink_name || strcmp(name, u->sink_name))
1073 return;
1074
1075 pa_xfree(u->device_description);
1076 u->device_description = pa_xstrdup(description);
1077
1078 update_description(u);
1079
1080 return;
1081
1082 fail:
1083 pa_module_unload_request(u->module, TRUE);
1084 pa_proplist_free(pl);
1085 }
1086
1087 /* Called from main context */
1088 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1089 struct userdata *u = userdata;
1090 uint32_t idx, owner_module, client, sink;
1091 pa_usec_t buffer_usec, sink_usec;
1092 const char *name, *driver, *resample_method;
1093 pa_bool_t mute;
1094 pa_sample_spec sample_spec;
1095 pa_channel_map channel_map;
1096 pa_cvolume volume;
1097 pa_proplist *pl;
1098
1099 pa_assert(pd);
1100 pa_assert(u);
1101
1102 pl = pa_proplist_new();
1103
1104 if (command != PA_COMMAND_REPLY) {
1105 if (command == PA_COMMAND_ERROR)
1106 pa_log("Failed to get info.");
1107 else
1108 pa_log("Protocol error.");
1109 goto fail;
1110 }
1111
1112 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1113 pa_tagstruct_gets(t, &name) < 0 ||
1114 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1115 pa_tagstruct_getu32(t, &client) < 0 ||
1116 pa_tagstruct_getu32(t, &sink) < 0 ||
1117 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1118 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1119 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1120 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1121 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1122 pa_tagstruct_gets(t, &resample_method) < 0 ||
1123 pa_tagstruct_gets(t, &driver) < 0) {
1124
1125 pa_log("Parse failure");
1126 goto fail;
1127 }
1128
1129 if (u->version >= 11) {
1130 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1131
1132 pa_log("Parse failure");
1133 goto fail;
1134 }
1135 }
1136
1137 if (u->version >= 13) {
1138 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1139
1140 pa_log("Parse failure");
1141 goto fail;
1142 }
1143 }
1144
1145 if (!pa_tagstruct_eof(t)) {
1146 pa_log("Packet too long");
1147 goto fail;
1148 }
1149
1150 pa_proplist_free(pl);
1151
1152 if (idx != u->device_index)
1153 return;
1154
1155 pa_assert(u->sink);
1156
1157 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1158 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1159 return;
1160
1161 pa_sink_volume_changed(u->sink, &volume, FALSE);
1162
1163 if (u->version >= 11)
1164 pa_sink_mute_changed(u->sink, mute, FALSE);
1165
1166 return;
1167
1168 fail:
1169 pa_module_unload_request(u->module, TRUE);
1170 pa_proplist_free(pl);
1171 }
1172
1173 #else
1174
1175 /* Called from main context */
1176 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1177 struct userdata *u = userdata;
1178 uint32_t idx, owner_module, monitor_of_sink, flags;
1179 const char *name, *description, *monitor_of_sink_name, *driver;
1180 pa_sample_spec ss;
1181 pa_channel_map cm;
1182 pa_cvolume volume;
1183 pa_bool_t mute;
1184 pa_usec_t latency, configured_latency;
1185 pa_proplist *pl;
1186
1187 pa_assert(pd);
1188 pa_assert(u);
1189
1190 pl = pa_proplist_new();
1191
1192 if (command != PA_COMMAND_REPLY) {
1193 if (command == PA_COMMAND_ERROR)
1194 pa_log("Failed to get info.");
1195 else
1196 pa_log("Protocol error.");
1197 goto fail;
1198 }
1199
1200 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1201 pa_tagstruct_gets(t, &name) < 0 ||
1202 pa_tagstruct_gets(t, &description) < 0 ||
1203 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1204 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1205 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1206 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1207 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1208 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1209 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1210 pa_tagstruct_get_usec(t, &latency) < 0 ||
1211 pa_tagstruct_gets(t, &driver) < 0 ||
1212 pa_tagstruct_getu32(t, &flags) < 0) {
1213
1214 pa_log("Parse failure");
1215 goto fail;
1216 }
1217
1218 if (u->version >= 13) {
1219 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1220 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1221
1222 pa_log("Parse failure");
1223 goto fail;
1224 }
1225 }
1226
1227 if (u->version >= 15) {
1228 pa_volume_t base_volume;
1229 uint32_t state, n_volume_steps, card;
1230
1231 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1232 pa_tagstruct_getu32(t, &state) < 0 ||
1233 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1234 pa_tagstruct_getu32(t, &card) < 0) {
1235
1236 pa_log("Parse failure");
1237 goto fail;
1238 }
1239 }
1240
1241 if (!pa_tagstruct_eof(t)) {
1242 pa_log("Packet too long");
1243 goto fail;
1244 }
1245
1246 pa_proplist_free(pl);
1247
1248 if (!u->source_name || strcmp(name, u->source_name))
1249 return;
1250
1251 pa_xfree(u->device_description);
1252 u->device_description = pa_xstrdup(description);
1253
1254 update_description(u);
1255
1256 return;
1257
1258 fail:
1259 pa_module_unload_request(u->module, TRUE);
1260 pa_proplist_free(pl);
1261 }
1262
1263 #endif
1264
1265 /* Called from main context */
1266 static void request_info(struct userdata *u) {
1267 pa_tagstruct *t;
1268 uint32_t tag;
1269 pa_assert(u);
1270
1271 t = pa_tagstruct_new(NULL, 0);
1272 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1273 pa_tagstruct_putu32(t, tag = u->ctag++);
1274 pa_pstream_send_tagstruct(u->pstream, t);
1275 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1276
1277 #ifdef TUNNEL_SINK
1278 t = pa_tagstruct_new(NULL, 0);
1279 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1280 pa_tagstruct_putu32(t, tag = u->ctag++);
1281 pa_tagstruct_putu32(t, u->device_index);
1282 pa_pstream_send_tagstruct(u->pstream, t);
1283 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1284
1285 if (u->sink_name) {
1286 t = pa_tagstruct_new(NULL, 0);
1287 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1288 pa_tagstruct_putu32(t, tag = u->ctag++);
1289 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1290 pa_tagstruct_puts(t, u->sink_name);
1291 pa_pstream_send_tagstruct(u->pstream, t);
1292 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1293 }
1294 #else
1295 if (u->source_name) {
1296 t = pa_tagstruct_new(NULL, 0);
1297 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1298 pa_tagstruct_putu32(t, tag = u->ctag++);
1299 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1300 pa_tagstruct_puts(t, u->source_name);
1301 pa_pstream_send_tagstruct(u->pstream, t);
1302 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1303 }
1304 #endif
1305 }
1306
1307 /* Called from main context */
1308 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1309 struct userdata *u = userdata;
1310 pa_subscription_event_type_t e;
1311 uint32_t idx;
1312
1313 pa_assert(pd);
1314 pa_assert(t);
1315 pa_assert(u);
1316 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1317
1318 if (pa_tagstruct_getu32(t, &e) < 0 ||
1319 pa_tagstruct_getu32(t, &idx) < 0) {
1320 pa_log("Invalid protocol reply");
1321 pa_module_unload_request(u->module, TRUE);
1322 return;
1323 }
1324
1325 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1326 #ifdef TUNNEL_SINK
1327 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1328 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1329 #else
1330 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1331 #endif
1332 )
1333 return;
1334
1335 request_info(u);
1336 }
1337
1338 /* Called from main context */
1339 static void start_subscribe(struct userdata *u) {
1340 pa_tagstruct *t;
1341 uint32_t tag;
1342 pa_assert(u);
1343
1344 t = pa_tagstruct_new(NULL, 0);
1345 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1346 pa_tagstruct_putu32(t, tag = u->ctag++);
1347 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1348 #ifdef TUNNEL_SINK
1349 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1350 #else
1351 PA_SUBSCRIPTION_MASK_SOURCE
1352 #endif
1353 );
1354
1355 pa_pstream_send_tagstruct(u->pstream, t);
1356 }
1357
1358 /* Called from main context */
1359 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1360 struct userdata *u = userdata;
1361 struct timeval ntv;
1362 #ifdef TUNNEL_SINK
1363 uint32_t bytes;
1364 #endif
1365
1366 pa_assert(pd);
1367 pa_assert(u);
1368 pa_assert(u->pdispatch == pd);
1369
1370 if (command != PA_COMMAND_REPLY) {
1371 if (command == PA_COMMAND_ERROR)
1372 pa_log("Failed to create stream.");
1373 else
1374 pa_log("Protocol error.");
1375 goto fail;
1376 }
1377
1378 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1379 pa_tagstruct_getu32(t, &u->device_index) < 0
1380 #ifdef TUNNEL_SINK
1381 || pa_tagstruct_getu32(t, &bytes) < 0
1382 #endif
1383 )
1384 goto parse_error;
1385
1386 if (u->version >= 9) {
1387 #ifdef TUNNEL_SINK
1388 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1389 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1390 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1391 pa_tagstruct_getu32(t, &u->minreq) < 0)
1392 goto parse_error;
1393 #else
1394 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1395 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1396 goto parse_error;
1397 #endif
1398 }
1399
1400 if (u->version >= 12) {
1401 pa_sample_spec ss;
1402 pa_channel_map cm;
1403 uint32_t device_index;
1404 const char *dn;
1405 pa_bool_t suspended;
1406
1407 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1408 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1409 pa_tagstruct_getu32(t, &device_index) < 0 ||
1410 pa_tagstruct_gets(t, &dn) < 0 ||
1411 pa_tagstruct_get_boolean(t, &suspended) < 0)
1412 goto parse_error;
1413
1414 #ifdef TUNNEL_SINK
1415 pa_xfree(u->sink_name);
1416 u->sink_name = pa_xstrdup(dn);
1417 #else
1418 pa_xfree(u->source_name);
1419 u->source_name = pa_xstrdup(dn);
1420 #endif
1421 }
1422
1423 if (u->version >= 13) {
1424 pa_usec_t usec;
1425
1426 if (pa_tagstruct_get_usec(t, &usec) < 0)
1427 goto parse_error;
1428
1429 /* #ifdef TUNNEL_SINK */
1430 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1431 /* #else */
1432 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1433 /* #endif */
1434 }
1435
1436 if (!pa_tagstruct_eof(t))
1437 goto parse_error;
1438
1439 start_subscribe(u);
1440 request_info(u);
1441
1442 pa_assert(!u->time_event);
1443 pa_gettimeofday(&ntv);
1444 ntv.tv_sec += LATENCY_INTERVAL;
1445 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1446
1447 request_latency(u);
1448
1449 pa_log_debug("Stream created.");
1450
1451 #ifdef TUNNEL_SINK
1452 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1453 #endif
1454
1455 return;
1456
1457 parse_error:
1458 pa_log("Invalid reply. (Create stream)");
1459
1460 fail:
1461 pa_module_unload_request(u->module, TRUE);
1462
1463 }
1464
1465 /* Called from main context */
1466 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1467 struct userdata *u = userdata;
1468 pa_tagstruct *reply;
1469 char name[256], un[128], hn[128];
1470 #ifdef TUNNEL_SINK
1471 pa_cvolume volume;
1472 #endif
1473
1474 pa_assert(pd);
1475 pa_assert(u);
1476 pa_assert(u->pdispatch == pd);
1477
1478 if (command != PA_COMMAND_REPLY ||
1479 pa_tagstruct_getu32(t, &u->version) < 0 ||
1480 !pa_tagstruct_eof(t)) {
1481
1482 if (command == PA_COMMAND_ERROR)
1483 pa_log("Failed to authenticate");
1484 else
1485 pa_log("Protocol error.");
1486
1487 goto fail;
1488 }
1489
1490 /* Minimum supported protocol version */
1491 if (u->version < 8) {
1492 pa_log("Incompatible protocol version");
1493 goto fail;
1494 }
1495
1496 /* Starting with protocol version 13 the MSB of the version tag
1497 reflects if shm is enabled for this connection or not. We don't
1498 support SHM here at all, so we just ignore this. */
1499
1500 if (u->version >= 13)
1501 u->version &= 0x7FFFFFFFU;
1502
1503 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1504
1505 #ifdef TUNNEL_SINK
1506 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1507 pa_sink_update_proplist(u->sink, 0, NULL);
1508
1509 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1510 u->sink_name,
1511 pa_get_user_name(un, sizeof(un)),
1512 pa_get_host_name(hn, sizeof(hn)));
1513 #else
1514 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1515 pa_source_update_proplist(u->source, 0, NULL);
1516
1517 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1518 u->source_name,
1519 pa_get_user_name(un, sizeof(un)),
1520 pa_get_host_name(hn, sizeof(hn)));
1521 #endif
1522
1523 reply = pa_tagstruct_new(NULL, 0);
1524 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1525 pa_tagstruct_putu32(reply, tag = u->ctag++);
1526
1527 if (u->version >= 13) {
1528 pa_proplist *pl;
1529 pl = pa_proplist_new();
1530 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1531 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1532 pa_init_proplist(pl);
1533 pa_tagstruct_put_proplist(reply, pl);
1534 pa_proplist_free(pl);
1535 } else
1536 pa_tagstruct_puts(reply, "PulseAudio");
1537
1538 pa_pstream_send_tagstruct(u->pstream, reply);
1539 /* We ignore the server's reply here */
1540
1541 reply = pa_tagstruct_new(NULL, 0);
1542
1543 if (u->version < 13)
1544 /* Only for older PA versions we need to fill in the maxlength */
1545 u->maxlength = 4*1024*1024;
1546
1547 #ifdef TUNNEL_SINK
1548 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1549 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1550 u->prebuf = u->tlength;
1551 #else
1552 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1553 #endif
1554
1555 #ifdef TUNNEL_SINK
1556 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1557 pa_tagstruct_putu32(reply, tag = u->ctag++);
1558
1559 if (u->version < 13)
1560 pa_tagstruct_puts(reply, name);
1561
1562 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1563 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1564 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1565 pa_tagstruct_puts(reply, u->sink_name);
1566 pa_tagstruct_putu32(reply, u->maxlength);
1567 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1568 pa_tagstruct_putu32(reply, u->tlength);
1569 pa_tagstruct_putu32(reply, u->prebuf);
1570 pa_tagstruct_putu32(reply, u->minreq);
1571 pa_tagstruct_putu32(reply, 0);
1572 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1573 pa_tagstruct_put_cvolume(reply, &volume);
1574 #else
1575 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1576 pa_tagstruct_putu32(reply, tag = u->ctag++);
1577
1578 if (u->version < 13)
1579 pa_tagstruct_puts(reply, name);
1580
1581 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1582 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1583 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1584 pa_tagstruct_puts(reply, u->source_name);
1585 pa_tagstruct_putu32(reply, u->maxlength);
1586 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1587 pa_tagstruct_putu32(reply, u->fragsize);
1588 #endif
1589
1590 if (u->version >= 12) {
1591 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1592 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1593 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1594 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1595 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1596 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1597 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1598 }
1599
1600 if (u->version >= 13) {
1601 pa_proplist *pl;
1602
1603 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1604 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1605
1606 pl = pa_proplist_new();
1607 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1608 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1609 pa_tagstruct_put_proplist(reply, pl);
1610 pa_proplist_free(pl);
1611
1612 #ifndef TUNNEL_SINK
1613 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1614 #endif
1615 }
1616
1617 if (u->version >= 14) {
1618 #ifdef TUNNEL_SINK
1619 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1620 #endif
1621 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1622 }
1623
1624 if (u->version >= 15) {
1625 #ifdef TUNNEL_SINK
1626 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1627 #endif
1628 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1629 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1630 }
1631
1632 pa_pstream_send_tagstruct(u->pstream, reply);
1633 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1634
1635 pa_log_debug("Connection authenticated, creating stream ...");
1636
1637 return;
1638
1639 fail:
1640 pa_module_unload_request(u->module, TRUE);
1641 }
1642
1643 /* Called from main context */
1644 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1645 struct userdata *u = userdata;
1646
1647 pa_assert(p);
1648 pa_assert(u);
1649
1650 pa_log_warn("Stream died.");
1651 pa_module_unload_request(u->module, TRUE);
1652 }
1653
1654 /* Called from main context */
1655 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1656 struct userdata *u = userdata;
1657
1658 pa_assert(p);
1659 pa_assert(packet);
1660 pa_assert(u);
1661
1662 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1663 pa_log("Invalid packet");
1664 pa_module_unload_request(u->module, TRUE);
1665 return;
1666 }
1667 }
1668
1669 #ifndef TUNNEL_SINK
1670 /* Called from main context */
1671 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1672 struct userdata *u = userdata;
1673
1674 pa_assert(p);
1675 pa_assert(chunk);
1676 pa_assert(u);
1677
1678 if (channel != u->channel) {
1679 pa_log("Received memory block on bad channel.");
1680 pa_module_unload_request(u->module, TRUE);
1681 return;
1682 }
1683
1684 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1685
1686 u->counter_delta += (int64_t) chunk->length;
1687 }
1688 #endif
1689
1690 /* Called from main context */
1691 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1692 struct userdata *u = userdata;
1693 pa_tagstruct *t;
1694 uint32_t tag;
1695
1696 pa_assert(sc);
1697 pa_assert(u);
1698 pa_assert(u->client == sc);
1699
1700 pa_socket_client_unref(u->client);
1701 u->client = NULL;
1702
1703 if (!io) {
1704 pa_log("Connection failed: %s", pa_cstrerror(errno));
1705 pa_module_unload_request(u->module, TRUE);
1706 return;
1707 }
1708
1709 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1710 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1711
1712 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1713 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1714 #ifndef TUNNEL_SINK
1715 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1716 #endif
1717
1718 t = pa_tagstruct_new(NULL, 0);
1719 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1720 pa_tagstruct_putu32(t, tag = u->ctag++);
1721 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1722
1723 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1724
1725 #ifdef HAVE_CREDS
1726 {
1727 pa_creds ucred;
1728
1729 if (pa_iochannel_creds_supported(io))
1730 pa_iochannel_creds_enable(io);
1731
1732 ucred.uid = getuid();
1733 ucred.gid = getgid();
1734
1735 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1736 }
1737 #else
1738 pa_pstream_send_tagstruct(u->pstream, t);
1739 #endif
1740
1741 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1742
1743 pa_log_debug("Connection established, authenticating ...");
1744 }
1745
1746 #ifdef TUNNEL_SINK
1747
1748 /* Called from main context */
1749 static void sink_set_volume(pa_sink *sink) {
1750 struct userdata *u;
1751 pa_tagstruct *t;
1752 uint32_t tag;
1753
1754 pa_assert(sink);
1755 u = sink->userdata;
1756 pa_assert(u);
1757
1758 t = pa_tagstruct_new(NULL, 0);
1759 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1760 pa_tagstruct_putu32(t, tag = u->ctag++);
1761 pa_tagstruct_putu32(t, u->device_index);
1762 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1763 pa_pstream_send_tagstruct(u->pstream, t);
1764 }
1765
1766 /* Called from main context */
1767 static void sink_set_mute(pa_sink *sink) {
1768 struct userdata *u;
1769 pa_tagstruct *t;
1770 uint32_t tag;
1771
1772 pa_assert(sink);
1773 u = sink->userdata;
1774 pa_assert(u);
1775
1776 if (u->version < 11)
1777 return;
1778
1779 t = pa_tagstruct_new(NULL, 0);
1780 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1781 pa_tagstruct_putu32(t, tag = u->ctag++);
1782 pa_tagstruct_putu32(t, u->device_index);
1783 pa_tagstruct_put_boolean(t, !!sink->muted);
1784 pa_pstream_send_tagstruct(u->pstream, t);
1785 }
1786
1787 #endif
1788
1789 int pa__init(pa_module*m) {
1790 pa_modargs *ma = NULL;
1791 struct userdata *u = NULL;
1792 pa_sample_spec ss;
1793 pa_channel_map map;
1794 char *dn = NULL;
1795 #ifdef TUNNEL_SINK
1796 pa_sink_new_data data;
1797 #else
1798 pa_source_new_data data;
1799 #endif
1800
1801 pa_assert(m);
1802
1803 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1804 pa_log("Failed to parse module arguments");
1805 goto fail;
1806 }
1807
1808 m->userdata = u = pa_xnew0(struct userdata, 1);
1809 u->core = m->core;
1810 u->module = m;
1811 u->client = NULL;
1812 u->pdispatch = NULL;
1813 u->pstream = NULL;
1814 u->server_name = NULL;
1815 #ifdef TUNNEL_SINK
1816 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1817 u->sink = NULL;
1818 u->requested_bytes = 0;
1819 #else
1820 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1821 u->source = NULL;
1822 #endif
1823 u->smoother = pa_smoother_new(
1824 PA_USEC_PER_SEC,
1825 PA_USEC_PER_SEC*2,
1826 TRUE,
1827 TRUE,
1828 10,
1829 pa_rtclock_now(),
1830 FALSE);
1831 u->ctag = 1;
1832 u->device_index = u->channel = PA_INVALID_INDEX;
1833 u->time_event = NULL;
1834 u->ignore_latency_before = 0;
1835 u->transport_usec = u->thread_transport_usec = 0;
1836 u->remote_suspended = u->remote_corked = FALSE;
1837 u->counter = u->counter_delta = 0;
1838
1839 u->rtpoll = pa_rtpoll_new();
1840 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1841
1842 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1843 goto fail;
1844
1845 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1846 pa_log("No server specified.");
1847 goto fail;
1848 }
1849
1850 ss = m->core->default_sample_spec;
1851 map = m->core->default_channel_map;
1852 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1853 pa_log("Invalid sample format specification");
1854 goto fail;
1855 }
1856
1857 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1858 pa_log("Failed to connect to server '%s'", u->server_name);
1859 goto fail;
1860 }
1861
1862 pa_socket_client_set_callback(u->client, on_connection, u);
1863
1864 #ifdef TUNNEL_SINK
1865
1866 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1867 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1868
1869 pa_sink_new_data_init(&data);
1870 data.driver = __FILE__;
1871 data.module = m;
1872 data.namereg_fail = TRUE;
1873 pa_sink_new_data_set_name(&data, dn);
1874 pa_sink_new_data_set_sample_spec(&data, &ss);
1875 pa_sink_new_data_set_channel_map(&data, &map);
1876 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1877 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1878 if (u->sink_name)
1879 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1880
1881 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1882 pa_log("Invalid properties");
1883 pa_sink_new_data_done(&data);
1884 goto fail;
1885 }
1886
1887 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1888 pa_sink_new_data_done(&data);
1889
1890 if (!u->sink) {
1891 pa_log("Failed to create sink.");
1892 goto fail;
1893 }
1894
1895 u->sink->parent.process_msg = sink_process_msg;
1896 u->sink->userdata = u;
1897 u->sink->set_state = sink_set_state;
1898 u->sink->set_volume = sink_set_volume;
1899 u->sink->set_mute = sink_set_mute;
1900
1901 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1902
1903 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1904
1905 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1906 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1907
1908 #else
1909
1910 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1911 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1912
1913 pa_source_new_data_init(&data);
1914 data.driver = __FILE__;
1915 data.module = m;
1916 data.namereg_fail = TRUE;
1917 pa_source_new_data_set_name(&data, dn);
1918 pa_source_new_data_set_sample_spec(&data, &ss);
1919 pa_source_new_data_set_channel_map(&data, &map);
1920 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1921 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1922 if (u->source_name)
1923 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1924
1925 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1926 pa_log("Invalid properties");
1927 pa_source_new_data_done(&data);
1928 goto fail;
1929 }
1930
1931 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1932 pa_source_new_data_done(&data);
1933
1934 if (!u->source) {
1935 pa_log("Failed to create source.");
1936 goto fail;
1937 }
1938
1939 u->source->parent.process_msg = source_process_msg;
1940 u->source->set_state = source_set_state;
1941 u->source->userdata = u;
1942
1943 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1944
1945 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1946 pa_source_set_rtpoll(u->source, u->rtpoll);
1947 #endif
1948
1949 pa_xfree(dn);
1950
1951 u->time_event = NULL;
1952
1953 u->maxlength = (uint32_t) -1;
1954 #ifdef TUNNEL_SINK
1955 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1956 #else
1957 u->fragsize = (uint32_t) -1;
1958 #endif
1959
1960 if (!(u->thread = pa_thread_new(thread_func, u))) {
1961 pa_log("Failed to create thread.");
1962 goto fail;
1963 }
1964
1965 #ifdef TUNNEL_SINK
1966 pa_sink_put(u->sink);
1967 #else
1968 pa_source_put(u->source);
1969 #endif
1970
1971 pa_modargs_free(ma);
1972
1973 return 0;
1974
1975 fail:
1976 pa__done(m);
1977
1978 if (ma)
1979 pa_modargs_free(ma);
1980
1981 pa_xfree(dn);
1982
1983 return -1;
1984 }
1985
1986 void pa__done(pa_module*m) {
1987 struct userdata* u;
1988
1989 pa_assert(m);
1990
1991 if (!(u = m->userdata))
1992 return;
1993
1994 #ifdef TUNNEL_SINK
1995 if (u->sink)
1996 pa_sink_unlink(u->sink);
1997 #else
1998 if (u->source)
1999 pa_source_unlink(u->source);
2000 #endif
2001
2002 if (u->thread) {
2003 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2004 pa_thread_free(u->thread);
2005 }
2006
2007 pa_thread_mq_done(&u->thread_mq);
2008
2009 #ifdef TUNNEL_SINK
2010 if (u->sink)
2011 pa_sink_unref(u->sink);
2012 #else
2013 if (u->source)
2014 pa_source_unref(u->source);
2015 #endif
2016
2017 if (u->rtpoll)
2018 pa_rtpoll_free(u->rtpoll);
2019
2020 if (u->pstream) {
2021 pa_pstream_unlink(u->pstream);
2022 pa_pstream_unref(u->pstream);
2023 }
2024
2025 if (u->pdispatch)
2026 pa_pdispatch_unref(u->pdispatch);
2027
2028 if (u->client)
2029 pa_socket_client_unref(u->client);
2030
2031 if (u->auth_cookie)
2032 pa_auth_cookie_unref(u->auth_cookie);
2033
2034 if (u->smoother)
2035 pa_smoother_free(u->smoother);
2036
2037 if (u->time_event)
2038 u->core->mainloop->time_free(u->time_event);
2039
2040 #ifdef TUNNEL_SINK
2041 pa_xfree(u->sink_name);
2042 #else
2043 pa_xfree(u->source_name);
2044 #endif
2045 pa_xfree(u->server_name);
2046
2047 pa_xfree(u->device_description);
2048 pa_xfree(u->server_fqdn);
2049 pa_xfree(u->user_name);
2050
2051 pa_xfree(u);
2052 }