4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
32 #include <sys/types.h>
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
61 #include "module-tunnel-sink-symdef.h"
62 PA_MODULE_DESCRIPTION("Tunnel module for sinks")
65 "sink=<remote sink name> "
67 "format=<sample format> "
68 "channels=<number of channels> "
70 "sink_name=<name for the local sink> "
71 "channel_map=<channel map>")
73 #include "module-tunnel-source-symdef.h"
74 PA_MODULE_DESCRIPTION("Tunnel module for sources")
77 "source=<remote source name> "
79 "format=<sample format> "
80 "channels=<number of channels> "
82 "source_name=<name for the local source> "
83 "channel_map=<channel map>")
86 PA_MODULE_AUTHOR("Lennart Poettering")
87 PA_MODULE_VERSION(PACKAGE_VERSION
)
89 #define DEFAULT_TLENGTH_MSEC 100
90 #define DEFAULT_MINREQ_MSEC 10
91 #define DEFAULT_MAXLENGTH_MSEC ((DEFAULT_TLENGTH_MSEC*3)/2)
92 #define DEFAULT_FRAGSIZE_MSEC 10
94 #define DEFAULT_TIMEOUT 5
96 #define LATENCY_INTERVAL 10
98 static const char* const valid_modargs
[] = {
116 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
120 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
125 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
126 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
128 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
129 static void command_overflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
130 static void command_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
132 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
134 [PA_COMMAND_REQUEST
] = command_request
,
135 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
137 [PA_COMMAND_OVERFLOW
] = command_overflow
,
138 [PA_COMMAND_UNDERFLOW
] = command_underflow
,
139 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
140 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
147 pa_thread_mq thread_mq
;
151 pa_socket_client
*client
;
153 pa_pdispatch
*pdispatch
;
159 uint32_t requested_bytes
;
165 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
169 uint32_t device_index
;
172 int64_t counter
, counter_delta
;
174 pa_time_event
*time_event
;
176 pa_bool_t auth_cookie_in_property
;
178 pa_smoother
*smoother
;
190 static void command_stream_killed(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
191 struct userdata
*u
= userdata
;
196 pa_assert(u
->pdispatch
== pd
);
198 pa_log_warn("Stream killed");
199 pa_module_unload_request(u
->module
);
202 static void command_overflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
203 struct userdata
*u
= userdata
;
208 pa_assert(u
->pdispatch
== pd
);
210 pa_log_warn("Server signalled buffer overrun.");
213 static void command_underflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
214 struct userdata
*u
= userdata
;
219 pa_assert(u
->pdispatch
== pd
);
221 pa_log_warn("Server signalled buffer underrun.");
224 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
229 pa_smoother_pause(u
->smoother
, pa_rtclock_usec());
231 pa_smoother_resume(u
->smoother
, pa_rtclock_usec());
236 t
= pa_tagstruct_new(NULL
, 0);
238 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
240 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
242 pa_tagstruct_putu32(t
, u
->ctag
++);
243 pa_tagstruct_putu32(t
, u
->channel
);
244 pa_tagstruct_put_boolean(t
, !!cork
);
245 pa_pstream_send_tagstruct(u
->pstream
, t
);
250 static void send_data(struct userdata
*u
) {
253 while (u
->requested_bytes
> 0) {
254 pa_memchunk memchunk
;
255 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
256 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
257 pa_memblock_unref(memchunk
.memblock
);
258 u
->requested_bytes
-= memchunk
.length
;
262 /* This function is called from IO context -- except when it is not. */
263 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
264 struct userdata
*u
= PA_SINK(o
)->userdata
;
268 case PA_SINK_MESSAGE_SET_STATE
: {
271 /* First, change the state, because otherwide pa_sink_render() would fail */
272 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
273 if (PA_SINK_OPENED((pa_sink_state_t
) PA_PTR_TO_UINT(data
)))
279 case SINK_MESSAGE_REQUEST
:
281 pa_assert(offset
> 0);
282 u
->requested_bytes
+= (size_t) offset
;
284 if (PA_SINK_OPENED(u
->sink
->thread_info
.state
))
289 case SINK_MESSAGE_POST
:
291 /* OK, This might be a bit confusing. This message is
292 * delivered to us from the main context -- NOT from the
293 * IO thread context where the rest of the messages are
294 * dispatched. Yeah, ugly, but I am a lazy bastard. */
296 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
297 u
->counter
+= chunk
->length
;
298 u
->counter_delta
+= chunk
->length
;
302 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
305 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
307 pa_sink_assert_ref(s
);
310 switch ((pa_sink_state_t
) state
) {
312 case PA_SINK_SUSPENDED
:
313 pa_assert(PA_SINK_OPENED(s
->state
));
314 stream_cork(u
, TRUE
);
318 case PA_SINK_RUNNING
:
319 if (s
->state
== PA_SINK_SUSPENDED
)
320 stream_cork(u
, FALSE
);
323 case PA_SINK_UNLINKED
:
333 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
334 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
337 case SOURCE_MESSAGE_POST
:
339 if (PA_SOURCE_OPENED(u
->source
->thread_info
.state
))
340 pa_source_post(u
->source
, chunk
);
344 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
347 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
349 pa_source_assert_ref(s
);
352 switch ((pa_source_state_t
) state
) {
354 case PA_SOURCE_SUSPENDED
:
355 pa_assert(PA_SOURCE_OPENED(s
->state
));
356 stream_cork(u
, TRUE
);
360 case PA_SOURCE_RUNNING
:
361 if (s
->state
== PA_SOURCE_SUSPENDED
)
362 stream_cork(u
, FALSE
);
365 case PA_SOURCE_UNLINKED
:
375 static void thread_func(void *userdata
) {
376 struct userdata
*u
= userdata
;
380 pa_log_debug("Thread starting up");
382 pa_thread_mq_install(&u
->thread_mq
);
383 pa_rtpoll_install(u
->rtpoll
);
388 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
396 /* If this was no regular exit from the loop we have to continue
397 * processing messages until we received PA_MESSAGE_SHUTDOWN */
398 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
399 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
402 pa_log_debug("Thread shutting down");
406 static void command_request(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
407 struct userdata
*u
= userdata
;
408 uint32_t bytes
, channel
;
411 pa_assert(command
== PA_COMMAND_REQUEST
);
414 pa_assert(u
->pdispatch
== pd
);
416 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
417 pa_tagstruct_getu32(t
, &bytes
) < 0 ||
418 !pa_tagstruct_eof(t
)) {
419 pa_log("Invalid protocol reply");
423 if (channel
!= u
->channel
) {
424 pa_log("Recieved data for invalid channel");
428 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
);
432 pa_module_unload_request(u
->module
);
437 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
438 struct userdata
*u
= userdata
;
439 pa_usec_t sink_usec
, source_usec
, transport_usec
, host_usec
, k
;
441 int64_t write_index
, read_index
;
442 struct timeval local
, remote
, now
;
447 if (command
!= PA_COMMAND_REPLY
) {
448 if (command
== PA_COMMAND_ERROR
)
449 pa_log("Failed to get latency.");
451 pa_log("Protocol error.");
455 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
456 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
457 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
458 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
459 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
460 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
461 pa_tagstruct_gets64(t
, &read_index
) < 0 ||
462 !pa_tagstruct_eof(t
)) {
463 pa_log("Invalid reply. (latency)");
467 pa_gettimeofday(&now
);
469 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
470 /* local and remote seem to have synchronized clocks */
472 transport_usec
= pa_timeval_diff(&remote
, &local
);
474 transport_usec
= pa_timeval_diff(&now
, &remote
);
477 transport_usec
= pa_timeval_diff(&now
, &local
)/2;
480 host_usec
= sink_usec
+ transport_usec
;
482 host_usec
= source_usec
+ transport_usec
;
483 if (host_usec
> sink_usec
)
484 host_usec
-= sink_usec
;
490 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->sink
->sample_spec
);
497 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->source
->sample_spec
);
501 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), k
);
506 pa_module_unload_request(u
->module
);
509 static void request_latency(struct userdata
*u
) {
515 t
= pa_tagstruct_new(NULL
, 0);
517 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
519 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
521 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
522 pa_tagstruct_putu32(t
, u
->channel
);
524 pa_gettimeofday(&now
);
525 pa_tagstruct_put_timeval(t
, &now
);
527 pa_pstream_send_tagstruct(u
->pstream
, t
);
528 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
530 u
->counter_delta
= 0;
533 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, PA_GCC_UNUSED
const struct timeval
*tv
, void *userdata
) {
534 struct userdata
*u
= userdata
;
543 pa_gettimeofday(&ntv
);
544 ntv
.tv_sec
+= LATENCY_INTERVAL
;
545 m
->time_restart(e
, &ntv
);
549 static pa_usec_t
sink_get_latency(pa_sink
*s
) {
551 struct userdata
*u
= s
->userdata
;
553 pa_sink_assert_ref(s
);
555 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
556 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
558 return c
> t
? c
- t
: 0;
561 static pa_usec_t
source_get_latency(pa_source
*s
) {
563 struct userdata
*u
= s
->userdata
;
565 pa_source_assert_ref(s
);
567 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
568 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
570 return t
> c
? t
- c
: 0;
576 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
577 struct userdata
*u
= userdata
;
578 uint32_t idx
, owner_module
, client
, sink
;
579 pa_usec_t buffer_usec
, sink_usec
;
580 const char *name
, *driver
, *resample_method
;
582 pa_sample_spec sample_spec
;
583 pa_channel_map channel_map
;
589 if (command
!= PA_COMMAND_REPLY
) {
590 if (command
== PA_COMMAND_ERROR
)
591 pa_log("Failed to get info.");
593 pa_log("Protocol error.");
597 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
598 pa_tagstruct_gets(t
, &name
) < 0 ||
599 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
600 pa_tagstruct_getu32(t
, &client
) < 0 ||
601 pa_tagstruct_getu32(t
, &sink
) < 0 ||
602 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
603 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
604 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
605 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
606 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
607 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
608 pa_tagstruct_gets(t
, &driver
) < 0 ||
609 (u
->version
>= 11 && pa_tagstruct_get_boolean(t
, &mute
) < 0) ||
610 !pa_tagstruct_eof(t
)) {
611 pa_log("Invalid reply. (get_info)");
617 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
618 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
621 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
623 if (u
->version
>= 11)
624 u
->sink
->muted
= !!mute
;
626 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
630 pa_module_unload_request(u
->module
);
633 static void request_info(struct userdata
*u
) {
638 t
= pa_tagstruct_new(NULL
, 0);
639 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
640 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
641 pa_tagstruct_putu32(t
, u
->device_index
);
642 pa_pstream_send_tagstruct(u
->pstream
, t
);
643 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
646 static void command_subscribe_event(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
647 struct userdata
*u
= userdata
;
648 pa_subscription_event_type_t e
;
654 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
656 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
657 pa_tagstruct_getu32(t
, &idx
) < 0 ||
658 !pa_tagstruct_eof(t
)) {
659 pa_log("Invalid protocol reply");
660 pa_module_unload_request(u
->module
);
664 if (e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
))
670 static void start_subscribe(struct userdata
*u
) {
675 t
= pa_tagstruct_new(NULL
, 0);
676 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
677 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
678 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SINK_INPUT
);
679 pa_pstream_send_tagstruct(u
->pstream
, t
);
683 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
684 struct userdata
*u
= userdata
;
692 pa_assert(u
->pdispatch
== pd
);
694 if (command
!= PA_COMMAND_REPLY
) {
695 if (command
== PA_COMMAND_ERROR
)
696 pa_log("Failed to create stream.");
698 pa_log("Protocol error.");
702 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
703 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
705 || pa_tagstruct_getu32(t
, &bytes
) < 0
710 if (u
->version
>= 9) {
712 uint32_t maxlength
, tlength
, prebuf
, minreq
;
714 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
715 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
716 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
717 pa_tagstruct_getu32(t
, &minreq
) < 0)
720 uint32_t maxlength
, fragsize
;
722 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
723 pa_tagstruct_getu32(t
, &fragsize
) < 0)
728 if (!pa_tagstruct_eof(t
))
736 pa_assert(!u
->time_event
);
737 pa_gettimeofday(&ntv
);
738 ntv
.tv_sec
+= LATENCY_INTERVAL
;
739 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
743 pa_log_debug("Stream created.");
746 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
752 pa_log("Invalid reply. (Create stream)");
755 pa_module_unload_request(u
->module
);
758 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
759 struct userdata
*u
= userdata
;
761 char name
[256], un
[128], hn
[128];
768 pa_assert(u
->pdispatch
== pd
);
770 if (command
!= PA_COMMAND_REPLY
||
771 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
772 !pa_tagstruct_eof(t
)) {
773 if (command
== PA_COMMAND_ERROR
)
774 pa_log("Failed to authenticate");
776 pa_log("Protocol error.");
781 /* Minimum supported protocol version */
782 if (u
->version
< 8) {
783 pa_log("Incompatible protocol version");
788 pa_snprintf(name
, sizeof(name
), "Tunnel from host %s, user %s, sink %s",
789 pa_get_host_name(hn
, sizeof(hn
)),
790 pa_get_user_name(un
, sizeof(un
)),
793 pa_snprintf(name
, sizeof(name
), "Tunnel from host %s, user %s, source %s",
794 pa_get_host_name(hn
, sizeof(hn
)),
795 pa_get_user_name(un
, sizeof(un
)),
799 reply
= pa_tagstruct_new(NULL
, 0);
800 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
801 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
802 pa_tagstruct_puts(reply
, name
);
803 pa_pstream_send_tagstruct(u
->pstream
, reply
);
804 /* We ignore the server's reply here */
806 reply
= pa_tagstruct_new(NULL
, 0);
809 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
810 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
811 pa_tagstruct_puts(reply
, name
);
812 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
813 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
814 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
815 pa_tagstruct_puts(reply
, u
->sink_name
);
816 pa_tagstruct_putu32(reply
, u
->maxlength
);
817 pa_tagstruct_put_boolean(reply
, !PA_SINK_OPENED(pa_sink_get_state(u
->sink
)));
818 pa_tagstruct_putu32(reply
, u
->tlength
);
819 pa_tagstruct_putu32(reply
, u
->prebuf
);
820 pa_tagstruct_putu32(reply
, u
->minreq
);
821 pa_tagstruct_putu32(reply
, 0);
822 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
823 pa_tagstruct_put_cvolume(reply
, &volume
);
825 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
826 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
827 pa_tagstruct_puts(reply
, name
);
828 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
829 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
830 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
831 pa_tagstruct_puts(reply
, u
->source_name
);
832 pa_tagstruct_putu32(reply
, u
->maxlength
);
833 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_OPENED(pa_source_get_state(u
->source
)));
834 pa_tagstruct_putu32(reply
, u
->fragsize
);
837 pa_pstream_send_tagstruct(u
->pstream
, reply
);
838 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
840 pa_log_debug("Connection authenticated, creating stream ...");
845 pa_module_unload_request(u
->module
);
848 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
849 struct userdata
*u
= userdata
;
854 pa_log_warn("Stream died.");
855 pa_module_unload_request(u
->module
);
858 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
859 struct userdata
*u
= userdata
;
865 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
866 pa_log("Invalid packet");
867 pa_module_unload_request(u
->module
);
873 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
874 struct userdata
*u
= userdata
;
880 if (channel
!= u
->channel
) {
881 pa_log("Recieved memory block on bad channel.");
882 pa_module_unload_request(u
->module
);
886 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
888 u
->counter
+= chunk
->length
;
889 u
->counter_delta
+= chunk
->length
;
894 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
895 struct userdata
*u
= userdata
;
901 pa_assert(u
->client
== sc
);
903 pa_socket_client_unref(u
->client
);
907 pa_log("Connection failed: %s", pa_cstrerror(errno
));
908 pa_module_unload_request(u
->module
);
912 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
913 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
915 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
916 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
918 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
921 t
= pa_tagstruct_new(NULL
, 0);
922 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
923 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
924 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
925 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
931 if (pa_iochannel_creds_supported(io
))
932 pa_iochannel_creds_enable(io
);
934 ucred
.uid
= getuid();
935 ucred
.gid
= getgid();
937 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
940 pa_pstream_send_tagstruct(u
->pstream
, t
);
943 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
945 pa_log_debug("Connection established, authenticating ...");
950 static int sink_get_volume(pa_sink
*sink
) {
954 static int sink_set_volume(pa_sink
*sink
) {
963 t
= pa_tagstruct_new(NULL
, 0);
964 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
965 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
966 pa_tagstruct_putu32(t
, u
->device_index
);
967 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
968 pa_pstream_send_tagstruct(u
->pstream
, t
);
973 static int sink_get_mute(pa_sink
*sink
) {
977 static int sink_set_mute(pa_sink
*sink
) {
989 t
= pa_tagstruct_new(NULL
, 0);
990 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
991 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
992 pa_tagstruct_putu32(t
, u
->device_index
);
993 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
994 pa_pstream_send_tagstruct(u
->pstream
, t
);
1001 static int load_key(struct userdata
*u
, const char*fn
) {
1004 u
->auth_cookie_in_property
= FALSE
;
1006 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1007 pa_log_debug("using already loaded auth cookie.");
1008 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1009 u
->auth_cookie_in_property
= 1;
1014 fn
= PA_NATIVE_COOKIE_FILE
;
1016 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1019 pa_log_debug("loading cookie from disk.");
1021 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1022 u
->auth_cookie_in_property
= TRUE
;
1027 int pa__init(pa_module
*m
) {
1028 pa_modargs
*ma
= NULL
;
1029 struct userdata
*u
= NULL
;
1032 char *t
, *dn
= NULL
;
1036 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1037 pa_log("failed to parse module arguments");
1041 u
= pa_xnew(struct userdata
, 1);
1046 u
->pdispatch
= NULL
;
1048 u
->server_name
= NULL
;
1050 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1052 u
->requested_bytes
= 0;
1054 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1057 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
);
1059 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1060 u
->auth_cookie_in_property
= FALSE
;
1061 u
->time_event
= NULL
;
1063 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
);
1064 u
->rtpoll
= pa_rtpoll_new();
1065 pa_rtpoll_item_new_asyncmsgq(u
->rtpoll
, PA_RTPOLL_EARLY
, u
->thread_mq
.inq
);
1067 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1070 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1071 pa_log("no server specified.");
1075 ss
= m
->core
->default_sample_spec
;
1076 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1077 pa_log("invalid sample format specification");
1081 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1082 pa_log("failed to connect to server '%s'", u
->server_name
);
1086 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1090 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1091 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1093 if (!(u
->sink
= pa_sink_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1094 pa_log("Failed to create sink.");
1098 u
->sink
->parent
.process_msg
= sink_process_msg
;
1099 u
->sink
->userdata
= u
;
1100 u
->sink
->set_state
= sink_set_state
;
1101 u
->sink
->get_latency
= sink_get_latency
;
1102 u
->sink
->get_volume
= sink_get_volume
;
1103 u
->sink
->get_mute
= sink_get_mute
;
1104 u
->sink
->set_volume
= sink_set_volume
;
1105 u
->sink
->set_mute
= sink_set_mute
;
1106 u
->sink
->flags
= PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
;
1108 pa_sink_set_module(u
->sink
, m
);
1109 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1110 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1111 pa_sink_set_description(u
->sink
, t
= pa_sprintf_malloc("Tunnel to %s%s%s", u
->sink_name
? u
->sink_name
: "", u
->sink_name
? " on " : "", u
->server_name
));
1116 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1117 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1119 if (!(u
->source
= pa_source_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1120 pa_log("Failed to create source.");
1124 u
->source
->parent
.process_msg
= source_process_msg
;
1125 u
->source
->userdata
= u
;
1126 u
->source
->set_state
= source_set_state
;
1127 u
->source
->get_latency
= source_get_latency
;
1128 u
->source
->flags
= PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
;
1130 pa_source_set_module(u
->source
, m
);
1131 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1132 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1133 pa_source_set_description(u
->source
, t
= pa_sprintf_malloc("Tunnel to %s%s%s", u
->source_name
? u
->source_name
: "", u
->source_name
? " on " : "", u
->server_name
));
1139 u
->time_event
= NULL
;
1141 u
->maxlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MAXLENGTH_MSEC
, &ss
);
1143 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &ss
);
1144 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &ss
);
1145 u
->prebuf
= u
->tlength
;
1147 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &ss
);
1150 u
->counter
= u
->counter_delta
= 0;
1151 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1153 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1154 pa_log("Failed to create thread.");
1159 pa_sink_put(u
->sink
);
1161 pa_source_put(u
->source
);
1164 pa_modargs_free(ma
);
1172 pa_modargs_free(ma
);
1179 void pa__done(pa_module
*m
) {
1184 if (!(u
= m
->userdata
))
1189 pa_sink_unlink(u
->sink
);
1192 pa_source_unlink(u
->source
);
1196 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1197 pa_thread_free(u
->thread
);
1200 pa_thread_mq_done(&u
->thread_mq
);
1204 pa_sink_unref(u
->sink
);
1207 pa_source_unref(u
->source
);
1211 pa_rtpoll_free(u
->rtpoll
);
1214 pa_pstream_unlink(u
->pstream
);
1215 pa_pstream_unref(u
->pstream
);
1219 pa_pdispatch_unref(u
->pdispatch
);
1222 pa_socket_client_unref(u
->client
);
1224 if (u
->auth_cookie_in_property
)
1225 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1228 pa_smoother_free(u
->smoother
);
1231 u
->core
->mainloop
->time_free(u
->time_event
);
1234 pa_xfree(u
->sink_name
);
1236 pa_xfree(u
->source_name
);
1238 pa_xfree(u
->server_name
);