2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 "sink=<remote sink name> "
70 "format=<sample format> "
71 "channels=<number of channels> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source=<remote source name> "
81 "format=<sample format> "
82 "channels=<number of channels> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION
);
90 PA_MODULE_LOAD_ONCE(FALSE
);
92 static const char* const valid_modargs
[] = {
109 #define DEFAULT_TIMEOUT 5
111 #define LATENCY_INTERVAL 10
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
119 SINK_MESSAGE_REMOTE_SUSPEND
,
120 SINK_MESSAGE_UPDATE_LATENCY
,
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
130 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
131 SOURCE_MESSAGE_REMOTE_SUSPEND
,
132 SOURCE_MESSAGE_UPDATE_LATENCY
135 #define DEFAULT_FRAGSIZE_MSEC 25
140 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
141 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
143 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
144 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
151 [PA_COMMAND_REQUEST
] = command_request
,
152 [PA_COMMAND_STARTED
] = command_started
,
154 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
155 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
156 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
157 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
158 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
159 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
160 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
161 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
162 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
169 pa_thread_mq thread_mq
;
173 pa_socket_client
*client
;
175 pa_pdispatch
*pdispatch
;
181 size_t requested_bytes
;
187 pa_auth_cookie
*auth_cookie
;
191 uint32_t device_index
;
194 int64_t counter
, counter_delta
;
196 pa_bool_t remote_corked
:1;
197 pa_bool_t remote_suspended
:1;
199 pa_usec_t transport_usec
;
200 pa_bool_t transport_usec_valid
;
202 uint32_t ignore_latency_before
;
204 pa_time_event
*time_event
;
206 pa_smoother
*smoother
;
208 char *device_description
;
222 static void request_latency(struct userdata
*u
);
224 /* Called from main context */
225 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
226 struct userdata
*u
= userdata
;
231 pa_assert(u
->pdispatch
== pd
);
233 pa_log_warn("Stream killed");
234 pa_module_unload_request(u
->module
, TRUE
);
237 /* Called from main context */
238 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
239 struct userdata
*u
= userdata
;
244 pa_assert(u
->pdispatch
== pd
);
246 pa_log_info("Server signalled buffer overrun/underrun.");
250 /* Called from main context */
251 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
252 struct userdata
*u
= userdata
;
259 pa_assert(u
->pdispatch
== pd
);
261 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
262 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
263 !pa_tagstruct_eof(t
)) {
264 pa_log("Invalid packet");
265 pa_module_unload_request(u
->module
, TRUE
);
270 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
272 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
278 /* Called from main context */
279 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
280 struct userdata
*u
= userdata
;
285 pa_assert(u
->pdispatch
== pd
);
287 pa_log_debug("Server reports a stream move.");
293 /* Called from main context */
294 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
295 struct userdata
*u
= userdata
;
300 pa_assert(u
->pdispatch
== pd
);
302 pa_log_debug("Server reports playback started.");
308 /* Called from IO thread context */
309 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
313 if (u
->remote_corked
== cork
)
316 u
->remote_corked
= cork
;
317 x
= pa_rtclock_usec();
319 /* Correct by the time this needs to travel to the other side.
320 * This is a valid thread-safe access, because the main thread is
322 if (u
->transport_usec_valid
)
323 x
+= u
->transport_usec
;
325 if (u
->remote_suspended
|| u
->remote_corked
)
326 pa_smoother_pause(u
->smoother
, x
);
328 pa_smoother_resume(u
->smoother
, x
);
331 /* Called from main context */
332 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
339 t
= pa_tagstruct_new(NULL
, 0);
341 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
343 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
345 pa_tagstruct_putu32(t
, u
->ctag
++);
346 pa_tagstruct_putu32(t
, u
->channel
);
347 pa_tagstruct_put_boolean(t
, !!cork
);
348 pa_pstream_send_tagstruct(u
->pstream
, t
);
353 /* Called from IO thread context */
354 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
358 if (u
->remote_suspended
== suspend
)
361 u
->remote_suspended
= suspend
;
363 x
= pa_rtclock_usec();
365 /* Correct by the time this needed to travel from the other side.
366 * This is a valid thread-safe access, because the main thread is
368 if (u
->transport_usec_valid
)
369 x
-= u
->transport_usec
;
371 if (u
->remote_suspended
|| u
->remote_corked
)
372 pa_smoother_pause(u
->smoother
, x
);
374 pa_smoother_resume(u
->smoother
, x
);
379 /* Called from IO thread context */
380 static void send_data(struct userdata
*u
) {
383 while (u
->requested_bytes
> 0) {
384 pa_memchunk memchunk
;
386 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
387 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
388 pa_memblock_unref(memchunk
.memblock
);
390 u
->requested_bytes
-= memchunk
.length
;
392 u
->counter
+= (int64_t) memchunk
.length
;
396 /* This function is called from IO context -- except when it is not. */
397 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
398 struct userdata
*u
= PA_SINK(o
)->userdata
;
402 case PA_SINK_MESSAGE_SET_STATE
: {
405 /* First, change the state, because otherwide pa_sink_render() would fail */
406 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
408 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
410 if (PA_SINK_IS_OPENED(u
->sink
->state
))
417 case PA_SINK_MESSAGE_GET_LATENCY
: {
418 pa_usec_t yl
, yr
, *usec
= data
;
420 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
421 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
423 *usec
= yl
> yr
? yl
- yr
: 0;
427 case SINK_MESSAGE_REQUEST
:
429 pa_assert(offset
> 0);
430 u
->requested_bytes
+= (size_t) offset
;
432 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
438 case SINK_MESSAGE_REMOTE_SUSPEND
:
440 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
444 case SINK_MESSAGE_UPDATE_LATENCY
: {
447 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
449 if (y
> (pa_usec_t
) offset
|| offset
< 0)
450 y
-= (pa_usec_t
) offset
;
454 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
459 case SINK_MESSAGE_POST
:
461 /* OK, This might be a bit confusing. This message is
462 * delivered to us from the main context -- NOT from the
463 * IO thread context where the rest of the messages are
464 * dispatched. Yeah, ugly, but I am a lazy bastard. */
466 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
468 u
->counter_delta
+= (int64_t) chunk
->length
;
473 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
476 /* Called from main context */
477 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
479 pa_sink_assert_ref(s
);
482 switch ((pa_sink_state_t
) state
) {
484 case PA_SINK_SUSPENDED
:
485 pa_assert(PA_SINK_IS_OPENED(s
->state
));
486 stream_cork(u
, TRUE
);
490 case PA_SINK_RUNNING
:
491 if (s
->state
== PA_SINK_SUSPENDED
)
492 stream_cork(u
, FALSE
);
495 case PA_SINK_UNLINKED
:
497 case PA_SINK_INVALID_STATE
:
506 /* This function is called from IO context -- except when it is not. */
507 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
508 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
512 case PA_SOURCE_MESSAGE_SET_STATE
: {
515 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
516 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
521 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
522 pa_usec_t yr
, yl
, *usec
= data
;
524 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
525 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
527 *usec
= yr
> yl
? yr
- yl
: 0;
531 case SOURCE_MESSAGE_POST
:
533 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
534 pa_source_post(u
->source
, chunk
);
536 u
->counter
+= (int64_t) chunk
->length
;
540 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
542 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
545 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
548 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
550 if (offset
>= 0 || y
> (pa_usec_t
) -offset
)
551 y
+= (pa_usec_t
) offset
;
555 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
561 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
564 /* Called from main context */
565 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
567 pa_source_assert_ref(s
);
570 switch ((pa_source_state_t
) state
) {
572 case PA_SOURCE_SUSPENDED
:
573 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
574 stream_cork(u
, TRUE
);
578 case PA_SOURCE_RUNNING
:
579 if (s
->state
== PA_SOURCE_SUSPENDED
)
580 stream_cork(u
, FALSE
);
583 case PA_SOURCE_UNLINKED
:
585 case PA_SINK_INVALID_STATE
:
594 static void thread_func(void *userdata
) {
595 struct userdata
*u
= userdata
;
599 pa_log_debug("Thread starting up");
601 pa_thread_mq_install(&u
->thread_mq
);
602 pa_rtpoll_install(u
->rtpoll
);
608 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
609 if (u
->sink
->thread_info
.rewind_requested
)
610 pa_sink_process_rewind(u
->sink
, 0);
613 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
621 /* If this was no regular exit from the loop we have to continue
622 * processing messages until we received PA_MESSAGE_SHUTDOWN */
623 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
624 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
627 pa_log_debug("Thread shutting down");
631 /* Called from main context */
632 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
633 struct userdata
*u
= userdata
;
634 uint32_t bytes
, channel
;
637 pa_assert(command
== PA_COMMAND_REQUEST
);
640 pa_assert(u
->pdispatch
== pd
);
642 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
643 pa_tagstruct_getu32(t
, &bytes
) < 0) {
644 pa_log("Invalid protocol reply");
648 if (channel
!= u
->channel
) {
649 pa_log("Recieved data for invalid channel");
653 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
657 pa_module_unload_request(u
->module
, TRUE
);
662 /* Called from main context */
663 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
664 struct userdata
*u
= userdata
;
665 pa_usec_t sink_usec
, source_usec
, transport_usec
= 0;
667 int64_t write_index
, read_index
;
668 struct timeval local
, remote
, now
;
675 if (command
!= PA_COMMAND_REPLY
) {
676 if (command
== PA_COMMAND_ERROR
)
677 pa_log("Failed to get latency.");
679 pa_log("Protocol error.");
683 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
684 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
685 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
686 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
687 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
688 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
689 pa_tagstruct_gets64(t
, &read_index
) < 0) {
690 pa_log("Invalid reply.");
695 if (u
->version
>= 13) {
696 uint64_t underrun_for
= 0, playing_for
= 0;
698 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
699 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
700 pa_log("Invalid reply.");
706 if (!pa_tagstruct_eof(t
)) {
707 pa_log("Invalid reply.");
711 if (tag
< u
->ignore_latency_before
) {
716 pa_gettimeofday(&now
);
718 /* Calculate transport usec */
719 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
720 /* local and remote seem to have synchronized clocks */
722 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
724 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
727 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
728 u
->transport_usec_valid
= TRUE
;
730 /* First, take the device's delay */
732 delay
= (int64_t) sink_usec
;
733 ss
= &u
->sink
->sample_spec
;
735 delay
= (int64_t) source_usec
;
736 ss
= &u
->source
->sample_spec
;
739 /* Add the length of our server-side buffer */
740 if (write_index
>= read_index
)
741 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
743 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
745 /* Our measurements are already out of date, hence correct by the *
746 * transport latency */
748 delay
-= (int64_t) transport_usec
;
750 delay
+= (int64_t) transport_usec
;
753 /* Now correct by what we have have read/written since we requested the update */
755 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
757 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
761 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
763 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
770 pa_module_unload_request(u
->module
, TRUE
);
773 /* Called from main context */
774 static void request_latency(struct userdata
*u
) {
780 t
= pa_tagstruct_new(NULL
, 0);
782 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
784 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
786 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
787 pa_tagstruct_putu32(t
, u
->channel
);
789 pa_gettimeofday(&now
);
790 pa_tagstruct_put_timeval(t
, &now
);
792 pa_pstream_send_tagstruct(u
->pstream
, t
);
793 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
795 u
->ignore_latency_before
= tag
;
796 u
->counter_delta
= 0;
799 /* Called from main context */
800 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
801 struct userdata
*u
= userdata
;
810 pa_gettimeofday(&ntv
);
811 ntv
.tv_sec
+= LATENCY_INTERVAL
;
812 m
->time_restart(e
, &ntv
);
815 /* Called from main context */
816 static void update_description(struct userdata
*u
) {
818 char un
[128], hn
[128];
823 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
826 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
829 pa_sink_set_description(u
->sink
, d
);
830 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
831 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
832 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
834 pa_source_set_description(u
->source
, d
);
835 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
836 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
837 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
842 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
843 pa_get_user_name(un
, sizeof(un
)),
844 pa_get_host_name(hn
, sizeof(hn
)));
846 t
= pa_tagstruct_new(NULL
, 0);
848 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
850 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
852 pa_tagstruct_putu32(t
, u
->ctag
++);
853 pa_tagstruct_putu32(t
, u
->channel
);
854 pa_tagstruct_puts(t
, d
);
855 pa_pstream_send_tagstruct(u
->pstream
, t
);
860 /* Called from main context */
861 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
862 struct userdata
*u
= userdata
;
864 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
870 if (command
!= PA_COMMAND_REPLY
) {
871 if (command
== PA_COMMAND_ERROR
)
872 pa_log("Failed to get info.");
874 pa_log("Protocol error.");
878 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
879 pa_tagstruct_gets(t
, &server_version
) < 0 ||
880 pa_tagstruct_gets(t
, &user_name
) < 0 ||
881 pa_tagstruct_gets(t
, &host_name
) < 0 ||
882 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
883 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
884 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
885 pa_tagstruct_getu32(t
, &cookie
) < 0) {
887 pa_log("Parse failure");
891 if (!pa_tagstruct_eof(t
)) {
892 pa_log("Packet too long");
896 pa_xfree(u
->server_fqdn
);
897 u
->server_fqdn
= pa_xstrdup(host_name
);
899 pa_xfree(u
->user_name
);
900 u
->user_name
= pa_xstrdup(user_name
);
902 update_description(u
);
907 pa_module_unload_request(u
->module
, TRUE
);
912 /* Called from main context */
913 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
914 struct userdata
*u
= userdata
;
915 uint32_t idx
, owner_module
, monitor_source
, flags
;
916 const char *name
, *description
, *monitor_source_name
, *driver
;
927 pl
= pa_proplist_new();
929 if (command
!= PA_COMMAND_REPLY
) {
930 if (command
== PA_COMMAND_ERROR
)
931 pa_log("Failed to get info.");
933 pa_log("Protocol error.");
937 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
938 pa_tagstruct_gets(t
, &name
) < 0 ||
939 pa_tagstruct_gets(t
, &description
) < 0 ||
940 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
941 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
942 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
943 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
944 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
945 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
946 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
947 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
948 pa_tagstruct_gets(t
, &driver
) < 0 ||
949 pa_tagstruct_getu32(t
, &flags
) < 0) {
951 pa_log("Parse failure");
955 if (u
->version
>= 13) {
956 pa_usec_t configured_latency
;
958 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
959 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
961 pa_log("Parse failure");
966 if (!pa_tagstruct_eof(t
)) {
967 pa_log("Packet too long");
971 pa_proplist_free(pl
);
973 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
976 pa_xfree(u
->device_description
);
977 u
->device_description
= pa_xstrdup(description
);
979 update_description(u
);
984 pa_module_unload_request(u
->module
, TRUE
);
985 pa_proplist_free(pl
);
988 /* Called from main context */
989 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
990 struct userdata
*u
= userdata
;
991 uint32_t idx
, owner_module
, client
, sink
;
992 pa_usec_t buffer_usec
, sink_usec
;
993 const char *name
, *driver
, *resample_method
;
995 pa_sample_spec sample_spec
;
996 pa_channel_map channel_map
;
1003 pl
= pa_proplist_new();
1005 if (command
!= PA_COMMAND_REPLY
) {
1006 if (command
== PA_COMMAND_ERROR
)
1007 pa_log("Failed to get info.");
1009 pa_log("Protocol error.");
1013 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1014 pa_tagstruct_gets(t
, &name
) < 0 ||
1015 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1016 pa_tagstruct_getu32(t
, &client
) < 0 ||
1017 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1018 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1019 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1020 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1021 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1022 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1023 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1024 pa_tagstruct_gets(t
, &driver
) < 0) {
1026 pa_log("Parse failure");
1030 if (u
->version
>= 11) {
1031 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1033 pa_log("Parse failure");
1038 if (u
->version
>= 13) {
1039 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1041 pa_log("Parse failure");
1046 if (!pa_tagstruct_eof(t
)) {
1047 pa_log("Packet too long");
1051 pa_proplist_free(pl
);
1053 if (idx
!= u
->device_index
)
1058 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1059 pa_cvolume_equal(&volume
, &u
->sink
->virtual_volume
))
1062 memcpy(&u
->sink
->virtual_volume
, &volume
, sizeof(pa_cvolume
));
1064 if (u
->version
>= 11)
1065 u
->sink
->muted
= !!mute
;
1067 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
1071 pa_module_unload_request(u
->module
, TRUE
);
1072 pa_proplist_free(pl
);
1077 /* Called from main context */
1078 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1079 struct userdata
*u
= userdata
;
1080 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1081 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1086 pa_usec_t latency
, configured_latency
;
1092 pl
= pa_proplist_new();
1094 if (command
!= PA_COMMAND_REPLY
) {
1095 if (command
== PA_COMMAND_ERROR
)
1096 pa_log("Failed to get info.");
1098 pa_log("Protocol error.");
1102 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1103 pa_tagstruct_gets(t
, &name
) < 0 ||
1104 pa_tagstruct_gets(t
, &description
) < 0 ||
1105 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1106 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1107 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1108 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1109 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1110 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1111 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1112 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1113 pa_tagstruct_gets(t
, &driver
) < 0 ||
1114 pa_tagstruct_getu32(t
, &flags
) < 0) {
1116 pa_log("Parse failure");
1120 if (u
->version
>= 13) {
1121 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1122 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1124 pa_log("Parse failure");
1129 if (!pa_tagstruct_eof(t
)) {
1130 pa_log("Packet too long");
1134 pa_proplist_free(pl
);
1136 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1139 pa_xfree(u
->device_description
);
1140 u
->device_description
= pa_xstrdup(description
);
1142 update_description(u
);
1147 pa_module_unload_request(u
->module
, TRUE
);
1148 pa_proplist_free(pl
);
1153 /* Called from main context */
1154 static void request_info(struct userdata
*u
) {
1159 t
= pa_tagstruct_new(NULL
, 0);
1160 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1161 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1162 pa_pstream_send_tagstruct(u
->pstream
, t
);
1163 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1166 t
= pa_tagstruct_new(NULL
, 0);
1167 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1168 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1169 pa_tagstruct_putu32(t
, u
->device_index
);
1170 pa_pstream_send_tagstruct(u
->pstream
, t
);
1171 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1174 t
= pa_tagstruct_new(NULL
, 0);
1175 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1176 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1177 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1178 pa_tagstruct_puts(t
, u
->sink_name
);
1179 pa_pstream_send_tagstruct(u
->pstream
, t
);
1180 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1183 if (u
->source_name
) {
1184 t
= pa_tagstruct_new(NULL
, 0);
1185 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1186 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1187 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1188 pa_tagstruct_puts(t
, u
->source_name
);
1189 pa_pstream_send_tagstruct(u
->pstream
, t
);
1190 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1195 /* Called from main context */
1196 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1197 struct userdata
*u
= userdata
;
1198 pa_subscription_event_type_t e
;
1204 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1206 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1207 pa_tagstruct_getu32(t
, &idx
) < 0) {
1208 pa_log("Invalid protocol reply");
1209 pa_module_unload_request(u
->module
, TRUE
);
1213 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1215 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1216 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1218 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1226 /* Called from main context */
1227 static void start_subscribe(struct userdata
*u
) {
1232 t
= pa_tagstruct_new(NULL
, 0);
1233 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1234 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1235 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1237 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1239 PA_SUBSCRIPTION_MASK_SOURCE
1243 pa_pstream_send_tagstruct(u
->pstream
, t
);
1246 /* Called from main context */
1247 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1248 struct userdata
*u
= userdata
;
1256 pa_assert(u
->pdispatch
== pd
);
1258 if (command
!= PA_COMMAND_REPLY
) {
1259 if (command
== PA_COMMAND_ERROR
)
1260 pa_log("Failed to create stream.");
1262 pa_log("Protocol error.");
1266 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1267 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1269 || pa_tagstruct_getu32(t
, &bytes
) < 0
1274 if (u
->version
>= 9) {
1276 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1277 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1278 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1279 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1282 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1283 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1288 if (u
->version
>= 12) {
1291 uint32_t device_index
;
1293 pa_bool_t suspended
;
1295 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1296 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1297 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1298 pa_tagstruct_gets(t
, &dn
) < 0 ||
1299 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1303 pa_xfree(u
->sink_name
);
1304 u
->sink_name
= pa_xstrdup(dn
);
1306 pa_xfree(u
->source_name
);
1307 u
->source_name
= pa_xstrdup(dn
);
1311 if (u
->version
>= 13) {
1314 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1318 pa_sink_set_latency_range(u
->sink
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1320 pa_source_set_latency_range(u
->source
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1324 if (!pa_tagstruct_eof(t
))
1330 pa_assert(!u
->time_event
);
1331 pa_gettimeofday(&ntv
);
1332 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1333 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1337 pa_log_debug("Stream created.");
1340 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1346 pa_log("Invalid reply. (Create stream)");
1349 pa_module_unload_request(u
->module
, TRUE
);
1353 /* Called from main context */
1354 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1355 struct userdata
*u
= userdata
;
1356 pa_tagstruct
*reply
;
1357 char name
[256], un
[128], hn
[128];
1364 pa_assert(u
->pdispatch
== pd
);
1366 if (command
!= PA_COMMAND_REPLY
||
1367 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1368 !pa_tagstruct_eof(t
)) {
1370 if (command
== PA_COMMAND_ERROR
)
1371 pa_log("Failed to authenticate");
1373 pa_log("Protocol error.");
1378 /* Minimum supported protocol version */
1379 if (u
->version
< 8) {
1380 pa_log("Incompatible protocol version");
1384 /* Starting with protocol version 13 the MSB of the version tag
1385 reflects if shm is enabled for this connection or not. We don't
1386 support SHM here at all, so we just ignore this. */
1388 if (u
->version
>= 13)
1389 u
->version
&= 0x7FFFFFFFU
;
1391 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1394 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1396 pa_get_user_name(un
, sizeof(un
)),
1397 pa_get_host_name(hn
, sizeof(hn
)));
1399 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1401 pa_get_user_name(un
, sizeof(un
)),
1402 pa_get_host_name(hn
, sizeof(hn
)));
1405 reply
= pa_tagstruct_new(NULL
, 0);
1406 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1407 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1409 if (u
->version
>= 13) {
1411 pl
= pa_proplist_new();
1412 pa_init_proplist(pl
);
1413 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1414 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1415 pa_tagstruct_put_proplist(reply
, pl
);
1416 pa_proplist_free(pl
);
1418 pa_tagstruct_puts(reply
, "PulseAudio");
1420 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1421 /* We ignore the server's reply here */
1423 reply
= pa_tagstruct_new(NULL
, 0);
1425 if (u
->version
< 13)
1426 /* Only for older PA versions we need to fill in the maxlength */
1427 u
->maxlength
= 4*1024*1024;
1430 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1431 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1432 u
->prebuf
= u
->tlength
;
1434 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1438 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1439 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1441 if (u
->version
< 13)
1442 pa_tagstruct_puts(reply
, name
);
1444 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1445 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1446 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1447 pa_tagstruct_puts(reply
, u
->sink_name
);
1448 pa_tagstruct_putu32(reply
, u
->maxlength
);
1449 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1450 pa_tagstruct_putu32(reply
, u
->tlength
);
1451 pa_tagstruct_putu32(reply
, u
->prebuf
);
1452 pa_tagstruct_putu32(reply
, u
->minreq
);
1453 pa_tagstruct_putu32(reply
, 0);
1454 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1455 pa_tagstruct_put_cvolume(reply
, &volume
);
1457 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1458 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1460 if (u
->version
< 13)
1461 pa_tagstruct_puts(reply
, name
);
1463 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1464 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1465 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1466 pa_tagstruct_puts(reply
, u
->source_name
);
1467 pa_tagstruct_putu32(reply
, u
->maxlength
);
1468 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1469 pa_tagstruct_putu32(reply
, u
->fragsize
);
1472 if (u
->version
>= 12) {
1473 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1474 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1475 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1476 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1477 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1478 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1479 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1482 if (u
->version
>= 13) {
1485 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1486 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1488 pl
= pa_proplist_new();
1489 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1490 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1491 pa_tagstruct_put_proplist(reply
, pl
);
1492 pa_proplist_free(pl
);
1495 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1499 if (u
->version
>= 14) {
1501 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1503 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1506 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1507 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1509 pa_log_debug("Connection authenticated, creating stream ...");
1514 pa_module_unload_request(u
->module
, TRUE
);
1517 /* Called from main context */
1518 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1519 struct userdata
*u
= userdata
;
1524 pa_log_warn("Stream died.");
1525 pa_module_unload_request(u
->module
, TRUE
);
1528 /* Called from main context */
1529 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1530 struct userdata
*u
= userdata
;
1536 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1537 pa_log("Invalid packet");
1538 pa_module_unload_request(u
->module
, TRUE
);
1544 /* Called from main context */
1545 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1546 struct userdata
*u
= userdata
;
1552 if (channel
!= u
->channel
) {
1553 pa_log("Recieved memory block on bad channel.");
1554 pa_module_unload_request(u
->module
, TRUE
);
1558 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1560 u
->counter_delta
+= (int64_t) chunk
->length
;
1565 /* Called from main context */
1566 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1567 struct userdata
*u
= userdata
;
1573 pa_assert(u
->client
== sc
);
1575 pa_socket_client_unref(u
->client
);
1579 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1580 pa_module_unload_request(u
->module
, TRUE
);
1584 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1585 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1587 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1588 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1590 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1593 t
= pa_tagstruct_new(NULL
, 0);
1594 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1595 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1596 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1598 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1604 if (pa_iochannel_creds_supported(io
))
1605 pa_iochannel_creds_enable(io
);
1607 ucred
.uid
= getuid();
1608 ucred
.gid
= getgid();
1610 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1613 pa_pstream_send_tagstruct(u
->pstream
, t
);
1616 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1618 pa_log_debug("Connection established, authenticating ...");
1623 /* Called from main context */
1624 static void sink_set_volume(pa_sink
*sink
) {
1633 t
= pa_tagstruct_new(NULL
, 0);
1634 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1635 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1636 pa_tagstruct_putu32(t
, u
->device_index
);
1637 pa_tagstruct_put_cvolume(t
, &sink
->virtual_volume
);
1638 pa_pstream_send_tagstruct(u
->pstream
, t
);
1641 /* Called from main context */
1642 static void sink_set_mute(pa_sink
*sink
) {
1651 if (u
->version
< 11)
1654 t
= pa_tagstruct_new(NULL
, 0);
1655 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1656 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1657 pa_tagstruct_putu32(t
, u
->device_index
);
1658 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1659 pa_pstream_send_tagstruct(u
->pstream
, t
);
1664 int pa__init(pa_module
*m
) {
1665 pa_modargs
*ma
= NULL
;
1666 struct userdata
*u
= NULL
;
1671 pa_sink_new_data data
;
1673 pa_source_new_data data
;
1678 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1679 pa_log("Failed to parse module arguments");
1683 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1687 u
->pdispatch
= NULL
;
1689 u
->server_name
= NULL
;
1691 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1693 u
->requested_bytes
= 0;
1695 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1698 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
1700 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1701 u
->time_event
= NULL
;
1702 u
->ignore_latency_before
= 0;
1703 u
->transport_usec
= 0;
1704 u
->transport_usec_valid
= FALSE
;
1705 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1706 u
->counter
= u
->counter_delta
= 0;
1708 u
->rtpoll
= pa_rtpoll_new();
1709 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1711 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1714 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1715 pa_log("No server specified.");
1719 ss
= m
->core
->default_sample_spec
;
1720 map
= m
->core
->default_channel_map
;
1721 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1722 pa_log("Invalid sample format specification");
1726 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1727 pa_log("Failed to connect to server '%s'", u
->server_name
);
1731 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1735 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1736 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1738 pa_sink_new_data_init(&data
);
1739 data
.driver
= __FILE__
;
1741 data
.namereg_fail
= TRUE
;
1742 pa_sink_new_data_set_name(&data
, dn
);
1743 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1744 pa_sink_new_data_set_channel_map(&data
, &map
);
1745 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1746 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1748 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1750 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
);
1751 pa_sink_new_data_done(&data
);
1754 pa_log("Failed to create sink.");
1758 u
->sink
->parent
.process_msg
= sink_process_msg
;
1759 u
->sink
->userdata
= u
;
1760 u
->sink
->set_state
= sink_set_state
;
1761 u
->sink
->set_volume
= sink_set_volume
;
1762 u
->sink
->set_mute
= sink_set_mute
;
1764 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1766 pa_sink_set_latency_range(u
->sink
, MIN_NETWORK_LATENCY_USEC
, 0);
1768 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1769 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1773 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1774 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1776 pa_source_new_data_init(&data
);
1777 data
.driver
= __FILE__
;
1779 data
.namereg_fail
= TRUE
;
1780 pa_source_new_data_set_name(&data
, dn
);
1781 pa_source_new_data_set_sample_spec(&data
, &ss
);
1782 pa_source_new_data_set_channel_map(&data
, &map
);
1783 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1784 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1786 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1788 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1789 pa_source_new_data_done(&data
);
1792 pa_log("Failed to create source.");
1796 u
->source
->parent
.process_msg
= source_process_msg
;
1797 u
->source
->set_state
= source_set_state
;
1798 u
->source
->userdata
= u
;
1800 pa_source_set_latency_range(u
->source
, MIN_NETWORK_LATENCY_USEC
, 0);
1802 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1803 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1808 u
->time_event
= NULL
;
1812 u
->tlength
= u
->minreq
= u
->prebuf
= 0;
1817 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1819 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1820 pa_log("Failed to create thread.");
1825 pa_sink_put(u
->sink
);
1827 pa_source_put(u
->source
);
1830 pa_modargs_free(ma
);
1838 pa_modargs_free(ma
);
1845 void pa__done(pa_module
*m
) {
1850 if (!(u
= m
->userdata
))
1855 pa_sink_unlink(u
->sink
);
1858 pa_source_unlink(u
->source
);
1862 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1863 pa_thread_free(u
->thread
);
1866 pa_thread_mq_done(&u
->thread_mq
);
1870 pa_sink_unref(u
->sink
);
1873 pa_source_unref(u
->source
);
1877 pa_rtpoll_free(u
->rtpoll
);
1880 pa_pstream_unlink(u
->pstream
);
1881 pa_pstream_unref(u
->pstream
);
1885 pa_pdispatch_unref(u
->pdispatch
);
1888 pa_socket_client_unref(u
->client
);
1891 pa_auth_cookie_unref(u
->auth_cookie
);
1894 pa_smoother_free(u
->smoother
);
1897 u
->core
->mainloop
->time_free(u
->time_event
);
1900 pa_xfree(u
->sink_name
);
1902 pa_xfree(u
->source_name
);
1904 pa_xfree(u
->server_name
);
1906 pa_xfree(u
->device_description
);
1907 pa_xfree(u
->server_fqdn
);
1908 pa_xfree(u
->user_name
);