2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 "sink=<remote sink name> "
70 "format=<sample format> "
71 "channels=<number of channels> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source=<remote source name> "
81 "format=<sample format> "
82 "channels=<number of channels> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION
);
90 PA_MODULE_LOAD_ONCE(FALSE
);
92 static const char* const valid_modargs
[] = {
109 #define DEFAULT_TIMEOUT 5
111 #define LATENCY_INTERVAL 10
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
119 SINK_MESSAGE_REMOTE_SUSPEND
,
120 SINK_MESSAGE_UPDATE_LATENCY
,
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
130 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
131 SOURCE_MESSAGE_REMOTE_SUSPEND
,
132 SOURCE_MESSAGE_UPDATE_LATENCY
135 #define DEFAULT_FRAGSIZE_MSEC 25
140 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
141 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
143 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
144 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
152 [PA_COMMAND_REQUEST
] = command_request
,
153 [PA_COMMAND_STARTED
] = command_started
,
155 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
156 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
157 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
158 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
159 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
160 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
161 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
162 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
163 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
164 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
165 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
166 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
173 pa_thread_mq thread_mq
;
177 pa_socket_client
*client
;
179 pa_pdispatch
*pdispatch
;
185 size_t requested_bytes
;
191 pa_auth_cookie
*auth_cookie
;
195 uint32_t device_index
;
198 int64_t counter
, counter_delta
;
200 pa_bool_t remote_corked
:1;
201 pa_bool_t remote_suspended
:1;
203 pa_usec_t transport_usec
; /* maintained in the main thread */
204 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
206 uint32_t ignore_latency_before
;
208 pa_time_event
*time_event
;
210 pa_smoother
*smoother
;
212 char *device_description
;
226 static void request_latency(struct userdata
*u
);
228 /* Called from main context */
229 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
230 pa_log_debug("Got stream or client event.");
233 /* Called from main context */
234 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
235 struct userdata
*u
= userdata
;
240 pa_assert(u
->pdispatch
== pd
);
242 pa_log_warn("Stream killed");
243 pa_module_unload_request(u
->module
, TRUE
);
246 /* Called from main context */
247 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
248 struct userdata
*u
= userdata
;
253 pa_assert(u
->pdispatch
== pd
);
255 pa_log_info("Server signalled buffer overrun/underrun.");
259 /* Called from main context */
260 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
261 struct userdata
*u
= userdata
;
268 pa_assert(u
->pdispatch
== pd
);
270 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
271 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
272 !pa_tagstruct_eof(t
)) {
274 pa_log("Invalid packet.");
275 pa_module_unload_request(u
->module
, TRUE
);
279 pa_log_debug("Server reports device suspend.");
282 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
284 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
290 /* Called from main context */
291 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
292 struct userdata
*u
= userdata
;
293 uint32_t channel
, di
;
300 pa_assert(u
->pdispatch
== pd
);
302 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
303 pa_tagstruct_getu32(t
, &di
) < 0 ||
304 pa_tagstruct_gets(t
, &dn
) < 0 ||
305 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
307 pa_log_error("Invalid packet.");
308 pa_module_unload_request(u
->module
, TRUE
);
312 pa_log_debug("Server reports a stream move.");
315 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
317 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
325 /* Called from main context */
326 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
327 struct userdata
*u
= userdata
;
332 pa_assert(u
->pdispatch
== pd
);
334 pa_log_debug("Server reports playback started.");
340 /* Called from IO thread context */
341 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
346 x
= pa_rtclock_usec();
348 /* Correct by the time the requested issued needs to travel to the
349 * other side. This is a valid thread-safe access, because the
350 * main thread is waiting for us */
353 x
-= u
->thread_transport_usec
;
355 x
+= u
->thread_transport_usec
;
357 if (u
->remote_suspended
|| u
->remote_corked
)
358 pa_smoother_pause(u
->smoother
, x
);
360 pa_smoother_resume(u
->smoother
, x
);
363 /* Called from IO thread context */
364 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
367 if (u
->remote_corked
== cork
)
370 u
->remote_corked
= cork
;
371 check_smoother_status(u
, FALSE
);
374 /* Called from main context */
375 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
382 t
= pa_tagstruct_new(NULL
, 0);
384 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
386 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
388 pa_tagstruct_putu32(t
, u
->ctag
++);
389 pa_tagstruct_putu32(t
, u
->channel
);
390 pa_tagstruct_put_boolean(t
, !!cork
);
391 pa_pstream_send_tagstruct(u
->pstream
, t
);
396 /* Called from IO thread context */
397 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
400 if (u
->remote_suspended
== suspend
)
403 u
->remote_suspended
= suspend
;
404 check_smoother_status(u
, TRUE
);
409 /* Called from IO thread context */
410 static void send_data(struct userdata
*u
) {
413 while (u
->requested_bytes
> 0) {
414 pa_memchunk memchunk
;
416 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
417 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
418 pa_memblock_unref(memchunk
.memblock
);
420 u
->requested_bytes
-= memchunk
.length
;
422 u
->counter
+= (int64_t) memchunk
.length
;
426 /* This function is called from IO context -- except when it is not. */
427 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
428 struct userdata
*u
= PA_SINK(o
)->userdata
;
432 case PA_SINK_MESSAGE_SET_STATE
: {
435 /* First, change the state, because otherwide pa_sink_render() would fail */
436 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
438 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
440 if (PA_SINK_IS_OPENED(u
->sink
->state
))
447 case PA_SINK_MESSAGE_GET_LATENCY
: {
448 pa_usec_t yl
, yr
, *usec
= data
;
450 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
451 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
453 *usec
= yl
> yr
? yl
- yr
: 0;
457 case SINK_MESSAGE_REQUEST
:
459 pa_assert(offset
> 0);
460 u
->requested_bytes
+= (size_t) offset
;
462 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
468 case SINK_MESSAGE_REMOTE_SUSPEND
:
470 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
474 case SINK_MESSAGE_UPDATE_LATENCY
: {
477 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
479 if (y
> (pa_usec_t
) offset
)
480 y
-= (pa_usec_t
) offset
;
484 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
486 /* We can access this freely here, since the main thread is waiting for us */
487 u
->thread_transport_usec
= u
->transport_usec
;
492 case SINK_MESSAGE_POST
:
494 /* OK, This might be a bit confusing. This message is
495 * delivered to us from the main context -- NOT from the
496 * IO thread context where the rest of the messages are
497 * dispatched. Yeah, ugly, but I am a lazy bastard. */
499 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
501 u
->counter_delta
+= (int64_t) chunk
->length
;
506 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
509 /* Called from main context */
510 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
512 pa_sink_assert_ref(s
);
515 switch ((pa_sink_state_t
) state
) {
517 case PA_SINK_SUSPENDED
:
518 pa_assert(PA_SINK_IS_OPENED(s
->state
));
519 stream_cork(u
, TRUE
);
523 case PA_SINK_RUNNING
:
524 if (s
->state
== PA_SINK_SUSPENDED
)
525 stream_cork(u
, FALSE
);
528 case PA_SINK_UNLINKED
:
530 case PA_SINK_INVALID_STATE
:
539 /* This function is called from IO context -- except when it is not. */
540 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
541 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
545 case PA_SOURCE_MESSAGE_SET_STATE
: {
548 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
549 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
554 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
555 pa_usec_t yr
, yl
, *usec
= data
;
557 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
558 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
560 *usec
= yr
> yl
? yr
- yl
: 0;
564 case SOURCE_MESSAGE_POST
:
566 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
567 pa_source_post(u
->source
, chunk
);
569 u
->counter
+= (int64_t) chunk
->length
;
573 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
575 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
578 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
581 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
582 y
+= (pa_usec_t
) offset
;
584 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
586 /* We can access this freely here, since the main thread is waiting for us */
587 u
->thread_transport_usec
= u
->transport_usec
;
593 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
596 /* Called from main context */
597 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
599 pa_source_assert_ref(s
);
602 switch ((pa_source_state_t
) state
) {
604 case PA_SOURCE_SUSPENDED
:
605 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
606 stream_cork(u
, TRUE
);
610 case PA_SOURCE_RUNNING
:
611 if (s
->state
== PA_SOURCE_SUSPENDED
)
612 stream_cork(u
, FALSE
);
615 case PA_SOURCE_UNLINKED
:
617 case PA_SINK_INVALID_STATE
:
626 static void thread_func(void *userdata
) {
627 struct userdata
*u
= userdata
;
631 pa_log_debug("Thread starting up");
633 pa_thread_mq_install(&u
->thread_mq
);
634 pa_rtpoll_install(u
->rtpoll
);
640 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
641 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
642 pa_sink_process_rewind(u
->sink
, 0);
645 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
653 /* If this was no regular exit from the loop we have to continue
654 * processing messages until we received PA_MESSAGE_SHUTDOWN */
655 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
656 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
659 pa_log_debug("Thread shutting down");
663 /* Called from main context */
664 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
665 struct userdata
*u
= userdata
;
666 uint32_t bytes
, channel
;
669 pa_assert(command
== PA_COMMAND_REQUEST
);
672 pa_assert(u
->pdispatch
== pd
);
674 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
675 pa_tagstruct_getu32(t
, &bytes
) < 0) {
676 pa_log("Invalid protocol reply");
680 if (channel
!= u
->channel
) {
681 pa_log("Recieved data for invalid channel");
685 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
689 pa_module_unload_request(u
->module
, TRUE
);
694 /* Called from main context */
695 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
696 struct userdata
*u
= userdata
;
697 pa_usec_t sink_usec
, source_usec
;
699 int64_t write_index
, read_index
;
700 struct timeval local
, remote
, now
;
707 if (command
!= PA_COMMAND_REPLY
) {
708 if (command
== PA_COMMAND_ERROR
)
709 pa_log("Failed to get latency.");
711 pa_log("Protocol error.");
715 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
716 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
717 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
718 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
719 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
720 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
721 pa_tagstruct_gets64(t
, &read_index
) < 0) {
722 pa_log("Invalid reply.");
727 if (u
->version
>= 13) {
728 uint64_t underrun_for
= 0, playing_for
= 0;
730 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
731 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
732 pa_log("Invalid reply.");
738 if (!pa_tagstruct_eof(t
)) {
739 pa_log("Invalid reply.");
743 if (tag
< u
->ignore_latency_before
) {
747 pa_gettimeofday(&now
);
749 /* Calculate transport usec */
750 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
751 /* local and remote seem to have synchronized clocks */
753 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
755 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
758 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
760 /* First, take the device's delay */
762 delay
= (int64_t) sink_usec
;
763 ss
= &u
->sink
->sample_spec
;
765 delay
= (int64_t) source_usec
;
766 ss
= &u
->source
->sample_spec
;
769 /* Add the length of our server-side buffer */
770 if (write_index
>= read_index
)
771 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
773 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
775 /* Our measurements are already out of date, hence correct by the *
776 * transport latency */
778 delay
-= (int64_t) u
->transport_usec
;
780 delay
+= (int64_t) u
->transport_usec
;
783 /* Now correct by what we have have read/written since we requested the update */
785 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
787 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
791 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
793 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
800 pa_module_unload_request(u
->module
, TRUE
);
803 /* Called from main context */
804 static void request_latency(struct userdata
*u
) {
810 t
= pa_tagstruct_new(NULL
, 0);
812 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
814 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
816 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
817 pa_tagstruct_putu32(t
, u
->channel
);
819 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
821 pa_pstream_send_tagstruct(u
->pstream
, t
);
822 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
824 u
->ignore_latency_before
= tag
;
825 u
->counter_delta
= 0;
828 /* Called from main context */
829 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
830 struct userdata
*u
= userdata
;
839 pa_gettimeofday(&ntv
);
840 ntv
.tv_sec
+= LATENCY_INTERVAL
;
841 m
->time_restart(e
, &ntv
);
844 /* Called from main context */
845 static void update_description(struct userdata
*u
) {
847 char un
[128], hn
[128];
852 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
855 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
858 pa_sink_set_description(u
->sink
, d
);
859 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
860 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
861 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
863 pa_source_set_description(u
->source
, d
);
864 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
865 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
866 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
871 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
872 pa_get_user_name(un
, sizeof(un
)),
873 pa_get_host_name(hn
, sizeof(hn
)));
875 t
= pa_tagstruct_new(NULL
, 0);
877 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
879 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
881 pa_tagstruct_putu32(t
, u
->ctag
++);
882 pa_tagstruct_putu32(t
, u
->channel
);
883 pa_tagstruct_puts(t
, d
);
884 pa_pstream_send_tagstruct(u
->pstream
, t
);
889 /* Called from main context */
890 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
891 struct userdata
*u
= userdata
;
894 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
900 if (command
!= PA_COMMAND_REPLY
) {
901 if (command
== PA_COMMAND_ERROR
)
902 pa_log("Failed to get info.");
904 pa_log("Protocol error.");
908 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
909 pa_tagstruct_gets(t
, &server_version
) < 0 ||
910 pa_tagstruct_gets(t
, &user_name
) < 0 ||
911 pa_tagstruct_gets(t
, &host_name
) < 0 ||
912 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
913 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
914 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
915 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
917 pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
919 pa_log("Parse failure");
923 if (!pa_tagstruct_eof(t
)) {
924 pa_log("Packet too long");
928 pa_xfree(u
->server_fqdn
);
929 u
->server_fqdn
= pa_xstrdup(host_name
);
931 pa_xfree(u
->user_name
);
932 u
->user_name
= pa_xstrdup(user_name
);
934 update_description(u
);
939 pa_module_unload_request(u
->module
, TRUE
);
944 /* Called from main context */
945 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
946 struct userdata
*u
= userdata
;
947 uint32_t idx
, owner_module
, monitor_source
, flags
;
948 const char *name
, *description
, *monitor_source_name
, *driver
;
959 pl
= pa_proplist_new();
961 if (command
!= PA_COMMAND_REPLY
) {
962 if (command
== PA_COMMAND_ERROR
)
963 pa_log("Failed to get info.");
965 pa_log("Protocol error.");
969 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
970 pa_tagstruct_gets(t
, &name
) < 0 ||
971 pa_tagstruct_gets(t
, &description
) < 0 ||
972 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
973 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
974 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
975 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
976 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
977 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
978 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
979 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
980 pa_tagstruct_gets(t
, &driver
) < 0 ||
981 pa_tagstruct_getu32(t
, &flags
) < 0) {
983 pa_log("Parse failure");
987 if (u
->version
>= 13) {
988 pa_usec_t configured_latency
;
990 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
991 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
993 pa_log("Parse failure");
998 if (u
->version
>= 15) {
999 pa_volume_t base_volume
;
1000 uint32_t state
, n_volume_steps
, card
;
1002 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1003 pa_tagstruct_getu32(t
, &state
) < 0 ||
1004 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1005 pa_tagstruct_getu32(t
, &card
) < 0) {
1007 pa_log("Parse failure");
1012 if (!pa_tagstruct_eof(t
)) {
1013 pa_log("Packet too long");
1017 pa_proplist_free(pl
);
1019 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1022 pa_xfree(u
->device_description
);
1023 u
->device_description
= pa_xstrdup(description
);
1025 update_description(u
);
1030 pa_module_unload_request(u
->module
, TRUE
);
1031 pa_proplist_free(pl
);
1034 /* Called from main context */
1035 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1036 struct userdata
*u
= userdata
;
1037 uint32_t idx
, owner_module
, client
, sink
;
1038 pa_usec_t buffer_usec
, sink_usec
;
1039 const char *name
, *driver
, *resample_method
;
1041 pa_sample_spec sample_spec
;
1042 pa_channel_map channel_map
;
1049 pl
= pa_proplist_new();
1051 if (command
!= PA_COMMAND_REPLY
) {
1052 if (command
== PA_COMMAND_ERROR
)
1053 pa_log("Failed to get info.");
1055 pa_log("Protocol error.");
1059 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1060 pa_tagstruct_gets(t
, &name
) < 0 ||
1061 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1062 pa_tagstruct_getu32(t
, &client
) < 0 ||
1063 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1064 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1065 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1066 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1067 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1068 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1069 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1070 pa_tagstruct_gets(t
, &driver
) < 0) {
1072 pa_log("Parse failure");
1076 if (u
->version
>= 11) {
1077 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1079 pa_log("Parse failure");
1084 if (u
->version
>= 13) {
1085 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1087 pa_log("Parse failure");
1092 if (!pa_tagstruct_eof(t
)) {
1093 pa_log("Packet too long");
1097 pa_proplist_free(pl
);
1099 if (idx
!= u
->device_index
)
1104 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1105 pa_cvolume_equal(&volume
, &u
->sink
->virtual_volume
))
1108 pa_sink_volume_changed(u
->sink
, &volume
);
1110 if (u
->version
>= 11)
1111 pa_sink_mute_changed(u
->sink
, mute
);
1116 pa_module_unload_request(u
->module
, TRUE
);
1117 pa_proplist_free(pl
);
1122 /* Called from main context */
1123 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1124 struct userdata
*u
= userdata
;
1125 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1126 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1131 pa_usec_t latency
, configured_latency
;
1137 pl
= pa_proplist_new();
1139 if (command
!= PA_COMMAND_REPLY
) {
1140 if (command
== PA_COMMAND_ERROR
)
1141 pa_log("Failed to get info.");
1143 pa_log("Protocol error.");
1147 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1148 pa_tagstruct_gets(t
, &name
) < 0 ||
1149 pa_tagstruct_gets(t
, &description
) < 0 ||
1150 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1151 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1152 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1153 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1154 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1155 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1156 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1157 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1158 pa_tagstruct_gets(t
, &driver
) < 0 ||
1159 pa_tagstruct_getu32(t
, &flags
) < 0) {
1161 pa_log("Parse failure");
1165 if (u
->version
>= 13) {
1166 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1167 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1169 pa_log("Parse failure");
1174 if (u
->version
>= 15) {
1175 pa_volume_t base_volume
;
1176 uint32_t state
, n_volume_steps
, card
;
1178 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1179 pa_tagstruct_getu32(t
, &state
) < 0 ||
1180 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1181 pa_tagstruct_getu32(t
, &card
) < 0) {
1183 pa_log("Parse failure");
1188 if (!pa_tagstruct_eof(t
)) {
1189 pa_log("Packet too long");
1193 pa_proplist_free(pl
);
1195 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1198 pa_xfree(u
->device_description
);
1199 u
->device_description
= pa_xstrdup(description
);
1201 update_description(u
);
1206 pa_module_unload_request(u
->module
, TRUE
);
1207 pa_proplist_free(pl
);
1212 /* Called from main context */
1213 static void request_info(struct userdata
*u
) {
1218 t
= pa_tagstruct_new(NULL
, 0);
1219 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1220 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1221 pa_pstream_send_tagstruct(u
->pstream
, t
);
1222 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1225 t
= pa_tagstruct_new(NULL
, 0);
1226 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1227 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1228 pa_tagstruct_putu32(t
, u
->device_index
);
1229 pa_pstream_send_tagstruct(u
->pstream
, t
);
1230 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1233 t
= pa_tagstruct_new(NULL
, 0);
1234 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1235 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1236 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1237 pa_tagstruct_puts(t
, u
->sink_name
);
1238 pa_pstream_send_tagstruct(u
->pstream
, t
);
1239 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1242 if (u
->source_name
) {
1243 t
= pa_tagstruct_new(NULL
, 0);
1244 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1245 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1246 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1247 pa_tagstruct_puts(t
, u
->source_name
);
1248 pa_pstream_send_tagstruct(u
->pstream
, t
);
1249 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1254 /* Called from main context */
1255 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1256 struct userdata
*u
= userdata
;
1257 pa_subscription_event_type_t e
;
1263 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1265 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1266 pa_tagstruct_getu32(t
, &idx
) < 0) {
1267 pa_log("Invalid protocol reply");
1268 pa_module_unload_request(u
->module
, TRUE
);
1272 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1274 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1275 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1277 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1285 /* Called from main context */
1286 static void start_subscribe(struct userdata
*u
) {
1291 t
= pa_tagstruct_new(NULL
, 0);
1292 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1293 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1294 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1296 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1298 PA_SUBSCRIPTION_MASK_SOURCE
1302 pa_pstream_send_tagstruct(u
->pstream
, t
);
1305 /* Called from main context */
1306 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1307 struct userdata
*u
= userdata
;
1315 pa_assert(u
->pdispatch
== pd
);
1317 if (command
!= PA_COMMAND_REPLY
) {
1318 if (command
== PA_COMMAND_ERROR
)
1319 pa_log("Failed to create stream.");
1321 pa_log("Protocol error.");
1325 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1326 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1328 || pa_tagstruct_getu32(t
, &bytes
) < 0
1333 if (u
->version
>= 9) {
1335 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1336 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1337 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1338 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1341 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1342 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1347 if (u
->version
>= 12) {
1350 uint32_t device_index
;
1352 pa_bool_t suspended
;
1354 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1355 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1356 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1357 pa_tagstruct_gets(t
, &dn
) < 0 ||
1358 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1362 pa_xfree(u
->sink_name
);
1363 u
->sink_name
= pa_xstrdup(dn
);
1365 pa_xfree(u
->source_name
);
1366 u
->source_name
= pa_xstrdup(dn
);
1370 if (u
->version
>= 13) {
1373 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1376 /* #ifdef TUNNEL_SINK */
1377 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1379 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1383 if (!pa_tagstruct_eof(t
))
1389 pa_assert(!u
->time_event
);
1390 pa_gettimeofday(&ntv
);
1391 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1392 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1396 pa_log_debug("Stream created.");
1399 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1405 pa_log("Invalid reply. (Create stream)");
1408 pa_module_unload_request(u
->module
, TRUE
);
1412 /* Called from main context */
1413 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1414 struct userdata
*u
= userdata
;
1415 pa_tagstruct
*reply
;
1416 char name
[256], un
[128], hn
[128];
1423 pa_assert(u
->pdispatch
== pd
);
1425 if (command
!= PA_COMMAND_REPLY
||
1426 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1427 !pa_tagstruct_eof(t
)) {
1429 if (command
== PA_COMMAND_ERROR
)
1430 pa_log("Failed to authenticate");
1432 pa_log("Protocol error.");
1437 /* Minimum supported protocol version */
1438 if (u
->version
< 8) {
1439 pa_log("Incompatible protocol version");
1443 /* Starting with protocol version 13 the MSB of the version tag
1444 reflects if shm is enabled for this connection or not. We don't
1445 support SHM here at all, so we just ignore this. */
1447 if (u
->version
>= 13)
1448 u
->version
&= 0x7FFFFFFFU
;
1450 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1453 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1454 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1456 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1458 pa_get_user_name(un
, sizeof(un
)),
1459 pa_get_host_name(hn
, sizeof(hn
)));
1461 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1462 pa_source_update_proplist(u
->source
, 0, NULL
);
1464 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1466 pa_get_user_name(un
, sizeof(un
)),
1467 pa_get_host_name(hn
, sizeof(hn
)));
1470 reply
= pa_tagstruct_new(NULL
, 0);
1471 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1472 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1474 if (u
->version
>= 13) {
1476 pl
= pa_proplist_new();
1477 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1478 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1479 pa_init_proplist(pl
);
1480 pa_tagstruct_put_proplist(reply
, pl
);
1481 pa_proplist_free(pl
);
1483 pa_tagstruct_puts(reply
, "PulseAudio");
1485 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1486 /* We ignore the server's reply here */
1488 reply
= pa_tagstruct_new(NULL
, 0);
1490 if (u
->version
< 13)
1491 /* Only for older PA versions we need to fill in the maxlength */
1492 u
->maxlength
= 4*1024*1024;
1495 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1496 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1497 u
->prebuf
= u
->tlength
;
1499 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1503 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1504 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1506 if (u
->version
< 13)
1507 pa_tagstruct_puts(reply
, name
);
1509 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1510 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1511 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1512 pa_tagstruct_puts(reply
, u
->sink_name
);
1513 pa_tagstruct_putu32(reply
, u
->maxlength
);
1514 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1515 pa_tagstruct_putu32(reply
, u
->tlength
);
1516 pa_tagstruct_putu32(reply
, u
->prebuf
);
1517 pa_tagstruct_putu32(reply
, u
->minreq
);
1518 pa_tagstruct_putu32(reply
, 0);
1519 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1520 pa_tagstruct_put_cvolume(reply
, &volume
);
1522 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1523 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1525 if (u
->version
< 13)
1526 pa_tagstruct_puts(reply
, name
);
1528 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1529 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1530 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1531 pa_tagstruct_puts(reply
, u
->source_name
);
1532 pa_tagstruct_putu32(reply
, u
->maxlength
);
1533 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1534 pa_tagstruct_putu32(reply
, u
->fragsize
);
1537 if (u
->version
>= 12) {
1538 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1539 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1540 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1541 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1542 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1543 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1544 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1547 if (u
->version
>= 13) {
1550 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1551 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1553 pl
= pa_proplist_new();
1554 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1555 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1556 pa_tagstruct_put_proplist(reply
, pl
);
1557 pa_proplist_free(pl
);
1560 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1564 if (u
->version
>= 14) {
1566 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1568 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1571 if (u
->version
>= 15) {
1573 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1575 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1576 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1579 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1580 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1582 pa_log_debug("Connection authenticated, creating stream ...");
1587 pa_module_unload_request(u
->module
, TRUE
);
1590 /* Called from main context */
1591 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1592 struct userdata
*u
= userdata
;
1597 pa_log_warn("Stream died.");
1598 pa_module_unload_request(u
->module
, TRUE
);
1601 /* Called from main context */
1602 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1603 struct userdata
*u
= userdata
;
1609 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1610 pa_log("Invalid packet");
1611 pa_module_unload_request(u
->module
, TRUE
);
1617 /* Called from main context */
1618 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1619 struct userdata
*u
= userdata
;
1625 if (channel
!= u
->channel
) {
1626 pa_log("Recieved memory block on bad channel.");
1627 pa_module_unload_request(u
->module
, TRUE
);
1631 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1633 u
->counter_delta
+= (int64_t) chunk
->length
;
1637 /* Called from main context */
1638 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1639 struct userdata
*u
= userdata
;
1645 pa_assert(u
->client
== sc
);
1647 pa_socket_client_unref(u
->client
);
1651 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1652 pa_module_unload_request(u
->module
, TRUE
);
1656 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1657 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1659 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1660 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1662 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1665 t
= pa_tagstruct_new(NULL
, 0);
1666 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1667 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1668 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1670 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1676 if (pa_iochannel_creds_supported(io
))
1677 pa_iochannel_creds_enable(io
);
1679 ucred
.uid
= getuid();
1680 ucred
.gid
= getgid();
1682 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1685 pa_pstream_send_tagstruct(u
->pstream
, t
);
1688 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1690 pa_log_debug("Connection established, authenticating ...");
1695 /* Called from main context */
1696 static void sink_set_volume(pa_sink
*sink
) {
1705 t
= pa_tagstruct_new(NULL
, 0);
1706 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1707 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1708 pa_tagstruct_putu32(t
, u
->device_index
);
1709 pa_tagstruct_put_cvolume(t
, &sink
->virtual_volume
);
1710 pa_pstream_send_tagstruct(u
->pstream
, t
);
1713 /* Called from main context */
1714 static void sink_set_mute(pa_sink
*sink
) {
1723 if (u
->version
< 11)
1726 t
= pa_tagstruct_new(NULL
, 0);
1727 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1728 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1729 pa_tagstruct_putu32(t
, u
->device_index
);
1730 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1731 pa_pstream_send_tagstruct(u
->pstream
, t
);
1736 int pa__init(pa_module
*m
) {
1737 pa_modargs
*ma
= NULL
;
1738 struct userdata
*u
= NULL
;
1743 pa_sink_new_data data
;
1745 pa_source_new_data data
;
1750 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1751 pa_log("Failed to parse module arguments");
1755 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1759 u
->pdispatch
= NULL
;
1761 u
->server_name
= NULL
;
1763 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1765 u
->requested_bytes
= 0;
1767 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1770 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
1772 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1773 u
->time_event
= NULL
;
1774 u
->ignore_latency_before
= 0;
1775 u
->transport_usec
= u
->thread_transport_usec
= 0;
1776 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1777 u
->counter
= u
->counter_delta
= 0;
1779 u
->rtpoll
= pa_rtpoll_new();
1780 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1782 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1785 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1786 pa_log("No server specified.");
1790 ss
= m
->core
->default_sample_spec
;
1791 map
= m
->core
->default_channel_map
;
1792 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1793 pa_log("Invalid sample format specification");
1797 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1798 pa_log("Failed to connect to server '%s'", u
->server_name
);
1802 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1806 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1807 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1809 pa_sink_new_data_init(&data
);
1810 data
.driver
= __FILE__
;
1812 data
.namereg_fail
= TRUE
;
1813 pa_sink_new_data_set_name(&data
, dn
);
1814 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1815 pa_sink_new_data_set_channel_map(&data
, &map
);
1816 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1817 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1819 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1821 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
|PA_SINK_HW_MUTE_CTRL
);
1822 pa_sink_new_data_done(&data
);
1825 pa_log("Failed to create sink.");
1829 u
->sink
->parent
.process_msg
= sink_process_msg
;
1830 u
->sink
->userdata
= u
;
1831 u
->sink
->set_state
= sink_set_state
;
1832 u
->sink
->set_volume
= sink_set_volume
;
1833 u
->sink
->set_mute
= sink_set_mute
;
1835 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1837 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1839 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1840 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1844 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1845 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1847 pa_source_new_data_init(&data
);
1848 data
.driver
= __FILE__
;
1850 data
.namereg_fail
= TRUE
;
1851 pa_source_new_data_set_name(&data
, dn
);
1852 pa_source_new_data_set_sample_spec(&data
, &ss
);
1853 pa_source_new_data_set_channel_map(&data
, &map
);
1854 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1855 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1857 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1859 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1860 pa_source_new_data_done(&data
);
1863 pa_log("Failed to create source.");
1867 u
->source
->parent
.process_msg
= source_process_msg
;
1868 u
->source
->set_state
= source_set_state
;
1869 u
->source
->userdata
= u
;
1871 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1873 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1874 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1879 u
->time_event
= NULL
;
1883 u
->tlength
= u
->minreq
= u
->prebuf
= 0;
1888 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1890 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1891 pa_log("Failed to create thread.");
1896 pa_sink_put(u
->sink
);
1898 pa_source_put(u
->source
);
1901 pa_modargs_free(ma
);
1909 pa_modargs_free(ma
);
1916 void pa__done(pa_module
*m
) {
1921 if (!(u
= m
->userdata
))
1926 pa_sink_unlink(u
->sink
);
1929 pa_source_unlink(u
->source
);
1933 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1934 pa_thread_free(u
->thread
);
1937 pa_thread_mq_done(&u
->thread_mq
);
1941 pa_sink_unref(u
->sink
);
1944 pa_source_unref(u
->source
);
1948 pa_rtpoll_free(u
->rtpoll
);
1951 pa_pstream_unlink(u
->pstream
);
1952 pa_pstream_unref(u
->pstream
);
1956 pa_pdispatch_unref(u
->pdispatch
);
1959 pa_socket_client_unref(u
->client
);
1962 pa_auth_cookie_unref(u
->auth_cookie
);
1965 pa_smoother_free(u
->smoother
);
1968 u
->core
->mainloop
->time_free(u
->time_event
);
1971 pa_xfree(u
->sink_name
);
1973 pa_xfree(u
->source_name
);
1975 pa_xfree(u
->server_name
);
1977 pa_xfree(u
->device_description
);
1978 pa_xfree(u
->server_fqdn
);
1979 pa_xfree(u
->user_name
);