]> code.delx.au - pulseaudio/blob - src/modules/module-tunnel.c
rename stream names too, when the sink name changes
[pulseaudio] / src / modules / module-tunnel.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <unistd.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
40
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
59
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 PA_MODULE_DESCRIPTION("Tunnel module for sinks")
63 PA_MODULE_USAGE(
64 "server=<address> "
65 "sink=<remote sink name> "
66 "cookie=<filename> "
67 "format=<sample format> "
68 "channels=<number of channels> "
69 "rate=<sample rate> "
70 "sink_name=<name for the local sink> "
71 "channel_map=<channel map>")
72 #else
73 #include "module-tunnel-source-symdef.h"
74 PA_MODULE_DESCRIPTION("Tunnel module for sources")
75 PA_MODULE_USAGE(
76 "server=<address> "
77 "source=<remote source name> "
78 "cookie=<filename> "
79 "format=<sample format> "
80 "channels=<number of channels> "
81 "rate=<sample rate> "
82 "source_name=<name for the local source> "
83 "channel_map=<channel map>")
84 #endif
85
86 PA_MODULE_AUTHOR("Lennart Poettering")
87 PA_MODULE_VERSION(PACKAGE_VERSION)
88
89 #define DEFAULT_TLENGTH_MSEC 100
90 #define DEFAULT_MINREQ_MSEC 10
91 #define DEFAULT_MAXLENGTH_MSEC ((DEFAULT_TLENGTH_MSEC*3)/2)
92 #define DEFAULT_FRAGSIZE_MSEC 10
93
94 #define DEFAULT_TIMEOUT 5
95
96 #define LATENCY_INTERVAL 10
97
98 static const char* const valid_modargs[] = {
99 "server",
100 "cookie",
101 "format",
102 "channels",
103 "rate",
104 #ifdef TUNNEL_SINK
105 "sink_name",
106 "sink",
107 #else
108 "source_name",
109 "source",
110 #endif
111 "channel_map",
112 NULL,
113 };
114
115 enum {
116 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX
117 };
118
119 enum {
120 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
121 SINK_MESSAGE_POST
122 };
123
124 #ifdef TUNNEL_SINK
125 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
126 #endif
127 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
128 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
129 static void command_overflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
130 static void command_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
131
132 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
133 #ifdef TUNNEL_SINK
134 [PA_COMMAND_REQUEST] = command_request,
135 #endif
136 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
137 [PA_COMMAND_OVERFLOW] = command_overflow,
138 [PA_COMMAND_UNDERFLOW] = command_underflow,
139 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
140 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
141 };
142
143 struct userdata {
144 pa_core *core;
145 pa_module *module;
146
147 pa_thread_mq thread_mq;
148 pa_rtpoll *rtpoll;
149 pa_thread *thread;
150
151 pa_socket_client *client;
152 pa_pstream *pstream;
153 pa_pdispatch *pdispatch;
154
155 char *server_name;
156 #ifdef TUNNEL_SINK
157 char *sink_name;
158 pa_sink *sink;
159 uint32_t requested_bytes;
160 #else
161 char *source_name;
162 pa_source *source;
163 #endif
164
165 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
166
167 uint32_t version;
168 uint32_t ctag;
169 uint32_t device_index;
170 uint32_t channel;
171
172 int64_t counter, counter_delta;
173
174 pa_time_event *time_event;
175
176 pa_bool_t auth_cookie_in_property;
177
178 pa_smoother *smoother;
179
180 char *device_description;
181 char *server_fqdn;
182 char *user_name;
183
184 uint32_t maxlength;
185 #ifdef TUNNEL_SINK
186 uint32_t tlength;
187 uint32_t minreq;
188 uint32_t prebuf;
189 #else
190 uint32_t fragsize;
191 #endif
192 };
193
194 static void command_stream_killed(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
195 struct userdata *u = userdata;
196
197 pa_assert(pd);
198 pa_assert(t);
199 pa_assert(u);
200 pa_assert(u->pdispatch == pd);
201
202 pa_log_warn("Stream killed");
203 pa_module_unload_request(u->module);
204 }
205
206 static void command_overflow(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
207 struct userdata *u = userdata;
208
209 pa_assert(pd);
210 pa_assert(t);
211 pa_assert(u);
212 pa_assert(u->pdispatch == pd);
213
214 pa_log_warn("Server signalled buffer overrun.");
215 }
216
217 static void command_underflow(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
218 struct userdata *u = userdata;
219
220 pa_assert(pd);
221 pa_assert(t);
222 pa_assert(u);
223 pa_assert(u->pdispatch == pd);
224
225 pa_log_warn("Server signalled buffer underrun.");
226 }
227
228 static void stream_cork(struct userdata *u, pa_bool_t cork) {
229 pa_tagstruct *t;
230 pa_assert(u);
231
232 if (cork)
233 pa_smoother_pause(u->smoother, pa_rtclock_usec());
234 else
235 pa_smoother_resume(u->smoother, pa_rtclock_usec());
236
237 if (!u->pstream)
238 return;
239
240 t = pa_tagstruct_new(NULL, 0);
241 #ifdef TUNNEL_SINK
242 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
243 #else
244 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
245 #endif
246 pa_tagstruct_putu32(t, u->ctag++);
247 pa_tagstruct_putu32(t, u->channel);
248 pa_tagstruct_put_boolean(t, !!cork);
249 pa_pstream_send_tagstruct(u->pstream, t);
250 }
251
252 #ifdef TUNNEL_SINK
253
254 static void send_data(struct userdata *u) {
255 pa_assert(u);
256
257 while (u->requested_bytes > 0) {
258 pa_memchunk memchunk;
259 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
260 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
261 pa_memblock_unref(memchunk.memblock);
262 u->requested_bytes -= memchunk.length;
263 }
264 }
265
266 /* This function is called from IO context -- except when it is not. */
267 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
268 struct userdata *u = PA_SINK(o)->userdata;
269
270 switch (code) {
271
272 case PA_SINK_MESSAGE_SET_STATE: {
273 int r;
274
275 /* First, change the state, because otherwide pa_sink_render() would fail */
276 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0)
277 if (PA_SINK_OPENED((pa_sink_state_t) PA_PTR_TO_UINT(data)))
278 send_data(u);
279
280 return r;
281 }
282
283 case SINK_MESSAGE_REQUEST:
284
285 pa_assert(offset > 0);
286 u->requested_bytes += (size_t) offset;
287
288 if (PA_SINK_OPENED(u->sink->thread_info.state))
289 send_data(u);
290
291 return 0;
292
293 case SINK_MESSAGE_POST:
294
295 /* OK, This might be a bit confusing. This message is
296 * delivered to us from the main context -- NOT from the
297 * IO thread context where the rest of the messages are
298 * dispatched. Yeah, ugly, but I am a lazy bastard. */
299
300 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
301 u->counter += chunk->length;
302 u->counter_delta += chunk->length;
303 return 0;
304 }
305
306 return pa_sink_process_msg(o, code, data, offset, chunk);
307 }
308
309 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
310 struct userdata *u;
311 pa_sink_assert_ref(s);
312 u = s->userdata;
313
314 switch ((pa_sink_state_t) state) {
315
316 case PA_SINK_SUSPENDED:
317 pa_assert(PA_SINK_OPENED(s->state));
318 stream_cork(u, TRUE);
319 break;
320
321 case PA_SINK_IDLE:
322 case PA_SINK_RUNNING:
323 if (s->state == PA_SINK_SUSPENDED)
324 stream_cork(u, FALSE);
325 break;
326
327 case PA_SINK_UNLINKED:
328 case PA_SINK_INIT:
329 ;
330 }
331
332 return 0;
333 }
334
335 #else
336
337 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
338 struct userdata *u = PA_SOURCE(o)->userdata;
339
340 switch (code) {
341 case SOURCE_MESSAGE_POST:
342
343 if (PA_SOURCE_OPENED(u->source->thread_info.state))
344 pa_source_post(u->source, chunk);
345 return 0;
346 }
347
348 return pa_source_process_msg(o, code, data, offset, chunk);
349 }
350
351 static int source_set_state(pa_source *s, pa_source_state_t state) {
352 struct userdata *u;
353 pa_source_assert_ref(s);
354 u = s->userdata;
355
356 switch ((pa_source_state_t) state) {
357
358 case PA_SOURCE_SUSPENDED:
359 pa_assert(PA_SOURCE_OPENED(s->state));
360 stream_cork(u, TRUE);
361 break;
362
363 case PA_SOURCE_IDLE:
364 case PA_SOURCE_RUNNING:
365 if (s->state == PA_SOURCE_SUSPENDED)
366 stream_cork(u, FALSE);
367 break;
368
369 case PA_SOURCE_UNLINKED:
370 case PA_SOURCE_INIT:
371 ;
372 }
373
374 return 0;
375 }
376
377 #endif
378
379 static void thread_func(void *userdata) {
380 struct userdata *u = userdata;
381
382 pa_assert(u);
383
384 pa_log_debug("Thread starting up");
385
386 pa_thread_mq_install(&u->thread_mq);
387 pa_rtpoll_install(u->rtpoll);
388
389 for (;;) {
390 int ret;
391
392 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
393 goto fail;
394
395 if (ret == 0)
396 goto finish;
397 }
398
399 fail:
400 /* If this was no regular exit from the loop we have to continue
401 * processing messages until we received PA_MESSAGE_SHUTDOWN */
402 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
403 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
404
405 finish:
406 pa_log_debug("Thread shutting down");
407 }
408
409 #ifdef TUNNEL_SINK
410 static void command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
411 struct userdata *u = userdata;
412 uint32_t bytes, channel;
413
414 pa_assert(pd);
415 pa_assert(command == PA_COMMAND_REQUEST);
416 pa_assert(t);
417 pa_assert(u);
418 pa_assert(u->pdispatch == pd);
419
420 if (pa_tagstruct_getu32(t, &channel) < 0 ||
421 pa_tagstruct_getu32(t, &bytes) < 0) {
422 pa_log("Invalid protocol reply");
423 goto fail;
424 }
425
426 if (channel != u->channel) {
427 pa_log("Recieved data for invalid channel");
428 goto fail;
429 }
430
431 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL);
432 return;
433
434 fail:
435 pa_module_unload_request(u->module);
436 }
437
438 #endif
439
440 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
441 struct userdata *u = userdata;
442 pa_usec_t sink_usec, source_usec, transport_usec, host_usec, k;
443 int playing;
444 int64_t write_index, read_index;
445 struct timeval local, remote, now;
446
447 pa_assert(pd);
448 pa_assert(u);
449
450 if (command != PA_COMMAND_REPLY) {
451 if (command == PA_COMMAND_ERROR)
452 pa_log("Failed to get latency.");
453 else
454 pa_log("Protocol error 1.");
455 goto fail;
456 }
457
458 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
459 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
460 pa_tagstruct_get_boolean(t, &playing) < 0 ||
461 pa_tagstruct_get_timeval(t, &local) < 0 ||
462 pa_tagstruct_get_timeval(t, &remote) < 0 ||
463 pa_tagstruct_gets64(t, &write_index) < 0 ||
464 pa_tagstruct_gets64(t, &read_index) < 0) {
465 pa_log("Invalid reply. (latency)");
466 goto fail;
467 }
468
469 pa_gettimeofday(&now);
470
471 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
472 /* local and remote seem to have synchronized clocks */
473 #ifdef TUNNEL_SINK
474 transport_usec = pa_timeval_diff(&remote, &local);
475 #else
476 transport_usec = pa_timeval_diff(&now, &remote);
477 #endif
478 } else
479 transport_usec = pa_timeval_diff(&now, &local)/2;
480
481 #ifdef TUNNEL_SINK
482 host_usec = sink_usec + transport_usec;
483 #else
484 host_usec = source_usec + transport_usec;
485 if (host_usec > sink_usec)
486 host_usec -= sink_usec;
487 else
488 host_usec = 0;
489 #endif
490
491 #ifdef TUNNEL_SINK
492 k = pa_bytes_to_usec(u->counter - u->counter_delta, &u->sink->sample_spec);
493
494 if (k > host_usec)
495 k -= host_usec;
496 else
497 k = 0;
498 #else
499 k = pa_bytes_to_usec(u->counter - u->counter_delta, &u->source->sample_spec);
500 k += host_usec;
501 #endif
502
503 pa_smoother_put(u->smoother, pa_rtclock_usec(), k);
504
505 return;
506
507 fail:
508 pa_module_unload_request(u->module);
509 }
510
511 static void request_latency(struct userdata *u) {
512 pa_tagstruct *t;
513 struct timeval now;
514 uint32_t tag;
515 pa_assert(u);
516
517 t = pa_tagstruct_new(NULL, 0);
518 #ifdef TUNNEL_SINK
519 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
520 #else
521 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
522 #endif
523 pa_tagstruct_putu32(t, tag = u->ctag++);
524 pa_tagstruct_putu32(t, u->channel);
525
526 pa_gettimeofday(&now);
527 pa_tagstruct_put_timeval(t, &now);
528
529 pa_pstream_send_tagstruct(u->pstream, t);
530 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
531
532 u->counter_delta = 0;
533 }
534
535 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
536 struct userdata *u = userdata;
537 struct timeval ntv;
538
539 pa_assert(m);
540 pa_assert(e);
541 pa_assert(u);
542
543 request_latency(u);
544
545 pa_gettimeofday(&ntv);
546 ntv.tv_sec += LATENCY_INTERVAL;
547 m->time_restart(e, &ntv);
548 }
549
550 #ifdef TUNNEL_SINK
551 static pa_usec_t sink_get_latency(pa_sink *s) {
552 pa_usec_t t, c;
553 struct userdata *u = s->userdata;
554
555 pa_sink_assert_ref(s);
556
557 c = pa_bytes_to_usec(u->counter, &s->sample_spec);
558 t = pa_smoother_get(u->smoother, pa_rtclock_usec());
559
560 return c > t ? c - t : 0;
561 }
562 #else
563 static pa_usec_t source_get_latency(pa_source *s) {
564 pa_usec_t t, c;
565 struct userdata *u = s->userdata;
566
567 pa_source_assert_ref(s);
568
569 c = pa_bytes_to_usec(u->counter, &s->sample_spec);
570 t = pa_smoother_get(u->smoother, pa_rtclock_usec());
571
572 return t > c ? t - c : 0;
573 }
574 #endif
575
576 static void update_description(struct userdata *u) {
577 char *d;
578 char un[128], hn[128];
579 pa_tagstruct *t;
580
581 pa_assert(u);
582
583 if (!u->server_fqdn || !u->user_name || !u->device_description)
584 return;
585
586 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
587
588 #ifdef TUNNEL_SINK
589 pa_sink_set_description(u->sink, d);
590 #else
591 pa_source_set_description(u->source, d);
592 #endif
593
594 pa_xfree(d);
595
596 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
597 pa_get_user_name(un, sizeof(un)),
598 pa_get_host_name(hn, sizeof(hn)));
599
600 t = pa_tagstruct_new(NULL, 0);
601 #ifdef TUNNEL_SINK
602 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
603 #else
604 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
605 #endif
606 pa_tagstruct_putu32(t, u->ctag++);
607 pa_tagstruct_putu32(t, u->channel);
608 pa_tagstruct_puts(t, d);
609 pa_pstream_send_tagstruct(u->pstream, t);
610
611 pa_xfree(d);
612 }
613
614 static void server_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
615 struct userdata *u = userdata;
616 pa_sample_spec ss;
617 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
618 uint32_t cookie;
619
620 pa_assert(pd);
621 pa_assert(u);
622
623 if (command != PA_COMMAND_REPLY) {
624 if (command == PA_COMMAND_ERROR)
625 pa_log("Failed to get info.");
626 else
627 pa_log("Protocol error 6.");
628 goto fail;
629 }
630
631 if (pa_tagstruct_gets(t, &server_name) < 0 ||
632 pa_tagstruct_gets(t, &server_version) < 0 ||
633 pa_tagstruct_gets(t, &user_name) < 0 ||
634 pa_tagstruct_gets(t, &host_name) < 0 ||
635 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
636 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
637 pa_tagstruct_gets(t, &default_source_name) < 0 ||
638 pa_tagstruct_getu32(t, &cookie) < 0) {
639 pa_log("Invalid reply. (get_server_info)");
640 goto fail;
641 }
642
643 pa_xfree(u->server_fqdn);
644 u->server_fqdn = pa_xstrdup(host_name);
645
646 pa_xfree(u->user_name);
647 u->user_name = pa_xstrdup(user_name);
648
649 update_description(u);
650
651 return;
652
653 fail:
654 pa_module_unload_request(u->module);
655 }
656
657 #ifdef TUNNEL_SINK
658
659 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
660 struct userdata *u = userdata;
661 uint32_t idx, owner_module, monitor_source, flags;
662 const char *name, *description, *monitor_source_name, *driver;
663 pa_sample_spec ss;
664 pa_channel_map cm;
665 pa_cvolume volume;
666 int mute;
667 pa_usec_t latency;
668
669 pa_assert(pd);
670 pa_assert(u);
671
672 if (command != PA_COMMAND_REPLY) {
673 if (command == PA_COMMAND_ERROR)
674 pa_log("Failed to get info.");
675 else
676 pa_log("Protocol error 5.");
677 goto fail;
678 }
679
680 if (pa_tagstruct_getu32(t, &idx) < 0 ||
681 pa_tagstruct_gets(t, &name) < 0 ||
682 pa_tagstruct_gets(t, &description) < 0 ||
683 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
684 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
685 pa_tagstruct_getu32(t, &owner_module) < 0 ||
686 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
687 pa_tagstruct_get_boolean(t, &mute) < 0 ||
688 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
689 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
690 pa_tagstruct_get_usec(t, &latency) < 0 ||
691 pa_tagstruct_gets(t, &driver) < 0 ||
692 pa_tagstruct_getu32(t, &flags) < 0) {
693 pa_log("Invalid reply. (get_sink_info)");
694 goto fail;
695 }
696
697 if (strcmp(name, u->sink_name))
698 return;
699
700 pa_xfree(u->device_description);
701 u->device_description = pa_xstrdup(description);
702
703 update_description(u);
704
705 return;
706
707 fail:
708 pa_module_unload_request(u->module);
709 }
710
711 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
712 struct userdata *u = userdata;
713 uint32_t idx, owner_module, client, sink;
714 pa_usec_t buffer_usec, sink_usec;
715 const char *name, *driver, *resample_method;
716 int mute;
717 pa_sample_spec sample_spec;
718 pa_channel_map channel_map;
719 pa_cvolume volume;
720
721 pa_assert(pd);
722 pa_assert(u);
723
724 if (command != PA_COMMAND_REPLY) {
725 if (command == PA_COMMAND_ERROR)
726 pa_log("Failed to get info.");
727 else
728 pa_log("Protocol error 2.");
729 goto fail;
730 }
731
732 if (pa_tagstruct_getu32(t, &idx) < 0 ||
733 pa_tagstruct_gets(t, &name) < 0 ||
734 pa_tagstruct_getu32(t, &owner_module) < 0 ||
735 pa_tagstruct_getu32(t, &client) < 0 ||
736 pa_tagstruct_getu32(t, &sink) < 0 ||
737 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
738 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
739 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
740 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
741 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
742 pa_tagstruct_gets(t, &resample_method) < 0 ||
743 pa_tagstruct_gets(t, &driver) < 0 ||
744 (u->version >= 11 && pa_tagstruct_get_boolean(t, &mute) < 0)) {
745 pa_log("Invalid reply. (get_info)");
746 goto fail;
747 }
748
749 if (idx != u->device_index)
750 return;
751
752 pa_assert(u->sink);
753
754 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
755 pa_cvolume_equal(&volume, &u->sink->volume))
756 return;
757
758 memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
759
760 if (u->version >= 11)
761 u->sink->muted = !!mute;
762
763 pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
764 return;
765
766 fail:
767 pa_module_unload_request(u->module);
768 }
769
770 #else
771
772 static void source_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
773 struct userdata *u = userdata;
774 uint32_t idx, owner_module, monitor_of_sink, flags;
775 const char *name, *description, *monitor_of_sink_name, *driver;
776 pa_sample_spec ss;
777 pa_channel_map cm;
778 pa_cvolume volume;
779 int mute;
780 pa_usec_t latency;
781
782 pa_assert(pd);
783 pa_assert(u);
784
785 if (command != PA_COMMAND_REPLY) {
786 if (command == PA_COMMAND_ERROR)
787 pa_log("Failed to get info.");
788 else
789 pa_log("Protocol error 5.");
790 goto fail;
791 }
792
793 if (pa_tagstruct_getu32(t, &idx) < 0 ||
794 pa_tagstruct_gets(t, &name) < 0 ||
795 pa_tagstruct_gets(t, &description) < 0 ||
796 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
797 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
798 pa_tagstruct_getu32(t, &owner_module) < 0 ||
799 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
800 pa_tagstruct_get_boolean(t, &mute) < 0 ||
801 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
802 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
803 pa_tagstruct_get_usec(t, &latency) < 0 ||
804 pa_tagstruct_gets(t, &driver) < 0 ||
805 pa_tagstruct_getu32(t, &flags) < 0) {
806 pa_log("Invalid reply. (get_source_info)");
807 goto fail;
808 }
809
810 if (strcmp(name, u->source_name))
811 return;
812
813 pa_xfree(u->device_description);
814 u->device_description = pa_xstrdup(description);
815
816 update_description(u);
817
818 return;
819
820 fail:
821 pa_module_unload_request(u->module);
822 }
823
824 #endif
825
826 static void request_info(struct userdata *u) {
827 pa_tagstruct *t;
828 uint32_t tag;
829 pa_assert(u);
830
831 t = pa_tagstruct_new(NULL, 0);
832 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
833 pa_tagstruct_putu32(t, tag = u->ctag++);
834 pa_pstream_send_tagstruct(u->pstream, t);
835 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
836
837 #ifdef TUNNEL_SINK
838 t = pa_tagstruct_new(NULL, 0);
839 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
840 pa_tagstruct_putu32(t, tag = u->ctag++);
841 pa_tagstruct_putu32(t, u->device_index);
842 pa_pstream_send_tagstruct(u->pstream, t);
843 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
844
845 t = pa_tagstruct_new(NULL, 0);
846 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
847 pa_tagstruct_putu32(t, tag = u->ctag++);
848 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
849 pa_tagstruct_puts(t, u->sink_name);
850 pa_pstream_send_tagstruct(u->pstream, t);
851 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
852 #else
853 t = pa_tagstruct_new(NULL, 0);
854 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
855 pa_tagstruct_putu32(t, tag = u->ctag++);
856 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
857 pa_tagstruct_puts(t, u->source_name);
858 pa_pstream_send_tagstruct(u->pstream, t);
859 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
860 #endif
861 }
862
863 static void command_subscribe_event(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
864 struct userdata *u = userdata;
865 pa_subscription_event_type_t e;
866 uint32_t idx;
867
868 pa_assert(pd);
869 pa_assert(t);
870 pa_assert(u);
871 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
872
873 if (pa_tagstruct_getu32(t, &e) < 0 ||
874 pa_tagstruct_getu32(t, &idx) < 0) {
875 pa_log("Invalid protocol reply");
876 pa_module_unload_request(u->module);
877 return;
878 }
879
880 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
881 #ifdef TUNNEL_SINK
882 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
883 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
884 #else
885 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
886 #endif
887 )
888 return;
889
890 request_info(u);
891 }
892
893 static void start_subscribe(struct userdata *u) {
894 pa_tagstruct *t;
895 uint32_t tag;
896 pa_assert(u);
897
898 t = pa_tagstruct_new(NULL, 0);
899 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
900 pa_tagstruct_putu32(t, tag = u->ctag++);
901 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
902 #ifdef TUNNEL_SINK
903 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
904 #else
905 PA_SUBSCRIPTION_MASK_SOURCE
906 #endif
907 );
908
909 pa_pstream_send_tagstruct(u->pstream, t);
910 }
911
912 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
913 struct userdata *u = userdata;
914 struct timeval ntv;
915 #ifdef TUNNEL_SINK
916 uint32_t bytes;
917 #endif
918
919 pa_assert(pd);
920 pa_assert(u);
921 pa_assert(u->pdispatch == pd);
922
923 if (command != PA_COMMAND_REPLY) {
924 if (command == PA_COMMAND_ERROR)
925 pa_log("Failed to create stream.");
926 else
927 pa_log("Protocol error 3.");
928 goto fail;
929 }
930
931 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
932 pa_tagstruct_getu32(t, &u->device_index) < 0
933 #ifdef TUNNEL_SINK
934 || pa_tagstruct_getu32(t, &bytes) < 0
935 #endif
936 )
937 goto parse_error;
938
939 if (u->version >= 9) {
940 #ifdef TUNNEL_SINK
941 uint32_t maxlength, tlength, prebuf, minreq;
942
943 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
944 pa_tagstruct_getu32(t, &tlength) < 0 ||
945 pa_tagstruct_getu32(t, &prebuf) < 0 ||
946 pa_tagstruct_getu32(t, &minreq) < 0)
947 goto parse_error;
948 #else
949 uint32_t maxlength, fragsize;
950
951 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
952 pa_tagstruct_getu32(t, &fragsize) < 0)
953 goto parse_error;
954 #endif
955 }
956
957 start_subscribe(u);
958 request_info(u);
959
960 pa_assert(!u->time_event);
961 pa_gettimeofday(&ntv);
962 ntv.tv_sec += LATENCY_INTERVAL;
963 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
964
965 request_latency(u);
966
967 pa_log_debug("Stream created.");
968
969 #ifdef TUNNEL_SINK
970 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
971 #endif
972
973 return;
974
975 parse_error:
976 pa_log("Invalid reply. (Create stream)");
977
978 fail:
979 pa_module_unload_request(u->module);
980 }
981
982 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
983 struct userdata *u = userdata;
984 pa_tagstruct *reply;
985 char name[256], un[128], hn[128];
986 #ifdef TUNNEL_SINK
987 pa_cvolume volume;
988 #endif
989
990 pa_assert(pd);
991 pa_assert(u);
992 pa_assert(u->pdispatch == pd);
993
994 if (command != PA_COMMAND_REPLY ||
995 pa_tagstruct_getu32(t, &u->version) < 0) {
996 if (command == PA_COMMAND_ERROR)
997 pa_log("Failed to authenticate");
998 else
999 pa_log("Protocol error 4.");
1000
1001 goto fail;
1002 }
1003
1004 /* Minimum supported protocol version */
1005 if (u->version < 8) {
1006 pa_log("Incompatible protocol version");
1007 goto fail;
1008 }
1009
1010 #ifdef TUNNEL_SINK
1011 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1012 u->sink_name,
1013 pa_get_user_name(un, sizeof(un)),
1014 pa_get_host_name(hn, sizeof(hn)));
1015 #else
1016 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1017 u->source_name,
1018 pa_get_user_name(un, sizeof(un)),
1019 pa_get_host_name(hn, sizeof(hn)));
1020 #endif
1021
1022 reply = pa_tagstruct_new(NULL, 0);
1023 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1024 pa_tagstruct_putu32(reply, tag = u->ctag++);
1025 pa_tagstruct_puts(reply, "PulseAudio");
1026 pa_pstream_send_tagstruct(u->pstream, reply);
1027 /* We ignore the server's reply here */
1028
1029 reply = pa_tagstruct_new(NULL, 0);
1030
1031 #ifdef TUNNEL_SINK
1032 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1033 pa_tagstruct_putu32(reply, tag = u->ctag++);
1034 pa_tagstruct_puts(reply, name);
1035 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1036 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1037 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1038 pa_tagstruct_puts(reply, u->sink_name);
1039 pa_tagstruct_putu32(reply, u->maxlength);
1040 pa_tagstruct_put_boolean(reply, !PA_SINK_OPENED(pa_sink_get_state(u->sink)));
1041 pa_tagstruct_putu32(reply, u->tlength);
1042 pa_tagstruct_putu32(reply, u->prebuf);
1043 pa_tagstruct_putu32(reply, u->minreq);
1044 pa_tagstruct_putu32(reply, 0);
1045 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1046 pa_tagstruct_put_cvolume(reply, &volume);
1047 #else
1048 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1049 pa_tagstruct_putu32(reply, tag = u->ctag++);
1050 pa_tagstruct_puts(reply, name);
1051 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1052 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1053 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1054 pa_tagstruct_puts(reply, u->source_name);
1055 pa_tagstruct_putu32(reply, u->maxlength);
1056 pa_tagstruct_put_boolean(reply, !PA_SOURCE_OPENED(pa_source_get_state(u->source)));
1057 pa_tagstruct_putu32(reply, u->fragsize);
1058 #endif
1059
1060 pa_pstream_send_tagstruct(u->pstream, reply);
1061 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1062
1063 pa_log_debug("Connection authenticated, creating stream ...");
1064
1065 return;
1066
1067 fail:
1068 pa_module_unload_request(u->module);
1069 }
1070
1071 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1072 struct userdata *u = userdata;
1073
1074 pa_assert(p);
1075 pa_assert(u);
1076
1077 pa_log_warn("Stream died.");
1078 pa_module_unload_request(u->module);
1079 }
1080
1081 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1082 struct userdata *u = userdata;
1083
1084 pa_assert(p);
1085 pa_assert(packet);
1086 pa_assert(u);
1087
1088 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1089 pa_log("Invalid packet");
1090 pa_module_unload_request(u->module);
1091 return;
1092 }
1093 }
1094
1095 #ifndef TUNNEL_SINK
1096 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1097 struct userdata *u = userdata;
1098
1099 pa_assert(p);
1100 pa_assert(chunk);
1101 pa_assert(u);
1102
1103 if (channel != u->channel) {
1104 pa_log("Recieved memory block on bad channel.");
1105 pa_module_unload_request(u->module);
1106 return;
1107 }
1108
1109 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1110
1111 u->counter += chunk->length;
1112 u->counter_delta += chunk->length;
1113 }
1114
1115 #endif
1116
1117 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1118 struct userdata *u = userdata;
1119 pa_tagstruct *t;
1120 uint32_t tag;
1121
1122 pa_assert(sc);
1123 pa_assert(u);
1124 pa_assert(u->client == sc);
1125
1126 pa_socket_client_unref(u->client);
1127 u->client = NULL;
1128
1129 if (!io) {
1130 pa_log("Connection failed: %s", pa_cstrerror(errno));
1131 pa_module_unload_request(u->module);
1132 return;
1133 }
1134
1135 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1136 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1137
1138 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1139 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1140 #ifndef TUNNEL_SINK
1141 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1142 #endif
1143
1144 t = pa_tagstruct_new(NULL, 0);
1145 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1146 pa_tagstruct_putu32(t, tag = u->ctag++);
1147 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1148 pa_tagstruct_put_arbitrary(t, u->auth_cookie, sizeof(u->auth_cookie));
1149
1150 #ifdef HAVE_CREDS
1151 {
1152 pa_creds ucred;
1153
1154 if (pa_iochannel_creds_supported(io))
1155 pa_iochannel_creds_enable(io);
1156
1157 ucred.uid = getuid();
1158 ucred.gid = getgid();
1159
1160 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1161 }
1162 #else
1163 pa_pstream_send_tagstruct(u->pstream, t);
1164 #endif
1165
1166 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1167
1168 pa_log_debug("Connection established, authenticating ...");
1169 }
1170
1171 #ifdef TUNNEL_SINK
1172
1173 static int sink_get_volume(pa_sink *sink) {
1174 return 0;
1175 }
1176
1177 static int sink_set_volume(pa_sink *sink) {
1178 struct userdata *u;
1179 pa_tagstruct *t;
1180 uint32_t tag;
1181
1182 pa_assert(sink);
1183 u = sink->userdata;
1184 pa_assert(u);
1185
1186 t = pa_tagstruct_new(NULL, 0);
1187 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1188 pa_tagstruct_putu32(t, tag = u->ctag++);
1189 pa_tagstruct_putu32(t, u->device_index);
1190 pa_tagstruct_put_cvolume(t, &sink->volume);
1191 pa_pstream_send_tagstruct(u->pstream, t);
1192
1193 return 0;
1194 }
1195
1196 static int sink_get_mute(pa_sink *sink) {
1197 return 0;
1198 }
1199
1200 static int sink_set_mute(pa_sink *sink) {
1201 struct userdata *u;
1202 pa_tagstruct *t;
1203 uint32_t tag;
1204
1205 pa_assert(sink);
1206 u = sink->userdata;
1207 pa_assert(u);
1208
1209 if (u->version < 11)
1210 return -1;
1211
1212 t = pa_tagstruct_new(NULL, 0);
1213 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1214 pa_tagstruct_putu32(t, tag = u->ctag++);
1215 pa_tagstruct_putu32(t, u->device_index);
1216 pa_tagstruct_put_boolean(t, !!sink->muted);
1217 pa_pstream_send_tagstruct(u->pstream, t);
1218
1219 return 0;
1220 }
1221
1222 #endif
1223
1224 static int load_key(struct userdata *u, const char*fn) {
1225 pa_assert(u);
1226
1227 u->auth_cookie_in_property = FALSE;
1228
1229 if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) {
1230 pa_log_debug("Using already loaded auth cookie.");
1231 pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1232 u->auth_cookie_in_property = 1;
1233 return 0;
1234 }
1235
1236 if (!fn)
1237 fn = PA_NATIVE_COOKIE_FILE;
1238
1239 if (pa_authkey_load_auto(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0)
1240 return -1;
1241
1242 pa_log_debug("Loading cookie from disk.");
1243
1244 if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0)
1245 u->auth_cookie_in_property = TRUE;
1246
1247 return 0;
1248 }
1249
1250 int pa__init(pa_module*m) {
1251 pa_modargs *ma = NULL;
1252 struct userdata *u = NULL;
1253 pa_sample_spec ss;
1254 pa_channel_map map;
1255 char *t, *dn = NULL;
1256
1257 pa_assert(m);
1258
1259 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1260 pa_log("failed to parse module arguments");
1261 goto fail;
1262 }
1263
1264 u = pa_xnew0(struct userdata, 1);
1265 m->userdata = u;
1266 u->module = m;
1267 u->core = m->core;
1268 u->client = NULL;
1269 u->pdispatch = NULL;
1270 u->pstream = NULL;
1271 u->server_name = NULL;
1272 #ifdef TUNNEL_SINK
1273 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1274 u->sink = NULL;
1275 u->requested_bytes = 0;
1276 #else
1277 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1278 u->source = NULL;
1279 #endif
1280 u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE);
1281 u->ctag = 1;
1282 u->device_index = u->channel = PA_INVALID_INDEX;
1283 u->auth_cookie_in_property = FALSE;
1284 u->time_event = NULL;
1285
1286 pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
1287 u->rtpoll = pa_rtpoll_new();
1288 pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
1289
1290 if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0)
1291 goto fail;
1292
1293 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1294 pa_log("no server specified.");
1295 goto fail;
1296 }
1297
1298 ss = m->core->default_sample_spec;
1299 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1300 pa_log("invalid sample format specification");
1301 goto fail;
1302 }
1303
1304 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1305 pa_log("failed to connect to server '%s'", u->server_name);
1306 goto fail;
1307 }
1308
1309 pa_socket_client_set_callback(u->client, on_connection, u);
1310
1311 #ifdef TUNNEL_SINK
1312
1313 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1314 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1315
1316 if (!(u->sink = pa_sink_new(m->core, __FILE__, dn, 1, &ss, &map))) {
1317 pa_log("Failed to create sink.");
1318 goto fail;
1319 }
1320
1321 u->sink->parent.process_msg = sink_process_msg;
1322 u->sink->userdata = u;
1323 u->sink->set_state = sink_set_state;
1324 u->sink->get_latency = sink_get_latency;
1325 u->sink->get_volume = sink_get_volume;
1326 u->sink->get_mute = sink_get_mute;
1327 u->sink->set_volume = sink_set_volume;
1328 u->sink->set_mute = sink_set_mute;
1329 u->sink->flags = PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL;
1330
1331 pa_sink_set_module(u->sink, m);
1332 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1333 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1334 pa_sink_set_description(u->sink, t = pa_sprintf_malloc("%s%s%s", u->sink_name ? u->sink_name : "", u->sink_name ? " on " : "", u->server_name));
1335 pa_xfree(t);
1336
1337 #else
1338
1339 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1340 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1341
1342 if (!(u->source = pa_source_new(m->core, __FILE__, dn, 1, &ss, &map))) {
1343 pa_log("Failed to create source.");
1344 goto fail;
1345 }
1346
1347 u->source->parent.process_msg = source_process_msg;
1348 u->source->userdata = u;
1349 u->source->set_state = source_set_state;
1350 u->source->get_latency = source_get_latency;
1351 u->source->flags = PA_SOURCE_NETWORK|PA_SOURCE_LATENCY;
1352
1353 pa_source_set_module(u->source, m);
1354 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1355 pa_source_set_rtpoll(u->source, u->rtpoll);
1356 pa_source_set_description(u->source, t = pa_sprintf_malloc("%s%s%s", u->source_name ? u->source_name : "", u->source_name ? " on " : "", u->server_name));
1357 pa_xfree(t);
1358 #endif
1359
1360 pa_xfree(dn);
1361
1362 u->time_event = NULL;
1363
1364 u->maxlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MAXLENGTH_MSEC, &ss);
1365 #ifdef TUNNEL_SINK
1366 u->tlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &ss);
1367 u->minreq = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &ss);
1368 u->prebuf = u->tlength;
1369 #else
1370 u->fragsize = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &ss);
1371 #endif
1372
1373 u->counter = u->counter_delta = 0;
1374 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1375
1376 if (!(u->thread = pa_thread_new(thread_func, u))) {
1377 pa_log("Failed to create thread.");
1378 goto fail;
1379 }
1380
1381 #ifdef TUNNEL_SINK
1382 pa_sink_put(u->sink);
1383 #else
1384 pa_source_put(u->source);
1385 #endif
1386
1387 pa_modargs_free(ma);
1388
1389 return 0;
1390
1391 fail:
1392 pa__done(m);
1393
1394 if (ma)
1395 pa_modargs_free(ma);
1396
1397 pa_xfree(dn);
1398
1399 return -1;
1400 }
1401
1402 void pa__done(pa_module*m) {
1403 struct userdata* u;
1404
1405 pa_assert(m);
1406
1407 if (!(u = m->userdata))
1408 return;
1409
1410 #ifdef TUNNEL_SINK
1411 if (u->sink)
1412 pa_sink_unlink(u->sink);
1413 #else
1414 if (u->source)
1415 pa_source_unlink(u->source);
1416 #endif
1417
1418 if (u->thread) {
1419 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1420 pa_thread_free(u->thread);
1421 }
1422
1423 pa_thread_mq_done(&u->thread_mq);
1424
1425 #ifdef TUNNEL_SINK
1426 if (u->sink)
1427 pa_sink_unref(u->sink);
1428 #else
1429 if (u->source)
1430 pa_source_unref(u->source);
1431 #endif
1432
1433 if (u->rtpoll)
1434 pa_rtpoll_free(u->rtpoll);
1435
1436 if (u->pstream) {
1437 pa_pstream_unlink(u->pstream);
1438 pa_pstream_unref(u->pstream);
1439 }
1440
1441 if (u->pdispatch)
1442 pa_pdispatch_unref(u->pdispatch);
1443
1444 if (u->client)
1445 pa_socket_client_unref(u->client);
1446
1447 if (u->auth_cookie_in_property)
1448 pa_authkey_prop_unref(m->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1449
1450 if (u->smoother)
1451 pa_smoother_free(u->smoother);
1452
1453 if (u->time_event)
1454 u->core->mainloop->time_free(u->time_event);
1455
1456 #ifdef TUNNEL_SINK
1457 pa_xfree(u->sink_name);
1458 #else
1459 pa_xfree(u->source_name);
1460 #endif
1461 pa_xfree(u->server_name);
1462
1463 pa_xfree(u->device_description);
1464 pa_xfree(u->server_fqdn);
1465 pa_xfree(u->user_name);
1466
1467 pa_xfree(u);
1468 }