4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
32 #include <sys/types.h>
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
61 #include "module-tunnel-sink-symdef.h"
62 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
63 PA_MODULE_LOAD_ONCE(FALSE
);
66 "sink=<remote sink name> "
68 "format=<sample format> "
69 "channels=<number of channels> "
71 "sink_name=<name for the local sink> "
72 "channel_map=<channel map>");
74 #include "module-tunnel-source-symdef.h"
75 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 "source=<remote source name> "
80 "format=<sample format> "
81 "channels=<number of channels> "
83 "source_name=<name for the local source> "
84 "channel_map=<channel map>");
87 PA_MODULE_AUTHOR("Lennart Poettering");
88 PA_MODULE_VERSION(PACKAGE_VERSION
);
90 #define DEFAULT_TLENGTH_MSEC 100
91 #define DEFAULT_MINREQ_MSEC 10
92 #define DEFAULT_MAXLENGTH_MSEC ((DEFAULT_TLENGTH_MSEC*3)/2)
93 #define DEFAULT_FRAGSIZE_MSEC 10
95 #define DEFAULT_TIMEOUT 5
97 #define LATENCY_INTERVAL 10
99 static const char* const valid_modargs
[] = {
117 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
121 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
126 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
128 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
129 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
130 static void command_overflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
131 static void command_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
133 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
135 [PA_COMMAND_REQUEST
] = command_request
,
137 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
138 [PA_COMMAND_OVERFLOW
] = command_overflow
,
139 [PA_COMMAND_UNDERFLOW
] = command_underflow
,
140 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
141 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
148 pa_thread_mq thread_mq
;
152 pa_socket_client
*client
;
154 pa_pdispatch
*pdispatch
;
160 uint32_t requested_bytes
;
166 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
170 uint32_t device_index
;
173 int64_t counter
, counter_delta
;
175 pa_time_event
*time_event
;
177 pa_bool_t auth_cookie_in_property
;
179 pa_smoother
*smoother
;
181 char *device_description
;
195 static void command_stream_killed(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
196 struct userdata
*u
= userdata
;
201 pa_assert(u
->pdispatch
== pd
);
203 pa_log_warn("Stream killed");
204 pa_module_unload_request(u
->module
);
207 static void command_overflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
208 struct userdata
*u
= userdata
;
213 pa_assert(u
->pdispatch
== pd
);
215 pa_log_warn("Server signalled buffer overrun.");
218 static void command_underflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
219 struct userdata
*u
= userdata
;
224 pa_assert(u
->pdispatch
== pd
);
226 pa_log_warn("Server signalled buffer underrun.");
229 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
234 pa_smoother_pause(u
->smoother
, pa_rtclock_usec());
236 pa_smoother_resume(u
->smoother
, pa_rtclock_usec());
241 t
= pa_tagstruct_new(NULL
, 0);
243 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
245 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
247 pa_tagstruct_putu32(t
, u
->ctag
++);
248 pa_tagstruct_putu32(t
, u
->channel
);
249 pa_tagstruct_put_boolean(t
, !!cork
);
250 pa_pstream_send_tagstruct(u
->pstream
, t
);
255 static void send_data(struct userdata
*u
) {
258 while (u
->requested_bytes
> 0) {
259 pa_memchunk memchunk
;
260 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
261 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
262 pa_memblock_unref(memchunk
.memblock
);
263 u
->requested_bytes
-= memchunk
.length
;
267 /* This function is called from IO context -- except when it is not. */
268 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
269 struct userdata
*u
= PA_SINK(o
)->userdata
;
273 case PA_SINK_MESSAGE_SET_STATE
: {
276 /* First, change the state, because otherwide pa_sink_render() would fail */
277 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
278 if (PA_SINK_OPENED((pa_sink_state_t
) PA_PTR_TO_UINT(data
)))
284 case SINK_MESSAGE_REQUEST
:
286 pa_assert(offset
> 0);
287 u
->requested_bytes
+= (size_t) offset
;
289 if (PA_SINK_OPENED(u
->sink
->thread_info
.state
))
294 case SINK_MESSAGE_POST
:
296 /* OK, This might be a bit confusing. This message is
297 * delivered to us from the main context -- NOT from the
298 * IO thread context where the rest of the messages are
299 * dispatched. Yeah, ugly, but I am a lazy bastard. */
301 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
302 u
->counter
+= chunk
->length
;
303 u
->counter_delta
+= chunk
->length
;
307 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
310 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
312 pa_sink_assert_ref(s
);
315 switch ((pa_sink_state_t
) state
) {
317 case PA_SINK_SUSPENDED
:
318 pa_assert(PA_SINK_OPENED(s
->state
));
319 stream_cork(u
, TRUE
);
323 case PA_SINK_RUNNING
:
324 if (s
->state
== PA_SINK_SUSPENDED
)
325 stream_cork(u
, FALSE
);
328 case PA_SINK_UNLINKED
:
338 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
339 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
342 case SOURCE_MESSAGE_POST
:
344 if (PA_SOURCE_OPENED(u
->source
->thread_info
.state
))
345 pa_source_post(u
->source
, chunk
);
349 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
352 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
354 pa_source_assert_ref(s
);
357 switch ((pa_source_state_t
) state
) {
359 case PA_SOURCE_SUSPENDED
:
360 pa_assert(PA_SOURCE_OPENED(s
->state
));
361 stream_cork(u
, TRUE
);
365 case PA_SOURCE_RUNNING
:
366 if (s
->state
== PA_SOURCE_SUSPENDED
)
367 stream_cork(u
, FALSE
);
370 case PA_SOURCE_UNLINKED
:
380 static void thread_func(void *userdata
) {
381 struct userdata
*u
= userdata
;
385 pa_log_debug("Thread starting up");
387 pa_thread_mq_install(&u
->thread_mq
);
388 pa_rtpoll_install(u
->rtpoll
);
393 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
401 /* If this was no regular exit from the loop we have to continue
402 * processing messages until we received PA_MESSAGE_SHUTDOWN */
403 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
404 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
407 pa_log_debug("Thread shutting down");
411 static void command_request(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
412 struct userdata
*u
= userdata
;
413 uint32_t bytes
, channel
;
416 pa_assert(command
== PA_COMMAND_REQUEST
);
419 pa_assert(u
->pdispatch
== pd
);
421 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
422 pa_tagstruct_getu32(t
, &bytes
) < 0) {
423 pa_log("Invalid protocol reply");
427 if (channel
!= u
->channel
) {
428 pa_log("Recieved data for invalid channel");
432 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
);
436 pa_module_unload_request(u
->module
);
441 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
442 struct userdata
*u
= userdata
;
443 pa_usec_t sink_usec
, source_usec
, transport_usec
, host_usec
, k
;
445 int64_t write_index
, read_index
;
446 struct timeval local
, remote
, now
;
451 if (command
!= PA_COMMAND_REPLY
) {
452 if (command
== PA_COMMAND_ERROR
)
453 pa_log("Failed to get latency.");
455 pa_log("Protocol error 1.");
459 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
460 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
461 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
462 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
463 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
464 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
465 pa_tagstruct_gets64(t
, &read_index
) < 0) {
466 pa_log("Invalid reply. (latency)");
470 pa_gettimeofday(&now
);
472 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
473 /* local and remote seem to have synchronized clocks */
475 transport_usec
= pa_timeval_diff(&remote
, &local
);
477 transport_usec
= pa_timeval_diff(&now
, &remote
);
480 transport_usec
= pa_timeval_diff(&now
, &local
)/2;
483 host_usec
= sink_usec
+ transport_usec
;
485 host_usec
= source_usec
+ transport_usec
;
486 if (host_usec
> sink_usec
)
487 host_usec
-= sink_usec
;
493 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->sink
->sample_spec
);
500 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->source
->sample_spec
);
504 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), k
);
509 pa_module_unload_request(u
->module
);
512 static void request_latency(struct userdata
*u
) {
518 t
= pa_tagstruct_new(NULL
, 0);
520 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
522 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
524 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
525 pa_tagstruct_putu32(t
, u
->channel
);
527 pa_gettimeofday(&now
);
528 pa_tagstruct_put_timeval(t
, &now
);
530 pa_pstream_send_tagstruct(u
->pstream
, t
);
531 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
533 u
->counter_delta
= 0;
536 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, PA_GCC_UNUSED
const struct timeval
*tv
, void *userdata
) {
537 struct userdata
*u
= userdata
;
546 pa_gettimeofday(&ntv
);
547 ntv
.tv_sec
+= LATENCY_INTERVAL
;
548 m
->time_restart(e
, &ntv
);
552 static pa_usec_t
sink_get_latency(pa_sink
*s
) {
554 struct userdata
*u
= s
->userdata
;
556 pa_sink_assert_ref(s
);
558 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
559 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
561 return c
> t
? c
- t
: 0;
564 static pa_usec_t
source_get_latency(pa_source
*s
) {
566 struct userdata
*u
= s
->userdata
;
568 pa_source_assert_ref(s
);
570 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
571 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
573 return t
> c
? t
- c
: 0;
577 static void update_description(struct userdata
*u
) {
579 char un
[128], hn
[128];
584 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
587 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
590 pa_sink_set_description(u
->sink
, d
);
592 pa_source_set_description(u
->source
, d
);
597 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
598 pa_get_user_name(un
, sizeof(un
)),
599 pa_get_host_name(hn
, sizeof(hn
)));
601 t
= pa_tagstruct_new(NULL
, 0);
603 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
605 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
607 pa_tagstruct_putu32(t
, u
->ctag
++);
608 pa_tagstruct_putu32(t
, u
->channel
);
609 pa_tagstruct_puts(t
, d
);
610 pa_pstream_send_tagstruct(u
->pstream
, t
);
615 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
616 struct userdata
*u
= userdata
;
618 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
624 if (command
!= PA_COMMAND_REPLY
) {
625 if (command
== PA_COMMAND_ERROR
)
626 pa_log("Failed to get info.");
628 pa_log("Protocol error 6.");
632 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
633 pa_tagstruct_gets(t
, &server_version
) < 0 ||
634 pa_tagstruct_gets(t
, &user_name
) < 0 ||
635 pa_tagstruct_gets(t
, &host_name
) < 0 ||
636 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
637 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
638 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
639 pa_tagstruct_getu32(t
, &cookie
) < 0) {
640 pa_log("Invalid reply. (get_server_info)");
644 pa_xfree(u
->server_fqdn
);
645 u
->server_fqdn
= pa_xstrdup(host_name
);
647 pa_xfree(u
->user_name
);
648 u
->user_name
= pa_xstrdup(user_name
);
650 update_description(u
);
655 pa_module_unload_request(u
->module
);
660 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
661 struct userdata
*u
= userdata
;
662 uint32_t idx
, owner_module
, monitor_source
, flags
;
663 const char *name
, *description
, *monitor_source_name
, *driver
;
673 if (command
!= PA_COMMAND_REPLY
) {
674 if (command
== PA_COMMAND_ERROR
)
675 pa_log("Failed to get info.");
677 pa_log("Protocol error 5.");
681 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
682 pa_tagstruct_gets(t
, &name
) < 0 ||
683 pa_tagstruct_gets(t
, &description
) < 0 ||
684 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
685 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
686 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
687 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
688 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
689 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
690 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
691 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
692 pa_tagstruct_gets(t
, &driver
) < 0 ||
693 pa_tagstruct_getu32(t
, &flags
) < 0) {
694 pa_log("Invalid reply. (get_sink_info)");
698 if (strcmp(name
, u
->sink_name
))
701 pa_xfree(u
->device_description
);
702 u
->device_description
= pa_xstrdup(description
);
704 update_description(u
);
709 pa_module_unload_request(u
->module
);
712 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
713 struct userdata
*u
= userdata
;
714 uint32_t idx
, owner_module
, client
, sink
;
715 pa_usec_t buffer_usec
, sink_usec
;
716 const char *name
, *driver
, *resample_method
;
718 pa_sample_spec sample_spec
;
719 pa_channel_map channel_map
;
725 if (command
!= PA_COMMAND_REPLY
) {
726 if (command
== PA_COMMAND_ERROR
)
727 pa_log("Failed to get info.");
729 pa_log("Protocol error 2.");
733 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
734 pa_tagstruct_gets(t
, &name
) < 0 ||
735 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
736 pa_tagstruct_getu32(t
, &client
) < 0 ||
737 pa_tagstruct_getu32(t
, &sink
) < 0 ||
738 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
739 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
740 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
741 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
742 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
743 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
744 pa_tagstruct_gets(t
, &driver
) < 0 ||
745 (u
->version
>= 11 && pa_tagstruct_get_boolean(t
, &mute
) < 0)) {
746 pa_log("Invalid reply. (get_info)");
750 if (idx
!= u
->device_index
)
755 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
756 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
759 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
761 if (u
->version
>= 11)
762 u
->sink
->muted
= !!mute
;
764 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
768 pa_module_unload_request(u
->module
);
773 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
774 struct userdata
*u
= userdata
;
775 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
776 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
786 if (command
!= PA_COMMAND_REPLY
) {
787 if (command
== PA_COMMAND_ERROR
)
788 pa_log("Failed to get info.");
790 pa_log("Protocol error 5.");
794 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
795 pa_tagstruct_gets(t
, &name
) < 0 ||
796 pa_tagstruct_gets(t
, &description
) < 0 ||
797 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
798 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
799 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
800 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
801 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
802 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
803 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
804 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
805 pa_tagstruct_gets(t
, &driver
) < 0 ||
806 pa_tagstruct_getu32(t
, &flags
) < 0) {
807 pa_log("Invalid reply. (get_source_info)");
811 if (strcmp(name
, u
->source_name
))
814 pa_xfree(u
->device_description
);
815 u
->device_description
= pa_xstrdup(description
);
817 update_description(u
);
822 pa_module_unload_request(u
->module
);
827 static void request_info(struct userdata
*u
) {
832 t
= pa_tagstruct_new(NULL
, 0);
833 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
834 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
835 pa_pstream_send_tagstruct(u
->pstream
, t
);
836 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
839 t
= pa_tagstruct_new(NULL
, 0);
840 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
841 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
842 pa_tagstruct_putu32(t
, u
->device_index
);
843 pa_pstream_send_tagstruct(u
->pstream
, t
);
844 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
846 t
= pa_tagstruct_new(NULL
, 0);
847 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
848 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
849 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
850 pa_tagstruct_puts(t
, u
->sink_name
);
851 pa_pstream_send_tagstruct(u
->pstream
, t
);
852 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
854 t
= pa_tagstruct_new(NULL
, 0);
855 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
856 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
857 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
858 pa_tagstruct_puts(t
, u
->source_name
);
859 pa_pstream_send_tagstruct(u
->pstream
, t
);
860 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
864 static void command_subscribe_event(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
865 struct userdata
*u
= userdata
;
866 pa_subscription_event_type_t e
;
872 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
874 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
875 pa_tagstruct_getu32(t
, &idx
) < 0) {
876 pa_log("Invalid protocol reply");
877 pa_module_unload_request(u
->module
);
881 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
883 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
884 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
886 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
894 static void start_subscribe(struct userdata
*u
) {
899 t
= pa_tagstruct_new(NULL
, 0);
900 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
901 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
902 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
904 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
906 PA_SUBSCRIPTION_MASK_SOURCE
910 pa_pstream_send_tagstruct(u
->pstream
, t
);
913 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
914 struct userdata
*u
= userdata
;
922 pa_assert(u
->pdispatch
== pd
);
924 if (command
!= PA_COMMAND_REPLY
) {
925 if (command
== PA_COMMAND_ERROR
)
926 pa_log("Failed to create stream.");
928 pa_log("Protocol error 3.");
932 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
933 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
935 || pa_tagstruct_getu32(t
, &bytes
) < 0
940 if (u
->version
>= 9) {
942 uint32_t maxlength
, tlength
, prebuf
, minreq
;
944 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
945 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
946 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
947 pa_tagstruct_getu32(t
, &minreq
) < 0)
950 uint32_t maxlength
, fragsize
;
952 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
953 pa_tagstruct_getu32(t
, &fragsize
) < 0)
961 pa_assert(!u
->time_event
);
962 pa_gettimeofday(&ntv
);
963 ntv
.tv_sec
+= LATENCY_INTERVAL
;
964 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
968 pa_log_debug("Stream created.");
971 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
977 pa_log("Invalid reply. (Create stream)");
980 pa_module_unload_request(u
->module
);
983 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
984 struct userdata
*u
= userdata
;
986 char name
[256], un
[128], hn
[128];
993 pa_assert(u
->pdispatch
== pd
);
995 if (command
!= PA_COMMAND_REPLY
||
996 pa_tagstruct_getu32(t
, &u
->version
) < 0) {
997 if (command
== PA_COMMAND_ERROR
)
998 pa_log("Failed to authenticate");
1000 pa_log("Protocol error 4.");
1005 /* Minimum supported protocol version */
1006 if (u
->version
< 8) {
1007 pa_log("Incompatible protocol version");
1012 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1014 pa_get_user_name(un
, sizeof(un
)),
1015 pa_get_host_name(hn
, sizeof(hn
)));
1017 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1019 pa_get_user_name(un
, sizeof(un
)),
1020 pa_get_host_name(hn
, sizeof(hn
)));
1023 reply
= pa_tagstruct_new(NULL
, 0);
1024 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1025 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1026 pa_tagstruct_puts(reply
, "PulseAudio");
1027 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1028 /* We ignore the server's reply here */
1030 reply
= pa_tagstruct_new(NULL
, 0);
1033 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1034 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1035 pa_tagstruct_puts(reply
, name
);
1036 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1037 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1038 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1039 pa_tagstruct_puts(reply
, u
->sink_name
);
1040 pa_tagstruct_putu32(reply
, u
->maxlength
);
1041 pa_tagstruct_put_boolean(reply
, !PA_SINK_OPENED(pa_sink_get_state(u
->sink
)));
1042 pa_tagstruct_putu32(reply
, u
->tlength
);
1043 pa_tagstruct_putu32(reply
, u
->prebuf
);
1044 pa_tagstruct_putu32(reply
, u
->minreq
);
1045 pa_tagstruct_putu32(reply
, 0);
1046 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1047 pa_tagstruct_put_cvolume(reply
, &volume
);
1049 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1050 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1051 pa_tagstruct_puts(reply
, name
);
1052 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1053 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1054 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1055 pa_tagstruct_puts(reply
, u
->source_name
);
1056 pa_tagstruct_putu32(reply
, u
->maxlength
);
1057 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_OPENED(pa_source_get_state(u
->source
)));
1058 pa_tagstruct_putu32(reply
, u
->fragsize
);
1061 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1062 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1064 pa_log_debug("Connection authenticated, creating stream ...");
1069 pa_module_unload_request(u
->module
);
1072 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1073 struct userdata
*u
= userdata
;
1078 pa_log_warn("Stream died.");
1079 pa_module_unload_request(u
->module
);
1082 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1083 struct userdata
*u
= userdata
;
1089 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1090 pa_log("Invalid packet");
1091 pa_module_unload_request(u
->module
);
1097 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1098 struct userdata
*u
= userdata
;
1104 if (channel
!= u
->channel
) {
1105 pa_log("Recieved memory block on bad channel.");
1106 pa_module_unload_request(u
->module
);
1110 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1112 u
->counter
+= chunk
->length
;
1113 u
->counter_delta
+= chunk
->length
;
1118 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1119 struct userdata
*u
= userdata
;
1125 pa_assert(u
->client
== sc
);
1127 pa_socket_client_unref(u
->client
);
1131 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1132 pa_module_unload_request(u
->module
);
1136 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1137 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1139 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1140 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1142 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1145 t
= pa_tagstruct_new(NULL
, 0);
1146 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1147 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1148 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1149 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
1155 if (pa_iochannel_creds_supported(io
))
1156 pa_iochannel_creds_enable(io
);
1158 ucred
.uid
= getuid();
1159 ucred
.gid
= getgid();
1161 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1164 pa_pstream_send_tagstruct(u
->pstream
, t
);
1167 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1169 pa_log_debug("Connection established, authenticating ...");
1174 static int sink_get_volume(pa_sink
*sink
) {
1178 static int sink_set_volume(pa_sink
*sink
) {
1187 t
= pa_tagstruct_new(NULL
, 0);
1188 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1189 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1190 pa_tagstruct_putu32(t
, u
->device_index
);
1191 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
1192 pa_pstream_send_tagstruct(u
->pstream
, t
);
1197 static int sink_get_mute(pa_sink
*sink
) {
1201 static int sink_set_mute(pa_sink
*sink
) {
1210 if (u
->version
< 11)
1213 t
= pa_tagstruct_new(NULL
, 0);
1214 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1215 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1216 pa_tagstruct_putu32(t
, u
->device_index
);
1217 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1218 pa_pstream_send_tagstruct(u
->pstream
, t
);
1225 static int load_key(struct userdata
*u
, const char*fn
) {
1228 u
->auth_cookie_in_property
= FALSE
;
1230 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1231 pa_log_debug("Using already loaded auth cookie.");
1232 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1233 u
->auth_cookie_in_property
= 1;
1238 fn
= PA_NATIVE_COOKIE_FILE
;
1240 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1243 pa_log_debug("Loading cookie from disk.");
1245 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1246 u
->auth_cookie_in_property
= TRUE
;
1251 int pa__init(pa_module
*m
) {
1252 pa_modargs
*ma
= NULL
;
1253 struct userdata
*u
= NULL
;
1256 char *t
, *dn
= NULL
;
1260 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1261 pa_log("failed to parse module arguments");
1265 u
= pa_xnew0(struct userdata
, 1);
1270 u
->pdispatch
= NULL
;
1272 u
->server_name
= NULL
;
1274 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1276 u
->requested_bytes
= 0;
1278 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1281 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
);
1283 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1284 u
->auth_cookie_in_property
= FALSE
;
1285 u
->time_event
= NULL
;
1287 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
);
1288 u
->rtpoll
= pa_rtpoll_new();
1289 pa_rtpoll_item_new_asyncmsgq(u
->rtpoll
, PA_RTPOLL_EARLY
, u
->thread_mq
.inq
);
1291 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1294 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1295 pa_log("no server specified.");
1299 ss
= m
->core
->default_sample_spec
;
1300 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1301 pa_log("invalid sample format specification");
1305 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1306 pa_log("failed to connect to server '%s'", u
->server_name
);
1310 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1314 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1315 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1317 if (!(u
->sink
= pa_sink_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1318 pa_log("Failed to create sink.");
1322 u
->sink
->parent
.process_msg
= sink_process_msg
;
1323 u
->sink
->userdata
= u
;
1324 u
->sink
->set_state
= sink_set_state
;
1325 u
->sink
->get_latency
= sink_get_latency
;
1326 u
->sink
->get_volume
= sink_get_volume
;
1327 u
->sink
->get_mute
= sink_get_mute
;
1328 u
->sink
->set_volume
= sink_set_volume
;
1329 u
->sink
->set_mute
= sink_set_mute
;
1330 u
->sink
->flags
= PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
;
1332 pa_sink_set_module(u
->sink
, m
);
1333 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1334 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1335 pa_sink_set_description(u
->sink
, t
= pa_sprintf_malloc("%s%s%s", u
->sink_name
? u
->sink_name
: "", u
->sink_name
? " on " : "", u
->server_name
));
1340 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1341 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1343 if (!(u
->source
= pa_source_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1344 pa_log("Failed to create source.");
1348 u
->source
->parent
.process_msg
= source_process_msg
;
1349 u
->source
->userdata
= u
;
1350 u
->source
->set_state
= source_set_state
;
1351 u
->source
->get_latency
= source_get_latency
;
1352 u
->source
->flags
= PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
;
1354 pa_source_set_module(u
->source
, m
);
1355 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1356 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1357 pa_source_set_description(u
->source
, t
= pa_sprintf_malloc("%s%s%s", u
->source_name
? u
->source_name
: "", u
->source_name
? " on " : "", u
->server_name
));
1363 u
->time_event
= NULL
;
1365 u
->maxlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MAXLENGTH_MSEC
, &ss
);
1367 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &ss
);
1368 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &ss
);
1369 u
->prebuf
= u
->tlength
;
1371 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &ss
);
1374 u
->counter
= u
->counter_delta
= 0;
1375 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1377 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1378 pa_log("Failed to create thread.");
1383 pa_sink_put(u
->sink
);
1385 pa_source_put(u
->source
);
1388 pa_modargs_free(ma
);
1396 pa_modargs_free(ma
);
1403 void pa__done(pa_module
*m
) {
1408 if (!(u
= m
->userdata
))
1413 pa_sink_unlink(u
->sink
);
1416 pa_source_unlink(u
->source
);
1420 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1421 pa_thread_free(u
->thread
);
1424 pa_thread_mq_done(&u
->thread_mq
);
1428 pa_sink_unref(u
->sink
);
1431 pa_source_unref(u
->source
);
1435 pa_rtpoll_free(u
->rtpoll
);
1438 pa_pstream_unlink(u
->pstream
);
1439 pa_pstream_unref(u
->pstream
);
1443 pa_pdispatch_unref(u
->pdispatch
);
1446 pa_socket_client_unref(u
->client
);
1448 if (u
->auth_cookie_in_property
)
1449 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1452 pa_smoother_free(u
->smoother
);
1455 u
->core
->mainloop
->time_free(u
->time_event
);
1458 pa_xfree(u
->sink_name
);
1460 pa_xfree(u
->source_name
);
1462 pa_xfree(u
->server_name
);
1464 pa_xfree(u
->device_description
);
1465 pa_xfree(u
->server_fqdn
);
1466 pa_xfree(u
->user_name
);