]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
Merge branch 'master' into dbus-work
[pulseaudio] / src / modules / module-tunnel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58
59 #ifdef TUNNEL_SINK
60 #include "module-tunnel-sink-symdef.h"
61 #else
62 #include "module-tunnel-source-symdef.h"
63 #endif
64
65 #ifdef TUNNEL_SINK
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 PA_MODULE_USAGE(
68 "sink_name=<name for the local sink> "
69 "sink_properties=<properties for the local sink> "
70 "server=<address> "
71 "sink=<remote sink name> "
72 "cookie=<filename> "
73 "format=<sample format> "
74 "channels=<number of channels> "
75 "rate=<sample rate> "
76 "channel_map=<channel map>");
77 #else
78 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 PA_MODULE_USAGE(
80 "source_name=<name for the local source> "
81 "source_properties=<properties for the local source> "
82 "server=<address> "
83 "source=<remote source name> "
84 "cookie=<filename> "
85 "format=<sample format> "
86 "channels=<number of channels> "
87 "rate=<sample rate> "
88 "channel_map=<channel map>");
89 #endif
90
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION);
93 PA_MODULE_LOAD_ONCE(FALSE);
94
95 static const char* const valid_modargs[] = {
96 "server",
97 "cookie",
98 "format",
99 "channels",
100 "rate",
101 #ifdef TUNNEL_SINK
102 "sink_name",
103 "sink_properties",
104 "sink",
105 #else
106 "source_name",
107 "source_properties",
108 "source",
109 #endif
110 "channel_map",
111 NULL,
112 };
113
114 #define DEFAULT_TIMEOUT 5
115
116 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
117
118 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
119
120 #ifdef TUNNEL_SINK
121
122 enum {
123 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
124 SINK_MESSAGE_REMOTE_SUSPEND,
125 SINK_MESSAGE_UPDATE_LATENCY,
126 SINK_MESSAGE_POST
127 };
128
129 #define DEFAULT_TLENGTH_MSEC 150
130 #define DEFAULT_MINREQ_MSEC 25
131
132 #else
133
134 enum {
135 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
136 SOURCE_MESSAGE_REMOTE_SUSPEND,
137 SOURCE_MESSAGE_UPDATE_LATENCY
138 };
139
140 #define DEFAULT_FRAGSIZE_MSEC 25
141
142 #endif
143
144 #ifdef TUNNEL_SINK
145 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 #endif
148 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155
156 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
157 #ifdef TUNNEL_SINK
158 [PA_COMMAND_REQUEST] = command_request,
159 [PA_COMMAND_STARTED] = command_started,
160 #endif
161 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
162 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
164 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
166 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
168 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
170 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
173 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
174 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
175 };
176
177 struct userdata {
178 pa_core *core;
179 pa_module *module;
180
181 pa_thread_mq thread_mq;
182 pa_rtpoll *rtpoll;
183 pa_thread *thread;
184
185 pa_socket_client *client;
186 pa_pstream *pstream;
187 pa_pdispatch *pdispatch;
188
189 char *server_name;
190 #ifdef TUNNEL_SINK
191 char *sink_name;
192 pa_sink *sink;
193 size_t requested_bytes;
194 #else
195 char *source_name;
196 pa_source *source;
197 #endif
198
199 pa_auth_cookie *auth_cookie;
200
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
205
206 int64_t counter, counter_delta;
207
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
210
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214 uint32_t ignore_latency_before;
215
216 pa_time_event *time_event;
217
218 pa_smoother *smoother;
219
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
223
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
244
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
249
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
257
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
262
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
272
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
277
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
281
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
285 }
286
287 pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295 request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
304
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
309
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
318 }
319
320 pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328 request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
334 pa_usec_t usec;
335
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
340
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
347 }
348
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
352
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
356 }
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
362
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
366 }
367 }
368
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373 request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
386
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
396
397 pa_assert(u);
398
399 x = pa_rtclock_now();
400
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
404
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
409
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
419
420 if (u->remote_corked == cork)
421 return;
422
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
431
432 if (!u->pstream)
433 return;
434
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
445
446 request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
452
453 if (u->remote_suspended == suspend)
454 return;
455
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
465
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
468
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
472
473 u->requested_bytes -= memchunk.length;
474
475 u->counter += (int64_t) memchunk.length;
476 }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
482
483 switch (code) {
484
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
487
488 /* First, change the state, because otherwide pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
495 }
496
497 return r;
498 }
499
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
502
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
508 }
509
510 case SINK_MESSAGE_REQUEST:
511
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
514
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
517
518 return 0;
519
520
521 case SINK_MESSAGE_REMOTE_SUSPEND:
522
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
525
526
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
529
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
536
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
541
542 return 0;
543 }
544
545 case SINK_MESSAGE_POST:
546
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554 u->counter_delta += (int64_t) chunk->length;
555
556 return 0;
557 }
558
559 return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
567
568 switch ((pa_sink_state_t) state) {
569
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
574
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
580
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
584 ;
585 }
586
587 return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
595
596 switch (code) {
597
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
600
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604 return r;
605 }
606
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
609
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
615 }
616
617 case SOURCE_MESSAGE_POST:
618
619 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
620 pa_source_post(u->source, chunk);
621
622 u->counter += (int64_t) chunk->length;
623
624 return 0;
625
626 case SOURCE_MESSAGE_REMOTE_SUSPEND:
627
628 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
629 return 0;
630
631 case SOURCE_MESSAGE_UPDATE_LATENCY: {
632 pa_usec_t y;
633
634 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
635 y += (pa_usec_t) offset;
636
637 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
638
639 /* We can access this freely here, since the main thread is waiting for us */
640 u->thread_transport_usec = u->transport_usec;
641
642 return 0;
643 }
644 }
645
646 return pa_source_process_msg(o, code, data, offset, chunk);
647 }
648
649 /* Called from main context */
650 static int source_set_state(pa_source *s, pa_source_state_t state) {
651 struct userdata *u;
652 pa_source_assert_ref(s);
653 u = s->userdata;
654
655 switch ((pa_source_state_t) state) {
656
657 case PA_SOURCE_SUSPENDED:
658 pa_assert(PA_SOURCE_IS_OPENED(s->state));
659 stream_cork(u, TRUE);
660 break;
661
662 case PA_SOURCE_IDLE:
663 case PA_SOURCE_RUNNING:
664 if (s->state == PA_SOURCE_SUSPENDED)
665 stream_cork(u, FALSE);
666 break;
667
668 case PA_SOURCE_UNLINKED:
669 case PA_SOURCE_INIT:
670 case PA_SINK_INVALID_STATE:
671 ;
672 }
673
674 return 0;
675 }
676
677 #endif
678
679 static void thread_func(void *userdata) {
680 struct userdata *u = userdata;
681
682 pa_assert(u);
683
684 pa_log_debug("Thread starting up");
685
686 pa_thread_mq_install(&u->thread_mq);
687
688 for (;;) {
689 int ret;
690
691 #ifdef TUNNEL_SINK
692 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
693 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
694 pa_sink_process_rewind(u->sink, 0);
695 #endif
696
697 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
698 goto fail;
699
700 if (ret == 0)
701 goto finish;
702 }
703
704 fail:
705 /* If this was no regular exit from the loop we have to continue
706 * processing messages until we received PA_MESSAGE_SHUTDOWN */
707 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
708 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
709
710 finish:
711 pa_log_debug("Thread shutting down");
712 }
713
714 #ifdef TUNNEL_SINK
715 /* Called from main context */
716 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 struct userdata *u = userdata;
718 uint32_t bytes, channel;
719
720 pa_assert(pd);
721 pa_assert(command == PA_COMMAND_REQUEST);
722 pa_assert(t);
723 pa_assert(u);
724 pa_assert(u->pdispatch == pd);
725
726 if (pa_tagstruct_getu32(t, &channel) < 0 ||
727 pa_tagstruct_getu32(t, &bytes) < 0) {
728 pa_log("Invalid protocol reply");
729 goto fail;
730 }
731
732 if (channel != u->channel) {
733 pa_log("Received data for invalid channel");
734 goto fail;
735 }
736
737 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
738 return;
739
740 fail:
741 pa_module_unload_request(u->module, TRUE);
742 }
743
744 #endif
745
746 /* Called from main context */
747 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
748 struct userdata *u = userdata;
749 pa_usec_t sink_usec, source_usec;
750 pa_bool_t playing;
751 int64_t write_index, read_index;
752 struct timeval local, remote, now;
753 pa_sample_spec *ss;
754 int64_t delay;
755
756 pa_assert(pd);
757 pa_assert(u);
758
759 if (command != PA_COMMAND_REPLY) {
760 if (command == PA_COMMAND_ERROR)
761 pa_log("Failed to get latency.");
762 else
763 pa_log("Protocol error.");
764 goto fail;
765 }
766
767 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
768 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
769 pa_tagstruct_get_boolean(t, &playing) < 0 ||
770 pa_tagstruct_get_timeval(t, &local) < 0 ||
771 pa_tagstruct_get_timeval(t, &remote) < 0 ||
772 pa_tagstruct_gets64(t, &write_index) < 0 ||
773 pa_tagstruct_gets64(t, &read_index) < 0) {
774 pa_log("Invalid reply.");
775 goto fail;
776 }
777
778 #ifdef TUNNEL_SINK
779 if (u->version >= 13) {
780 uint64_t underrun_for = 0, playing_for = 0;
781
782 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
783 pa_tagstruct_getu64(t, &playing_for) < 0) {
784 pa_log("Invalid reply.");
785 goto fail;
786 }
787 }
788 #endif
789
790 if (!pa_tagstruct_eof(t)) {
791 pa_log("Invalid reply.");
792 goto fail;
793 }
794
795 if (tag < u->ignore_latency_before) {
796 return;
797 }
798
799 pa_gettimeofday(&now);
800
801 /* Calculate transport usec */
802 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
803 /* local and remote seem to have synchronized clocks */
804 #ifdef TUNNEL_SINK
805 u->transport_usec = pa_timeval_diff(&remote, &local);
806 #else
807 u->transport_usec = pa_timeval_diff(&now, &remote);
808 #endif
809 } else
810 u->transport_usec = pa_timeval_diff(&now, &local)/2;
811
812 /* First, take the device's delay */
813 #ifdef TUNNEL_SINK
814 delay = (int64_t) sink_usec;
815 ss = &u->sink->sample_spec;
816 #else
817 delay = (int64_t) source_usec;
818 ss = &u->source->sample_spec;
819 #endif
820
821 /* Add the length of our server-side buffer */
822 if (write_index >= read_index)
823 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
824 else
825 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
826
827 /* Our measurements are already out of date, hence correct by the *
828 * transport latency */
829 #ifdef TUNNEL_SINK
830 delay -= (int64_t) u->transport_usec;
831 #else
832 delay += (int64_t) u->transport_usec;
833 #endif
834
835 /* Now correct by what we have have read/written since we requested the update */
836 #ifdef TUNNEL_SINK
837 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
838 #else
839 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
840 #endif
841
842 #ifdef TUNNEL_SINK
843 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
844 #else
845 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
846 #endif
847
848 return;
849
850 fail:
851
852 pa_module_unload_request(u->module, TRUE);
853 }
854
855 /* Called from main context */
856 static void request_latency(struct userdata *u) {
857 pa_tagstruct *t;
858 struct timeval now;
859 uint32_t tag;
860 pa_assert(u);
861
862 t = pa_tagstruct_new(NULL, 0);
863 #ifdef TUNNEL_SINK
864 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
865 #else
866 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
867 #endif
868 pa_tagstruct_putu32(t, tag = u->ctag++);
869 pa_tagstruct_putu32(t, u->channel);
870
871 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
872
873 pa_pstream_send_tagstruct(u->pstream, t);
874 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
875
876 u->ignore_latency_before = tag;
877 u->counter_delta = 0;
878 }
879
880 /* Called from main context */
881 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
882 struct userdata *u = userdata;
883
884 pa_assert(m);
885 pa_assert(e);
886 pa_assert(u);
887
888 request_latency(u);
889
890 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
891 }
892
893 /* Called from main context */
894 static void update_description(struct userdata *u) {
895 char *d;
896 char un[128], hn[128];
897 pa_tagstruct *t;
898
899 pa_assert(u);
900
901 if (!u->server_fqdn || !u->user_name || !u->device_description)
902 return;
903
904 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
905
906 #ifdef TUNNEL_SINK
907 pa_sink_set_description(u->sink, d);
908 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
909 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
910 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
911 #else
912 pa_source_set_description(u->source, d);
913 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
914 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
915 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
916 #endif
917
918 pa_xfree(d);
919
920 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
921 pa_get_user_name(un, sizeof(un)),
922 pa_get_host_name(hn, sizeof(hn)));
923
924 t = pa_tagstruct_new(NULL, 0);
925 #ifdef TUNNEL_SINK
926 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
927 #else
928 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
929 #endif
930 pa_tagstruct_putu32(t, u->ctag++);
931 pa_tagstruct_putu32(t, u->channel);
932 pa_tagstruct_puts(t, d);
933 pa_pstream_send_tagstruct(u->pstream, t);
934
935 pa_xfree(d);
936 }
937
938 /* Called from main context */
939 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
940 struct userdata *u = userdata;
941 pa_sample_spec ss;
942 pa_channel_map cm;
943 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
944 uint32_t cookie;
945
946 pa_assert(pd);
947 pa_assert(u);
948
949 if (command != PA_COMMAND_REPLY) {
950 if (command == PA_COMMAND_ERROR)
951 pa_log("Failed to get info.");
952 else
953 pa_log("Protocol error.");
954 goto fail;
955 }
956
957 if (pa_tagstruct_gets(t, &server_name) < 0 ||
958 pa_tagstruct_gets(t, &server_version) < 0 ||
959 pa_tagstruct_gets(t, &user_name) < 0 ||
960 pa_tagstruct_gets(t, &host_name) < 0 ||
961 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
962 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
963 pa_tagstruct_gets(t, &default_source_name) < 0 ||
964 pa_tagstruct_getu32(t, &cookie) < 0 ||
965 (u->version >= 15 &&
966 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
967
968 pa_log("Parse failure");
969 goto fail;
970 }
971
972 if (!pa_tagstruct_eof(t)) {
973 pa_log("Packet too long");
974 goto fail;
975 }
976
977 pa_xfree(u->server_fqdn);
978 u->server_fqdn = pa_xstrdup(host_name);
979
980 pa_xfree(u->user_name);
981 u->user_name = pa_xstrdup(user_name);
982
983 update_description(u);
984
985 return;
986
987 fail:
988 pa_module_unload_request(u->module, TRUE);
989 }
990
991 #ifdef TUNNEL_SINK
992
993 /* Called from main context */
994 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
995 struct userdata *u = userdata;
996 uint32_t idx, owner_module, monitor_source, flags;
997 const char *name, *description, *monitor_source_name, *driver;
998 pa_sample_spec ss;
999 pa_channel_map cm;
1000 pa_cvolume volume;
1001 pa_bool_t mute;
1002 pa_usec_t latency;
1003 pa_proplist *pl;
1004
1005 pa_assert(pd);
1006 pa_assert(u);
1007
1008 pl = pa_proplist_new();
1009
1010 if (command != PA_COMMAND_REPLY) {
1011 if (command == PA_COMMAND_ERROR)
1012 pa_log("Failed to get info.");
1013 else
1014 pa_log("Protocol error.");
1015 goto fail;
1016 }
1017
1018 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1019 pa_tagstruct_gets(t, &name) < 0 ||
1020 pa_tagstruct_gets(t, &description) < 0 ||
1021 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1022 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1023 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1024 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1025 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1026 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1027 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1028 pa_tagstruct_get_usec(t, &latency) < 0 ||
1029 pa_tagstruct_gets(t, &driver) < 0 ||
1030 pa_tagstruct_getu32(t, &flags) < 0) {
1031
1032 pa_log("Parse failure");
1033 goto fail;
1034 }
1035
1036 if (u->version >= 13) {
1037 pa_usec_t configured_latency;
1038
1039 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1040 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1041
1042 pa_log("Parse failure");
1043 goto fail;
1044 }
1045 }
1046
1047 if (u->version >= 15) {
1048 pa_volume_t base_volume;
1049 uint32_t state, n_volume_steps, card;
1050
1051 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1052 pa_tagstruct_getu32(t, &state) < 0 ||
1053 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1054 pa_tagstruct_getu32(t, &card) < 0) {
1055
1056 pa_log("Parse failure");
1057 goto fail;
1058 }
1059 }
1060
1061 if (!pa_tagstruct_eof(t)) {
1062 pa_log("Packet too long");
1063 goto fail;
1064 }
1065
1066 pa_proplist_free(pl);
1067
1068 if (!u->sink_name || strcmp(name, u->sink_name))
1069 return;
1070
1071 pa_xfree(u->device_description);
1072 u->device_description = pa_xstrdup(description);
1073
1074 update_description(u);
1075
1076 return;
1077
1078 fail:
1079 pa_module_unload_request(u->module, TRUE);
1080 pa_proplist_free(pl);
1081 }
1082
1083 /* Called from main context */
1084 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1085 struct userdata *u = userdata;
1086 uint32_t idx, owner_module, client, sink;
1087 pa_usec_t buffer_usec, sink_usec;
1088 const char *name, *driver, *resample_method;
1089 pa_bool_t mute;
1090 pa_sample_spec sample_spec;
1091 pa_channel_map channel_map;
1092 pa_cvolume volume;
1093 pa_proplist *pl;
1094
1095 pa_assert(pd);
1096 pa_assert(u);
1097
1098 pl = pa_proplist_new();
1099
1100 if (command != PA_COMMAND_REPLY) {
1101 if (command == PA_COMMAND_ERROR)
1102 pa_log("Failed to get info.");
1103 else
1104 pa_log("Protocol error.");
1105 goto fail;
1106 }
1107
1108 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1109 pa_tagstruct_gets(t, &name) < 0 ||
1110 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1111 pa_tagstruct_getu32(t, &client) < 0 ||
1112 pa_tagstruct_getu32(t, &sink) < 0 ||
1113 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1114 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1115 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1116 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1117 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1118 pa_tagstruct_gets(t, &resample_method) < 0 ||
1119 pa_tagstruct_gets(t, &driver) < 0) {
1120
1121 pa_log("Parse failure");
1122 goto fail;
1123 }
1124
1125 if (u->version >= 11) {
1126 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1127
1128 pa_log("Parse failure");
1129 goto fail;
1130 }
1131 }
1132
1133 if (u->version >= 13) {
1134 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1135
1136 pa_log("Parse failure");
1137 goto fail;
1138 }
1139 }
1140
1141 if (!pa_tagstruct_eof(t)) {
1142 pa_log("Packet too long");
1143 goto fail;
1144 }
1145
1146 pa_proplist_free(pl);
1147
1148 if (idx != u->device_index)
1149 return;
1150
1151 pa_assert(u->sink);
1152
1153 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1154 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1155 return;
1156
1157 pa_sink_volume_changed(u->sink, &volume, FALSE);
1158
1159 if (u->version >= 11)
1160 pa_sink_mute_changed(u->sink, mute, FALSE);
1161
1162 return;
1163
1164 fail:
1165 pa_module_unload_request(u->module, TRUE);
1166 pa_proplist_free(pl);
1167 }
1168
1169 #else
1170
1171 /* Called from main context */
1172 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1173 struct userdata *u = userdata;
1174 uint32_t idx, owner_module, monitor_of_sink, flags;
1175 const char *name, *description, *monitor_of_sink_name, *driver;
1176 pa_sample_spec ss;
1177 pa_channel_map cm;
1178 pa_cvolume volume;
1179 pa_bool_t mute;
1180 pa_usec_t latency, configured_latency;
1181 pa_proplist *pl;
1182
1183 pa_assert(pd);
1184 pa_assert(u);
1185
1186 pl = pa_proplist_new();
1187
1188 if (command != PA_COMMAND_REPLY) {
1189 if (command == PA_COMMAND_ERROR)
1190 pa_log("Failed to get info.");
1191 else
1192 pa_log("Protocol error.");
1193 goto fail;
1194 }
1195
1196 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1197 pa_tagstruct_gets(t, &name) < 0 ||
1198 pa_tagstruct_gets(t, &description) < 0 ||
1199 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1200 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1201 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1202 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1203 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1204 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1205 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1206 pa_tagstruct_get_usec(t, &latency) < 0 ||
1207 pa_tagstruct_gets(t, &driver) < 0 ||
1208 pa_tagstruct_getu32(t, &flags) < 0) {
1209
1210 pa_log("Parse failure");
1211 goto fail;
1212 }
1213
1214 if (u->version >= 13) {
1215 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1216 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1217
1218 pa_log("Parse failure");
1219 goto fail;
1220 }
1221 }
1222
1223 if (u->version >= 15) {
1224 pa_volume_t base_volume;
1225 uint32_t state, n_volume_steps, card;
1226
1227 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1228 pa_tagstruct_getu32(t, &state) < 0 ||
1229 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1230 pa_tagstruct_getu32(t, &card) < 0) {
1231
1232 pa_log("Parse failure");
1233 goto fail;
1234 }
1235 }
1236
1237 if (!pa_tagstruct_eof(t)) {
1238 pa_log("Packet too long");
1239 goto fail;
1240 }
1241
1242 pa_proplist_free(pl);
1243
1244 if (!u->source_name || strcmp(name, u->source_name))
1245 return;
1246
1247 pa_xfree(u->device_description);
1248 u->device_description = pa_xstrdup(description);
1249
1250 update_description(u);
1251
1252 return;
1253
1254 fail:
1255 pa_module_unload_request(u->module, TRUE);
1256 pa_proplist_free(pl);
1257 }
1258
1259 #endif
1260
1261 /* Called from main context */
1262 static void request_info(struct userdata *u) {
1263 pa_tagstruct *t;
1264 uint32_t tag;
1265 pa_assert(u);
1266
1267 t = pa_tagstruct_new(NULL, 0);
1268 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1269 pa_tagstruct_putu32(t, tag = u->ctag++);
1270 pa_pstream_send_tagstruct(u->pstream, t);
1271 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1272
1273 #ifdef TUNNEL_SINK
1274 t = pa_tagstruct_new(NULL, 0);
1275 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1276 pa_tagstruct_putu32(t, tag = u->ctag++);
1277 pa_tagstruct_putu32(t, u->device_index);
1278 pa_pstream_send_tagstruct(u->pstream, t);
1279 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1280
1281 if (u->sink_name) {
1282 t = pa_tagstruct_new(NULL, 0);
1283 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1284 pa_tagstruct_putu32(t, tag = u->ctag++);
1285 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1286 pa_tagstruct_puts(t, u->sink_name);
1287 pa_pstream_send_tagstruct(u->pstream, t);
1288 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1289 }
1290 #else
1291 if (u->source_name) {
1292 t = pa_tagstruct_new(NULL, 0);
1293 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1294 pa_tagstruct_putu32(t, tag = u->ctag++);
1295 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1296 pa_tagstruct_puts(t, u->source_name);
1297 pa_pstream_send_tagstruct(u->pstream, t);
1298 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1299 }
1300 #endif
1301 }
1302
1303 /* Called from main context */
1304 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1305 struct userdata *u = userdata;
1306 pa_subscription_event_type_t e;
1307 uint32_t idx;
1308
1309 pa_assert(pd);
1310 pa_assert(t);
1311 pa_assert(u);
1312 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1313
1314 if (pa_tagstruct_getu32(t, &e) < 0 ||
1315 pa_tagstruct_getu32(t, &idx) < 0) {
1316 pa_log("Invalid protocol reply");
1317 pa_module_unload_request(u->module, TRUE);
1318 return;
1319 }
1320
1321 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1322 #ifdef TUNNEL_SINK
1323 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1324 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1325 #else
1326 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1327 #endif
1328 )
1329 return;
1330
1331 request_info(u);
1332 }
1333
1334 /* Called from main context */
1335 static void start_subscribe(struct userdata *u) {
1336 pa_tagstruct *t;
1337 uint32_t tag;
1338 pa_assert(u);
1339
1340 t = pa_tagstruct_new(NULL, 0);
1341 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1342 pa_tagstruct_putu32(t, tag = u->ctag++);
1343 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1344 #ifdef TUNNEL_SINK
1345 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1346 #else
1347 PA_SUBSCRIPTION_MASK_SOURCE
1348 #endif
1349 );
1350
1351 pa_pstream_send_tagstruct(u->pstream, t);
1352 }
1353
1354 /* Called from main context */
1355 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1356 struct userdata *u = userdata;
1357 #ifdef TUNNEL_SINK
1358 uint32_t bytes;
1359 #endif
1360
1361 pa_assert(pd);
1362 pa_assert(u);
1363 pa_assert(u->pdispatch == pd);
1364
1365 if (command != PA_COMMAND_REPLY) {
1366 if (command == PA_COMMAND_ERROR)
1367 pa_log("Failed to create stream.");
1368 else
1369 pa_log("Protocol error.");
1370 goto fail;
1371 }
1372
1373 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1374 pa_tagstruct_getu32(t, &u->device_index) < 0
1375 #ifdef TUNNEL_SINK
1376 || pa_tagstruct_getu32(t, &bytes) < 0
1377 #endif
1378 )
1379 goto parse_error;
1380
1381 if (u->version >= 9) {
1382 #ifdef TUNNEL_SINK
1383 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1384 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1385 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1386 pa_tagstruct_getu32(t, &u->minreq) < 0)
1387 goto parse_error;
1388 #else
1389 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1390 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1391 goto parse_error;
1392 #endif
1393 }
1394
1395 if (u->version >= 12) {
1396 pa_sample_spec ss;
1397 pa_channel_map cm;
1398 uint32_t device_index;
1399 const char *dn;
1400 pa_bool_t suspended;
1401
1402 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1403 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1404 pa_tagstruct_getu32(t, &device_index) < 0 ||
1405 pa_tagstruct_gets(t, &dn) < 0 ||
1406 pa_tagstruct_get_boolean(t, &suspended) < 0)
1407 goto parse_error;
1408
1409 #ifdef TUNNEL_SINK
1410 pa_xfree(u->sink_name);
1411 u->sink_name = pa_xstrdup(dn);
1412 #else
1413 pa_xfree(u->source_name);
1414 u->source_name = pa_xstrdup(dn);
1415 #endif
1416 }
1417
1418 if (u->version >= 13) {
1419 pa_usec_t usec;
1420
1421 if (pa_tagstruct_get_usec(t, &usec) < 0)
1422 goto parse_error;
1423
1424 /* #ifdef TUNNEL_SINK */
1425 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1426 /* #else */
1427 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1428 /* #endif */
1429 }
1430
1431 if (!pa_tagstruct_eof(t))
1432 goto parse_error;
1433
1434 start_subscribe(u);
1435 request_info(u);
1436
1437 pa_assert(!u->time_event);
1438 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1439
1440 request_latency(u);
1441
1442 pa_log_debug("Stream created.");
1443
1444 #ifdef TUNNEL_SINK
1445 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1446 #endif
1447
1448 return;
1449
1450 parse_error:
1451 pa_log("Invalid reply. (Create stream)");
1452
1453 fail:
1454 pa_module_unload_request(u->module, TRUE);
1455
1456 }
1457
1458 /* Called from main context */
1459 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1460 struct userdata *u = userdata;
1461 pa_tagstruct *reply;
1462 char name[256], un[128], hn[128];
1463 #ifdef TUNNEL_SINK
1464 pa_cvolume volume;
1465 #endif
1466
1467 pa_assert(pd);
1468 pa_assert(u);
1469 pa_assert(u->pdispatch == pd);
1470
1471 if (command != PA_COMMAND_REPLY ||
1472 pa_tagstruct_getu32(t, &u->version) < 0 ||
1473 !pa_tagstruct_eof(t)) {
1474
1475 if (command == PA_COMMAND_ERROR)
1476 pa_log("Failed to authenticate");
1477 else
1478 pa_log("Protocol error.");
1479
1480 goto fail;
1481 }
1482
1483 /* Minimum supported protocol version */
1484 if (u->version < 8) {
1485 pa_log("Incompatible protocol version");
1486 goto fail;
1487 }
1488
1489 /* Starting with protocol version 13 the MSB of the version tag
1490 reflects if shm is enabled for this connection or not. We don't
1491 support SHM here at all, so we just ignore this. */
1492
1493 if (u->version >= 13)
1494 u->version &= 0x7FFFFFFFU;
1495
1496 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1497
1498 #ifdef TUNNEL_SINK
1499 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1500 pa_sink_update_proplist(u->sink, 0, NULL);
1501
1502 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1503 u->sink_name,
1504 pa_get_user_name(un, sizeof(un)),
1505 pa_get_host_name(hn, sizeof(hn)));
1506 #else
1507 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1508 pa_source_update_proplist(u->source, 0, NULL);
1509
1510 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1511 u->source_name,
1512 pa_get_user_name(un, sizeof(un)),
1513 pa_get_host_name(hn, sizeof(hn)));
1514 #endif
1515
1516 reply = pa_tagstruct_new(NULL, 0);
1517 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1518 pa_tagstruct_putu32(reply, tag = u->ctag++);
1519
1520 if (u->version >= 13) {
1521 pa_proplist *pl;
1522 pl = pa_proplist_new();
1523 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1524 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1525 pa_init_proplist(pl);
1526 pa_tagstruct_put_proplist(reply, pl);
1527 pa_proplist_free(pl);
1528 } else
1529 pa_tagstruct_puts(reply, "PulseAudio");
1530
1531 pa_pstream_send_tagstruct(u->pstream, reply);
1532 /* We ignore the server's reply here */
1533
1534 reply = pa_tagstruct_new(NULL, 0);
1535
1536 if (u->version < 13)
1537 /* Only for older PA versions we need to fill in the maxlength */
1538 u->maxlength = 4*1024*1024;
1539
1540 #ifdef TUNNEL_SINK
1541 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1542 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1543 u->prebuf = u->tlength;
1544 #else
1545 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1546 #endif
1547
1548 #ifdef TUNNEL_SINK
1549 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1550 pa_tagstruct_putu32(reply, tag = u->ctag++);
1551
1552 if (u->version < 13)
1553 pa_tagstruct_puts(reply, name);
1554
1555 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1556 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1557 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1558 pa_tagstruct_puts(reply, u->sink_name);
1559 pa_tagstruct_putu32(reply, u->maxlength);
1560 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1561 pa_tagstruct_putu32(reply, u->tlength);
1562 pa_tagstruct_putu32(reply, u->prebuf);
1563 pa_tagstruct_putu32(reply, u->minreq);
1564 pa_tagstruct_putu32(reply, 0);
1565 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1566 pa_tagstruct_put_cvolume(reply, &volume);
1567 #else
1568 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1569 pa_tagstruct_putu32(reply, tag = u->ctag++);
1570
1571 if (u->version < 13)
1572 pa_tagstruct_puts(reply, name);
1573
1574 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1575 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1576 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1577 pa_tagstruct_puts(reply, u->source_name);
1578 pa_tagstruct_putu32(reply, u->maxlength);
1579 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1580 pa_tagstruct_putu32(reply, u->fragsize);
1581 #endif
1582
1583 if (u->version >= 12) {
1584 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1585 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1586 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1587 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1588 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1589 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1590 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1591 }
1592
1593 if (u->version >= 13) {
1594 pa_proplist *pl;
1595
1596 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1597 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1598
1599 pl = pa_proplist_new();
1600 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1601 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1602 pa_tagstruct_put_proplist(reply, pl);
1603 pa_proplist_free(pl);
1604
1605 #ifndef TUNNEL_SINK
1606 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1607 #endif
1608 }
1609
1610 if (u->version >= 14) {
1611 #ifdef TUNNEL_SINK
1612 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1613 #endif
1614 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1615 }
1616
1617 if (u->version >= 15) {
1618 #ifdef TUNNEL_SINK
1619 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1620 #endif
1621 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1622 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1623 }
1624
1625 pa_pstream_send_tagstruct(u->pstream, reply);
1626 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1627
1628 pa_log_debug("Connection authenticated, creating stream ...");
1629
1630 return;
1631
1632 fail:
1633 pa_module_unload_request(u->module, TRUE);
1634 }
1635
1636 /* Called from main context */
1637 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1638 struct userdata *u = userdata;
1639
1640 pa_assert(p);
1641 pa_assert(u);
1642
1643 pa_log_warn("Stream died.");
1644 pa_module_unload_request(u->module, TRUE);
1645 }
1646
1647 /* Called from main context */
1648 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1649 struct userdata *u = userdata;
1650
1651 pa_assert(p);
1652 pa_assert(packet);
1653 pa_assert(u);
1654
1655 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1656 pa_log("Invalid packet");
1657 pa_module_unload_request(u->module, TRUE);
1658 return;
1659 }
1660 }
1661
1662 #ifndef TUNNEL_SINK
1663 /* Called from main context */
1664 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1665 struct userdata *u = userdata;
1666
1667 pa_assert(p);
1668 pa_assert(chunk);
1669 pa_assert(u);
1670
1671 if (channel != u->channel) {
1672 pa_log("Received memory block on bad channel.");
1673 pa_module_unload_request(u->module, TRUE);
1674 return;
1675 }
1676
1677 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1678
1679 u->counter_delta += (int64_t) chunk->length;
1680 }
1681 #endif
1682
1683 /* Called from main context */
1684 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1685 struct userdata *u = userdata;
1686 pa_tagstruct *t;
1687 uint32_t tag;
1688
1689 pa_assert(sc);
1690 pa_assert(u);
1691 pa_assert(u->client == sc);
1692
1693 pa_socket_client_unref(u->client);
1694 u->client = NULL;
1695
1696 if (!io) {
1697 pa_log("Connection failed: %s", pa_cstrerror(errno));
1698 pa_module_unload_request(u->module, TRUE);
1699 return;
1700 }
1701
1702 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1703 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1704
1705 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1706 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1707 #ifndef TUNNEL_SINK
1708 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1709 #endif
1710
1711 t = pa_tagstruct_new(NULL, 0);
1712 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1713 pa_tagstruct_putu32(t, tag = u->ctag++);
1714 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1715
1716 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1717
1718 #ifdef HAVE_CREDS
1719 {
1720 pa_creds ucred;
1721
1722 if (pa_iochannel_creds_supported(io))
1723 pa_iochannel_creds_enable(io);
1724
1725 ucred.uid = getuid();
1726 ucred.gid = getgid();
1727
1728 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1729 }
1730 #else
1731 pa_pstream_send_tagstruct(u->pstream, t);
1732 #endif
1733
1734 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1735
1736 pa_log_debug("Connection established, authenticating ...");
1737 }
1738
1739 #ifdef TUNNEL_SINK
1740
1741 /* Called from main context */
1742 static void sink_set_volume(pa_sink *sink) {
1743 struct userdata *u;
1744 pa_tagstruct *t;
1745 uint32_t tag;
1746
1747 pa_assert(sink);
1748 u = sink->userdata;
1749 pa_assert(u);
1750
1751 t = pa_tagstruct_new(NULL, 0);
1752 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1753 pa_tagstruct_putu32(t, tag = u->ctag++);
1754 pa_tagstruct_putu32(t, u->device_index);
1755 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1756 pa_pstream_send_tagstruct(u->pstream, t);
1757 }
1758
1759 /* Called from main context */
1760 static void sink_set_mute(pa_sink *sink) {
1761 struct userdata *u;
1762 pa_tagstruct *t;
1763 uint32_t tag;
1764
1765 pa_assert(sink);
1766 u = sink->userdata;
1767 pa_assert(u);
1768
1769 if (u->version < 11)
1770 return;
1771
1772 t = pa_tagstruct_new(NULL, 0);
1773 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1774 pa_tagstruct_putu32(t, tag = u->ctag++);
1775 pa_tagstruct_putu32(t, u->device_index);
1776 pa_tagstruct_put_boolean(t, !!sink->muted);
1777 pa_pstream_send_tagstruct(u->pstream, t);
1778 }
1779
1780 #endif
1781
1782 int pa__init(pa_module*m) {
1783 pa_modargs *ma = NULL;
1784 struct userdata *u = NULL;
1785 pa_sample_spec ss;
1786 pa_channel_map map;
1787 char *dn = NULL;
1788 #ifdef TUNNEL_SINK
1789 pa_sink_new_data data;
1790 #else
1791 pa_source_new_data data;
1792 #endif
1793
1794 pa_assert(m);
1795
1796 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1797 pa_log("Failed to parse module arguments");
1798 goto fail;
1799 }
1800
1801 m->userdata = u = pa_xnew0(struct userdata, 1);
1802 u->core = m->core;
1803 u->module = m;
1804 u->client = NULL;
1805 u->pdispatch = NULL;
1806 u->pstream = NULL;
1807 u->server_name = NULL;
1808 #ifdef TUNNEL_SINK
1809 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1810 u->sink = NULL;
1811 u->requested_bytes = 0;
1812 #else
1813 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1814 u->source = NULL;
1815 #endif
1816 u->smoother = pa_smoother_new(
1817 PA_USEC_PER_SEC,
1818 PA_USEC_PER_SEC*2,
1819 TRUE,
1820 TRUE,
1821 10,
1822 pa_rtclock_now(),
1823 FALSE);
1824 u->ctag = 1;
1825 u->device_index = u->channel = PA_INVALID_INDEX;
1826 u->time_event = NULL;
1827 u->ignore_latency_before = 0;
1828 u->transport_usec = u->thread_transport_usec = 0;
1829 u->remote_suspended = u->remote_corked = FALSE;
1830 u->counter = u->counter_delta = 0;
1831
1832 u->rtpoll = pa_rtpoll_new();
1833 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1834
1835 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1836 goto fail;
1837
1838 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1839 pa_log("No server specified.");
1840 goto fail;
1841 }
1842
1843 ss = m->core->default_sample_spec;
1844 map = m->core->default_channel_map;
1845 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1846 pa_log("Invalid sample format specification");
1847 goto fail;
1848 }
1849
1850 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1851 pa_log("Failed to connect to server '%s'", u->server_name);
1852 goto fail;
1853 }
1854
1855 pa_socket_client_set_callback(u->client, on_connection, u);
1856
1857 #ifdef TUNNEL_SINK
1858
1859 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1860 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1861
1862 pa_sink_new_data_init(&data);
1863 data.driver = __FILE__;
1864 data.module = m;
1865 data.namereg_fail = TRUE;
1866 pa_sink_new_data_set_name(&data, dn);
1867 pa_sink_new_data_set_sample_spec(&data, &ss);
1868 pa_sink_new_data_set_channel_map(&data, &map);
1869 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1870 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1871 if (u->sink_name)
1872 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1873
1874 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1875 pa_log("Invalid properties");
1876 pa_sink_new_data_done(&data);
1877 goto fail;
1878 }
1879
1880 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1881 pa_sink_new_data_done(&data);
1882
1883 if (!u->sink) {
1884 pa_log("Failed to create sink.");
1885 goto fail;
1886 }
1887
1888 u->sink->parent.process_msg = sink_process_msg;
1889 u->sink->userdata = u;
1890 u->sink->set_state = sink_set_state;
1891 u->sink->set_volume = sink_set_volume;
1892 u->sink->set_mute = sink_set_mute;
1893
1894 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1895
1896 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1897
1898 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1899 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1900
1901 #else
1902
1903 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1904 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1905
1906 pa_source_new_data_init(&data);
1907 data.driver = __FILE__;
1908 data.module = m;
1909 data.namereg_fail = TRUE;
1910 pa_source_new_data_set_name(&data, dn);
1911 pa_source_new_data_set_sample_spec(&data, &ss);
1912 pa_source_new_data_set_channel_map(&data, &map);
1913 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1914 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1915 if (u->source_name)
1916 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1917
1918 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1919 pa_log("Invalid properties");
1920 pa_source_new_data_done(&data);
1921 goto fail;
1922 }
1923
1924 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1925 pa_source_new_data_done(&data);
1926
1927 if (!u->source) {
1928 pa_log("Failed to create source.");
1929 goto fail;
1930 }
1931
1932 u->source->parent.process_msg = source_process_msg;
1933 u->source->set_state = source_set_state;
1934 u->source->userdata = u;
1935
1936 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1937
1938 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1939 pa_source_set_rtpoll(u->source, u->rtpoll);
1940 #endif
1941
1942 pa_xfree(dn);
1943
1944 u->time_event = NULL;
1945
1946 u->maxlength = (uint32_t) -1;
1947 #ifdef TUNNEL_SINK
1948 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1949 #else
1950 u->fragsize = (uint32_t) -1;
1951 #endif
1952
1953 if (!(u->thread = pa_thread_new(thread_func, u))) {
1954 pa_log("Failed to create thread.");
1955 goto fail;
1956 }
1957
1958 #ifdef TUNNEL_SINK
1959 pa_sink_put(u->sink);
1960 #else
1961 pa_source_put(u->source);
1962 #endif
1963
1964 pa_modargs_free(ma);
1965
1966 return 0;
1967
1968 fail:
1969 pa__done(m);
1970
1971 if (ma)
1972 pa_modargs_free(ma);
1973
1974 pa_xfree(dn);
1975
1976 return -1;
1977 }
1978
1979 void pa__done(pa_module*m) {
1980 struct userdata* u;
1981
1982 pa_assert(m);
1983
1984 if (!(u = m->userdata))
1985 return;
1986
1987 #ifdef TUNNEL_SINK
1988 if (u->sink)
1989 pa_sink_unlink(u->sink);
1990 #else
1991 if (u->source)
1992 pa_source_unlink(u->source);
1993 #endif
1994
1995 if (u->thread) {
1996 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1997 pa_thread_free(u->thread);
1998 }
1999
2000 pa_thread_mq_done(&u->thread_mq);
2001
2002 #ifdef TUNNEL_SINK
2003 if (u->sink)
2004 pa_sink_unref(u->sink);
2005 #else
2006 if (u->source)
2007 pa_source_unref(u->source);
2008 #endif
2009
2010 if (u->rtpoll)
2011 pa_rtpoll_free(u->rtpoll);
2012
2013 if (u->pstream) {
2014 pa_pstream_unlink(u->pstream);
2015 pa_pstream_unref(u->pstream);
2016 }
2017
2018 if (u->pdispatch)
2019 pa_pdispatch_unref(u->pdispatch);
2020
2021 if (u->client)
2022 pa_socket_client_unref(u->client);
2023
2024 if (u->auth_cookie)
2025 pa_auth_cookie_unref(u->auth_cookie);
2026
2027 if (u->smoother)
2028 pa_smoother_free(u->smoother);
2029
2030 if (u->time_event)
2031 u->core->mainloop->time_free(u->time_event);
2032
2033 #ifdef TUNNEL_SINK
2034 pa_xfree(u->sink_name);
2035 #else
2036 pa_xfree(u->source_name);
2037 #endif
2038 pa_xfree(u->server_name);
2039
2040 pa_xfree(u->device_description);
2041 pa_xfree(u->server_fqdn);
2042 pa_xfree(u->user_name);
2043
2044 pa_xfree(u);
2045 }