]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
Base mainloop on pa_rtclock_now()
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58
59 #ifdef TUNNEL_SINK
60 #include "module-tunnel-sink-symdef.h"
61 #else
62 #include "module-tunnel-source-symdef.h"
63 #endif
64
65 #ifdef TUNNEL_SINK
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 PA_MODULE_USAGE(
68 "sink_name=<name for the local sink> "
69 "sink_properties=<properties for the local sink> "
70 "server=<address> "
71 "sink=<remote sink name> "
72 "cookie=<filename> "
73 "format=<sample format> "
74 "channels=<number of channels> "
75 "rate=<sample rate> "
76 "channel_map=<channel map>");
77 #else
78 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 PA_MODULE_USAGE(
80 "source_name=<name for the local source> "
81 "source_properties=<properties for the local source> "
82 "server=<address> "
83 "source=<remote source name> "
84 "cookie=<filename> "
85 "format=<sample format> "
86 "channels=<number of channels> "
87 "rate=<sample rate> "
88 "channel_map=<channel map>");
89 #endif
90
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION);
93 PA_MODULE_LOAD_ONCE(FALSE);
94
95 static const char* const valid_modargs[] = {
96 "server",
97 "cookie",
98 "format",
99 "channels",
100 "rate",
101 #ifdef TUNNEL_SINK
102 "sink_name",
103 "sink_properties",
104 "sink",
105 #else
106 "source_name",
107 "source_properties",
108 "source",
109 #endif
110 "channel_map",
111 NULL,
112 };
113
114 #define DEFAULT_TIMEOUT 5
115
116 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
117
118 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
119
120 #ifdef TUNNEL_SINK
121
122 enum {
123 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
124 SINK_MESSAGE_REMOTE_SUSPEND,
125 SINK_MESSAGE_UPDATE_LATENCY,
126 SINK_MESSAGE_POST
127 };
128
129 #define DEFAULT_TLENGTH_MSEC 150
130 #define DEFAULT_MINREQ_MSEC 25
131
132 #else
133
134 enum {
135 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
136 SOURCE_MESSAGE_REMOTE_SUSPEND,
137 SOURCE_MESSAGE_UPDATE_LATENCY
138 };
139
140 #define DEFAULT_FRAGSIZE_MSEC 25
141
142 #endif
143
144 #ifdef TUNNEL_SINK
145 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 #endif
148 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155
156 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
157 #ifdef TUNNEL_SINK
158 [PA_COMMAND_REQUEST] = command_request,
159 [PA_COMMAND_STARTED] = command_started,
160 #endif
161 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
162 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
164 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
166 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
168 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
170 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
173 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
174 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
175 };
176
177 struct userdata {
178 pa_core *core;
179 pa_module *module;
180
181 pa_thread_mq thread_mq;
182 pa_rtpoll *rtpoll;
183 pa_thread *thread;
184
185 pa_socket_client *client;
186 pa_pstream *pstream;
187 pa_pdispatch *pdispatch;
188
189 char *server_name;
190 #ifdef TUNNEL_SINK
191 char *sink_name;
192 pa_sink *sink;
193 size_t requested_bytes;
194 #else
195 char *source_name;
196 pa_source *source;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwide pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520
521 case SINK_MESSAGE_REMOTE_SUSPEND:
522
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
525
526
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
529
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
536
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
541
542 return 0;
543 }
544
545 case SINK_MESSAGE_POST:
546
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554 u->counter_delta += (int64_t) chunk->length;
555
556 return 0;
557 }
558
559 return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
567
568 switch ((pa_sink_state_t) state) {
569
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
574
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
580
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
584 ;
585 }
586
587 return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
595
596 switch (code) {
597
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
600
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604 return r;
605 }
606
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
609
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
615 }
616
617 case SOURCE_MESSAGE_POST:
618
619 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
620 pa_source_post(u->source, chunk);
621
622 u->counter += (int64_t) chunk->length;
623
624 return 0;
625
626 case SOURCE_MESSAGE_REMOTE_SUSPEND:
627
628 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
629 return 0;
630
631 case SOURCE_MESSAGE_UPDATE_LATENCY: {
632 pa_usec_t y;
633
634 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
635 y += (pa_usec_t) offset;
636
637 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
638
639 /* We can access this freely here, since the main thread is waiting for us */
640 u->thread_transport_usec = u->transport_usec;
641
642 return 0;
643 }
644 }
645
646 return pa_source_process_msg(o, code, data, offset, chunk);
647 }
648
649 /* Called from main context */
650 static int source_set_state(pa_source *s, pa_source_state_t state) {
651 struct userdata *u;
652 pa_source_assert_ref(s);
653 u = s->userdata;
654
655 switch ((pa_source_state_t) state) {
656
657 case PA_SOURCE_SUSPENDED:
658 pa_assert(PA_SOURCE_IS_OPENED(s->state));
659 stream_cork(u, TRUE);
660 break;
661
662 case PA_SOURCE_IDLE:
663 case PA_SOURCE_RUNNING:
664 if (s->state == PA_SOURCE_SUSPENDED)
665 stream_cork(u, FALSE);
666 break;
667
668 case PA_SOURCE_UNLINKED:
669 case PA_SOURCE_INIT:
670 case PA_SINK_INVALID_STATE:
671 ;
672 }
673
674 return 0;
675 }
676
677 #endif
678
679 static void thread_func(void *userdata) {
680 struct userdata *u = userdata;
681
682 pa_assert(u);
683
684 pa_log_debug("Thread starting up");
685
686 pa_thread_mq_install(&u->thread_mq);
687 pa_rtpoll_install(u->rtpoll);
688
689 for (;;) {
690 int ret;
691
692 #ifdef TUNNEL_SINK
693 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
694 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
695 pa_sink_process_rewind(u->sink, 0);
696 #endif
697
698 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
699 goto fail;
700
701 if (ret == 0)
702 goto finish;
703 }
704
705 fail:
706 /* If this was no regular exit from the loop we have to continue
707 * processing messages until we received PA_MESSAGE_SHUTDOWN */
708 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
709 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
710
711 finish:
712 pa_log_debug("Thread shutting down");
713 }
714
715 #ifdef TUNNEL_SINK
716 /* Called from main context */
717 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718 struct userdata *u = userdata;
719 uint32_t bytes, channel;
720
721 pa_assert(pd);
722 pa_assert(command == PA_COMMAND_REQUEST);
723 pa_assert(t);
724 pa_assert(u);
725 pa_assert(u->pdispatch == pd);
726
727 if (pa_tagstruct_getu32(t, &channel) < 0 ||
728 pa_tagstruct_getu32(t, &bytes) < 0) {
729 pa_log("Invalid protocol reply");
730 goto fail;
731 }
732
733 if (channel != u->channel) {
734 pa_log("Received data for invalid channel");
735 goto fail;
736 }
737
738 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
739 return;
740
741 fail:
742 pa_module_unload_request(u->module, TRUE);
743 }
744
745 #endif
746
747 /* Called from main context */
748 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
749 struct userdata *u = userdata;
750 pa_usec_t sink_usec, source_usec;
751 pa_bool_t playing;
752 int64_t write_index, read_index;
753 struct timeval local, remote, now;
754 pa_sample_spec *ss;
755 int64_t delay;
756
757 pa_assert(pd);
758 pa_assert(u);
759
760 if (command != PA_COMMAND_REPLY) {
761 if (command == PA_COMMAND_ERROR)
762 pa_log("Failed to get latency.");
763 else
764 pa_log("Protocol error.");
765 goto fail;
766 }
767
768 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
769 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
770 pa_tagstruct_get_boolean(t, &playing) < 0 ||
771 pa_tagstruct_get_timeval(t, &local) < 0 ||
772 pa_tagstruct_get_timeval(t, &remote) < 0 ||
773 pa_tagstruct_gets64(t, &write_index) < 0 ||
774 pa_tagstruct_gets64(t, &read_index) < 0) {
775 pa_log("Invalid reply.");
776 goto fail;
777 }
778
779 #ifdef TUNNEL_SINK
780 if (u->version >= 13) {
781 uint64_t underrun_for = 0, playing_for = 0;
782
783 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
784 pa_tagstruct_getu64(t, &playing_for) < 0) {
785 pa_log("Invalid reply.");
786 goto fail;
787 }
788 }
789 #endif
790
791 if (!pa_tagstruct_eof(t)) {
792 pa_log("Invalid reply.");
793 goto fail;
794 }
795
796 if (tag < u->ignore_latency_before) {
797 return;
798 }
799
800 pa_gettimeofday(&now);
801
802 /* Calculate transport usec */
803 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
804 /* local and remote seem to have synchronized clocks */
805 #ifdef TUNNEL_SINK
806 u->transport_usec = pa_timeval_diff(&remote, &local);
807 #else
808 u->transport_usec = pa_timeval_diff(&now, &remote);
809 #endif
810 } else
811 u->transport_usec = pa_timeval_diff(&now, &local)/2;
812
813 /* First, take the device's delay */
814 #ifdef TUNNEL_SINK
815 delay = (int64_t) sink_usec;
816 ss = &u->sink->sample_spec;
817 #else
818 delay = (int64_t) source_usec;
819 ss = &u->source->sample_spec;
820 #endif
821
822 /* Add the length of our server-side buffer */
823 if (write_index >= read_index)
824 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
825 else
826 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
827
828 /* Our measurements are already out of date, hence correct by the *
829 * transport latency */
830 #ifdef TUNNEL_SINK
831 delay -= (int64_t) u->transport_usec;
832 #else
833 delay += (int64_t) u->transport_usec;
834 #endif
835
836 /* Now correct by what we have have read/written since we requested the update */
837 #ifdef TUNNEL_SINK
838 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
839 #else
840 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
841 #endif
842
843 #ifdef TUNNEL_SINK
844 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
845 #else
846 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
847 #endif
848
849 return;
850
851 fail:
852
853 pa_module_unload_request(u->module, TRUE);
854 }
855
856 /* Called from main context */
857 static void request_latency(struct userdata *u) {
858 pa_tagstruct *t;
859 struct timeval now;
860 uint32_t tag;
861 pa_assert(u);
862
863 t = pa_tagstruct_new(NULL, 0);
864 #ifdef TUNNEL_SINK
865 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
866 #else
867 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
868 #endif
869 pa_tagstruct_putu32(t, tag = u->ctag++);
870 pa_tagstruct_putu32(t, u->channel);
871
872 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
873
874 pa_pstream_send_tagstruct(u->pstream, t);
875 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
876
877 u->ignore_latency_before = tag;
878 u->counter_delta = 0;
879 }
880
881 /* Called from main context */
882 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
883 struct userdata *u = userdata;
884
885 pa_assert(m);
886 pa_assert(e);
887 pa_assert(u);
888
889 request_latency(u);
890
891 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
892 }
893
894 /* Called from main context */
895 static void update_description(struct userdata *u) {
896 char *d;
897 char un[128], hn[128];
898 pa_tagstruct *t;
899
900 pa_assert(u);
901
902 if (!u->server_fqdn || !u->user_name || !u->device_description)
903 return;
904
905 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
906
907 #ifdef TUNNEL_SINK
908 pa_sink_set_description(u->sink, d);
909 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
910 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
911 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
912 #else
913 pa_source_set_description(u->source, d);
914 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
915 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
916 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
917 #endif
918
919 pa_xfree(d);
920
921 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
922 pa_get_user_name(un, sizeof(un)),
923 pa_get_host_name(hn, sizeof(hn)));
924
925 t = pa_tagstruct_new(NULL, 0);
926 #ifdef TUNNEL_SINK
927 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
928 #else
929 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
930 #endif
931 pa_tagstruct_putu32(t, u->ctag++);
932 pa_tagstruct_putu32(t, u->channel);
933 pa_tagstruct_puts(t, d);
934 pa_pstream_send_tagstruct(u->pstream, t);
935
936 pa_xfree(d);
937 }
938
939 /* Called from main context */
940 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
941 struct userdata *u = userdata;
942 pa_sample_spec ss;
943 pa_channel_map cm;
944 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
945 uint32_t cookie;
946
947 pa_assert(pd);
948 pa_assert(u);
949
950 if (command != PA_COMMAND_REPLY) {
951 if (command == PA_COMMAND_ERROR)
952 pa_log("Failed to get info.");
953 else
954 pa_log("Protocol error.");
955 goto fail;
956 }
957
958 if (pa_tagstruct_gets(t, &server_name) < 0 ||
959 pa_tagstruct_gets(t, &server_version) < 0 ||
960 pa_tagstruct_gets(t, &user_name) < 0 ||
961 pa_tagstruct_gets(t, &host_name) < 0 ||
962 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
963 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
964 pa_tagstruct_gets(t, &default_source_name) < 0 ||
965 pa_tagstruct_getu32(t, &cookie) < 0 ||
966 (u->version >= 15 &&
967 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
968
969 pa_log("Parse failure");
970 goto fail;
971 }
972
973 if (!pa_tagstruct_eof(t)) {
974 pa_log("Packet too long");
975 goto fail;
976 }
977
978 pa_xfree(u->server_fqdn);
979 u->server_fqdn = pa_xstrdup(host_name);
980
981 pa_xfree(u->user_name);
982 u->user_name = pa_xstrdup(user_name);
983
984 update_description(u);
985
986 return;
987
988 fail:
989 pa_module_unload_request(u->module, TRUE);
990 }
991
992 #ifdef TUNNEL_SINK
993
994 /* Called from main context */
995 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
996 struct userdata *u = userdata;
997 uint32_t idx, owner_module, monitor_source, flags;
998 const char *name, *description, *monitor_source_name, *driver;
999 pa_sample_spec ss;
1000 pa_channel_map cm;
1001 pa_cvolume volume;
1002 pa_bool_t mute;
1003 pa_usec_t latency;
1004 pa_proplist *pl;
1005
1006 pa_assert(pd);
1007 pa_assert(u);
1008
1009 pl = pa_proplist_new();
1010
1011 if (command != PA_COMMAND_REPLY) {
1012 if (command == PA_COMMAND_ERROR)
1013 pa_log("Failed to get info.");
1014 else
1015 pa_log("Protocol error.");
1016 goto fail;
1017 }
1018
1019 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1020 pa_tagstruct_gets(t, &name) < 0 ||
1021 pa_tagstruct_gets(t, &description) < 0 ||
1022 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1023 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1024 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1025 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1026 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1027 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1028 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1029 pa_tagstruct_get_usec(t, &latency) < 0 ||
1030 pa_tagstruct_gets(t, &driver) < 0 ||
1031 pa_tagstruct_getu32(t, &flags) < 0) {
1032
1033 pa_log("Parse failure");
1034 goto fail;
1035 }
1036
1037 if (u->version >= 13) {
1038 pa_usec_t configured_latency;
1039
1040 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1041 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1042
1043 pa_log("Parse failure");
1044 goto fail;
1045 }
1046 }
1047
1048 if (u->version >= 15) {
1049 pa_volume_t base_volume;
1050 uint32_t state, n_volume_steps, card;
1051
1052 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1053 pa_tagstruct_getu32(t, &state) < 0 ||
1054 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1055 pa_tagstruct_getu32(t, &card) < 0) {
1056
1057 pa_log("Parse failure");
1058 goto fail;
1059 }
1060 }
1061
1062 if (!pa_tagstruct_eof(t)) {
1063 pa_log("Packet too long");
1064 goto fail;
1065 }
1066
1067 pa_proplist_free(pl);
1068
1069 if (!u->sink_name || strcmp(name, u->sink_name))
1070 return;
1071
1072 pa_xfree(u->device_description);
1073 u->device_description = pa_xstrdup(description);
1074
1075 update_description(u);
1076
1077 return;
1078
1079 fail:
1080 pa_module_unload_request(u->module, TRUE);
1081 pa_proplist_free(pl);
1082 }
1083
1084 /* Called from main context */
1085 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1086 struct userdata *u = userdata;
1087 uint32_t idx, owner_module, client, sink;
1088 pa_usec_t buffer_usec, sink_usec;
1089 const char *name, *driver, *resample_method;
1090 pa_bool_t mute;
1091 pa_sample_spec sample_spec;
1092 pa_channel_map channel_map;
1093 pa_cvolume volume;
1094 pa_proplist *pl;
1095
1096 pa_assert(pd);
1097 pa_assert(u);
1098
1099 pl = pa_proplist_new();
1100
1101 if (command != PA_COMMAND_REPLY) {
1102 if (command == PA_COMMAND_ERROR)
1103 pa_log("Failed to get info.");
1104 else
1105 pa_log("Protocol error.");
1106 goto fail;
1107 }
1108
1109 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1110 pa_tagstruct_gets(t, &name) < 0 ||
1111 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1112 pa_tagstruct_getu32(t, &client) < 0 ||
1113 pa_tagstruct_getu32(t, &sink) < 0 ||
1114 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1115 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1116 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1117 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1118 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1119 pa_tagstruct_gets(t, &resample_method) < 0 ||
1120 pa_tagstruct_gets(t, &driver) < 0) {
1121
1122 pa_log("Parse failure");
1123 goto fail;
1124 }
1125
1126 if (u->version >= 11) {
1127 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1128
1129 pa_log("Parse failure");
1130 goto fail;
1131 }
1132 }
1133
1134 if (u->version >= 13) {
1135 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1136
1137 pa_log("Parse failure");
1138 goto fail;
1139 }
1140 }
1141
1142 if (!pa_tagstruct_eof(t)) {
1143 pa_log("Packet too long");
1144 goto fail;
1145 }
1146
1147 pa_proplist_free(pl);
1148
1149 if (idx != u->device_index)
1150 return;
1151
1152 pa_assert(u->sink);
1153
1154 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1155 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1156 return;
1157
1158 pa_sink_volume_changed(u->sink, &volume, FALSE);
1159
1160 if (u->version >= 11)
1161 pa_sink_mute_changed(u->sink, mute, FALSE);
1162
1163 return;
1164
1165 fail:
1166 pa_module_unload_request(u->module, TRUE);
1167 pa_proplist_free(pl);
1168 }
1169
1170 #else
1171
1172 /* Called from main context */
1173 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1174 struct userdata *u = userdata;
1175 uint32_t idx, owner_module, monitor_of_sink, flags;
1176 const char *name, *description, *monitor_of_sink_name, *driver;
1177 pa_sample_spec ss;
1178 pa_channel_map cm;
1179 pa_cvolume volume;
1180 pa_bool_t mute;
1181 pa_usec_t latency, configured_latency;
1182 pa_proplist *pl;
1183
1184 pa_assert(pd);
1185 pa_assert(u);
1186
1187 pl = pa_proplist_new();
1188
1189 if (command != PA_COMMAND_REPLY) {
1190 if (command == PA_COMMAND_ERROR)
1191 pa_log("Failed to get info.");
1192 else
1193 pa_log("Protocol error.");
1194 goto fail;
1195 }
1196
1197 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1198 pa_tagstruct_gets(t, &name) < 0 ||
1199 pa_tagstruct_gets(t, &description) < 0 ||
1200 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1201 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1202 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1203 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1204 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1205 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1206 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1207 pa_tagstruct_get_usec(t, &latency) < 0 ||
1208 pa_tagstruct_gets(t, &driver) < 0 ||
1209 pa_tagstruct_getu32(t, &flags) < 0) {
1210
1211 pa_log("Parse failure");
1212 goto fail;
1213 }
1214
1215 if (u->version >= 13) {
1216 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1217 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1218
1219 pa_log("Parse failure");
1220 goto fail;
1221 }
1222 }
1223
1224 if (u->version >= 15) {
1225 pa_volume_t base_volume;
1226 uint32_t state, n_volume_steps, card;
1227
1228 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1229 pa_tagstruct_getu32(t, &state) < 0 ||
1230 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1231 pa_tagstruct_getu32(t, &card) < 0) {
1232
1233 pa_log("Parse failure");
1234 goto fail;
1235 }
1236 }
1237
1238 if (!pa_tagstruct_eof(t)) {
1239 pa_log("Packet too long");
1240 goto fail;
1241 }
1242
1243 pa_proplist_free(pl);
1244
1245 if (!u->source_name || strcmp(name, u->source_name))
1246 return;
1247
1248 pa_xfree(u->device_description);
1249 u->device_description = pa_xstrdup(description);
1250
1251 update_description(u);
1252
1253 return;
1254
1255 fail:
1256 pa_module_unload_request(u->module, TRUE);
1257 pa_proplist_free(pl);
1258 }
1259
1260 #endif
1261
1262 /* Called from main context */
1263 static void request_info(struct userdata *u) {
1264 pa_tagstruct *t;
1265 uint32_t tag;
1266 pa_assert(u);
1267
1268 t = pa_tagstruct_new(NULL, 0);
1269 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1270 pa_tagstruct_putu32(t, tag = u->ctag++);
1271 pa_pstream_send_tagstruct(u->pstream, t);
1272 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1273
1274 #ifdef TUNNEL_SINK
1275 t = pa_tagstruct_new(NULL, 0);
1276 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1277 pa_tagstruct_putu32(t, tag = u->ctag++);
1278 pa_tagstruct_putu32(t, u->device_index);
1279 pa_pstream_send_tagstruct(u->pstream, t);
1280 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1281
1282 if (u->sink_name) {
1283 t = pa_tagstruct_new(NULL, 0);
1284 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1285 pa_tagstruct_putu32(t, tag = u->ctag++);
1286 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1287 pa_tagstruct_puts(t, u->sink_name);
1288 pa_pstream_send_tagstruct(u->pstream, t);
1289 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1290 }
1291 #else
1292 if (u->source_name) {
1293 t = pa_tagstruct_new(NULL, 0);
1294 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1295 pa_tagstruct_putu32(t, tag = u->ctag++);
1296 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1297 pa_tagstruct_puts(t, u->source_name);
1298 pa_pstream_send_tagstruct(u->pstream, t);
1299 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1300 }
1301 #endif
1302 }
1303
1304 /* Called from main context */
1305 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1306 struct userdata *u = userdata;
1307 pa_subscription_event_type_t e;
1308 uint32_t idx;
1309
1310 pa_assert(pd);
1311 pa_assert(t);
1312 pa_assert(u);
1313 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1314
1315 if (pa_tagstruct_getu32(t, &e) < 0 ||
1316 pa_tagstruct_getu32(t, &idx) < 0) {
1317 pa_log("Invalid protocol reply");
1318 pa_module_unload_request(u->module, TRUE);
1319 return;
1320 }
1321
1322 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1323 #ifdef TUNNEL_SINK
1324 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1325 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1326 #else
1327 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1328 #endif
1329 )
1330 return;
1331
1332 request_info(u);
1333 }
1334
1335 /* Called from main context */
1336 static void start_subscribe(struct userdata *u) {
1337 pa_tagstruct *t;
1338 uint32_t tag;
1339 pa_assert(u);
1340
1341 t = pa_tagstruct_new(NULL, 0);
1342 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1343 pa_tagstruct_putu32(t, tag = u->ctag++);
1344 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1345 #ifdef TUNNEL_SINK
1346 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1347 #else
1348 PA_SUBSCRIPTION_MASK_SOURCE
1349 #endif
1350 );
1351
1352 pa_pstream_send_tagstruct(u->pstream, t);
1353 }
1354
1355 /* Called from main context */
1356 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1357 struct userdata *u = userdata;
1358 #ifdef TUNNEL_SINK
1359 uint32_t bytes;
1360 #endif
1361
1362 pa_assert(pd);
1363 pa_assert(u);
1364 pa_assert(u->pdispatch == pd);
1365
1366 if (command != PA_COMMAND_REPLY) {
1367 if (command == PA_COMMAND_ERROR)
1368 pa_log("Failed to create stream.");
1369 else
1370 pa_log("Protocol error.");
1371 goto fail;
1372 }
1373
1374 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1375 pa_tagstruct_getu32(t, &u->device_index) < 0
1376 #ifdef TUNNEL_SINK
1377 || pa_tagstruct_getu32(t, &bytes) < 0
1378 #endif
1379 )
1380 goto parse_error;
1381
1382 if (u->version >= 9) {
1383 #ifdef TUNNEL_SINK
1384 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1385 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1386 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1387 pa_tagstruct_getu32(t, &u->minreq) < 0)
1388 goto parse_error;
1389 #else
1390 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1391 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1392 goto parse_error;
1393 #endif
1394 }
1395
1396 if (u->version >= 12) {
1397 pa_sample_spec ss;
1398 pa_channel_map cm;
1399 uint32_t device_index;
1400 const char *dn;
1401 pa_bool_t suspended;
1402
1403 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1404 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1405 pa_tagstruct_getu32(t, &device_index) < 0 ||
1406 pa_tagstruct_gets(t, &dn) < 0 ||
1407 pa_tagstruct_get_boolean(t, &suspended) < 0)
1408 goto parse_error;
1409
1410 #ifdef TUNNEL_SINK
1411 pa_xfree(u->sink_name);
1412 u->sink_name = pa_xstrdup(dn);
1413 #else
1414 pa_xfree(u->source_name);
1415 u->source_name = pa_xstrdup(dn);
1416 #endif
1417 }
1418
1419 if (u->version >= 13) {
1420 pa_usec_t usec;
1421
1422 if (pa_tagstruct_get_usec(t, &usec) < 0)
1423 goto parse_error;
1424
1425 /* #ifdef TUNNEL_SINK */
1426 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1427 /* #else */
1428 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1429 /* #endif */
1430 }
1431
1432 if (!pa_tagstruct_eof(t))
1433 goto parse_error;
1434
1435 start_subscribe(u);
1436 request_info(u);
1437
1438 pa_assert(!u->time_event);
1439 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1440
1441 request_latency(u);
1442
1443 pa_log_debug("Stream created.");
1444
1445 #ifdef TUNNEL_SINK
1446 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1447 #endif
1448
1449 return;
1450
1451 parse_error:
1452 pa_log("Invalid reply. (Create stream)");
1453
1454 fail:
1455 pa_module_unload_request(u->module, TRUE);
1456
1457 }
1458
1459 /* Called from main context */
1460 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1461 struct userdata *u = userdata;
1462 pa_tagstruct *reply;
1463 char name[256], un[128], hn[128];
1464 #ifdef TUNNEL_SINK
1465 pa_cvolume volume;
1466 #endif
1467
1468 pa_assert(pd);
1469 pa_assert(u);
1470 pa_assert(u->pdispatch == pd);
1471
1472 if (command != PA_COMMAND_REPLY ||
1473 pa_tagstruct_getu32(t, &u->version) < 0 ||
1474 !pa_tagstruct_eof(t)) {
1475
1476 if (command == PA_COMMAND_ERROR)
1477 pa_log("Failed to authenticate");
1478 else
1479 pa_log("Protocol error.");
1480
1481 goto fail;
1482 }
1483
1484 /* Minimum supported protocol version */
1485 if (u->version < 8) {
1486 pa_log("Incompatible protocol version");
1487 goto fail;
1488 }
1489
1490 /* Starting with protocol version 13 the MSB of the version tag
1491 reflects if shm is enabled for this connection or not. We don't
1492 support SHM here at all, so we just ignore this. */
1493
1494 if (u->version >= 13)
1495 u->version &= 0x7FFFFFFFU;
1496
1497 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1498
1499 #ifdef TUNNEL_SINK
1500 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1501 pa_sink_update_proplist(u->sink, 0, NULL);
1502
1503 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1504 u->sink_name,
1505 pa_get_user_name(un, sizeof(un)),
1506 pa_get_host_name(hn, sizeof(hn)));
1507 #else
1508 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1509 pa_source_update_proplist(u->source, 0, NULL);
1510
1511 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1512 u->source_name,
1513 pa_get_user_name(un, sizeof(un)),
1514 pa_get_host_name(hn, sizeof(hn)));
1515 #endif
1516
1517 reply = pa_tagstruct_new(NULL, 0);
1518 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1519 pa_tagstruct_putu32(reply, tag = u->ctag++);
1520
1521 if (u->version >= 13) {
1522 pa_proplist *pl;
1523 pl = pa_proplist_new();
1524 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1525 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1526 pa_init_proplist(pl);
1527 pa_tagstruct_put_proplist(reply, pl);
1528 pa_proplist_free(pl);
1529 } else
1530 pa_tagstruct_puts(reply, "PulseAudio");
1531
1532 pa_pstream_send_tagstruct(u->pstream, reply);
1533 /* We ignore the server's reply here */
1534
1535 reply = pa_tagstruct_new(NULL, 0);
1536
1537 if (u->version < 13)
1538 /* Only for older PA versions we need to fill in the maxlength */
1539 u->maxlength = 4*1024*1024;
1540
1541 #ifdef TUNNEL_SINK
1542 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1543 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1544 u->prebuf = u->tlength;
1545 #else
1546 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1547 #endif
1548
1549 #ifdef TUNNEL_SINK
1550 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1551 pa_tagstruct_putu32(reply, tag = u->ctag++);
1552
1553 if (u->version < 13)
1554 pa_tagstruct_puts(reply, name);
1555
1556 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1557 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1558 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1559 pa_tagstruct_puts(reply, u->sink_name);
1560 pa_tagstruct_putu32(reply, u->maxlength);
1561 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1562 pa_tagstruct_putu32(reply, u->tlength);
1563 pa_tagstruct_putu32(reply, u->prebuf);
1564 pa_tagstruct_putu32(reply, u->minreq);
1565 pa_tagstruct_putu32(reply, 0);
1566 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1567 pa_tagstruct_put_cvolume(reply, &volume);
1568 #else
1569 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1570 pa_tagstruct_putu32(reply, tag = u->ctag++);
1571
1572 if (u->version < 13)
1573 pa_tagstruct_puts(reply, name);
1574
1575 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1576 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1577 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1578 pa_tagstruct_puts(reply, u->source_name);
1579 pa_tagstruct_putu32(reply, u->maxlength);
1580 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1581 pa_tagstruct_putu32(reply, u->fragsize);
1582 #endif
1583
1584 if (u->version >= 12) {
1585 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1586 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1587 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1588 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1589 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1590 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1591 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1592 }
1593
1594 if (u->version >= 13) {
1595 pa_proplist *pl;
1596
1597 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1598 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1599
1600 pl = pa_proplist_new();
1601 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1602 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1603 pa_tagstruct_put_proplist(reply, pl);
1604 pa_proplist_free(pl);
1605
1606 #ifndef TUNNEL_SINK
1607 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1608 #endif
1609 }
1610
1611 if (u->version >= 14) {
1612 #ifdef TUNNEL_SINK
1613 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1614 #endif
1615 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1616 }
1617
1618 if (u->version >= 15) {
1619 #ifdef TUNNEL_SINK
1620 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1621 #endif
1622 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1623 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1624 }
1625
1626 pa_pstream_send_tagstruct(u->pstream, reply);
1627 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1628
1629 pa_log_debug("Connection authenticated, creating stream ...");
1630
1631 return;
1632
1633 fail:
1634 pa_module_unload_request(u->module, TRUE);
1635 }
1636
1637 /* Called from main context */
1638 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1639 struct userdata *u = userdata;
1640
1641 pa_assert(p);
1642 pa_assert(u);
1643
1644 pa_log_warn("Stream died.");
1645 pa_module_unload_request(u->module, TRUE);
1646 }
1647
1648 /* Called from main context */
1649 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1650 struct userdata *u = userdata;
1651
1652 pa_assert(p);
1653 pa_assert(packet);
1654 pa_assert(u);
1655
1656 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1657 pa_log("Invalid packet");
1658 pa_module_unload_request(u->module, TRUE);
1659 return;
1660 }
1661 }
1662
1663 #ifndef TUNNEL_SINK
1664 /* Called from main context */
1665 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1666 struct userdata *u = userdata;
1667
1668 pa_assert(p);
1669 pa_assert(chunk);
1670 pa_assert(u);
1671
1672 if (channel != u->channel) {
1673 pa_log("Received memory block on bad channel.");
1674 pa_module_unload_request(u->module, TRUE);
1675 return;
1676 }
1677
1678 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1679
1680 u->counter_delta += (int64_t) chunk->length;
1681 }
1682 #endif
1683
1684 /* Called from main context */
1685 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1686 struct userdata *u = userdata;
1687 pa_tagstruct *t;
1688 uint32_t tag;
1689
1690 pa_assert(sc);
1691 pa_assert(u);
1692 pa_assert(u->client == sc);
1693
1694 pa_socket_client_unref(u->client);
1695 u->client = NULL;
1696
1697 if (!io) {
1698 pa_log("Connection failed: %s", pa_cstrerror(errno));
1699 pa_module_unload_request(u->module, TRUE);
1700 return;
1701 }
1702
1703 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1704 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1705
1706 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1707 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1708 #ifndef TUNNEL_SINK
1709 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1710 #endif
1711
1712 t = pa_tagstruct_new(NULL, 0);
1713 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1714 pa_tagstruct_putu32(t, tag = u->ctag++);
1715 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1716
1717 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1718
1719 #ifdef HAVE_CREDS
1720 {
1721 pa_creds ucred;
1722
1723 if (pa_iochannel_creds_supported(io))
1724 pa_iochannel_creds_enable(io);
1725
1726 ucred.uid = getuid();
1727 ucred.gid = getgid();
1728
1729 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1730 }
1731 #else
1732 pa_pstream_send_tagstruct(u->pstream, t);
1733 #endif
1734
1735 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1736
1737 pa_log_debug("Connection established, authenticating ...");
1738 }
1739
1740 #ifdef TUNNEL_SINK
1741
1742 /* Called from main context */
1743 static void sink_set_volume(pa_sink *sink) {
1744 struct userdata *u;
1745 pa_tagstruct *t;
1746 uint32_t tag;
1747
1748 pa_assert(sink);
1749 u = sink->userdata;
1750 pa_assert(u);
1751
1752 t = pa_tagstruct_new(NULL, 0);
1753 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1754 pa_tagstruct_putu32(t, tag = u->ctag++);
1755 pa_tagstruct_putu32(t, u->device_index);
1756 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1757 pa_pstream_send_tagstruct(u->pstream, t);
1758 }
1759
1760 /* Called from main context */
1761 static void sink_set_mute(pa_sink *sink) {
1762 struct userdata *u;
1763 pa_tagstruct *t;
1764 uint32_t tag;
1765
1766 pa_assert(sink);
1767 u = sink->userdata;
1768 pa_assert(u);
1769
1770 if (u->version < 11)
1771 return;
1772
1773 t = pa_tagstruct_new(NULL, 0);
1774 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1775 pa_tagstruct_putu32(t, tag = u->ctag++);
1776 pa_tagstruct_putu32(t, u->device_index);
1777 pa_tagstruct_put_boolean(t, !!sink->muted);
1778 pa_pstream_send_tagstruct(u->pstream, t);
1779 }
1780
1781 #endif
1782
1783 int pa__init(pa_module*m) {
1784 pa_modargs *ma = NULL;
1785 struct userdata *u = NULL;
1786 pa_sample_spec ss;
1787 pa_channel_map map;
1788 char *dn = NULL;
1789 #ifdef TUNNEL_SINK
1790 pa_sink_new_data data;
1791 #else
1792 pa_source_new_data data;
1793 #endif
1794
1795 pa_assert(m);
1796
1797 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1798 pa_log("Failed to parse module arguments");
1799 goto fail;
1800 }
1801
1802 m->userdata = u = pa_xnew0(struct userdata, 1);
1803 u->core = m->core;
1804 u->module = m;
1805 u->client = NULL;
1806 u->pdispatch = NULL;
1807 u->pstream = NULL;
1808 u->server_name = NULL;
1809 #ifdef TUNNEL_SINK
1810 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1811 u->sink = NULL;
1812 u->requested_bytes = 0;
1813 #else
1814 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1815 u->source = NULL;
1816 #endif
1817 u->smoother = pa_smoother_new(
1818 PA_USEC_PER_SEC,
1819 PA_USEC_PER_SEC*2,
1820 TRUE,
1821 TRUE,
1822 10,
1823 pa_rtclock_now(),
1824 FALSE);
1825 u->ctag = 1;
1826 u->device_index = u->channel = PA_INVALID_INDEX;
1827 u->time_event = NULL;
1828 u->ignore_latency_before = 0;
1829 u->transport_usec = u->thread_transport_usec = 0;
1830 u->remote_suspended = u->remote_corked = FALSE;
1831 u->counter = u->counter_delta = 0;
1832
1833 u->rtpoll = pa_rtpoll_new();
1834 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1835
1836 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1837 goto fail;
1838
1839 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1840 pa_log("No server specified.");
1841 goto fail;
1842 }
1843
1844 ss = m->core->default_sample_spec;
1845 map = m->core->default_channel_map;
1846 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1847 pa_log("Invalid sample format specification");
1848 goto fail;
1849 }
1850
1851 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1852 pa_log("Failed to connect to server '%s'", u->server_name);
1853 goto fail;
1854 }
1855
1856 pa_socket_client_set_callback(u->client, on_connection, u);
1857
1858 #ifdef TUNNEL_SINK
1859
1860 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1861 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1862
1863 pa_sink_new_data_init(&data);
1864 data.driver = __FILE__;
1865 data.module = m;
1866 data.namereg_fail = TRUE;
1867 pa_sink_new_data_set_name(&data, dn);
1868 pa_sink_new_data_set_sample_spec(&data, &ss);
1869 pa_sink_new_data_set_channel_map(&data, &map);
1870 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1871 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1872 if (u->sink_name)
1873 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1874
1875 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1876 pa_log("Invalid properties");
1877 pa_sink_new_data_done(&data);
1878 goto fail;
1879 }
1880
1881 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1882 pa_sink_new_data_done(&data);
1883
1884 if (!u->sink) {
1885 pa_log("Failed to create sink.");
1886 goto fail;
1887 }
1888
1889 u->sink->parent.process_msg = sink_process_msg;
1890 u->sink->userdata = u;
1891 u->sink->set_state = sink_set_state;
1892 u->sink->set_volume = sink_set_volume;
1893 u->sink->set_mute = sink_set_mute;
1894
1895 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1896
1897 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1898
1899 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1900 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1901
1902 #else
1903
1904 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1905 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1906
1907 pa_source_new_data_init(&data);
1908 data.driver = __FILE__;
1909 data.module = m;
1910 data.namereg_fail = TRUE;
1911 pa_source_new_data_set_name(&data, dn);
1912 pa_source_new_data_set_sample_spec(&data, &ss);
1913 pa_source_new_data_set_channel_map(&data, &map);
1914 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1915 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1916 if (u->source_name)
1917 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1918
1919 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1920 pa_log("Invalid properties");
1921 pa_source_new_data_done(&data);
1922 goto fail;
1923 }
1924
1925 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1926 pa_source_new_data_done(&data);
1927
1928 if (!u->source) {
1929 pa_log("Failed to create source.");
1930 goto fail;
1931 }
1932
1933 u->source->parent.process_msg = source_process_msg;
1934 u->source->set_state = source_set_state;
1935 u->source->userdata = u;
1936
1937 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1938
1939 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1940 pa_source_set_rtpoll(u->source, u->rtpoll);
1941 #endif
1942
1943 pa_xfree(dn);
1944
1945 u->time_event = NULL;
1946
1947 u->maxlength = (uint32_t) -1;
1948 #ifdef TUNNEL_SINK
1949 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1950 #else
1951 u->fragsize = (uint32_t) -1;
1952 #endif
1953
1954 if (!(u->thread = pa_thread_new(thread_func, u))) {
1955 pa_log("Failed to create thread.");
1956 goto fail;
1957 }
1958
1959 #ifdef TUNNEL_SINK
1960 pa_sink_put(u->sink);
1961 #else
1962 pa_source_put(u->source);
1963 #endif
1964
1965 pa_modargs_free(ma);
1966
1967 return 0;
1968
1969 fail:
1970 pa__done(m);
1971
1972 if (ma)
1973 pa_modargs_free(ma);
1974
1975 pa_xfree(dn);
1976
1977 return -1;
1978 }
1979
1980 void pa__done(pa_module*m) {
1981 struct userdata* u;
1982
1983 pa_assert(m);
1984
1985 if (!(u = m->userdata))
1986 return;
1987
1988 #ifdef TUNNEL_SINK
1989 if (u->sink)
1990 pa_sink_unlink(u->sink);
1991 #else
1992 if (u->source)
1993 pa_source_unlink(u->source);
1994 #endif
1995
1996 if (u->thread) {
1997 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1998 pa_thread_free(u->thread);
1999 }
2000
2001 pa_thread_mq_done(&u->thread_mq);
2002
2003 #ifdef TUNNEL_SINK
2004 if (u->sink)
2005 pa_sink_unref(u->sink);
2006 #else
2007 if (u->source)
2008 pa_source_unref(u->source);
2009 #endif
2010
2011 if (u->rtpoll)
2012 pa_rtpoll_free(u->rtpoll);
2013
2014 if (u->pstream) {
2015 pa_pstream_unlink(u->pstream);
2016 pa_pstream_unref(u->pstream);
2017 }
2018
2019 if (u->pdispatch)
2020 pa_pdispatch_unref(u->pdispatch);
2021
2022 if (u->client)
2023 pa_socket_client_unref(u->client);
2024
2025 if (u->auth_cookie)
2026 pa_auth_cookie_unref(u->auth_cookie);
2027
2028 if (u->smoother)
2029 pa_smoother_free(u->smoother);
2030
2031 if (u->time_event)
2032 u->core->mainloop->time_free(u->time_event);
2033
2034 #ifdef TUNNEL_SINK
2035 pa_xfree(u->sink_name);
2036 #else
2037 pa_xfree(u->source_name);
2038 #endif
2039 pa_xfree(u->server_name);
2040
2041 pa_xfree(u->device_description);
2042 pa_xfree(u->server_fqdn);
2043 pa_xfree(u->user_name);
2044
2045 pa_xfree(u);
2046 }