4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
32 #include <sys/types.h>
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
61 #include "module-tunnel-sink-symdef.h"
62 PA_MODULE_DESCRIPTION("Tunnel module for sinks")
65 "sink=<remote sink name> "
67 "format=<sample format> "
68 "channels=<number of channels> "
70 "sink_name=<name for the local sink> "
71 "channel_map=<channel map>")
73 #include "module-tunnel-source-symdef.h"
74 PA_MODULE_DESCRIPTION("Tunnel module for sources")
77 "source=<remote source name> "
79 "format=<sample format> "
80 "channels=<number of channels> "
82 "source_name=<name for the local source> "
83 "channel_map=<channel map>")
86 PA_MODULE_AUTHOR("Lennart Poettering")
87 PA_MODULE_VERSION(PACKAGE_VERSION
)
89 #define DEFAULT_TLENGTH_MSEC 100
90 #define DEFAULT_MINREQ_MSEC 10
91 #define DEFAULT_MAXLENGTH_MSEC ((DEFAULT_TLENGTH_MSEC*3)/2)
92 #define DEFAULT_FRAGSIZE_MSEC 10
94 #define DEFAULT_TIMEOUT 5
96 #define LATENCY_INTERVAL 10
98 static const char* const valid_modargs
[] = {
116 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
120 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
125 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
127 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
128 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
129 static void command_overflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
130 static void command_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
132 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
134 [PA_COMMAND_REQUEST
] = command_request
,
136 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
137 [PA_COMMAND_OVERFLOW
] = command_overflow
,
138 [PA_COMMAND_UNDERFLOW
] = command_underflow
,
139 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
140 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
147 pa_thread_mq thread_mq
;
151 pa_socket_client
*client
;
153 pa_pdispatch
*pdispatch
;
159 uint32_t requested_bytes
;
165 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
169 uint32_t device_index
;
172 int64_t counter
, counter_delta
;
174 pa_time_event
*time_event
;
176 pa_bool_t auth_cookie_in_property
;
178 pa_smoother
*smoother
;
180 char *device_description
;
194 static void command_stream_killed(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
195 struct userdata
*u
= userdata
;
200 pa_assert(u
->pdispatch
== pd
);
202 pa_log_warn("Stream killed");
203 pa_module_unload_request(u
->module
);
206 static void command_overflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
207 struct userdata
*u
= userdata
;
212 pa_assert(u
->pdispatch
== pd
);
214 pa_log_warn("Server signalled buffer overrun.");
217 static void command_underflow(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
218 struct userdata
*u
= userdata
;
223 pa_assert(u
->pdispatch
== pd
);
225 pa_log_warn("Server signalled buffer underrun.");
228 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
233 pa_smoother_pause(u
->smoother
, pa_rtclock_usec());
235 pa_smoother_resume(u
->smoother
, pa_rtclock_usec());
240 t
= pa_tagstruct_new(NULL
, 0);
242 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
244 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
246 pa_tagstruct_putu32(t
, u
->ctag
++);
247 pa_tagstruct_putu32(t
, u
->channel
);
248 pa_tagstruct_put_boolean(t
, !!cork
);
249 pa_pstream_send_tagstruct(u
->pstream
, t
);
254 static void send_data(struct userdata
*u
) {
257 while (u
->requested_bytes
> 0) {
258 pa_memchunk memchunk
;
259 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
260 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
261 pa_memblock_unref(memchunk
.memblock
);
262 u
->requested_bytes
-= memchunk
.length
;
266 /* This function is called from IO context -- except when it is not. */
267 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
268 struct userdata
*u
= PA_SINK(o
)->userdata
;
272 case PA_SINK_MESSAGE_SET_STATE
: {
275 /* First, change the state, because otherwide pa_sink_render() would fail */
276 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
277 if (PA_SINK_OPENED((pa_sink_state_t
) PA_PTR_TO_UINT(data
)))
283 case SINK_MESSAGE_REQUEST
:
285 pa_assert(offset
> 0);
286 u
->requested_bytes
+= (size_t) offset
;
288 if (PA_SINK_OPENED(u
->sink
->thread_info
.state
))
293 case SINK_MESSAGE_POST
:
295 /* OK, This might be a bit confusing. This message is
296 * delivered to us from the main context -- NOT from the
297 * IO thread context where the rest of the messages are
298 * dispatched. Yeah, ugly, but I am a lazy bastard. */
300 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
301 u
->counter
+= chunk
->length
;
302 u
->counter_delta
+= chunk
->length
;
306 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
309 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
311 pa_sink_assert_ref(s
);
314 switch ((pa_sink_state_t
) state
) {
316 case PA_SINK_SUSPENDED
:
317 pa_assert(PA_SINK_OPENED(s
->state
));
318 stream_cork(u
, TRUE
);
322 case PA_SINK_RUNNING
:
323 if (s
->state
== PA_SINK_SUSPENDED
)
324 stream_cork(u
, FALSE
);
327 case PA_SINK_UNLINKED
:
337 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
338 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
341 case SOURCE_MESSAGE_POST
:
343 if (PA_SOURCE_OPENED(u
->source
->thread_info
.state
))
344 pa_source_post(u
->source
, chunk
);
348 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
351 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
353 pa_source_assert_ref(s
);
356 switch ((pa_source_state_t
) state
) {
358 case PA_SOURCE_SUSPENDED
:
359 pa_assert(PA_SOURCE_OPENED(s
->state
));
360 stream_cork(u
, TRUE
);
364 case PA_SOURCE_RUNNING
:
365 if (s
->state
== PA_SOURCE_SUSPENDED
)
366 stream_cork(u
, FALSE
);
369 case PA_SOURCE_UNLINKED
:
379 static void thread_func(void *userdata
) {
380 struct userdata
*u
= userdata
;
384 pa_log_debug("Thread starting up");
386 pa_thread_mq_install(&u
->thread_mq
);
387 pa_rtpoll_install(u
->rtpoll
);
392 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
400 /* If this was no regular exit from the loop we have to continue
401 * processing messages until we received PA_MESSAGE_SHUTDOWN */
402 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
403 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
406 pa_log_debug("Thread shutting down");
410 static void command_request(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
411 struct userdata
*u
= userdata
;
412 uint32_t bytes
, channel
;
415 pa_assert(command
== PA_COMMAND_REQUEST
);
418 pa_assert(u
->pdispatch
== pd
);
420 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
421 pa_tagstruct_getu32(t
, &bytes
) < 0) {
422 pa_log("Invalid protocol reply");
426 if (channel
!= u
->channel
) {
427 pa_log("Recieved data for invalid channel");
431 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
);
435 pa_module_unload_request(u
->module
);
440 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
441 struct userdata
*u
= userdata
;
442 pa_usec_t sink_usec
, source_usec
, transport_usec
, host_usec
, k
;
444 int64_t write_index
, read_index
;
445 struct timeval local
, remote
, now
;
450 if (command
!= PA_COMMAND_REPLY
) {
451 if (command
== PA_COMMAND_ERROR
)
452 pa_log("Failed to get latency.");
454 pa_log("Protocol error 1.");
458 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
459 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
460 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
461 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
462 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
463 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
464 pa_tagstruct_gets64(t
, &read_index
) < 0) {
465 pa_log("Invalid reply. (latency)");
469 pa_gettimeofday(&now
);
471 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
472 /* local and remote seem to have synchronized clocks */
474 transport_usec
= pa_timeval_diff(&remote
, &local
);
476 transport_usec
= pa_timeval_diff(&now
, &remote
);
479 transport_usec
= pa_timeval_diff(&now
, &local
)/2;
482 host_usec
= sink_usec
+ transport_usec
;
484 host_usec
= source_usec
+ transport_usec
;
485 if (host_usec
> sink_usec
)
486 host_usec
-= sink_usec
;
492 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->sink
->sample_spec
);
499 k
= pa_bytes_to_usec(u
->counter
- u
->counter_delta
, &u
->source
->sample_spec
);
503 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), k
);
508 pa_module_unload_request(u
->module
);
511 static void request_latency(struct userdata
*u
) {
517 t
= pa_tagstruct_new(NULL
, 0);
519 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
521 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
523 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
524 pa_tagstruct_putu32(t
, u
->channel
);
526 pa_gettimeofday(&now
);
527 pa_tagstruct_put_timeval(t
, &now
);
529 pa_pstream_send_tagstruct(u
->pstream
, t
);
530 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
532 u
->counter_delta
= 0;
535 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, PA_GCC_UNUSED
const struct timeval
*tv
, void *userdata
) {
536 struct userdata
*u
= userdata
;
545 pa_gettimeofday(&ntv
);
546 ntv
.tv_sec
+= LATENCY_INTERVAL
;
547 m
->time_restart(e
, &ntv
);
551 static pa_usec_t
sink_get_latency(pa_sink
*s
) {
553 struct userdata
*u
= s
->userdata
;
555 pa_sink_assert_ref(s
);
557 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
558 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
560 return c
> t
? c
- t
: 0;
563 static pa_usec_t
source_get_latency(pa_source
*s
) {
565 struct userdata
*u
= s
->userdata
;
567 pa_source_assert_ref(s
);
569 c
= pa_bytes_to_usec(u
->counter
, &s
->sample_spec
);
570 t
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
572 return t
> c
? t
- c
: 0;
576 static void update_description(struct userdata
*u
) {
578 char un
[128], hn
[128];
583 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
586 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
589 pa_sink_set_description(u
->sink
, d
);
591 pa_source_set_description(u
->source
, d
);
596 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
597 pa_get_user_name(un
, sizeof(un
)),
598 pa_get_host_name(hn
, sizeof(hn
)));
600 t
= pa_tagstruct_new(NULL
, 0);
602 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
604 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
606 pa_tagstruct_putu32(t
, u
->ctag
++);
607 pa_tagstruct_putu32(t
, u
->channel
);
608 pa_tagstruct_puts(t
, d
);
609 pa_pstream_send_tagstruct(u
->pstream
, t
);
614 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
615 struct userdata
*u
= userdata
;
617 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
623 if (command
!= PA_COMMAND_REPLY
) {
624 if (command
== PA_COMMAND_ERROR
)
625 pa_log("Failed to get info.");
627 pa_log("Protocol error 6.");
631 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
632 pa_tagstruct_gets(t
, &server_version
) < 0 ||
633 pa_tagstruct_gets(t
, &user_name
) < 0 ||
634 pa_tagstruct_gets(t
, &host_name
) < 0 ||
635 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
636 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
637 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
638 pa_tagstruct_getu32(t
, &cookie
) < 0) {
639 pa_log("Invalid reply. (get_server_info)");
643 pa_xfree(u
->server_fqdn
);
644 u
->server_fqdn
= pa_xstrdup(host_name
);
646 pa_xfree(u
->user_name
);
647 u
->user_name
= pa_xstrdup(user_name
);
649 update_description(u
);
654 pa_module_unload_request(u
->module
);
659 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
660 struct userdata
*u
= userdata
;
661 uint32_t idx
, owner_module
, monitor_source
, flags
;
662 const char *name
, *description
, *monitor_source_name
, *driver
;
672 if (command
!= PA_COMMAND_REPLY
) {
673 if (command
== PA_COMMAND_ERROR
)
674 pa_log("Failed to get info.");
676 pa_log("Protocol error 5.");
680 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
681 pa_tagstruct_gets(t
, &name
) < 0 ||
682 pa_tagstruct_gets(t
, &description
) < 0 ||
683 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
684 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
685 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
686 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
687 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
688 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
689 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
690 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
691 pa_tagstruct_gets(t
, &driver
) < 0 ||
692 pa_tagstruct_getu32(t
, &flags
) < 0) {
693 pa_log("Invalid reply. (get_sink_info)");
697 if (strcmp(name
, u
->sink_name
))
700 pa_xfree(u
->device_description
);
701 u
->device_description
= pa_xstrdup(description
);
703 update_description(u
);
708 pa_module_unload_request(u
->module
);
711 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
712 struct userdata
*u
= userdata
;
713 uint32_t idx
, owner_module
, client
, sink
;
714 pa_usec_t buffer_usec
, sink_usec
;
715 const char *name
, *driver
, *resample_method
;
717 pa_sample_spec sample_spec
;
718 pa_channel_map channel_map
;
724 if (command
!= PA_COMMAND_REPLY
) {
725 if (command
== PA_COMMAND_ERROR
)
726 pa_log("Failed to get info.");
728 pa_log("Protocol error 2.");
732 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
733 pa_tagstruct_gets(t
, &name
) < 0 ||
734 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
735 pa_tagstruct_getu32(t
, &client
) < 0 ||
736 pa_tagstruct_getu32(t
, &sink
) < 0 ||
737 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
738 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
739 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
740 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
741 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
742 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
743 pa_tagstruct_gets(t
, &driver
) < 0 ||
744 (u
->version
>= 11 && pa_tagstruct_get_boolean(t
, &mute
) < 0)) {
745 pa_log("Invalid reply. (get_info)");
749 if (idx
!= u
->device_index
)
754 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
755 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
758 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
760 if (u
->version
>= 11)
761 u
->sink
->muted
= !!mute
;
763 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
767 pa_module_unload_request(u
->module
);
772 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
773 struct userdata
*u
= userdata
;
774 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
775 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
785 if (command
!= PA_COMMAND_REPLY
) {
786 if (command
== PA_COMMAND_ERROR
)
787 pa_log("Failed to get info.");
789 pa_log("Protocol error 5.");
793 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
794 pa_tagstruct_gets(t
, &name
) < 0 ||
795 pa_tagstruct_gets(t
, &description
) < 0 ||
796 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
797 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
798 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
799 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
800 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
801 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
802 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
803 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
804 pa_tagstruct_gets(t
, &driver
) < 0 ||
805 pa_tagstruct_getu32(t
, &flags
) < 0) {
806 pa_log("Invalid reply. (get_source_info)");
810 if (strcmp(name
, u
->source_name
))
813 pa_xfree(u
->device_description
);
814 u
->device_description
= pa_xstrdup(description
);
816 update_description(u
);
821 pa_module_unload_request(u
->module
);
826 static void request_info(struct userdata
*u
) {
831 t
= pa_tagstruct_new(NULL
, 0);
832 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
833 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
834 pa_pstream_send_tagstruct(u
->pstream
, t
);
835 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
838 t
= pa_tagstruct_new(NULL
, 0);
839 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
840 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
841 pa_tagstruct_putu32(t
, u
->device_index
);
842 pa_pstream_send_tagstruct(u
->pstream
, t
);
843 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
845 t
= pa_tagstruct_new(NULL
, 0);
846 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
847 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
848 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
849 pa_tagstruct_puts(t
, u
->sink_name
);
850 pa_pstream_send_tagstruct(u
->pstream
, t
);
851 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
853 t
= pa_tagstruct_new(NULL
, 0);
854 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
855 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
856 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
857 pa_tagstruct_puts(t
, u
->source_name
);
858 pa_pstream_send_tagstruct(u
->pstream
, t
);
859 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
863 static void command_subscribe_event(pa_pdispatch
*pd
, PA_GCC_UNUSED
uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
864 struct userdata
*u
= userdata
;
865 pa_subscription_event_type_t e
;
871 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
873 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
874 pa_tagstruct_getu32(t
, &idx
) < 0) {
875 pa_log("Invalid protocol reply");
876 pa_module_unload_request(u
->module
);
880 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
882 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
883 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
885 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
893 static void start_subscribe(struct userdata
*u
) {
898 t
= pa_tagstruct_new(NULL
, 0);
899 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
900 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
901 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
903 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
905 PA_SUBSCRIPTION_MASK_SOURCE
909 pa_pstream_send_tagstruct(u
->pstream
, t
);
912 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, PA_GCC_UNUSED
uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
913 struct userdata
*u
= userdata
;
921 pa_assert(u
->pdispatch
== pd
);
923 if (command
!= PA_COMMAND_REPLY
) {
924 if (command
== PA_COMMAND_ERROR
)
925 pa_log("Failed to create stream.");
927 pa_log("Protocol error 3.");
931 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
932 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
934 || pa_tagstruct_getu32(t
, &bytes
) < 0
939 if (u
->version
>= 9) {
941 uint32_t maxlength
, tlength
, prebuf
, minreq
;
943 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
944 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
945 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
946 pa_tagstruct_getu32(t
, &minreq
) < 0)
949 uint32_t maxlength
, fragsize
;
951 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
952 pa_tagstruct_getu32(t
, &fragsize
) < 0)
960 pa_assert(!u
->time_event
);
961 pa_gettimeofday(&ntv
);
962 ntv
.tv_sec
+= LATENCY_INTERVAL
;
963 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
967 pa_log_debug("Stream created.");
970 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
976 pa_log("Invalid reply. (Create stream)");
979 pa_module_unload_request(u
->module
);
982 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
983 struct userdata
*u
= userdata
;
985 char name
[256], un
[128], hn
[128];
992 pa_assert(u
->pdispatch
== pd
);
994 if (command
!= PA_COMMAND_REPLY
||
995 pa_tagstruct_getu32(t
, &u
->version
) < 0) {
996 if (command
== PA_COMMAND_ERROR
)
997 pa_log("Failed to authenticate");
999 pa_log("Protocol error 4.");
1004 /* Minimum supported protocol version */
1005 if (u
->version
< 8) {
1006 pa_log("Incompatible protocol version");
1011 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1013 pa_get_user_name(un
, sizeof(un
)),
1014 pa_get_host_name(hn
, sizeof(hn
)));
1016 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1018 pa_get_user_name(un
, sizeof(un
)),
1019 pa_get_host_name(hn
, sizeof(hn
)));
1022 reply
= pa_tagstruct_new(NULL
, 0);
1023 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1024 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1025 pa_tagstruct_puts(reply
, "PulseAudio");
1026 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1027 /* We ignore the server's reply here */
1029 reply
= pa_tagstruct_new(NULL
, 0);
1032 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1033 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1034 pa_tagstruct_puts(reply
, name
);
1035 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1036 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1037 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1038 pa_tagstruct_puts(reply
, u
->sink_name
);
1039 pa_tagstruct_putu32(reply
, u
->maxlength
);
1040 pa_tagstruct_put_boolean(reply
, !PA_SINK_OPENED(pa_sink_get_state(u
->sink
)));
1041 pa_tagstruct_putu32(reply
, u
->tlength
);
1042 pa_tagstruct_putu32(reply
, u
->prebuf
);
1043 pa_tagstruct_putu32(reply
, u
->minreq
);
1044 pa_tagstruct_putu32(reply
, 0);
1045 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1046 pa_tagstruct_put_cvolume(reply
, &volume
);
1048 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1049 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1050 pa_tagstruct_puts(reply
, name
);
1051 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1052 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1053 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1054 pa_tagstruct_puts(reply
, u
->source_name
);
1055 pa_tagstruct_putu32(reply
, u
->maxlength
);
1056 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_OPENED(pa_source_get_state(u
->source
)));
1057 pa_tagstruct_putu32(reply
, u
->fragsize
);
1060 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1061 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1063 pa_log_debug("Connection authenticated, creating stream ...");
1068 pa_module_unload_request(u
->module
);
1071 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1072 struct userdata
*u
= userdata
;
1077 pa_log_warn("Stream died.");
1078 pa_module_unload_request(u
->module
);
1081 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1082 struct userdata
*u
= userdata
;
1088 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1089 pa_log("Invalid packet");
1090 pa_module_unload_request(u
->module
);
1096 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1097 struct userdata
*u
= userdata
;
1103 if (channel
!= u
->channel
) {
1104 pa_log("Recieved memory block on bad channel.");
1105 pa_module_unload_request(u
->module
);
1109 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1111 u
->counter
+= chunk
->length
;
1112 u
->counter_delta
+= chunk
->length
;
1117 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1118 struct userdata
*u
= userdata
;
1124 pa_assert(u
->client
== sc
);
1126 pa_socket_client_unref(u
->client
);
1130 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1131 pa_module_unload_request(u
->module
);
1135 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1136 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1138 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1139 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1141 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1144 t
= pa_tagstruct_new(NULL
, 0);
1145 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1146 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1147 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1148 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
1154 if (pa_iochannel_creds_supported(io
))
1155 pa_iochannel_creds_enable(io
);
1157 ucred
.uid
= getuid();
1158 ucred
.gid
= getgid();
1160 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1163 pa_pstream_send_tagstruct(u
->pstream
, t
);
1166 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1168 pa_log_debug("Connection established, authenticating ...");
1173 static int sink_get_volume(pa_sink
*sink
) {
1177 static int sink_set_volume(pa_sink
*sink
) {
1186 t
= pa_tagstruct_new(NULL
, 0);
1187 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1188 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1189 pa_tagstruct_putu32(t
, u
->device_index
);
1190 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
1191 pa_pstream_send_tagstruct(u
->pstream
, t
);
1196 static int sink_get_mute(pa_sink
*sink
) {
1200 static int sink_set_mute(pa_sink
*sink
) {
1209 if (u
->version
< 11)
1212 t
= pa_tagstruct_new(NULL
, 0);
1213 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1214 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1215 pa_tagstruct_putu32(t
, u
->device_index
);
1216 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1217 pa_pstream_send_tagstruct(u
->pstream
, t
);
1224 static int load_key(struct userdata
*u
, const char*fn
) {
1227 u
->auth_cookie_in_property
= FALSE
;
1229 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1230 pa_log_debug("Using already loaded auth cookie.");
1231 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1232 u
->auth_cookie_in_property
= 1;
1237 fn
= PA_NATIVE_COOKIE_FILE
;
1239 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1242 pa_log_debug("Loading cookie from disk.");
1244 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1245 u
->auth_cookie_in_property
= TRUE
;
1250 int pa__init(pa_module
*m
) {
1251 pa_modargs
*ma
= NULL
;
1252 struct userdata
*u
= NULL
;
1255 char *t
, *dn
= NULL
;
1259 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1260 pa_log("failed to parse module arguments");
1264 u
= pa_xnew0(struct userdata
, 1);
1269 u
->pdispatch
= NULL
;
1271 u
->server_name
= NULL
;
1273 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1275 u
->requested_bytes
= 0;
1277 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1280 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
);
1282 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1283 u
->auth_cookie_in_property
= FALSE
;
1284 u
->time_event
= NULL
;
1286 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
);
1287 u
->rtpoll
= pa_rtpoll_new();
1288 pa_rtpoll_item_new_asyncmsgq(u
->rtpoll
, PA_RTPOLL_EARLY
, u
->thread_mq
.inq
);
1290 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1293 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1294 pa_log("no server specified.");
1298 ss
= m
->core
->default_sample_spec
;
1299 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1300 pa_log("invalid sample format specification");
1304 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1305 pa_log("failed to connect to server '%s'", u
->server_name
);
1309 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1313 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1314 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1316 if (!(u
->sink
= pa_sink_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1317 pa_log("Failed to create sink.");
1321 u
->sink
->parent
.process_msg
= sink_process_msg
;
1322 u
->sink
->userdata
= u
;
1323 u
->sink
->set_state
= sink_set_state
;
1324 u
->sink
->get_latency
= sink_get_latency
;
1325 u
->sink
->get_volume
= sink_get_volume
;
1326 u
->sink
->get_mute
= sink_get_mute
;
1327 u
->sink
->set_volume
= sink_set_volume
;
1328 u
->sink
->set_mute
= sink_set_mute
;
1329 u
->sink
->flags
= PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
;
1331 pa_sink_set_module(u
->sink
, m
);
1332 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1333 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1334 pa_sink_set_description(u
->sink
, t
= pa_sprintf_malloc("%s%s%s", u
->sink_name
? u
->sink_name
: "", u
->sink_name
? " on " : "", u
->server_name
));
1339 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1340 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1342 if (!(u
->source
= pa_source_new(m
->core
, __FILE__
, dn
, 1, &ss
, &map
))) {
1343 pa_log("Failed to create source.");
1347 u
->source
->parent
.process_msg
= source_process_msg
;
1348 u
->source
->userdata
= u
;
1349 u
->source
->set_state
= source_set_state
;
1350 u
->source
->get_latency
= source_get_latency
;
1351 u
->source
->flags
= PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
;
1353 pa_source_set_module(u
->source
, m
);
1354 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1355 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1356 pa_source_set_description(u
->source
, t
= pa_sprintf_malloc("%s%s%s", u
->source_name
? u
->source_name
: "", u
->source_name
? " on " : "", u
->server_name
));
1362 u
->time_event
= NULL
;
1364 u
->maxlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MAXLENGTH_MSEC
, &ss
);
1366 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &ss
);
1367 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &ss
);
1368 u
->prebuf
= u
->tlength
;
1370 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &ss
);
1373 u
->counter
= u
->counter_delta
= 0;
1374 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1376 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1377 pa_log("Failed to create thread.");
1382 pa_sink_put(u
->sink
);
1384 pa_source_put(u
->source
);
1387 pa_modargs_free(ma
);
1395 pa_modargs_free(ma
);
1402 void pa__done(pa_module
*m
) {
1407 if (!(u
= m
->userdata
))
1412 pa_sink_unlink(u
->sink
);
1415 pa_source_unlink(u
->source
);
1419 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1420 pa_thread_free(u
->thread
);
1423 pa_thread_mq_done(&u
->thread_mq
);
1427 pa_sink_unref(u
->sink
);
1430 pa_source_unref(u
->source
);
1434 pa_rtpoll_free(u
->rtpoll
);
1437 pa_pstream_unlink(u
->pstream
);
1438 pa_pstream_unref(u
->pstream
);
1442 pa_pdispatch_unref(u
->pdispatch
);
1445 pa_socket_client_unref(u
->client
);
1447 if (u
->auth_cookie_in_property
)
1448 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1451 pa_smoother_free(u
->smoother
);
1454 u
->core
->mainloop
->time_free(u
->time_event
);
1457 pa_xfree(u
->sink_name
);
1459 pa_xfree(u
->source_name
);
1461 pa_xfree(u
->server_name
);
1463 pa_xfree(u
->device_description
);
1464 pa_xfree(u
->server_fqdn
);
1465 pa_xfree(u
->user_name
);