2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/authkey-prop.h>
52 #include <pulsecore/time-smoother.h>
53 #include <pulsecore/thread.h>
54 #include <pulsecore/thread-mq.h>
55 #include <pulsecore/rtclock.h>
56 #include <pulsecore/core-error.h>
57 #include <pulsecore/proplist-util.h>
60 #include "module-tunnel-sink-symdef.h"
62 #include "module-tunnel-source-symdef.h"
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
69 "sink=<remote sink name> "
71 "format=<sample format> "
72 "channels=<number of channels> "
74 "sink_name=<name for the local sink> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 "source=<remote source name> "
82 "format=<sample format> "
83 "channels=<number of channels> "
85 "source_name=<name for the local source> "
86 "channel_map=<channel map>");
89 PA_MODULE_AUTHOR("Lennart Poettering");
90 PA_MODULE_VERSION(PACKAGE_VERSION
);
91 PA_MODULE_LOAD_ONCE(FALSE
);
93 static const char* const valid_modargs
[] = {
110 #define DEFAULT_TIMEOUT 5
112 #define LATENCY_INTERVAL 10
114 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
119 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
120 SINK_MESSAGE_REMOTE_SUSPEND
,
121 SINK_MESSAGE_UPDATE_LATENCY
,
125 #define DEFAULT_TLENGTH_MSEC 150
126 #define DEFAULT_MINREQ_MSEC 25
131 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
132 SOURCE_MESSAGE_REMOTE_SUSPEND
,
133 SOURCE_MESSAGE_UPDATE_LATENCY
136 #define DEFAULT_FRAGSIZE_MSEC 25
141 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
142 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
144 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
152 [PA_COMMAND_REQUEST
] = command_request
,
153 [PA_COMMAND_STARTED
] = command_started
,
155 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
156 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
157 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
158 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
159 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
160 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
161 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
162 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
163 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
170 pa_thread_mq thread_mq
;
174 pa_socket_client
*client
;
176 pa_pdispatch
*pdispatch
;
182 int32_t requested_bytes
;
188 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
192 uint32_t device_index
;
195 int64_t counter
, counter_delta
;
197 pa_bool_t remote_corked
:1;
198 pa_bool_t remote_suspended
:1;
200 pa_usec_t transport_usec
;
201 pa_bool_t transport_usec_valid
;
203 uint32_t ignore_latency_before
;
205 pa_time_event
*time_event
;
207 pa_bool_t auth_cookie_in_property
;
209 pa_smoother
*smoother
;
211 char *device_description
;
225 static void request_latency(struct userdata
*u
);
227 /* Called from main context */
228 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
229 struct userdata
*u
= userdata
;
234 pa_assert(u
->pdispatch
== pd
);
236 pa_log_warn("Stream killed");
237 pa_module_unload_request(u
->module
);
240 /* Called from main context */
241 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
242 struct userdata
*u
= userdata
;
247 pa_assert(u
->pdispatch
== pd
);
249 pa_log_info("Server signalled buffer overrun/underrun.");
253 /* Called from main context */
254 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
255 struct userdata
*u
= userdata
;
262 pa_assert(u
->pdispatch
== pd
);
264 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
265 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
266 !pa_tagstruct_eof(t
)) {
267 pa_log("Invalid packet");
268 pa_module_unload_request(u
->module
);
273 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
275 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
281 /* Called from main context */
282 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
283 struct userdata
*u
= userdata
;
288 pa_assert(u
->pdispatch
== pd
);
290 pa_log_debug("Server reports a stream move.");
296 /* Called from main context */
297 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
298 struct userdata
*u
= userdata
;
303 pa_assert(u
->pdispatch
== pd
);
305 pa_log_debug("Server reports playback started.");
311 /* Called from IO thread context */
312 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
316 if (u
->remote_corked
== cork
)
319 u
->remote_corked
= cork
;
320 x
= pa_rtclock_usec();
322 /* Correct by the time this needs to travel to the other side.
323 * This is a valid thread-safe access, because the main thread is
325 if (u
->transport_usec_valid
)
326 x
+= u
->transport_usec
;
328 if (u
->remote_suspended
|| u
->remote_corked
)
329 pa_smoother_pause(u
->smoother
, x
);
331 pa_smoother_resume(u
->smoother
, x
);
334 /* Called from main context */
335 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
342 t
= pa_tagstruct_new(NULL
, 0);
344 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
346 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
348 pa_tagstruct_putu32(t
, u
->ctag
++);
349 pa_tagstruct_putu32(t
, u
->channel
);
350 pa_tagstruct_put_boolean(t
, !!cork
);
351 pa_pstream_send_tagstruct(u
->pstream
, t
);
356 /* Called from IO thread context */
357 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
361 if (u
->remote_suspended
== suspend
)
364 u
->remote_suspended
= suspend
;
366 x
= pa_rtclock_usec();
368 /* Correct by the time this needed to travel from the other side.
369 * This is a valid thread-safe access, because the main thread is
371 if (u
->transport_usec_valid
)
372 x
-= u
->transport_usec
;
374 if (u
->remote_suspended
|| u
->remote_corked
)
375 pa_smoother_pause(u
->smoother
, x
);
377 pa_smoother_resume(u
->smoother
, x
);
382 /* Called from IO thread context */
383 static void send_data(struct userdata
*u
) {
386 while (u
->requested_bytes
> 0) {
387 pa_memchunk memchunk
;
389 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
390 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
391 pa_memblock_unref(memchunk
.memblock
);
393 u
->requested_bytes
-= memchunk
.length
;
395 u
->counter
+= memchunk
.length
;
399 /* This function is called from IO context -- except when it is not. */
400 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
401 struct userdata
*u
= PA_SINK(o
)->userdata
;
405 case PA_SINK_MESSAGE_SET_STATE
: {
408 /* First, change the state, because otherwide pa_sink_render() would fail */
409 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
411 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
413 if (PA_SINK_IS_OPENED(u
->sink
->state
))
420 case PA_SINK_MESSAGE_GET_LATENCY
: {
421 pa_usec_t yl
, yr
, *usec
= data
;
423 yl
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
424 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
426 *usec
= yl
> yr
? yl
- yr
: 0;
430 case SINK_MESSAGE_REQUEST
:
432 pa_assert(offset
> 0);
433 u
->requested_bytes
+= (size_t) offset
;
435 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
441 case SINK_MESSAGE_REMOTE_SUSPEND
:
443 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
447 case SINK_MESSAGE_UPDATE_LATENCY
: {
450 y
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
452 if (y
> (pa_usec_t
) offset
|| offset
< 0)
457 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
462 case SINK_MESSAGE_POST
:
464 /* OK, This might be a bit confusing. This message is
465 * delivered to us from the main context -- NOT from the
466 * IO thread context where the rest of the messages are
467 * dispatched. Yeah, ugly, but I am a lazy bastard. */
469 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
471 u
->counter_delta
+= chunk
->length
;
476 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
479 /* Called from main context */
480 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
482 pa_sink_assert_ref(s
);
485 switch ((pa_sink_state_t
) state
) {
487 case PA_SINK_SUSPENDED
:
488 pa_assert(PA_SINK_IS_OPENED(s
->state
));
489 stream_cork(u
, TRUE
);
493 case PA_SINK_RUNNING
:
494 if (s
->state
== PA_SINK_SUSPENDED
)
495 stream_cork(u
, FALSE
);
498 case PA_SINK_UNLINKED
:
508 /* This function is called from IO context -- except when it is not. */
509 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
510 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
514 case PA_SINK_MESSAGE_SET_STATE
: {
517 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
518 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
523 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
524 pa_usec_t yr
, yl
, *usec
= data
;
526 yl
= pa_bytes_to_usec(u
->counter
, &PA_SINK(o
)->sample_spec
);
527 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
529 *usec
= yr
> yl
? yr
- yl
: 0;
533 case SOURCE_MESSAGE_POST
:
535 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
536 pa_source_post(u
->source
, chunk
);
538 u
->counter
+= chunk
->length
;
542 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
544 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
547 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
550 y
= pa_bytes_to_usec(u
->counter
, &u
->source
->sample_spec
);
552 if (offset
>= 0 || y
> (pa_usec_t
) -offset
)
557 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
563 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
566 /* Called from main context */
567 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
569 pa_source_assert_ref(s
);
572 switch ((pa_source_state_t
) state
) {
574 case PA_SOURCE_SUSPENDED
:
575 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
576 stream_cork(u
, TRUE
);
580 case PA_SOURCE_RUNNING
:
581 if (s
->state
== PA_SOURCE_SUSPENDED
)
582 stream_cork(u
, FALSE
);
585 case PA_SOURCE_UNLINKED
:
595 static void thread_func(void *userdata
) {
596 struct userdata
*u
= userdata
;
600 pa_log_debug("Thread starting up");
602 pa_thread_mq_install(&u
->thread_mq
);
603 pa_rtpoll_install(u
->rtpoll
);
608 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
616 /* If this was no regular exit from the loop we have to continue
617 * processing messages until we received PA_MESSAGE_SHUTDOWN */
618 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
619 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
622 pa_log_debug("Thread shutting down");
626 /* Called from main context */
627 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
628 struct userdata
*u
= userdata
;
629 uint32_t bytes
, channel
;
632 pa_assert(command
== PA_COMMAND_REQUEST
);
635 pa_assert(u
->pdispatch
== pd
);
637 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
638 pa_tagstruct_getu32(t
, &bytes
) < 0) {
639 pa_log("Invalid protocol reply");
643 if (channel
!= u
->channel
) {
644 pa_log("Recieved data for invalid channel");
648 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
652 pa_module_unload_request(u
->module
);
657 /* Called from main context */
658 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
659 struct userdata
*u
= userdata
;
660 pa_usec_t sink_usec
, source_usec
, transport_usec
;
662 int64_t write_index
, read_index
;
663 struct timeval local
, remote
, now
;
670 if (command
!= PA_COMMAND_REPLY
) {
671 if (command
== PA_COMMAND_ERROR
)
672 pa_log("Failed to get latency.");
674 pa_log("Protocol error.");
678 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
679 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
680 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
681 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
682 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
683 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
684 pa_tagstruct_gets64(t
, &read_index
) < 0) {
685 pa_log("Invalid reply.");
690 if (u
->version
>= 13) {
691 uint64_t underrun_for
= 0, playing_for
= 0;
693 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
694 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
695 pa_log("Invalid reply.");
701 if (!pa_tagstruct_eof(t
)) {
702 pa_log("Invalid reply.");
706 if (tag
< u
->ignore_latency_before
) {
711 pa_gettimeofday(&now
);
713 /* Calculate transport usec */
714 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
715 /* local and remote seem to have synchronized clocks */
717 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
719 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
722 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
723 u
->transport_usec_valid
= TRUE
;
725 /* First, take the device's delay */
727 delay
= (int64_t) sink_usec
;
728 ss
= &u
->sink
->sample_spec
;
730 delay
= (int64_t) source_usec
;
731 ss
= &u
->source
->sample_spec
;
734 /* Add the length of our server-side buffer */
735 if (write_index
>= read_index
)
736 delay
+= (int64_t) pa_bytes_to_usec(write_index
-read_index
, ss
);
738 delay
-= (int64_t) pa_bytes_to_usec(read_index
-write_index
, ss
);
740 /* Our measurements are already out of date, hence correct by the *
741 * transport latency */
743 delay
-= (int64_t) transport_usec
;
745 delay
+= (int64_t) transport_usec
;
748 /* Now correct by what we have have read/written since we requested the update */
750 delay
+= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
752 delay
-= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
756 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
758 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
765 pa_module_unload_request(u
->module
);
768 /* Called from main context */
769 static void request_latency(struct userdata
*u
) {
775 t
= pa_tagstruct_new(NULL
, 0);
777 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
779 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
781 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
782 pa_tagstruct_putu32(t
, u
->channel
);
784 pa_gettimeofday(&now
);
785 pa_tagstruct_put_timeval(t
, &now
);
787 pa_pstream_send_tagstruct(u
->pstream
, t
);
788 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
790 u
->ignore_latency_before
= tag
;
791 u
->counter_delta
= 0;
794 /* Called from main context */
795 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
796 struct userdata
*u
= userdata
;
805 pa_gettimeofday(&ntv
);
806 ntv
.tv_sec
+= LATENCY_INTERVAL
;
807 m
->time_restart(e
, &ntv
);
810 /* Called from main context */
811 static void update_description(struct userdata
*u
) {
813 char un
[128], hn
[128];
818 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
821 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
824 pa_sink_set_description(u
->sink
, d
);
825 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
826 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
827 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
829 pa_source_set_description(u
->source
, d
);
830 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
831 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
832 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
837 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
838 pa_get_user_name(un
, sizeof(un
)),
839 pa_get_host_name(hn
, sizeof(hn
)));
841 t
= pa_tagstruct_new(NULL
, 0);
843 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
845 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
847 pa_tagstruct_putu32(t
, u
->ctag
++);
848 pa_tagstruct_putu32(t
, u
->channel
);
849 pa_tagstruct_puts(t
, d
);
850 pa_pstream_send_tagstruct(u
->pstream
, t
);
855 /* Called from main context */
856 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
857 struct userdata
*u
= userdata
;
859 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
865 if (command
!= PA_COMMAND_REPLY
) {
866 if (command
== PA_COMMAND_ERROR
)
867 pa_log("Failed to get info.");
869 pa_log("Protocol error.");
873 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
874 pa_tagstruct_gets(t
, &server_version
) < 0 ||
875 pa_tagstruct_gets(t
, &user_name
) < 0 ||
876 pa_tagstruct_gets(t
, &host_name
) < 0 ||
877 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
878 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
879 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
880 pa_tagstruct_getu32(t
, &cookie
) < 0) {
882 pa_log("Parse failure");
886 if (!pa_tagstruct_eof(t
)) {
887 pa_log("Packet too long");
891 pa_xfree(u
->server_fqdn
);
892 u
->server_fqdn
= pa_xstrdup(host_name
);
894 pa_xfree(u
->user_name
);
895 u
->user_name
= pa_xstrdup(user_name
);
897 update_description(u
);
902 pa_module_unload_request(u
->module
);
907 /* Called from main context */
908 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
909 struct userdata
*u
= userdata
;
910 uint32_t idx
, owner_module
, monitor_source
, flags
;
911 const char *name
, *description
, *monitor_source_name
, *driver
;
922 pl
= pa_proplist_new();
924 if (command
!= PA_COMMAND_REPLY
) {
925 if (command
== PA_COMMAND_ERROR
)
926 pa_log("Failed to get info.");
928 pa_log("Protocol error.");
932 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
933 pa_tagstruct_gets(t
, &name
) < 0 ||
934 pa_tagstruct_gets(t
, &description
) < 0 ||
935 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
936 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
937 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
938 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
939 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
940 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
941 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
942 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
943 pa_tagstruct_gets(t
, &driver
) < 0 ||
944 pa_tagstruct_getu32(t
, &flags
) < 0) {
946 pa_log("Parse failure");
950 if (u
->version
>= 13) {
951 pa_usec_t configured_latency
;
953 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
954 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
956 pa_log("Parse failure");
961 if (!pa_tagstruct_eof(t
)) {
962 pa_log("Packet too long");
966 pa_proplist_free(pl
);
968 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
971 pa_xfree(u
->device_description
);
972 u
->device_description
= pa_xstrdup(description
);
974 update_description(u
);
979 pa_module_unload_request(u
->module
);
980 pa_proplist_free(pl
);
983 /* Called from main context */
984 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
985 struct userdata
*u
= userdata
;
986 uint32_t idx
, owner_module
, client
, sink
;
987 pa_usec_t buffer_usec
, sink_usec
;
988 const char *name
, *driver
, *resample_method
;
990 pa_sample_spec sample_spec
;
991 pa_channel_map channel_map
;
998 pl
= pa_proplist_new();
1000 if (command
!= PA_COMMAND_REPLY
) {
1001 if (command
== PA_COMMAND_ERROR
)
1002 pa_log("Failed to get info.");
1004 pa_log("Protocol error.");
1008 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1009 pa_tagstruct_gets(t
, &name
) < 0 ||
1010 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1011 pa_tagstruct_getu32(t
, &client
) < 0 ||
1012 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1013 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1014 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1015 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1016 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1017 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1018 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1019 pa_tagstruct_gets(t
, &driver
) < 0) {
1021 pa_log("Parse failure");
1025 if (u
->version
>= 11) {
1026 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1028 pa_log("Parse failure");
1033 if (u
->version
>= 13) {
1034 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1036 pa_log("Parse failure");
1041 if (!pa_tagstruct_eof(t
)) {
1042 pa_log("Packet too long");
1046 pa_proplist_free(pl
);
1048 if (idx
!= u
->device_index
)
1053 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1054 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
1057 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
1059 if (u
->version
>= 11)
1060 u
->sink
->muted
= !!mute
;
1062 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
1066 pa_module_unload_request(u
->module
);
1067 pa_proplist_free(pl
);
1072 /* Called from main context */
1073 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1074 struct userdata
*u
= userdata
;
1075 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1076 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1081 pa_usec_t latency
, configured_latency
;
1087 pl
= pa_proplist_new();
1089 if (command
!= PA_COMMAND_REPLY
) {
1090 if (command
== PA_COMMAND_ERROR
)
1091 pa_log("Failed to get info.");
1093 pa_log("Protocol error.");
1097 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1098 pa_tagstruct_gets(t
, &name
) < 0 ||
1099 pa_tagstruct_gets(t
, &description
) < 0 ||
1100 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1101 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1102 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1103 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1104 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1105 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1106 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1107 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1108 pa_tagstruct_gets(t
, &driver
) < 0 ||
1109 pa_tagstruct_getu32(t
, &flags
) < 0) {
1111 pa_log("Parse failure");
1115 if (u
->version
>= 13) {
1116 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1117 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1119 pa_log("Parse failure");
1124 if (!pa_tagstruct_eof(t
)) {
1125 pa_log("Packet too long");
1129 pa_proplist_free(pl
);
1131 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1134 pa_xfree(u
->device_description
);
1135 u
->device_description
= pa_xstrdup(description
);
1137 update_description(u
);
1142 pa_module_unload_request(u
->module
);
1143 pa_proplist_free(pl
);
1148 /* Called from main context */
1149 static void request_info(struct userdata
*u
) {
1154 t
= pa_tagstruct_new(NULL
, 0);
1155 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1156 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1157 pa_pstream_send_tagstruct(u
->pstream
, t
);
1158 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1161 t
= pa_tagstruct_new(NULL
, 0);
1162 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1163 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1164 pa_tagstruct_putu32(t
, u
->device_index
);
1165 pa_pstream_send_tagstruct(u
->pstream
, t
);
1166 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1169 t
= pa_tagstruct_new(NULL
, 0);
1170 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1171 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1172 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1173 pa_tagstruct_puts(t
, u
->sink_name
);
1174 pa_pstream_send_tagstruct(u
->pstream
, t
);
1175 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1178 if (u
->source_name
) {
1179 t
= pa_tagstruct_new(NULL
, 0);
1180 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1181 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1182 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1183 pa_tagstruct_puts(t
, u
->source_name
);
1184 pa_pstream_send_tagstruct(u
->pstream
, t
);
1185 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1190 /* Called from main context */
1191 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1192 struct userdata
*u
= userdata
;
1193 pa_subscription_event_type_t e
;
1199 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1201 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1202 pa_tagstruct_getu32(t
, &idx
) < 0) {
1203 pa_log("Invalid protocol reply");
1204 pa_module_unload_request(u
->module
);
1208 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1210 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1211 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1213 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1221 /* Called from main context */
1222 static void start_subscribe(struct userdata
*u
) {
1227 t
= pa_tagstruct_new(NULL
, 0);
1228 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1229 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1230 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1232 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1234 PA_SUBSCRIPTION_MASK_SOURCE
1238 pa_pstream_send_tagstruct(u
->pstream
, t
);
1241 /* Called from main context */
1242 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1243 struct userdata
*u
= userdata
;
1251 pa_assert(u
->pdispatch
== pd
);
1253 if (command
!= PA_COMMAND_REPLY
) {
1254 if (command
== PA_COMMAND_ERROR
)
1255 pa_log("Failed to create stream.");
1257 pa_log("Protocol error.");
1261 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1262 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1264 || pa_tagstruct_getu32(t
, &bytes
) < 0
1269 if (u
->version
>= 9) {
1271 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1272 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1273 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1274 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1277 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1278 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1283 if (u
->version
>= 12) {
1286 uint32_t device_index
;
1288 pa_bool_t suspended
;
1290 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1291 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1292 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1293 pa_tagstruct_gets(t
, &dn
) < 0 ||
1294 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1298 pa_xfree(u
->sink_name
);
1299 u
->sink_name
= pa_xstrdup(dn
);
1301 pa_xfree(u
->source_name
);
1302 u
->source_name
= pa_xstrdup(dn
);
1306 if (u
->version
>= 13) {
1309 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1313 pa_sink_set_latency_range(u
->sink
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1315 pa_source_set_latency_range(u
->source
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1319 if (!pa_tagstruct_eof(t
))
1325 pa_assert(!u
->time_event
);
1326 pa_gettimeofday(&ntv
);
1327 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1328 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1332 pa_log_debug("Stream created.");
1335 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1341 pa_log("Invalid reply. (Create stream)");
1344 pa_module_unload_request(u
->module
);
1348 /* Called from main context */
1349 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1350 struct userdata
*u
= userdata
;
1351 pa_tagstruct
*reply
;
1352 char name
[256], un
[128], hn
[128];
1359 pa_assert(u
->pdispatch
== pd
);
1361 if (command
!= PA_COMMAND_REPLY
||
1362 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1363 !pa_tagstruct_eof(t
)) {
1365 if (command
== PA_COMMAND_ERROR
)
1366 pa_log("Failed to authenticate");
1368 pa_log("Protocol error.");
1373 /* Minimum supported protocol version */
1374 if (u
->version
< 8) {
1375 pa_log("Incompatible protocol version");
1379 /* Starting with protocol version 13 the MSB of the version tag
1380 reflects if shm is enabled for this connection or not. We don't
1381 support SHM here at all, so we just ignore this. */
1383 if (u
->version
>= 13)
1384 u
->version
&= 0x7FFFFFFFU
;
1386 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1389 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1391 pa_get_user_name(un
, sizeof(un
)),
1392 pa_get_host_name(hn
, sizeof(hn
)));
1394 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1396 pa_get_user_name(un
, sizeof(un
)),
1397 pa_get_host_name(hn
, sizeof(hn
)));
1400 reply
= pa_tagstruct_new(NULL
, 0);
1401 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1402 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1404 if (u
->version
>= 13) {
1406 pl
= pa_proplist_new();
1407 pa_init_proplist(pl
);
1408 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1409 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1410 pa_tagstruct_put_proplist(reply
, pl
);
1411 pa_proplist_free(pl
);
1413 pa_tagstruct_puts(reply
, "PulseAudio");
1415 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1416 /* We ignore the server's reply here */
1418 reply
= pa_tagstruct_new(NULL
, 0);
1420 if (u
->version
< 13)
1421 /* Only for older PA versions we need to fill in the maxlength */
1422 u
->maxlength
= 4*1024*1024;
1425 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1426 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1427 u
->prebuf
= u
->tlength
;
1429 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1433 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1434 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1436 if (u
->version
< 13)
1437 pa_tagstruct_puts(reply
, name
);
1439 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1440 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1441 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1442 pa_tagstruct_puts(reply
, u
->sink_name
);
1443 pa_tagstruct_putu32(reply
, u
->maxlength
);
1444 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1445 pa_tagstruct_putu32(reply
, u
->tlength
);
1446 pa_tagstruct_putu32(reply
, u
->prebuf
);
1447 pa_tagstruct_putu32(reply
, u
->minreq
);
1448 pa_tagstruct_putu32(reply
, 0);
1449 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1450 pa_tagstruct_put_cvolume(reply
, &volume
);
1452 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1453 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1455 if (u
->version
< 13)
1456 pa_tagstruct_puts(reply
, name
);
1458 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1459 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1460 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1461 pa_tagstruct_puts(reply
, u
->source_name
);
1462 pa_tagstruct_putu32(reply
, u
->maxlength
);
1463 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1464 pa_tagstruct_putu32(reply
, u
->fragsize
);
1467 if (u
->version
>= 12) {
1468 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1469 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1470 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1471 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1472 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1473 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1474 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1477 if (u
->version
>= 13) {
1480 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1481 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1483 pl
= pa_proplist_new();
1484 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1485 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1486 pa_tagstruct_put_proplist(reply
, pl
);
1487 pa_proplist_free(pl
);
1490 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1494 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1495 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1497 pa_log_debug("Connection authenticated, creating stream ...");
1502 pa_module_unload_request(u
->module
);
1505 /* Called from main context */
1506 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1507 struct userdata
*u
= userdata
;
1512 pa_log_warn("Stream died.");
1513 pa_module_unload_request(u
->module
);
1516 /* Called from main context */
1517 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1518 struct userdata
*u
= userdata
;
1524 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1525 pa_log("Invalid packet");
1526 pa_module_unload_request(u
->module
);
1532 /* Called from main context */
1533 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1534 struct userdata
*u
= userdata
;
1540 if (channel
!= u
->channel
) {
1541 pa_log("Recieved memory block on bad channel.");
1542 pa_module_unload_request(u
->module
);
1546 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1548 u
->counter_delta
+= chunk
->length
;
1553 /* Called from main context */
1554 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1555 struct userdata
*u
= userdata
;
1561 pa_assert(u
->client
== sc
);
1563 pa_socket_client_unref(u
->client
);
1567 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1568 pa_module_unload_request(u
->module
);
1572 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1573 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1575 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1576 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1578 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1581 t
= pa_tagstruct_new(NULL
, 0);
1582 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1583 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1584 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1585 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
1591 if (pa_iochannel_creds_supported(io
))
1592 pa_iochannel_creds_enable(io
);
1594 ucred
.uid
= getuid();
1595 ucred
.gid
= getgid();
1597 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1600 pa_pstream_send_tagstruct(u
->pstream
, t
);
1603 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1605 pa_log_debug("Connection established, authenticating ...");
1610 /* Called from main context */
1611 static int sink_set_volume(pa_sink
*sink
) {
1620 t
= pa_tagstruct_new(NULL
, 0);
1621 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1622 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1623 pa_tagstruct_putu32(t
, u
->device_index
);
1624 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
1625 pa_pstream_send_tagstruct(u
->pstream
, t
);
1630 /* Called from main context */
1631 static int sink_set_mute(pa_sink
*sink
) {
1640 if (u
->version
< 11)
1643 t
= pa_tagstruct_new(NULL
, 0);
1644 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1645 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1646 pa_tagstruct_putu32(t
, u
->device_index
);
1647 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1648 pa_pstream_send_tagstruct(u
->pstream
, t
);
1655 /* Called from main context */
1656 static int load_key(struct userdata
*u
, const char*fn
) {
1659 u
->auth_cookie_in_property
= FALSE
;
1661 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1662 pa_log_debug("Using already loaded auth cookie.");
1663 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1664 u
->auth_cookie_in_property
= 1;
1669 fn
= PA_NATIVE_COOKIE_FILE
;
1671 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1674 pa_log_debug("Loading cookie from disk.");
1676 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1677 u
->auth_cookie_in_property
= TRUE
;
1682 int pa__init(pa_module
*m
) {
1683 pa_modargs
*ma
= NULL
;
1684 struct userdata
*u
= NULL
;
1689 pa_sink_new_data data
;
1691 pa_source_new_data data
;
1696 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1697 pa_log("Failed to parse module arguments");
1701 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1705 u
->pdispatch
= NULL
;
1707 u
->server_name
= NULL
;
1709 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1711 u
->requested_bytes
= 0;
1713 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1716 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
1718 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1719 u
->auth_cookie_in_property
= FALSE
;
1720 u
->time_event
= NULL
;
1721 u
->ignore_latency_before
= 0;
1722 u
->transport_usec
= 0;
1723 u
->transport_usec_valid
= FALSE
;
1724 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1725 u
->counter
= u
->counter_delta
= 0;
1727 u
->rtpoll
= pa_rtpoll_new();
1728 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1730 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1733 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1734 pa_log("No server specified.");
1738 ss
= m
->core
->default_sample_spec
;
1739 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1740 pa_log("Invalid sample format specification");
1744 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1745 pa_log("Failed to connect to server '%s'", u
->server_name
);
1749 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1753 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1754 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1756 pa_sink_new_data_init(&data
);
1757 data
.driver
= __FILE__
;
1759 data
.namereg_fail
= TRUE
;
1760 pa_sink_new_data_set_name(&data
, dn
);
1761 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1762 pa_sink_new_data_set_channel_map(&data
, &map
);
1763 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1764 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1766 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1768 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
);
1769 pa_sink_new_data_done(&data
);
1772 pa_log("Failed to create sink.");
1776 u
->sink
->parent
.process_msg
= sink_process_msg
;
1777 u
->sink
->userdata
= u
;
1778 u
->sink
->set_state
= sink_set_state
;
1779 u
->sink
->set_volume
= sink_set_volume
;
1780 u
->sink
->set_mute
= sink_set_mute
;
1782 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1784 pa_sink_set_latency_range(u
->sink
, MIN_NETWORK_LATENCY_USEC
, 0);
1786 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1787 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1791 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1792 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1794 pa_source_new_data_init(&data
);
1795 data
.driver
= __FILE__
;
1797 data
.namereg_fail
= TRUE
;
1798 pa_source_new_data_set_name(&data
, dn
);
1799 pa_source_new_data_set_sample_spec(&data
, &ss
);
1800 pa_source_new_data_set_channel_map(&data
, &map
);
1801 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1802 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1804 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1806 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1807 pa_source_new_data_done(&data
);
1810 pa_log("Failed to create source.");
1814 u
->source
->parent
.process_msg
= source_process_msg
;
1815 u
->source
->set_state
= source_set_state
;
1816 u
->source
->userdata
= u
;
1818 pa_source_set_latency_range(u
->source
, MIN_NETWORK_LATENCY_USEC
, 0);
1820 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1821 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1826 u
->time_event
= NULL
;
1830 u
->tlength
= u
->minreq
= u
->prebuf
= 0;
1835 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1837 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1838 pa_log("Failed to create thread.");
1843 pa_sink_put(u
->sink
);
1845 pa_source_put(u
->source
);
1848 pa_modargs_free(ma
);
1856 pa_modargs_free(ma
);
1863 void pa__done(pa_module
*m
) {
1868 if (!(u
= m
->userdata
))
1873 pa_sink_unlink(u
->sink
);
1876 pa_source_unlink(u
->source
);
1880 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1881 pa_thread_free(u
->thread
);
1884 pa_thread_mq_done(&u
->thread_mq
);
1888 pa_sink_unref(u
->sink
);
1891 pa_source_unref(u
->source
);
1895 pa_rtpoll_free(u
->rtpoll
);
1898 pa_pstream_unlink(u
->pstream
);
1899 pa_pstream_unref(u
->pstream
);
1903 pa_pdispatch_unref(u
->pdispatch
);
1906 pa_socket_client_unref(u
->client
);
1908 if (u
->auth_cookie_in_property
)
1909 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1912 pa_smoother_free(u
->smoother
);
1915 u
->core
->mainloop
->time_free(u
->time_event
);
1918 pa_xfree(u
->sink_name
);
1920 pa_xfree(u
->source_name
);
1922 pa_xfree(u
->server_name
);
1924 pa_xfree(u
->device_description
);
1925 pa_xfree(u
->server_fqdn
);
1926 pa_xfree(u
->user_name
);