X-Git-Url: https://code.delx.au/pulseaudio/blobdiff_plain/6f59ae1763ee48f27448a7de9d635f61886052e1..fa499dad06ba6558111cdef64c18f2401e803cff:/polyp/module-tunnel.c diff --git a/polyp/module-tunnel.c b/polyp/module-tunnel.c index 0c78b138..368ae422 100644 --- a/polyp/module-tunnel.c +++ b/polyp/module-tunnel.c @@ -4,7 +4,7 @@ This file is part of polypaudio. polypaudio is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published + it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. @@ -13,7 +13,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - You should have received a copy of the GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with polypaudio; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. @@ -45,38 +45,60 @@ #include "authkey.h" #include "socket-client.h" #include "socket-util.h" +#include "authkey-prop.h" + +#ifdef TUNNEL_SINK +#include "module-tunnel-sink-symdef.h" +PA_MODULE_DESCRIPTION("Tunnel module for sinks") +PA_MODULE_USAGE("server=
sink= cookie= format= channels= rate= sink_name=") +#else +#include "module-tunnel-source-symdef.h" +PA_MODULE_DESCRIPTION("Tunnel module for sources") +PA_MODULE_USAGE("server=
source= cookie= format= channels= rate= source_name=") +#endif PA_MODULE_AUTHOR("Lennart Poettering") -PA_MODULE_DESCRIPTION("Tunnel module") -PA_MODULE_USAGE("server= sink= cookie= format= channels= rate= sink_name=") PA_MODULE_VERSION(PACKAGE_VERSION) #define DEFAULT_SINK_NAME "tunnel" +#define DEFAULT_SOURCE_NAME "tunnel" -#define DEFAULT_TLENGTH (10240*8) +#define DEFAULT_TLENGTH (44100*2*2/10) //(10240*8) #define DEFAULT_MAXLENGTH ((DEFAULT_TLENGTH*3)/2) -#define DEFAULT_PREBUF DEFAULT_TLENGTH #define DEFAULT_MINREQ 512 +#define DEFAULT_PREBUF (DEFAULT_TLENGTH-DEFAULT_MINREQ) #define DEFAULT_FRAGSIZE 1024 #define DEFAULT_TIMEOUT 5 +#define LATENCY_INTERVAL 10 + static const char* const valid_modargs[] = { "server", - "sink", "cookie", "format", "channels", "rate", +#ifdef TUNNEL_SINK "sink_name", + "sink", +#else + "source_name", + "source", +#endif NULL, }; static void command_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); + +#ifdef TUNNEL_SINK static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +#endif static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { +#ifdef TUNNEL_SINK [PA_COMMAND_REQUEST] = { command_request }, +#endif [PA_COMMAND_PLAYBACK_STREAM_KILLED] = { command_stream_killed }, [PA_COMMAND_RECORD_STREAM_KILLED] = { command_stream_killed }, }; @@ -86,9 +108,16 @@ struct userdata { struct pa_pstream *pstream; struct pa_pdispatch *pdispatch; - char *server_name, *sink_name; - + char *server_name; +#ifdef TUNNEL_SINK + char *sink_name; struct pa_sink *sink; + uint32_t requested_bytes; +#else + char *source_name; + struct pa_source *source; +#endif + struct pa_module *module; struct pa_core *core; @@ -96,12 +125,18 @@ struct userdata { uint32_t ctag; uint32_t device_index; - uint32_t requested_bytes; uint32_t channel; -}; + + pa_usec_t host_latency; + struct pa_time_event *time_event; + + int auth_cookie_in_property; +}; static void close_stuff(struct userdata *u) { + assert(u); + if (u->pstream) { pa_pstream_close(u->pstream); pa_pstream_unref(u->pstream); @@ -118,11 +153,24 @@ static void close_stuff(struct userdata *u) { u->client = NULL; } +#ifdef TUNNEL_SINK if (u->sink) { pa_sink_disconnect(u->sink); pa_sink_unref(u->sink); u->sink = NULL; } +#else + if (u->source) { + pa_source_disconnect(u->source); + pa_source_unref(u->source); + u->source = NULL; + } +#endif + + if (u->time_event) { + u->core->mainloop->time_free(u->time_event); + u->time_event = NULL; + } } static void die(struct userdata *u) { @@ -131,16 +179,41 @@ static void die(struct userdata *u) { pa_module_unload_request(u->module); } -static void request_bytes(struct userdata *u) { +static void command_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct userdata *u = userdata; + assert(pd && t && u && u->pdispatch == pd); + + pa_log(__FILE__": stream killed\n"); + die(u); +} + +#ifdef TUNNEL_SINK +static void send_prebuf_request(struct userdata *u) { + struct pa_tagstruct *t; + + t = pa_tagstruct_new(NULL, 0); + pa_tagstruct_putu32(t, PA_COMMAND_PREBUF_PLAYBACK_STREAM); + pa_tagstruct_putu32(t, u->ctag++); + pa_tagstruct_putu32(t, u->channel); + pa_pstream_send_tagstruct(u->pstream, t); +} + +static void send_bytes(struct userdata *u) { assert(u); if (!u->pstream) return; - + while (u->requested_bytes > 0) { struct pa_memchunk chunk; - if (pa_sink_render(u->sink, u->requested_bytes, &chunk) < 0) + if (pa_sink_render(u->sink, u->requested_bytes, &chunk) < 0) { + + + if (u->requested_bytes >= DEFAULT_TLENGTH-DEFAULT_PREBUF) + send_prebuf_request(u); + return; + } pa_pstream_send_memblock(u->pstream, u->channel, 0, &chunk); pa_memblock_unref(chunk.memblock); @@ -152,14 +225,6 @@ static void request_bytes(struct userdata *u) { } } -static void command_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { - struct userdata *u = userdata; - assert(pd && t && u && u->pdispatch == pd); - - pa_log(__FILE__": stream killed\n"); - die(u); -} - static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t bytes, channel; @@ -180,9 +245,87 @@ static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t } u->requested_bytes += bytes; - request_bytes(u); + send_bytes(u); +} + +#endif + +static void stream_get_latency_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct userdata *u = userdata; + pa_usec_t buffer_usec, sink_usec, source_usec, transport_usec; + int playing; + uint32_t queue_length; + struct timeval local, remote, now; + assert(pd && u && t); + + if (command != PA_COMMAND_REPLY) { + if (command == PA_COMMAND_ERROR) + pa_log(__FILE__": failed to get latency.\n"); + else + pa_log(__FILE__": protocol error.\n"); + die(u); + return; + } + + if (pa_tagstruct_get_usec(t, &buffer_usec) < 0 || + pa_tagstruct_get_usec(t, &sink_usec) < 0 || + pa_tagstruct_get_usec(t, &source_usec) < 0 || + pa_tagstruct_get_boolean(t, &playing) < 0 || + pa_tagstruct_getu32(t, &queue_length) < 0 || + pa_tagstruct_get_timeval(t, &local) < 0 || + pa_tagstruct_get_timeval(t, &remote) < 0 || + !pa_tagstruct_eof(t)) { + pa_log(__FILE__": invalid reply.\n"); + die(u); + return; + } + + gettimeofday(&now, NULL); + + if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) { + /* local and remote seem to have synchronized clocks */ +#ifdef TUNNEL_SINK + transport_usec = pa_timeval_diff(&remote, &local); +#else + transport_usec = pa_timeval_diff(&now, &remote); +#endif + } else + transport_usec = pa_timeval_diff(&now, &local)/2; + +#ifdef TUNNEL_SINK + u->host_latency = sink_usec + transport_usec; +#else + u->host_latency = source_usec + transport_usec; + if (u->host_latency > sink_usec) + u->host_latency -= sink_usec; + else + u->host_latency = 0; +#endif + +/* pa_log(__FILE__": estimated host latency: %0.0f usec\n", (double) u->host_latency); */ } +static void request_latency(struct userdata *u) { + struct pa_tagstruct *t; + struct timeval now; + uint32_t tag; + assert(u); + + t = pa_tagstruct_new(NULL, 0); +#ifdef TUNNEL_SINK + pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY); +#else + pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY); +#endif + pa_tagstruct_putu32(t, tag = u->ctag++); + pa_tagstruct_putu32(t, u->channel); + + gettimeofday(&now, NULL); + pa_tagstruct_put_timeval(t, &now); + + pa_pstream_send_tagstruct(u->pstream, t); + pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u); +} static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; @@ -199,14 +342,19 @@ static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, ui if (pa_tagstruct_getu32(t, &u->channel) < 0 || pa_tagstruct_getu32(t, &u->device_index) < 0 || +#ifdef TUNNEL_SINK pa_tagstruct_getu32(t, &u->requested_bytes) < 0 || +#endif !pa_tagstruct_eof(t)) { pa_log(__FILE__": invalid reply.\n"); die(u); return; } - request_bytes(u); + request_latency(u); +#ifdef TUNNEL_SINK + send_bytes(u); +#endif } static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { @@ -223,11 +371,17 @@ static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, u die(u); return; } - +#ifdef TUNNEL_SINK snprintf(name, sizeof(name), "Tunnel from host '%s', user '%s', sink '%s'", pa_get_host_name(hn, sizeof(hn)), pa_get_user_name(un, sizeof(un)), u->sink->name); +#else + snprintf(name, sizeof(name), "Tunnel from host '%s', user '%s', source '%s'", + pa_get_host_name(hn, sizeof(hn)), + pa_get_user_name(un, sizeof(un)), + u->source->name); +#endif reply = pa_tagstruct_new(NULL, 0); pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME); @@ -237,6 +391,7 @@ static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, u /* We ignore the server's reply here */ reply = pa_tagstruct_new(NULL, 0); +#ifdef TUNNEL_SINK pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM); pa_tagstruct_putu32(reply, tag = u->ctag++); pa_tagstruct_puts(reply, name); @@ -249,6 +404,17 @@ static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, u pa_tagstruct_putu32(reply, DEFAULT_PREBUF); pa_tagstruct_putu32(reply, DEFAULT_MINREQ); pa_tagstruct_putu32(reply, PA_VOLUME_NORM); +#else + pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM); + pa_tagstruct_putu32(reply, tag = u->ctag++); + pa_tagstruct_puts(reply, name); + pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec); + pa_tagstruct_putu32(reply, PA_INVALID_INDEX); + pa_tagstruct_puts(reply, u->source_name); + pa_tagstruct_putu32(reply, DEFAULT_MAXLENGTH); + pa_tagstruct_put_boolean(reply, 0); + pa_tagstruct_putu32(reply, DEFAULT_FRAGSIZE); +#endif pa_pstream_send_tagstruct(u->pstream, reply); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u); @@ -273,11 +439,26 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack } } +#ifndef TUNNEL_SINK +static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) { + struct userdata *u = userdata; + assert(p && chunk && u); + + if (channel != u->channel) { + pa_log(__FILE__": recieved memory block on bad channel.\n"); + die(u); + return; + } + + pa_source_post(u->source, chunk); +} +#endif + static void on_connection(struct pa_socket_client *sc, struct pa_iochannel *io, void *userdata) { struct userdata *u = userdata; struct pa_tagstruct *t; uint32_t tag; - assert(sc && io && u && u->client == sc); + assert(sc && u && u->client == sc); pa_socket_client_unref(u->client); u->client = NULL; @@ -293,6 +474,9 @@ static void on_connection(struct pa_socket_client *sc, struct pa_iochannel *io, pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u); pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u); +#ifndef TUNNEL_SINK + pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u); +#endif t = pa_tagstruct_new(NULL, 0); pa_tagstruct_putu32(t, PA_COMMAND_AUTH); @@ -303,19 +487,78 @@ static void on_connection(struct pa_socket_client *sc, struct pa_iochannel *io, } +#ifdef TUNNEL_SINK static void sink_notify(struct pa_sink*sink) { struct userdata *u; assert(sink && sink->userdata); u = sink->userdata; - request_bytes(u); + send_bytes(u); } static pa_usec_t sink_get_latency(struct pa_sink *sink) { struct userdata *u; + uint32_t l; + pa_usec_t usec = 0; assert(sink && sink->userdata); u = sink->userdata; + l = DEFAULT_TLENGTH; + + if (l > u->requested_bytes) { + l -= u->requested_bytes; + usec += pa_bytes_to_usec(l, &u->sink->sample_spec); + } + + usec += u->host_latency; + + return usec; +} +#else +static pa_usec_t source_get_latency(struct pa_source *source) { + struct userdata *u; + assert(source && source->userdata); + u = source->userdata; + + return u->host_latency; +} +#endif + +static void timeout_callback(struct pa_mainloop_api *m, struct pa_time_event*e, const struct timeval *tv, void *userdata) { + struct userdata *u = userdata; + struct timeval ntv; + assert(m && e && u); + + request_latency(u); + + gettimeofday(&ntv, NULL); + ntv.tv_sec += LATENCY_INTERVAL; + m->time_restart(e, &ntv); +} + +static int load_key(struct userdata *u, const char*fn) { + assert(u); + + u->auth_cookie_in_property = 0; + + if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) { + pa_log(__FILE__": using already loaded auth cookie.\n"); + pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME); + u->auth_cookie_in_property = 1; + return 0; + } + + if (!fn) + fn = PA_NATIVE_COOKIE_FILE; + + if (pa_authkey_load_from_home(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0) + return -1; + + pa_log(__FILE__": loading cookie from disk.\n"); + + if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) + u->auth_cookie_in_property = 1; + return 0; } @@ -323,11 +566,12 @@ int pa__init(struct pa_core *c, struct pa_module*m) { struct pa_modargs *ma = NULL; struct userdata *u = NULL; struct pa_sample_spec ss; + struct timeval ntv; assert(c && m); if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log(__FILE__": failed to parse module arguments\n"); - + goto fail; } u = pa_xmalloc(sizeof(struct userdata)); @@ -337,24 +581,29 @@ int pa__init(struct pa_core *c, struct pa_module*m) { u->client = NULL; u->pdispatch = NULL; u->pstream = NULL; - u->server_name = u->sink_name = NULL; + u->server_name = NULL; +#ifdef TUNNEL_SINK + u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));; u->sink = NULL; + u->requested_bytes = 0; +#else + u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));; + u->source = NULL; +#endif u->ctag = 1; u->device_index = u->channel = PA_INVALID_INDEX; - u->requested_bytes = 0; - - if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), u->auth_cookie, sizeof(u->auth_cookie)) < 0) { - pa_log(__FILE__": failed to load cookie.\n"); + u->host_latency = 0; + u->auth_cookie_in_property = 0; + u->time_event = NULL; + + if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0) goto fail; - } if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) { pa_log(__FILE__": no server specified.\n"); goto fail; } - u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); - ss = c->default_sample_spec; if (pa_modargs_get_sample_spec(ma, &ss) < 0) { pa_log(__FILE__": invalid sample format specification\n"); @@ -380,7 +629,8 @@ int pa__init(struct pa_core *c, struct pa_module*m) { goto fail; pa_socket_client_set_callback(u->client, on_connection, u); - + +#ifdef TUNNEL_SINK if (!(u->sink = pa_sink_new(c, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss))) { pa_log(__FILE__": failed to create sink.\n"); goto fail; @@ -392,6 +642,22 @@ int pa__init(struct pa_core *c, struct pa_module*m) { u->sink->description = pa_sprintf_malloc("Tunnel to '%s%s%s'", u->sink_name ? u->sink_name : "", u->sink_name ? "@" : "", u->server_name); pa_sink_set_owner(u->sink, m); +#else + if (!(u->source = pa_source_new(c, pa_modargs_get_value(ma, "source_name", DEFAULT_SOURCE_NAME), 0, &ss))) { + pa_log(__FILE__": failed to create source.\n"); + goto fail; + } + + u->source->get_latency = source_get_latency; + u->source->userdata = u; + u->source->description = pa_sprintf_malloc("Tunnel to '%s%s%s'", u->source_name ? u->source_name : "", u->source_name ? "@" : "", u->server_name); + + pa_source_set_owner(u->source, m); +#endif + + gettimeofday(&ntv, NULL); + ntv.tv_sec += LATENCY_INTERVAL; + u->time_event = c->mainloop->time_new(c->mainloop, &ntv, timeout_callback, u); pa_modargs_free(ma); @@ -414,7 +680,14 @@ void pa__done(struct pa_core *c, struct pa_module*m) { close_stuff(u); + if (u->auth_cookie_in_property) + pa_authkey_prop_unref(c, PA_NATIVE_COOKIE_PROPERTY_NAME); + +#ifdef TUNNEL_SINK pa_xfree(u->sink_name); +#else + pa_xfree(u->source_name); +#endif pa_xfree(u->server_name); pa_xfree(u);