4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
32 #include <sys/types.h>
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
61 #include "module-tunnel-sink-symdef.h"
62 PA_MODULE_DESCRIPTION("Tunnel module for sinks")
65 "sink=<remote sink name> "
67 "format=<sample format> "
68 "channels=<number of channels> "
70 "sink_name=<name for the local sink> "
71 "channel_map=<channel map>")
73 #include "module-tunnel-source-symdef.h"
74 PA_MODULE_DESCRIPTION("Tunnel module for sources")
77 "source=<remote source name> "
79 "format=<sample format> "
80 "channels=<number of channels> "
82 "source_name=<name for the local source> "
83 "channel_map=<channel map>")
86 PA_MODULE_AUTHOR("Lennart Poettering")
87 PA_MODULE_VERSION(PACKAGE_VERSION
)
89 #define DEFAULT_TLENGTH_MSEC 100
90 #define DEFAULT_MINREQ_MSEC 10
91 #define DEFAULT_MAXLENGTH_MSEC ((DEFAULT_TLENGTH_MSEC*3)/2)
92 #define DEFAULT_FRAGSIZE_MSEC 10
94 #define DEFAULT_TIMEOUT 5
96 #define LATENCY_INTERVAL 10
98 static const char* const valid_modargs
[] = {
116 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
120 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
125 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
126 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
128 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
129 static void command_overflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
130 static void command_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
132 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
134 [PA_COMMAND_REQUEST
] = command_request
,
135 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
137 [PA_COMMAND_OVERFLOW
] = command_overflow
,
138 [PA_COMMAND_UNDERFLOW
] = command_underflow
,
139 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
140 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
147 pa_thread_mq thread_mq
;
151 pa_socket_client
*client
;
153 pa_pdispatch
*pdispatch
;
159 uint32_t requested_bytes
;
165 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
169 uint32_t device_index
;
172 int64_t counter
, counter_delta
;
174 pa_time_event
*time_event
;
176 pa_bool_t auth_cookie_in_property
;
178 pa_smoother
*smoother
;
190 static void command_stream_killed(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
191 struct userdata
*u
= userdata
;
196 pa_assert(u
->pdispatch
== pd
);
198 pa_log_warn("Stream killed");
199 pa_module_unload_request(u
->module
);
202 static void command_overflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
203 struct userdata
*u
= userdata
;
208 pa_assert(u
->pdispatch
== pd
);
210 pa_log_warn("Server signalled buffer overrun.");
213 static void command_underflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
214 struct userdata
*u
= userdata
;
219 pa_assert(u
->pdispatch
== pd
);
221 pa_log_warn("Server signalled buffer underrun.");
224 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
229 pa_smoother_pause(u
->smoother
, pa_rtclock_usec());
231 pa_smoother_resume(u
->smoother
, pa_rtclock_usec());
233 t
= pa_tagstruct_new(NULL
, 0);
235 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
237 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
239 pa_tagstruct_putu32(t
, u
->ctag
++);
240 pa_tagstruct_putu32(t
, u
->channel
);
241 pa_tagstruct_put_boolean(t
, !!cork
);
242 pa_pstream_send_tagstruct(u
->pstream
, t
);
247 static void send_data(struct userdata
*u
) {
250 while (u
->requested_bytes
> 0) {
251 pa_memchunk memchunk
;
252 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
253 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
254 pa_memblock_unref(memchunk
.memblock
);
255 u
->requested_bytes
-= memchunk
.length
;
259 /* This function is called from IO context -- except when it is not. */
260 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
261 struct userdata
*u
= PA_SINK(o
)->userdata
;
265 case PA_SINK_MESSAGE_SET_STATE
: {
268 /* First, change the state, because otherwide pa_sink_render() would fail */
269 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
270 if (PA_SINK_OPENED((pa_sink_state_t
) PA_PTR_TO_UINT(data
)))
276 case SINK_MESSAGE_REQUEST
:
278 pa_assert(offset
> 0);
279 u
->requested_bytes
+= (size_t) offset
;
281 if (PA_SINK_OPENED(u
->sink
->thread_info
.state
))
286 case SINK_MESSAGE_POST
:
288 /* OK, This might be a bit confusing. This message is
289 * delivered to us from the main context -- NOT from the
290 * IO thread context where the rest of the messages are
291 * dispatched. Yeah, ugly, but I am a lazy bastard. */
293 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
294 u
->counter
+= chunk
->length
;
295 u
->counter_delta
+= chunk
->length
;
299 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
302 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
304 pa_sink_assert_ref(s
);
307 switch ((pa_sink_state_t
) state
) {
309 case PA_SINK_SUSPENDED
:
310 pa_assert(PA_SINK_OPENED(s
->state
));
311 stream_cork(u
, TRUE
);
315 case PA_SINK_RUNNING
:
316 if (s
->state
== PA_SINK_SUSPENDED
)
317 stream_cork(u
, FALSE
);
320 case PA_SINK_UNLINKED
:
330 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
331 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
334 case SOURCE_MESSAGE_POST
:
336 if (PA_SOURCE_OPENED(u
->source
->thread_info
.state
))
337 pa_source_post(u
->source
, chunk
);
341 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
344 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
346 pa_source_assert_ref(s
);
349 switch ((pa_source_state_t
) state
) {
351 case PA_SOURCE_SUSPENDED
:
352 pa_assert(PA_SOURCE_OPENED(s
->state
));
353 stream_cork(u
, TRUE
);
357 case PA_SOURCE_RUNNING
:
358 if (s
->state
== PA_SOURCE_SUSPENDED
)
359 stream_cork(u
, FALSE
);
362 case PA_SOURCE_UNLINKED
:
372 static void thread_func(void *userdata
) {
373 struct userdata
*u
= userdata
;
377 pa_log_debug("Thread starting up");
379 pa_thread_mq_install(&u
->thread_mq
);
380 pa_rtpoll_install(u
->rtpoll
);
385 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
393 /* If this was no regular exit from the loop we have to continue
394 * processing messages until we received PA_MESSAGE_SHUTDOWN */
395 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
396 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
399 pa_log_debug("Thread shutting down");
403 static void command_request(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
404 struct userdata
*u
= userdata
;
405 uint32_t bytes
, channel
;
408 pa_assert(command
== PA_COMMAND_REQUEST
);
411 pa_assert(u
->pdispatch
== pd
);
413 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
414 pa_tagstruct_getu32(t
, &bytes
) < 0 ||
415 !pa_tagstruct_eof(t
)) {
416 pa_log("Invalid protocol reply");
420 if (channel
!= u
->channel
) {
421 pa_log("Recieved data for invalid channel");
425 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
);
429 pa_module_unload_request(u
->module
);
434 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
435 struct userdata
*u
= userdata
;
436 pa_usec_t sink_usec
, source_usec
, transport_usec
, host_usec
, k
;
438 int64_t write_index
, read_index
;
439 struct timeval local
, remote
, now
;
444 if (command
!= PA_COMMAND_REPLY
) {
445 if (command
== PA_COMMAND_ERROR
)
446 pa_log("Failed to get latency.");
448 pa_log("Protocol error.");
452 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
453 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
454 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
455 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
456 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
457 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
458 pa_tagstruct_gets64(t
, &read_index
) < 0 ||
459 !pa_tagstruct_eof(t
)) {
460 pa_log("Invalid reply. (latency)");
464 pa_gettimeofday(&now
);
466 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
467 /* local and remote seem to have synchronized clocks */
469 transport_usec
= pa_timeval_diff(&remote
, &local
);
471 transport_usec
= pa_timeval_diff(&now
, &remote
);
474 transport_usec
= pa_timeval_diff(&now
, &local
)/2;
477 host_usec
= sink_usec
+ transport_usec
;
479 host_usec
= source_usec
+ transport_usec
;
480 if (host_usec
> sink_usec
)
481 host_usec
-= sink_usec
;
487 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->sink
->sample_spec
);
494 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->source
->sample_spec
);
498 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), k
);
503 pa_module_unload_request(u
->module
);
506 static void request_latency(struct userdata
*u
) {
512 t
= pa_tagstruct_new(NULL
, 0);
514 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
516 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
518 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
519 pa_tagstruct_putu32(t
, u
->channel
);
521 pa_gettimeofday(&now
);
522 pa_tagstruct_put_timeval(t
, &now
);
524 pa_pstream_send_tagstruct(u
->pstream
, t
);
525 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
527 u
->counter_delta
= 0;
530 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, PA_GCC_UNUSED
const struct timeval
*tv
, void *userdata
) {
531 struct userdata
*u
= userdata
;
540 pa_gettimeofday(&ntv
);
541 ntv
.tv_sec
+= LATENCY_INTERVAL
;
542 m
->time_restart(e
, &ntv
);
546 static pa_usec_t
sink_get_latency(pa_sink
*s
) {
548 struct userdata
*u
= s
->userdata
;
550 pa_sink_assert_ref(s
);
552 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
553 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
555 return c
> t
? c
- t
: 0;
558 static pa_usec_t
source_get_latency(pa_source
*s
) {
560 struct userdata
*u
= s
->userdata
;
562 pa_source_assert_ref(s
);
564 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
565 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
567 return t
> c
? t
- c
: 0;
573 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
574 struct userdata
*u
= userdata
;
575 uint32_t idx
, owner_module
, client
, sink
;
576 pa_usec_t buffer_usec
, sink_usec
;
577 const char *name
, *driver
, *resample_method
;
579 pa_sample_spec sample_spec
;
580 pa_channel_map channel_map
;
586 if (command
!= PA_COMMAND_REPLY
) {
587 if (command
== PA_COMMAND_ERROR
)
588 pa_log("Failed to get info.");
590 pa_log("Protocol error.");
594 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
595 pa_tagstruct_gets(t
, &name
) < 0 ||
596 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
597 pa_tagstruct_getu32(t
, &client
) < 0 ||
598 pa_tagstruct_getu32(t
, &sink
) < 0 ||
599 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
600 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
601 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
602 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
603 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
604 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
605 pa_tagstruct_gets(t
, &driver
) < 0 ||
606 (u
->version
>= 11 && pa_tagstruct_get_boolean(t
, &mute
) < 0) ||
607 !pa_tagstruct_eof(t
)) {
608 pa_log("Invalid reply. (get_info)");
614 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
615 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
618 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
620 if (u
->version
>= 11)
621 u
->sink
->muted
= !!mute
;
623 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
627 pa_module_unload_request(u
->module
);
630 static void request_info(struct userdata
*u
) {
635 t
= pa_tagstruct_new(NULL
, 0);
636 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
637 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
638 pa_tagstruct_putu32(t
, u
->device_index
);
639 pa_pstream_send_tagstruct(u
->pstream
, t
);
640 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
643 static void command_subscribe_event(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
644 struct userdata
*u
= userdata
;
645 pa_subscription_event_type_t e
;
651 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
653 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
654 pa_tagstruct_getu32(t
, &idx
) < 0 ||
655 !pa_tagstruct_eof(t
)) {
656 pa_log("Invalid protocol reply");
657 pa_module_unload_request(u
->module
);
661 if (e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
))
667 static void start_subscribe(struct userdata
*u
) {
672 t
= pa_tagstruct_new(NULL
, 0);
673 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
674 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
675 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SINK_INPUT
);
676 pa_pstream_send_tagstruct(u
->pstream
, t
);
680 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
681 struct userdata
*u
= userdata
;
689 pa_assert(u
->pdispatch
== pd
);
691 if (command
!= PA_COMMAND_REPLY
) {
692 if (command
== PA_COMMAND_ERROR
)
693 pa_log("Failed to create stream.");
695 pa_log("Protocol error.");
699 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
700 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
702 || pa_tagstruct_getu32(t
, &bytes
) < 0
707 if (u
->version
>= 9) {
709 uint32_t maxlength
, tlength
, prebuf
, minreq
;
711 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
712 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
713 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
714 pa_tagstruct_getu32(t
, &minreq
) < 0)
717 uint32_t maxlength
, fragsize
;
719 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
720 pa_tagstruct_getu32(t
, &fragsize
) < 0)
725 if (!pa_tagstruct_eof(t
))
733 pa_assert(!u
->time_event
);
734 pa_gettimeofday(&ntv
);
735 ntv
.tv_sec
+= LATENCY_INTERVAL
;
736 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
740 pa_log_debug("Stream created.");
743 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
749 pa_log("Invalid reply. (Create stream)");
752 pa_module_unload_request(u
->module
);
755 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
756 struct userdata
*u
= userdata
;
758 char name
[256], un
[128], hn
[128];
765 pa_assert(u
->pdispatch
== pd
);
767 if (command
!= PA_COMMAND_REPLY
||
768 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
769 !pa_tagstruct_eof(t
)) {
770 if (command
== PA_COMMAND_ERROR
)
771 pa_log("Failed to authenticate");
773 pa_log("Protocol error.");
778 /* Minimum supported protocol version */
779 if (u
->version
< 8) {
780 pa_log("Incompatible protocol version");
785 pa_snprintf(name
, sizeof(name
), "Tunnel from host %s, user %s, sink %s",
786 pa_get_host_name(hn
, sizeof(hn
)),
787 pa_get_user_name(un
, sizeof(un
)),
790 pa_snprintf(name
, sizeof(name
), "Tunnel from host %s, user %s, source %s",
791 pa_get_host_name(hn
, sizeof(hn
)),
792 pa_get_user_name(un
, sizeof(un
)),
796 reply
= pa_tagstruct_new(NULL
, 0);
797 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
798 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
799 pa_tagstruct_puts(reply
, name
);
800 pa_pstream_send_tagstruct(u
->pstream
, reply
);
801 /* We ignore the server's reply here */
803 reply
= pa_tagstruct_new(NULL
, 0);
806 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
807 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
808 pa_tagstruct_puts(reply
, name
);
809 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
810 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
811 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
812 pa_tagstruct_puts(reply
, u
->sink_name
);
813 pa_tagstruct_putu32(reply
, u
->maxlength
);
814 pa_tagstruct_put_boolean(reply
, FALSE
);
815 pa_tagstruct_putu32(reply
, u
->tlength
);
816 pa_tagstruct_putu32(reply
, u
->prebuf
);
817 pa_tagstruct_putu32(reply
, u
->minreq
);
818 pa_tagstruct_putu32(reply
, 0);
819 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
820 pa_tagstruct_put_cvolume(reply
, &volume
);
822 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
823 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
824 pa_tagstruct_puts(reply
, name
);
825 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
826 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
827 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
828 pa_tagstruct_puts(reply
, u
->source_name
);
829 pa_tagstruct_putu32(reply
, u
->maxlength
);
830 pa_tagstruct_put_boolean(reply
, 0);
831 pa_tagstruct_putu32(reply
, u
->fragsize
);
834 pa_pstream_send_tagstruct(u
->pstream
, reply
);
835 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
837 pa_log_debug("Connection authenticated, creating stream ...");
842 pa_module_unload_request(u
->module
);
845 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
846 struct userdata
*u
= userdata
;
851 pa_log_warn("Stream died.");
852 pa_module_unload_request(u
->module
);
855 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
856 struct userdata
*u
= userdata
;
862 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
863 pa_log("Invalid packet");
864 pa_module_unload_request(u
->module
);
870 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
871 struct userdata
*u
= userdata
;
877 if (channel
!= u
->channel
) {
878 pa_log("Recieved memory block on bad channel.");
879 pa_module_unload_request(u
->module
);
883 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
885 u
->counter
+= chunk
->length
;
886 u
->counter_delta
+= chunk
->length
;
891 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
892 struct userdata
*u
= userdata
;
898 pa_assert(u
->client
== sc
);
900 pa_socket_client_unref(u
->client
);
904 pa_log("Connection failed: %s", pa_cstrerror(errno
));
905 pa_module_unload_request(u
->module
);
909 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
910 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
912 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
913 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
915 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
918 t
= pa_tagstruct_new(NULL
, 0);
919 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
920 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
921 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
922 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
928 if (pa_iochannel_creds_supported(io
))
929 pa_iochannel_creds_enable(io
);
931 ucred
.uid
= getuid();
932 ucred
.gid
= getgid();
934 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
937 pa_pstream_send_tagstruct(u
->pstream
, t
);
940 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
942 pa_log_debug("Connection established, authenticating ...");
947 static int sink_get_volume(pa_sink
*sink
) {
951 static int sink_set_volume(pa_sink
*sink
) {
960 t
= pa_tagstruct_new(NULL
, 0);
961 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
962 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
963 pa_tagstruct_putu32(t
, u
->device_index
);
964 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
965 pa_pstream_send_tagstruct(u
->pstream
, t
);
970 static int sink_get_mute(pa_sink
*sink
) {
974 static int sink_set_mute(pa_sink
*sink
) {
986 t
= pa_tagstruct_new(NULL
, 0);
987 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
988 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
989 pa_tagstruct_putu32(t
, u
->device_index
);
990 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
991 pa_pstream_send_tagstruct(u
->pstream
, t
);
998 static int load_key(struct userdata
*u
, const char*fn
) {
1001 u
->auth_cookie_in_property
= FALSE
;
1003 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1004 pa_log_debug("using already loaded auth cookie.");
1005 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1006 u
->auth_cookie_in_property
= 1;
1011 fn
= PA_NATIVE_COOKIE_FILE
;
1013 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1016 pa_log_debug("loading cookie from disk.");
1018 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1019 u
->auth_cookie_in_property
= TRUE
;
1024 int pa__init(pa_module
*m
) {
1025 pa_modargs
*ma
= NULL
;
1026 struct userdata
*u
= NULL
;
1029 char *t
, *dn
= NULL
;
1033 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1034 pa_log("failed to parse module arguments");
1038 u
= pa_xnew(struct userdata
, 1);
1043 u
->pdispatch
= NULL
;
1045 u
->server_name
= NULL
;
1047 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1049 u
->requested_bytes
= 0;
1051 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1054 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
);
1056 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1057 u
->auth_cookie_in_property
= FALSE
;
1058 u
->time_event
= NULL
;
1060 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
);
1061 u
->rtpoll
= pa_rtpoll_new();
1062 pa_rtpoll_item_new_asyncmsgq(u
->rtpoll
, PA_RTPOLL_EARLY
, u
->thread_mq
.inq
);
1064 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1067 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1068 pa_log("no server specified.");
1072 ss
= m
->core
->default_sample_spec
;
1073 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1074 pa_log("invalid sample format specification");
1078 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1079 pa_log("failed to connect to server '%s'", u
->server_name
);
1083 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1087 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1088 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1090 if (!(u
->sink
= pa_sink_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1091 pa_log("Failed to create sink.");
1095 u
->sink
->parent
.process_msg
= sink_process_msg
;
1096 u
->sink
->userdata
= u
;
1097 u
->sink
->set_state
= sink_set_state
;
1098 u
->sink
->get_latency
= sink_get_latency
;
1099 u
->sink
->get_volume
= sink_get_volume
;
1100 u
->sink
->get_mute
= sink_get_mute
;
1101 u
->sink
->set_volume
= sink_set_volume
;
1102 u
->sink
->set_mute
= sink_set_mute
;
1103 u
->sink
->flags
= PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
;
1105 pa_sink_set_module(u
->sink
, m
);
1106 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1107 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1108 pa_sink_set_description(u
->sink
, t
= pa_sprintf_malloc("Tunnel to %s%s%s", u
->sink_name
? u
->sink_name
: "", u
->sink_name
? " on " : "", u
->server_name
));
1113 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1114 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1116 if (!(u
->source
= pa_source_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1117 pa_log("Failed to create source.");
1121 u
->source
->parent
.process_msg
= source_process_msg
;
1122 u
->source
->userdata
= u
;
1123 u
->source
->set_state
= source_set_state
;
1124 u
->source
->get_latency
= source_get_latency
;
1125 u
->source
->flags
= PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
;
1127 pa_source_set_module(u
->source
, m
);
1128 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1129 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1130 pa_source_set_description(u
->source
, t
= pa_sprintf_malloc("Tunnel to %s%s%s", u
->source_name
? u
->source_name
: "", u
->source_name
? " on " : "", u
->server_name
));
1136 u
->time_event
= NULL
;
1138 u
->maxlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MAXLENGTH_MSEC
, &ss
);
1140 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &ss
);
1141 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &ss
);
1142 u
->prebuf
= u
->tlength
;
1144 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &ss
);
1147 u
->counter
= u
->counter_delta
= 0;
1148 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1150 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1151 pa_log("Failed to create thread.");
1156 pa_sink_put(u
->sink
);
1158 pa_source_put(u
->source
);
1161 pa_modargs_free(ma
);
1169 pa_modargs_free(ma
);
1176 void pa__done(pa_module
*m
) {
1181 if (!(u
= m
->userdata
))
1186 pa_sink_unlink(u
->sink
);
1189 pa_source_unlink(u
->source
);
1193 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1194 pa_thread_free(u
->thread
);
1197 pa_thread_mq_done(&u
->thread_mq
);
1201 pa_sink_unref(u
->sink
);
1204 pa_source_unref(u
->source
);
1208 pa_rtpoll_free(u
->rtpoll
);
1211 pa_pstream_unlink(u
->pstream
);
1212 pa_pstream_unref(u
->pstream
);
1216 pa_pdispatch_unref(u
->pdispatch
);
1219 pa_socket_client_unref(u
->client
);
1221 if (u
->auth_cookie_in_property
)
1222 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1225 pa_smoother_free(u
->smoother
);
1228 u
->core
->mainloop
->time_free(u
->time_event
);
1231 pa_xfree(u
->sink_name
);
1233 pa_xfree(u
->source_name
);
1235 pa_xfree(u
->server_name
);