2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/authkey-prop.h>
52 #include <pulsecore/time-smoother.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/thread-mq.h>
55 #include <pulsecore/rtclock.h>
56 #include <pulsecore/core-error.h>
57 #include <pulsecore/proplist-util.h>
60 #include "module-tunnel-sink-symdef.h"
62 #include "module-tunnel-source-symdef.h"
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
69 "sink=<remote sink name> "
71 "format=<sample format> "
72 "channels=<number of channels> "
74 "sink_name=<name for the local sink> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 "source=<remote source name> "
82 "format=<sample format> "
83 "channels=<number of channels> "
85 "source_name=<name for the local source> "
86 "channel_map=<channel map>");
89 PA_MODULE_AUTHOR("Lennart Poettering");
90 PA_MODULE_VERSION(PACKAGE_VERSION
);
91 PA_MODULE_LOAD_ONCE(FALSE
);
93 static const char* const valid_modargs
[] = {
110 #define DEFAULT_TIMEOUT 5
112 #define LATENCY_INTERVAL 10
114 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
119 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
120 SINK_MESSAGE_REMOTE_SUSPEND
,
121 SINK_MESSAGE_UPDATE_LATENCY
,
125 #define DEFAULT_TLENGTH_MSEC 150
126 #define DEFAULT_MINREQ_MSEC 25
131 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
132 SOURCE_MESSAGE_REMOTE_SUSPEND
,
133 SOURCE_MESSAGE_UPDATE_LATENCY
136 #define DEFAULT_FRAGSIZE_MSEC 25
141 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
142 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
144 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
152 [PA_COMMAND_REQUEST
] = command_request
,
153 [PA_COMMAND_STARTED
] = command_started
,
155 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
156 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
157 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
158 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
159 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
160 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
161 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
162 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
163 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
170 pa_thread_mq thread_mq
;
174 pa_socket_client
*client
;
176 pa_pdispatch
*pdispatch
;
182 int32_t requested_bytes
;
188 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
192 uint32_t device_index
;
195 int64_t counter
, counter_delta
;
197 pa_bool_t remote_corked
:1;
198 pa_bool_t remote_suspended
:1;
200 pa_usec_t transport_usec
;
201 pa_bool_t transport_usec_valid
;
203 uint32_t ignore_latency_before
;
205 pa_time_event
*time_event
;
207 pa_bool_t auth_cookie_in_property
;
209 pa_smoother
*smoother
;
211 char *device_description
;
225 static void request_latency(struct userdata
*u
);
227 /* Called from main context */
228 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
229 struct userdata
*u
= userdata
;
234 pa_assert(u
->pdispatch
== pd
);
236 pa_log_warn("Stream killed");
237 pa_module_unload_request(u
->module
);
240 /* Called from main context */
241 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
242 struct userdata
*u
= userdata
;
247 pa_assert(u
->pdispatch
== pd
);
249 pa_log_info("Server signalled buffer overrun/underrun.");
253 /* Called from main context */
254 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
255 struct userdata
*u
= userdata
;
262 pa_assert(u
->pdispatch
== pd
);
264 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
265 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
266 !pa_tagstruct_eof(t
)) {
267 pa_log("Invalid packet");
268 pa_module_unload_request(u
->module
);
273 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
275 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
281 /* Called from main context */
282 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
283 struct userdata
*u
= userdata
;
288 pa_assert(u
->pdispatch
== pd
);
290 pa_log_debug("Server reports a stream move.");
296 /* Called from main context */
297 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
298 struct userdata
*u
= userdata
;
303 pa_assert(u
->pdispatch
== pd
);
305 pa_log_debug("Server reports playback started.");
311 /* Called from IO thread context */
312 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
316 if (u
->remote_corked
== cork
)
319 u
->remote_corked
= cork
;
320 x
= pa_rtclock_usec();
322 /* Correct by the time this needs to travel to the other side.
323 * This is a valid thread-safe access, because the main thread is
325 if (u
->transport_usec_valid
)
326 x
+= u
->transport_usec
;
328 if (u
->remote_suspended
|| u
->remote_corked
)
329 pa_smoother_pause(u
->smoother
, x
);
331 pa_smoother_resume(u
->smoother
, x
);
334 /* Called from main context */
335 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
342 t
= pa_tagstruct_new(NULL
, 0);
344 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
346 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
348 pa_tagstruct_putu32(t
, u
->ctag
++);
349 pa_tagstruct_putu32(t
, u
->channel
);
350 pa_tagstruct_put_boolean(t
, !!cork
);
351 pa_pstream_send_tagstruct(u
->pstream
, t
);
356 /* Called from IO thread context */
357 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
361 if (u
->remote_suspended
== suspend
)
364 u
->remote_suspended
= suspend
;
366 x
= pa_rtclock_usec();
368 /* Correct by the time this needed to travel from the other side.
369 * This is a valid thread-safe access, because the main thread is
371 if (u
->transport_usec_valid
)
372 x
-= u
->transport_usec
;
374 if (u
->remote_suspended
|| u
->remote_corked
)
375 pa_smoother_pause(u
->smoother
, x
);
377 pa_smoother_resume(u
->smoother
, x
);
382 /* Called from IO thread context */
383 static void send_data(struct userdata
*u
) {
386 while (u
->requested_bytes
> 0) {
387 pa_memchunk memchunk
;
389 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
390 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
391 pa_memblock_unref(memchunk
.memblock
);
393 u
->requested_bytes
-= memchunk
.length
;
395 u
->counter
+= memchunk
.length
;
399 /* This function is called from IO context -- except when it is not. */
400 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
401 struct userdata
*u
= PA_SINK(o
)->userdata
;
405 case PA_SINK_MESSAGE_SET_STATE
: {
408 /* First, change the state, because otherwide pa_sink_render() would fail */
409 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
411 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
413 if (PA_SINK_IS_OPENED(u
->sink
->state
))
420 case PA_SINK_MESSAGE_GET_LATENCY
: {
421 pa_usec_t yl
, yr
, *usec
= data
;
423 yl
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
424 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
426 *usec
= yl
> yr
? yl
- yr
: 0;
430 case SINK_MESSAGE_REQUEST
:
432 pa_assert(offset
> 0);
433 u
->requested_bytes
+= (size_t) offset
;
435 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
441 case SINK_MESSAGE_REMOTE_SUSPEND
:
443 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
447 case SINK_MESSAGE_UPDATE_LATENCY
: {
450 y
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
452 if (y
> (pa_usec_t
) offset
|| offset
< 0)
457 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
462 case SINK_MESSAGE_POST
:
464 /* OK, This might be a bit confusing. This message is
465 * delivered to us from the main context -- NOT from the
466 * IO thread context where the rest of the messages are
467 * dispatched. Yeah, ugly, but I am a lazy bastard. */
469 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
471 u
->counter_delta
+= chunk
->length
;
476 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
479 /* Called from main context */
480 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
482 pa_sink_assert_ref(s
);
485 switch ((pa_sink_state_t
) state
) {
487 case PA_SINK_SUSPENDED
:
488 pa_assert(PA_SINK_IS_OPENED(s
->state
));
489 stream_cork(u
, TRUE
);
493 case PA_SINK_RUNNING
:
494 if (s
->state
== PA_SINK_SUSPENDED
)
495 stream_cork(u
, FALSE
);
498 case PA_SINK_UNLINKED
:
508 /* This function is called from IO context -- except when it is not. */
509 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
510 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
514 case PA_SINK_MESSAGE_SET_STATE
: {
517 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
518 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
523 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
524 pa_usec_t yr
, yl
, *usec
= data
;
526 yl
= pa_bytes_to_usec(u
->counter
, &PA_SINK(o
)->sample_spec
);
527 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
529 *usec
= yr
> yl
? yr
- yl
: 0;
533 case SOURCE_MESSAGE_POST
:
535 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
536 pa_source_post(u
->source
, chunk
);
538 u
->counter
+= chunk
->length
;
542 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
544 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
547 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
550 y
= pa_bytes_to_usec(u
->counter
, &u
->source
->sample_spec
);
552 if (offset
>= 0 || y
> (pa_usec_t
) -offset
)
557 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
563 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
566 /* Called from main context */
567 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
569 pa_source_assert_ref(s
);
572 switch ((pa_source_state_t
) state
) {
574 case PA_SOURCE_SUSPENDED
:
575 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
576 stream_cork(u
, TRUE
);
580 case PA_SOURCE_RUNNING
:
581 if (s
->state
== PA_SOURCE_SUSPENDED
)
582 stream_cork(u
, FALSE
);
585 case PA_SOURCE_UNLINKED
:
595 static void thread_func(void *userdata
) {
596 struct userdata
*u
= userdata
;
600 pa_log_debug("Thread starting up");
602 pa_thread_mq_install(&u
->thread_mq
);
603 pa_rtpoll_install(u
->rtpoll
);
609 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
610 if (u
->sink
->thread_info
.rewind_requested
)
611 pa_sink_process_rewind(u
->sink
, 0);
614 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
622 /* If this was no regular exit from the loop we have to continue
623 * processing messages until we received PA_MESSAGE_SHUTDOWN */
624 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
625 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
628 pa_log_debug("Thread shutting down");
632 /* Called from main context */
633 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
634 struct userdata
*u
= userdata
;
635 uint32_t bytes
, channel
;
638 pa_assert(command
== PA_COMMAND_REQUEST
);
641 pa_assert(u
->pdispatch
== pd
);
643 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
644 pa_tagstruct_getu32(t
, &bytes
) < 0) {
645 pa_log("Invalid protocol reply");
649 if (channel
!= u
->channel
) {
650 pa_log("Recieved data for invalid channel");
654 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
658 pa_module_unload_request(u
->module
);
663 /* Called from main context */
664 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
665 struct userdata
*u
= userdata
;
666 pa_usec_t sink_usec
, source_usec
, transport_usec
;
668 int64_t write_index
, read_index
;
669 struct timeval local
, remote
, now
;
676 if (command
!= PA_COMMAND_REPLY
) {
677 if (command
== PA_COMMAND_ERROR
)
678 pa_log("Failed to get latency.");
680 pa_log("Protocol error.");
684 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
685 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
686 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
687 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
688 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
689 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
690 pa_tagstruct_gets64(t
, &read_index
) < 0) {
691 pa_log("Invalid reply.");
696 if (u
->version
>= 13) {
697 uint64_t underrun_for
= 0, playing_for
= 0;
699 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
700 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
701 pa_log("Invalid reply.");
707 if (!pa_tagstruct_eof(t
)) {
708 pa_log("Invalid reply.");
712 if (tag
< u
->ignore_latency_before
) {
717 pa_gettimeofday(&now
);
719 /* Calculate transport usec */
720 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
721 /* local and remote seem to have synchronized clocks */
723 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
725 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
728 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
729 u
->transport_usec_valid
= TRUE
;
731 /* First, take the device's delay */
733 delay
= (int64_t) sink_usec
;
734 ss
= &u
->sink
->sample_spec
;
736 delay
= (int64_t) source_usec
;
737 ss
= &u
->source
->sample_spec
;
740 /* Add the length of our server-side buffer */
741 if (write_index
>= read_index
)
742 delay
+= (int64_t) pa_bytes_to_usec(write_index
-read_index
, ss
);
744 delay
-= (int64_t) pa_bytes_to_usec(read_index
-write_index
, ss
);
746 /* Our measurements are already out of date, hence correct by the *
747 * transport latency */
749 delay
-= (int64_t) transport_usec
;
751 delay
+= (int64_t) transport_usec
;
754 /* Now correct by what we have have read/written since we requested the update */
756 delay
+= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
758 delay
-= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
762 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
764 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
771 pa_module_unload_request(u
->module
);
774 /* Called from main context */
775 static void request_latency(struct userdata
*u
) {
781 t
= pa_tagstruct_new(NULL
, 0);
783 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
785 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
787 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
788 pa_tagstruct_putu32(t
, u
->channel
);
790 pa_gettimeofday(&now
);
791 pa_tagstruct_put_timeval(t
, &now
);
793 pa_pstream_send_tagstruct(u
->pstream
, t
);
794 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
796 u
->ignore_latency_before
= tag
;
797 u
->counter_delta
= 0;
800 /* Called from main context */
801 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
802 struct userdata
*u
= userdata
;
811 pa_gettimeofday(&ntv
);
812 ntv
.tv_sec
+= LATENCY_INTERVAL
;
813 m
->time_restart(e
, &ntv
);
816 /* Called from main context */
817 static void update_description(struct userdata
*u
) {
819 char un
[128], hn
[128];
824 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
827 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
830 pa_sink_set_description(u
->sink
, d
);
831 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
832 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
833 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
835 pa_source_set_description(u
->source
, d
);
836 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
837 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
838 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
843 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
844 pa_get_user_name(un
, sizeof(un
)),
845 pa_get_host_name(hn
, sizeof(hn
)));
847 t
= pa_tagstruct_new(NULL
, 0);
849 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
851 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
853 pa_tagstruct_putu32(t
, u
->ctag
++);
854 pa_tagstruct_putu32(t
, u
->channel
);
855 pa_tagstruct_puts(t
, d
);
856 pa_pstream_send_tagstruct(u
->pstream
, t
);
861 /* Called from main context */
862 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
863 struct userdata
*u
= userdata
;
865 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
871 if (command
!= PA_COMMAND_REPLY
) {
872 if (command
== PA_COMMAND_ERROR
)
873 pa_log("Failed to get info.");
875 pa_log("Protocol error.");
879 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
880 pa_tagstruct_gets(t
, &server_version
) < 0 ||
881 pa_tagstruct_gets(t
, &user_name
) < 0 ||
882 pa_tagstruct_gets(t
, &host_name
) < 0 ||
883 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
884 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
885 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
886 pa_tagstruct_getu32(t
, &cookie
) < 0) {
888 pa_log("Parse failure");
892 if (!pa_tagstruct_eof(t
)) {
893 pa_log("Packet too long");
897 pa_xfree(u
->server_fqdn
);
898 u
->server_fqdn
= pa_xstrdup(host_name
);
900 pa_xfree(u
->user_name
);
901 u
->user_name
= pa_xstrdup(user_name
);
903 update_description(u
);
908 pa_module_unload_request(u
->module
);
913 /* Called from main context */
914 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
915 struct userdata
*u
= userdata
;
916 uint32_t idx
, owner_module
, monitor_source
, flags
;
917 const char *name
, *description
, *monitor_source_name
, *driver
;
928 pl
= pa_proplist_new();
930 if (command
!= PA_COMMAND_REPLY
) {
931 if (command
== PA_COMMAND_ERROR
)
932 pa_log("Failed to get info.");
934 pa_log("Protocol error.");
938 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
939 pa_tagstruct_gets(t
, &name
) < 0 ||
940 pa_tagstruct_gets(t
, &description
) < 0 ||
941 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
942 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
943 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
944 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
945 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
946 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
947 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
948 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
949 pa_tagstruct_gets(t
, &driver
) < 0 ||
950 pa_tagstruct_getu32(t
, &flags
) < 0) {
952 pa_log("Parse failure");
956 if (u
->version
>= 13) {
957 pa_usec_t configured_latency
;
959 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
960 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
962 pa_log("Parse failure");
967 if (!pa_tagstruct_eof(t
)) {
968 pa_log("Packet too long");
972 pa_proplist_free(pl
);
974 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
977 pa_xfree(u
->device_description
);
978 u
->device_description
= pa_xstrdup(description
);
980 update_description(u
);
985 pa_module_unload_request(u
->module
);
986 pa_proplist_free(pl
);
989 /* Called from main context */
990 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
991 struct userdata
*u
= userdata
;
992 uint32_t idx
, owner_module
, client
, sink
;
993 pa_usec_t buffer_usec
, sink_usec
;
994 const char *name
, *driver
, *resample_method
;
996 pa_sample_spec sample_spec
;
997 pa_channel_map channel_map
;
1004 pl
= pa_proplist_new();
1006 if (command
!= PA_COMMAND_REPLY
) {
1007 if (command
== PA_COMMAND_ERROR
)
1008 pa_log("Failed to get info.");
1010 pa_log("Protocol error.");
1014 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1015 pa_tagstruct_gets(t
, &name
) < 0 ||
1016 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1017 pa_tagstruct_getu32(t
, &client
) < 0 ||
1018 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1019 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1020 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1021 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1022 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1023 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1024 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1025 pa_tagstruct_gets(t
, &driver
) < 0) {
1027 pa_log("Parse failure");
1031 if (u
->version
>= 11) {
1032 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1034 pa_log("Parse failure");
1039 if (u
->version
>= 13) {
1040 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1042 pa_log("Parse failure");
1047 if (!pa_tagstruct_eof(t
)) {
1048 pa_log("Packet too long");
1052 pa_proplist_free(pl
);
1054 if (idx
!= u
->device_index
)
1059 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1060 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
1063 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
1065 if (u
->version
>= 11)
1066 u
->sink
->muted
= !!mute
;
1068 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
1072 pa_module_unload_request(u
->module
);
1073 pa_proplist_free(pl
);
1078 /* Called from main context */
1079 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1080 struct userdata
*u
= userdata
;
1081 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1082 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1087 pa_usec_t latency
, configured_latency
;
1093 pl
= pa_proplist_new();
1095 if (command
!= PA_COMMAND_REPLY
) {
1096 if (command
== PA_COMMAND_ERROR
)
1097 pa_log("Failed to get info.");
1099 pa_log("Protocol error.");
1103 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1104 pa_tagstruct_gets(t
, &name
) < 0 ||
1105 pa_tagstruct_gets(t
, &description
) < 0 ||
1106 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1107 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1108 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1109 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1110 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1111 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1112 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1113 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1114 pa_tagstruct_gets(t
, &driver
) < 0 ||
1115 pa_tagstruct_getu32(t
, &flags
) < 0) {
1117 pa_log("Parse failure");
1121 if (u
->version
>= 13) {
1122 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1123 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1125 pa_log("Parse failure");
1130 if (!pa_tagstruct_eof(t
)) {
1131 pa_log("Packet too long");
1135 pa_proplist_free(pl
);
1137 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1140 pa_xfree(u
->device_description
);
1141 u
->device_description
= pa_xstrdup(description
);
1143 update_description(u
);
1148 pa_module_unload_request(u
->module
);
1149 pa_proplist_free(pl
);
1154 /* Called from main context */
1155 static void request_info(struct userdata
*u
) {
1160 t
= pa_tagstruct_new(NULL
, 0);
1161 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1162 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1163 pa_pstream_send_tagstruct(u
->pstream
, t
);
1164 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1167 t
= pa_tagstruct_new(NULL
, 0);
1168 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1169 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1170 pa_tagstruct_putu32(t
, u
->device_index
);
1171 pa_pstream_send_tagstruct(u
->pstream
, t
);
1172 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1175 t
= pa_tagstruct_new(NULL
, 0);
1176 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1177 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1178 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1179 pa_tagstruct_puts(t
, u
->sink_name
);
1180 pa_pstream_send_tagstruct(u
->pstream
, t
);
1181 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1184 if (u
->source_name
) {
1185 t
= pa_tagstruct_new(NULL
, 0);
1186 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1187 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1188 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1189 pa_tagstruct_puts(t
, u
->source_name
);
1190 pa_pstream_send_tagstruct(u
->pstream
, t
);
1191 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1196 /* Called from main context */
1197 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1198 struct userdata
*u
= userdata
;
1199 pa_subscription_event_type_t e
;
1205 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1207 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1208 pa_tagstruct_getu32(t
, &idx
) < 0) {
1209 pa_log("Invalid protocol reply");
1210 pa_module_unload_request(u
->module
);
1214 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1216 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1217 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1219 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1227 /* Called from main context */
1228 static void start_subscribe(struct userdata
*u
) {
1233 t
= pa_tagstruct_new(NULL
, 0);
1234 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1235 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1236 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1238 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1240 PA_SUBSCRIPTION_MASK_SOURCE
1244 pa_pstream_send_tagstruct(u
->pstream
, t
);
1247 /* Called from main context */
1248 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1249 struct userdata
*u
= userdata
;
1257 pa_assert(u
->pdispatch
== pd
);
1259 if (command
!= PA_COMMAND_REPLY
) {
1260 if (command
== PA_COMMAND_ERROR
)
1261 pa_log("Failed to create stream.");
1263 pa_log("Protocol error.");
1267 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1268 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1270 || pa_tagstruct_getu32(t
, &bytes
) < 0
1275 if (u
->version
>= 9) {
1277 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1278 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1279 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1280 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1283 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1284 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1289 if (u
->version
>= 12) {
1292 uint32_t device_index
;
1294 pa_bool_t suspended
;
1296 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1297 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1298 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1299 pa_tagstruct_gets(t
, &dn
) < 0 ||
1300 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1304 pa_xfree(u
->sink_name
);
1305 u
->sink_name
= pa_xstrdup(dn
);
1307 pa_xfree(u
->source_name
);
1308 u
->source_name
= pa_xstrdup(dn
);
1312 if (u
->version
>= 13) {
1315 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1319 pa_sink_set_latency_range(u
->sink
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1321 pa_source_set_latency_range(u
->source
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1325 if (!pa_tagstruct_eof(t
))
1331 pa_assert(!u
->time_event
);
1332 pa_gettimeofday(&ntv
);
1333 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1334 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1338 pa_log_debug("Stream created.");
1341 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1347 pa_log("Invalid reply. (Create stream)");
1350 pa_module_unload_request(u
->module
);
1354 /* Called from main context */
1355 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1356 struct userdata
*u
= userdata
;
1357 pa_tagstruct
*reply
;
1358 char name
[256], un
[128], hn
[128];
1365 pa_assert(u
->pdispatch
== pd
);
1367 if (command
!= PA_COMMAND_REPLY
||
1368 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1369 !pa_tagstruct_eof(t
)) {
1371 if (command
== PA_COMMAND_ERROR
)
1372 pa_log("Failed to authenticate");
1374 pa_log("Protocol error.");
1379 /* Minimum supported protocol version */
1380 if (u
->version
< 8) {
1381 pa_log("Incompatible protocol version");
1385 /* Starting with protocol version 13 the MSB of the version tag
1386 reflects if shm is enabled for this connection or not. We don't
1387 support SHM here at all, so we just ignore this. */
1389 if (u
->version
>= 13)
1390 u
->version
&= 0x7FFFFFFFU
;
1392 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1395 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1397 pa_get_user_name(un
, sizeof(un
)),
1398 pa_get_host_name(hn
, sizeof(hn
)));
1400 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1402 pa_get_user_name(un
, sizeof(un
)),
1403 pa_get_host_name(hn
, sizeof(hn
)));
1406 reply
= pa_tagstruct_new(NULL
, 0);
1407 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1408 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1410 if (u
->version
>= 13) {
1412 pl
= pa_proplist_new();
1413 pa_init_proplist(pl
);
1414 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1415 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1416 pa_tagstruct_put_proplist(reply
, pl
);
1417 pa_proplist_free(pl
);
1419 pa_tagstruct_puts(reply
, "PulseAudio");
1421 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1422 /* We ignore the server's reply here */
1424 reply
= pa_tagstruct_new(NULL
, 0);
1426 if (u
->version
< 13)
1427 /* Only for older PA versions we need to fill in the maxlength */
1428 u
->maxlength
= 4*1024*1024;
1431 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1432 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1433 u
->prebuf
= u
->tlength
;
1435 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1439 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1440 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1442 if (u
->version
< 13)
1443 pa_tagstruct_puts(reply
, name
);
1445 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1446 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1447 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1448 pa_tagstruct_puts(reply
, u
->sink_name
);
1449 pa_tagstruct_putu32(reply
, u
->maxlength
);
1450 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1451 pa_tagstruct_putu32(reply
, u
->tlength
);
1452 pa_tagstruct_putu32(reply
, u
->prebuf
);
1453 pa_tagstruct_putu32(reply
, u
->minreq
);
1454 pa_tagstruct_putu32(reply
, 0);
1455 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1456 pa_tagstruct_put_cvolume(reply
, &volume
);
1458 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1459 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1461 if (u
->version
< 13)
1462 pa_tagstruct_puts(reply
, name
);
1464 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1465 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1466 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1467 pa_tagstruct_puts(reply
, u
->source_name
);
1468 pa_tagstruct_putu32(reply
, u
->maxlength
);
1469 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1470 pa_tagstruct_putu32(reply
, u
->fragsize
);
1473 if (u
->version
>= 12) {
1474 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1475 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1476 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1477 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1478 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1479 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1480 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1483 if (u
->version
>= 13) {
1486 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1487 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1489 pl
= pa_proplist_new();
1490 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1491 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1492 pa_tagstruct_put_proplist(reply
, pl
);
1493 pa_proplist_free(pl
);
1496 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1500 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1501 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1503 pa_log_debug("Connection authenticated, creating stream ...");
1508 pa_module_unload_request(u
->module
);
1511 /* Called from main context */
1512 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1513 struct userdata
*u
= userdata
;
1518 pa_log_warn("Stream died.");
1519 pa_module_unload_request(u
->module
);
1522 /* Called from main context */
1523 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1524 struct userdata
*u
= userdata
;
1530 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1531 pa_log("Invalid packet");
1532 pa_module_unload_request(u
->module
);
1538 /* Called from main context */
1539 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1540 struct userdata
*u
= userdata
;
1546 if (channel
!= u
->channel
) {
1547 pa_log("Recieved memory block on bad channel.");
1548 pa_module_unload_request(u
->module
);
1552 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1554 u
->counter_delta
+= chunk
->length
;
1559 /* Called from main context */
1560 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1561 struct userdata
*u
= userdata
;
1567 pa_assert(u
->client
== sc
);
1569 pa_socket_client_unref(u
->client
);
1573 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1574 pa_module_unload_request(u
->module
);
1578 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1579 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1581 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1582 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1584 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1587 t
= pa_tagstruct_new(NULL
, 0);
1588 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1589 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1590 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1591 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
1597 if (pa_iochannel_creds_supported(io
))
1598 pa_iochannel_creds_enable(io
);
1600 ucred
.uid
= getuid();
1601 ucred
.gid
= getgid();
1603 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1606 pa_pstream_send_tagstruct(u
->pstream
, t
);
1609 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1611 pa_log_debug("Connection established, authenticating ...");
1616 /* Called from main context */
1617 static int sink_set_volume(pa_sink
*sink
) {
1626 t
= pa_tagstruct_new(NULL
, 0);
1627 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1628 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1629 pa_tagstruct_putu32(t
, u
->device_index
);
1630 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
1631 pa_pstream_send_tagstruct(u
->pstream
, t
);
1636 /* Called from main context */
1637 static int sink_set_mute(pa_sink
*sink
) {
1646 if (u
->version
< 11)
1649 t
= pa_tagstruct_new(NULL
, 0);
1650 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1651 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1652 pa_tagstruct_putu32(t
, u
->device_index
);
1653 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1654 pa_pstream_send_tagstruct(u
->pstream
, t
);
1661 /* Called from main context */
1662 static int load_key(struct userdata
*u
, const char*fn
) {
1665 u
->auth_cookie_in_property
= FALSE
;
1667 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1668 pa_log_debug("Using already loaded auth cookie.");
1669 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1670 u
->auth_cookie_in_property
= 1;
1675 fn
= PA_NATIVE_COOKIE_FILE
;
1677 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1680 pa_log_debug("Loading cookie from disk.");
1682 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1683 u
->auth_cookie_in_property
= TRUE
;
1688 int pa__init(pa_module
*m
) {
1689 pa_modargs
*ma
= NULL
;
1690 struct userdata
*u
= NULL
;
1695 pa_sink_new_data data
;
1697 pa_source_new_data data
;
1702 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1703 pa_log("Failed to parse module arguments");
1707 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1711 u
->pdispatch
= NULL
;
1713 u
->server_name
= NULL
;
1715 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1717 u
->requested_bytes
= 0;
1719 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1722 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
1724 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1725 u
->auth_cookie_in_property
= FALSE
;
1726 u
->time_event
= NULL
;
1727 u
->ignore_latency_before
= 0;
1728 u
->transport_usec
= 0;
1729 u
->transport_usec_valid
= FALSE
;
1730 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1731 u
->counter
= u
->counter_delta
= 0;
1733 u
->rtpoll
= pa_rtpoll_new();
1734 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1736 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1739 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1740 pa_log("No server specified.");
1744 ss
= m
->core
->default_sample_spec
;
1745 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1746 pa_log("Invalid sample format specification");
1750 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1751 pa_log("Failed to connect to server '%s'", u
->server_name
);
1755 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1759 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1760 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1762 pa_sink_new_data_init(&data
);
1763 data
.driver
= __FILE__
;
1765 data
.namereg_fail
= TRUE
;
1766 pa_sink_new_data_set_name(&data
, dn
);
1767 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1768 pa_sink_new_data_set_channel_map(&data
, &map
);
1769 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1770 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1772 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1774 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
);
1775 pa_sink_new_data_done(&data
);
1778 pa_log("Failed to create sink.");
1782 u
->sink
->parent
.process_msg
= sink_process_msg
;
1783 u
->sink
->userdata
= u
;
1784 u
->sink
->set_state
= sink_set_state
;
1785 u
->sink
->set_volume
= sink_set_volume
;
1786 u
->sink
->set_mute
= sink_set_mute
;
1788 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1790 pa_sink_set_latency_range(u
->sink
, MIN_NETWORK_LATENCY_USEC
, 0);
1792 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1793 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1797 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1798 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1800 pa_source_new_data_init(&data
);
1801 data
.driver
= __FILE__
;
1803 data
.namereg_fail
= TRUE
;
1804 pa_source_new_data_set_name(&data
, dn
);
1805 pa_source_new_data_set_sample_spec(&data
, &ss
);
1806 pa_source_new_data_set_channel_map(&data
, &map
);
1807 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1808 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1810 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1812 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1813 pa_source_new_data_done(&data
);
1816 pa_log("Failed to create source.");
1820 u
->source
->parent
.process_msg
= source_process_msg
;
1821 u
->source
->set_state
= source_set_state
;
1822 u
->source
->userdata
= u
;
1824 pa_source_set_latency_range(u
->source
, MIN_NETWORK_LATENCY_USEC
, 0);
1826 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1827 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1832 u
->time_event
= NULL
;
1836 u
->tlength
= u
->minreq
= u
->prebuf
= 0;
1841 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1843 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1844 pa_log("Failed to create thread.");
1849 pa_sink_put(u
->sink
);
1851 pa_source_put(u
->source
);
1854 pa_modargs_free(ma
);
1862 pa_modargs_free(ma
);
1869 void pa__done(pa_module
*m
) {
1874 if (!(u
= m
->userdata
))
1879 pa_sink_unlink(u
->sink
);
1882 pa_source_unlink(u
->source
);
1886 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1887 pa_thread_free(u
->thread
);
1890 pa_thread_mq_done(&u
->thread_mq
);
1894 pa_sink_unref(u
->sink
);
1897 pa_source_unref(u
->source
);
1901 pa_rtpoll_free(u
->rtpoll
);
1904 pa_pstream_unlink(u
->pstream
);
1905 pa_pstream_unref(u
->pstream
);
1909 pa_pdispatch_unref(u
->pdispatch
);
1912 pa_socket_client_unref(u
->client
);
1914 if (u
->auth_cookie_in_property
)
1915 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1918 pa_smoother_free(u
->smoother
);
1921 u
->core
->mainloop
->time_free(u
->time_event
);
1924 pa_xfree(u
->sink_name
);
1926 pa_xfree(u
->source_name
);
1928 pa_xfree(u
->server_name
);
1930 pa_xfree(u
->device_description
);
1931 pa_xfree(u
->server_fqdn
);
1932 pa_xfree(u
->user_name
);