From: Lennart Poettering Date: Sat, 15 Mar 2008 15:19:40 +0000 (+0000) Subject: commit glitch-free work X-Git-Url: https://code.delx.au/pulseaudio/commitdiff_plain/12c01e942d23bd477e14b467e66352e6ce0557a9 commit glitch-free work git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/glitch-free@2122 fefdeb5f-60dc-0310-8127-8f9354f1896f --- diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h index 5d3867ba..575f760f 100644 --- a/src/pulsecore/asyncmsgq.h +++ b/src/pulsecore/asyncmsgq.h @@ -56,6 +56,7 @@ typedef struct pa_asyncmsgq pa_asyncmsgq; pa_asyncmsgq* pa_asyncmsgq_new(unsigned size); pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q); + void pa_asyncmsgq_unref(pa_asyncmsgq* q); void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb); diff --git a/src/pulsecore/cli-command.c b/src/pulsecore/cli-command.c index 3110a271..73b6619c 100644 --- a/src/pulsecore/cli-command.c +++ b/src/pulsecore/cli-command.c @@ -779,6 +779,7 @@ static int pa_cli_command_scache_list(pa_core *c, pa_tokenizer *t, pa_strbuf *bu static int pa_cli_command_scache_play(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, pa_bool_t *fail) { const char *n, *sink_name; pa_sink *sink; + uint32_t idx; pa_core_assert_ref(c); pa_assert(t); @@ -795,11 +796,13 @@ static int pa_cli_command_scache_play(pa_core *c, pa_tokenizer *t, pa_strbuf *bu return -1; } - if (pa_scache_play_item(c, n, sink, PA_VOLUME_NORM) < 0) { + if (pa_scache_play_item(c, n, sink, PA_VOLUME_NORM, NULL, &idx) < 0) { pa_strbuf_puts(buf, "Failed to play sample.\n"); return -1; } + pa_strbuf_printf(buf, "Playing on sink input #%i\n", idx); + return 0; } diff --git a/src/pulsecore/cli-text.c b/src/pulsecore/cli-text.c index b64cafe2..8bb567b7 100644 --- a/src/pulsecore/cli-text.c +++ b/src/pulsecore/cli-text.c @@ -41,6 +41,7 @@ #include #include #include +#include #include "cli-text.h" @@ -78,10 +79,20 @@ char *pa_client_list_to_string(pa_core *c) { pa_strbuf_printf(s, "%u client(s) logged in.\n", pa_idxset_size(c->clients)); for (client = pa_idxset_first(c->clients, &idx); client; client = pa_idxset_next(c->clients, &idx)) { - pa_strbuf_printf(s, " index: %u\n\tname: <%s>\n\tdriver: <%s>\n", client->index, client->name, client->driver); + char *t; + pa_strbuf_printf( + s, + " index: %u\n" + "\tdriver: <%s>\n", + client->index, + client->driver); - if (client->owner) - pa_strbuf_printf(s, "\towner module: <%u>\n", client->owner->index); + if (client->module) + pa_strbuf_printf(s, "\towner module: <%u>\n", client->module->index); + + t = pa_proplist_to_string(client->proplist); + pa_strbuf_printf(s, "\tproperties:\n%s", t); + pa_xfree(t); } return pa_strbuf_tostring_free(s); @@ -92,6 +103,7 @@ char *pa_sink_list_to_string(pa_core *c) { pa_sink *sink; uint32_t idx = PA_IDXSET_INVALID; static const char* const state_table[] = { + [PA_SINK_INIT] = "INIT", [PA_SINK_RUNNING] = "RUNNING", [PA_SINK_SUSPENDED] = "SUSPENDED", [PA_SINK_IDLE] = "IDLE", @@ -104,14 +116,14 @@ char *pa_sink_list_to_string(pa_core *c) { pa_strbuf_printf(s, "%u sink(s) available.\n", pa_idxset_size(c->sinks)); for (sink = pa_idxset_first(c->sinks, &idx); sink; sink = pa_idxset_next(c->sinks, &idx)) { - char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cv[PA_CVOLUME_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; + char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cv[PA_CVOLUME_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX], *t; pa_strbuf_printf( s, " %c index: %u\n" "\tname: <%s>\n" "\tdriver: <%s>\n" - "\tflags: %s%s%s%s\n" + "\tflags: %s%s%s%s%s%s\n" "\tstate: %s\n" "\tvolume: <%s>\n" "\tmute: <%i>\n" @@ -125,7 +137,9 @@ char *pa_sink_list_to_string(pa_core *c) { sink->index, sink->name, sink->driver, + sink->flags & PA_SINK_HW_MUTE_CTRL ? "HW_MUTE_CTRL " : "", sink->flags & PA_SINK_HW_VOLUME_CTRL ? "HW_VOLUME_CTRL " : "", + sink->flags & PA_SINK_DECIBEL_VOLUME ? "DECIBEL_VOLUME " : "", sink->flags & PA_SINK_LATENCY ? "LATENCY " : "", sink->flags & PA_SINK_HARDWARE ? "HARDWARE " : "", sink->flags & PA_SINK_NETWORK ? "NETWORK " : "", @@ -141,8 +155,10 @@ char *pa_sink_list_to_string(pa_core *c) { if (sink->module) pa_strbuf_printf(s, "\tmodule: <%u>\n", sink->module->index); - if (sink->description) - pa_strbuf_printf(s, "\tdescription: <%s>\n", sink->description); + + t = pa_proplist_to_string(sink->proplist); + pa_strbuf_printf(s, "\tproperties:\n%s", t); + pa_xfree(t); } return pa_strbuf_tostring_free(s); @@ -153,6 +169,7 @@ char *pa_source_list_to_string(pa_core *c) { pa_source *source; uint32_t idx = PA_IDXSET_INVALID; static const char* const state_table[] = { + [PA_SOURCE_INIT] = "INIT", [PA_SOURCE_RUNNING] = "RUNNING", [PA_SOURCE_SUSPENDED] = "SUSPENDED", [PA_SOURCE_IDLE] = "IDLE", @@ -165,15 +182,14 @@ char *pa_source_list_to_string(pa_core *c) { pa_strbuf_printf(s, "%u source(s) available.\n", pa_idxset_size(c->sources)); for (source = pa_idxset_first(c->sources, &idx); source; source = pa_idxset_next(c->sources, &idx)) { - char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX], cv[PA_CVOLUME_SNPRINT_MAX]; - + char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX], cv[PA_CVOLUME_SNPRINT_MAX], *t; pa_strbuf_printf( s, " %c index: %u\n" "\tname: <%s>\n" "\tdriver: <%s>\n" - "\tflags: %s%s%s%s\n" + "\tflags: %s%s%s%s%s%s\n" "\tstate: %s\n" "\tvolume: <%s>\n" "\tmute: <%u>\n" @@ -186,7 +202,9 @@ char *pa_source_list_to_string(pa_core *c) { source->index, source->name, source->driver, + source->flags & PA_SOURCE_HW_MUTE_CTRL ? "HW_MUTE_CTRL " : "", source->flags & PA_SOURCE_HW_VOLUME_CTRL ? "HW_VOLUME_CTRL " : "", + source->flags & PA_SOURCE_DECIBEL_VOLUME ? "DECIBEL_VOLUME " : "", source->flags & PA_SOURCE_LATENCY ? "LATENCY " : "", source->flags & PA_SOURCE_HARDWARE ? "HARDWARE " : "", source->flags & PA_SOURCE_NETWORK ? "NETWORK " : "", @@ -203,8 +221,10 @@ char *pa_source_list_to_string(pa_core *c) { pa_strbuf_printf(s, "\tmonitor_of: <%u>\n", source->monitor_of->index); if (source->module) pa_strbuf_printf(s, "\tmodule: <%u>\n", source->module->index); - if (source->description) - pa_strbuf_printf(s, "\tdescription: <%s>\n", source->description); + + t = pa_proplist_to_string(source->proplist); + pa_strbuf_printf(s, "\tproperties:\n%s", t); + pa_xfree(t); } return pa_strbuf_tostring_free(s); @@ -216,6 +236,7 @@ char *pa_source_output_list_to_string(pa_core *c) { pa_source_output *o; uint32_t idx = PA_IDXSET_INVALID; static const char* const state_table[] = { + [PA_SOURCE_OUTPUT_INIT] = "INIT", [PA_SOURCE_OUTPUT_RUNNING] = "RUNNING", [PA_SOURCE_OUTPUT_CORKED] = "CORKED", [PA_SOURCE_OUTPUT_UNLINKED] = "UNLINKED" @@ -227,16 +248,15 @@ char *pa_source_output_list_to_string(pa_core *c) { pa_strbuf_printf(s, "%u source outputs(s) available.\n", pa_idxset_size(c->source_outputs)); for (o = pa_idxset_first(c->source_outputs, &idx); o; o = pa_idxset_next(c->source_outputs, &idx)) { - char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; + char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX], *t; pa_assert(o->source); pa_strbuf_printf( s, " index: %u\n" - "\tname: '%s'\n" "\tdriver: <%s>\n" - "\tflags: %s%s%s%s%s%s%s\n" + "\tflags: %s%s%s%s%s%s%s%s\n" "\tstate: %s\n" "\tsource: <%u> '%s'\n" "\tlatency: <%0.0f usec>\n" @@ -244,10 +264,10 @@ char *pa_source_output_list_to_string(pa_core *c) { "\tchannel map: <%s>\n" "\tresample method: %s\n", o->index, - o->name, o->driver, o->flags & PA_SOURCE_OUTPUT_VARIABLE_RATE ? "VARIABLE_RATE " : "", o->flags & PA_SOURCE_OUTPUT_DONT_MOVE ? "DONT_MOVE " : "", + o->flags & PA_SOURCE_OUTPUT_START_CORKED ? "START_CORKED " : "", o->flags & PA_SOURCE_OUTPUT_NO_REMAP ? "NO_REMAP " : "", o->flags & PA_SOURCE_OUTPUT_NO_REMIX ? "NO_REMIX " : "", o->flags & PA_SOURCE_OUTPUT_FIX_FORMAT ? "FIX_FORMAT " : "", @@ -262,7 +282,11 @@ char *pa_source_output_list_to_string(pa_core *c) { if (o->module) pa_strbuf_printf(s, "\towner module: <%u>\n", o->module->index); if (o->client) - pa_strbuf_printf(s, "\tclient: <%u> '%s'\n", o->client->index, o->client->name); + pa_strbuf_printf(s, "\tclient: <%u> '%s'\n", o->client->index, pa_strnull(pa_proplist_gets(o->client->proplist, PA_PROP_APPLICATION_NAME))); + + t = pa_proplist_to_string(o->proplist); + pa_strbuf_printf(s, "\tproperties:\n%s", t); + pa_xfree(t); } return pa_strbuf_tostring_free(s); @@ -273,6 +297,7 @@ char *pa_sink_input_list_to_string(pa_core *c) { pa_sink_input *i; uint32_t idx = PA_IDXSET_INVALID; static const char* const state_table[] = { + [PA_SINK_INPUT_INIT] = "INIT", [PA_SINK_INPUT_RUNNING] = "RUNNING", [PA_SINK_INPUT_DRAINED] = "DRAINED", [PA_SINK_INPUT_CORKED] = "CORKED", @@ -285,16 +310,15 @@ char *pa_sink_input_list_to_string(pa_core *c) { pa_strbuf_printf(s, "%u sink input(s) available.\n", pa_idxset_size(c->sink_inputs)); for (i = pa_idxset_first(c->sink_inputs, &idx); i; i = pa_idxset_next(c->sink_inputs, &idx)) { - char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cv[PA_CVOLUME_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; + char ss[PA_SAMPLE_SPEC_SNPRINT_MAX], cv[PA_CVOLUME_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX], *t; pa_assert(i->sink); pa_strbuf_printf( s, " index: %u\n" - "\tname: <%s>\n" "\tdriver: <%s>\n" - "\tflags: %s%s%s%s%s%s%s\n" + "\tflags: %s%s%s%s%s%s%s%s\n" "\tstate: %s\n" "\tsink: <%u> '%s'\n" "\tvolume: <%s>\n" @@ -304,10 +328,10 @@ char *pa_sink_input_list_to_string(pa_core *c) { "\tchannel map: <%s>\n" "\tresample method: %s\n", i->index, - i->name, i->driver, i->flags & PA_SINK_INPUT_VARIABLE_RATE ? "VARIABLE_RATE " : "", i->flags & PA_SINK_INPUT_DONT_MOVE ? "DONT_MOVE " : "", + i->flags & PA_SINK_INPUT_START_CORKED ? "START_CORKED " : "", i->flags & PA_SINK_INPUT_NO_REMAP ? "NO_REMAP " : "", i->flags & PA_SINK_INPUT_NO_REMIX ? "NO_REMIX " : "", i->flags & PA_SINK_INPUT_FIX_FORMAT ? "FIX_FORMAT " : "", @@ -325,7 +349,11 @@ char *pa_sink_input_list_to_string(pa_core *c) { if (i->module) pa_strbuf_printf(s, "\tmodule: <%u>\n", i->module->index); if (i->client) - pa_strbuf_printf(s, "\tclient: <%u> '%s'\n", i->client->index, i->client->name); + pa_strbuf_printf(s, "\tclient: <%u> '%s'\n", i->client->index, pa_strnull(pa_proplist_gets(i->client->proplist, PA_PROP_APPLICATION_NAME))); + + t = pa_proplist_to_string(i->proplist); + pa_strbuf_printf(s, "\tproperties:\n%s", t); + pa_xfree(t); } return pa_strbuf_tostring_free(s); @@ -345,7 +373,7 @@ char *pa_scache_list_to_string(pa_core *c) { for (e = pa_idxset_first(c->scache, &idx); e; e = pa_idxset_next(c->scache, &idx)) { double l = 0; - char ss[PA_SAMPLE_SPEC_SNPRINT_MAX] = "n/a", cv[PA_CVOLUME_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX] = "n/a"; + char ss[PA_SAMPLE_SPEC_SNPRINT_MAX] = "n/a", cv[PA_CVOLUME_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX] = "n/a", *t; if (e->memchunk.memblock) { pa_sample_spec_snprint(ss, sizeof(ss), &e->sample_spec); @@ -371,8 +399,12 @@ char *pa_scache_list_to_string(pa_core *c) { (long unsigned)(e->memchunk.memblock ? e->memchunk.length : 0), l, pa_cvolume_snprint(cv, sizeof(cv), &e->volume), - e->lazy ? "yes" : "no", + pa_yes_no(e->lazy), e->filename ? e->filename : "n/a"); + + t = pa_proplist_to_string(e->proplist); + pa_strbuf_printf(s, "\tproperties:\n%s", t); + pa_xfree(t); } } diff --git a/src/pulsecore/cli.c b/src/pulsecore/cli.c index 85e08634..47712d30 100644 --- a/src/pulsecore/cli.c +++ b/src/pulsecore/cli.c @@ -82,7 +82,7 @@ pa_cli* pa_cli_new(pa_core *core, pa_iochannel *io, pa_module *m) { pa_assert_se(c->client = pa_client_new(core, __FILE__, cname)); c->client->kill = client_kill; c->client->userdata = c; - c->client->owner = m; + c->client->module = m; pa_ioline_set_callback(c->line, line_callback, c); pa_ioline_puts(c->line, "Welcome to PulseAudio! Use \"help\" for usage information.\n"PROMPT); diff --git a/src/pulsecore/client.c b/src/pulsecore/client.c index 319b8387..6a087d46 100644 --- a/src/pulsecore/client.c +++ b/src/pulsecore/client.c @@ -44,17 +44,19 @@ pa_client *pa_client_new(pa_core *core, const char *driver, const char *name) { pa_core_assert_ref(core); c = pa_xnew(pa_client, 1); - c->name = pa_xstrdup(name); - c->driver = pa_xstrdup(driver); - c->owner = NULL; c->core = core; + c->proplist = pa_proplist_new(); + if (name) + pa_proplist_sets(c->proplist, PA_PROP_APPLICATION_NAME, name); + c->driver = pa_xstrdup(driver); + c->module = NULL; c->kill = NULL; c->userdata = NULL; pa_assert_se(pa_idxset_put(core->clients, c, &c->index) >= 0); - pa_log_info("Created %u \"%s\"", c->index, c->name); + pa_log_info("Created %u \"%s\"", c->index, pa_strnull(name)); pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_NEW, c->index); pa_core_check_quit(core); @@ -70,9 +72,9 @@ void pa_client_free(pa_client *c) { pa_core_check_quit(c->core); - pa_log_info("Freed %u \"%s\"", c->index, c->name); + pa_log_info("Freed %u \"%s\"", c->index, pa_strnull(pa_proplist_gets(c->proplist, PA_PROP_APPLICATION_NAME))); pa_subscription_post(c->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_REMOVE, c->index); - pa_xfree(c->name); + pa_proplist_free(c->proplist); pa_xfree(c->driver); pa_xfree(c); } @@ -91,10 +93,7 @@ void pa_client_kill(pa_client *c) { void pa_client_set_name(pa_client *c, const char *name) { pa_assert(c); - pa_log_info("Client %u changed name from \"%s\" to \"%s\"", c->index, c->name, name); - - pa_xfree(c->name); - c->name = pa_xstrdup(name); - + pa_log_info("Client %u changed name from \"%s\" to \"%s\"", c->index, pa_strnull(pa_proplist_gets(c->proplist, PA_PROP_APPLICATION_NAME)), name); + pa_proplist_sets(c->proplist, PA_PROP_APPLICATION_NAME, name); pa_subscription_post(c->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->index); } diff --git a/src/pulsecore/client.h b/src/pulsecore/client.h index 6d09b999..bff057ed 100644 --- a/src/pulsecore/client.h +++ b/src/pulsecore/client.h @@ -28,6 +28,7 @@ typedef struct pa_client pa_client; +#include #include #include @@ -37,11 +38,12 @@ typedef struct pa_client pa_client; struct pa_client { uint32_t index; - - pa_module *owner; - char *name, *driver; pa_core *core; + pa_proplist *proplist; + pa_module *module; + char *driver; + void (*kill)(pa_client *c); void *userdata; }; diff --git a/src/pulsecore/core-scache.c b/src/pulsecore/core-scache.c index 46444a90..4036a55b 100644 --- a/src/pulsecore/core-scache.c +++ b/src/pulsecore/core-scache.c @@ -63,7 +63,7 @@ #include "core-scache.h" -#define UNLOAD_POLL_TIME 2 +#define UNLOAD_POLL_TIME 5 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) { pa_core *c = userdata; @@ -89,6 +89,8 @@ static void free_entry(pa_scache_entry *e) { pa_xfree(e->filename); if (e->memchunk.memblock) pa_memblock_unref(e->memchunk.memblock); + if (e->proplist) + pa_proplist_free(e->proplist); pa_xfree(e); } @@ -103,6 +105,7 @@ static pa_scache_entry* scache_add_item(pa_core *c, const char *name) { pa_memblock_unref(e->memchunk.memblock); pa_xfree(e->filename); + pa_proplist_clear(e->proplist); pa_assert(e->core == c); @@ -117,11 +120,10 @@ static pa_scache_entry* scache_add_item(pa_core *c, const char *name) { e->name = pa_xstrdup(name); e->core = c; + e->proplist = pa_proplist_new(); - if (!c->scache) { + if (!c->scache) c->scache = pa_idxset_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); - pa_assert(c->scache); - } pa_idxset_put(c->scache, e, &e->index); @@ -132,7 +134,7 @@ static pa_scache_entry* scache_add_item(pa_core *c, const char *name) { e->memchunk.memblock = NULL; e->memchunk.index = e->memchunk.length = 0; e->filename = NULL; - e->lazy = 0; + e->lazy = FALSE; e->last_used_time = 0; memset(&e->sample_spec, 0, sizeof(e->sample_spec)); @@ -142,7 +144,7 @@ static pa_scache_entry* scache_add_item(pa_core *c, const char *name) { return e; } -int pa_scache_add_item(pa_core *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, const pa_memchunk *chunk, uint32_t *idx) { +int pa_scache_add_item(pa_core *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, const pa_memchunk *chunk, pa_proplist *p, uint32_t *idx) { pa_scache_entry *e; char st[PA_SAMPLE_SPEC_SNPRINT_MAX]; pa_channel_map tmap; @@ -178,6 +180,9 @@ int pa_scache_add_item(pa_core *c, const char *name, const pa_sample_spec *ss, c pa_memblock_ref(e->memchunk.memblock); } + if (p) + pa_proplist_update(e->proplist, PA_UPDATE_REPLACE, p); + if (idx) *idx = e->index; @@ -208,7 +213,7 @@ int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint3 if (pa_sound_file_load(c->mempool, filename, &ss, &map, &chunk) < 0) return -1; - r = pa_scache_add_item(c, name, &ss, &map, &chunk, idx); + r = pa_scache_add_item(c, name, &ss, &map, &chunk, NULL, idx); pa_memblock_unref(chunk.memblock); return r; @@ -231,7 +236,7 @@ int pa_scache_add_file_lazy(pa_core *c, const char *name, const char *filename, if (!(e = scache_add_item(c, name))) return -1; - e->lazy = 1; + e->lazy = TRUE; e->filename = pa_xstrdup(filename); if (!c->scache_auto_unload_event) { @@ -285,10 +290,11 @@ void pa_scache_free(pa_core *c) { c->mainloop->time_free(c->scache_auto_unload_event); } -int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t volume) { +int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t volume, pa_proplist *p, uint32_t *sink_input_idx) { pa_scache_entry *e; char *t; pa_cvolume r; + pa_proplist *merged; pa_assert(c); pa_assert(name); @@ -312,17 +318,24 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t pa_log_debug("Playing sample \"%s\" on \"%s\"", name, sink->name); - t = pa_sprintf_malloc("sample:%s", name); - pa_cvolume_set(&r, e->volume.channels, volume); pa_sw_cvolume_multiply(&r, &r, &e->volume); - if (pa_play_memchunk(sink, t, &e->sample_spec, &e->channel_map, &e->memchunk, &r) < 0) { - pa_xfree(t); + merged = pa_proplist_new(); + + t = pa_sprintf_malloc("sample:%s", name); + pa_proplist_sets(merged, PA_PROP_MEDIA_NAME, t); + pa_xfree(t); + + pa_proplist_update(merged, PA_UPDATE_REPLACE, e->proplist); + pa_proplist_update(merged, PA_UPDATE_REPLACE, p); + + if (pa_play_memchunk(sink, &e->sample_spec, &e->channel_map, &e->memchunk, &r, merged, sink_input_idx) < 0) { + pa_proplist_free(merged); return -1; } - pa_xfree(t); + pa_proplist_free(merged); if (e->lazy) time(&e->last_used_time); @@ -330,7 +343,7 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t return 0; } -int pa_scache_play_item_by_name(pa_core *c, const char *name, const char*sink_name, pa_volume_t volume, int autoload) { +int pa_scache_play_item_by_name(pa_core *c, const char *name, const char*sink_name, pa_bool_t autoload, pa_volume_t volume, pa_proplist *p, uint32_t *sink_input_idx) { pa_sink *sink; pa_assert(c); @@ -339,10 +352,10 @@ int pa_scache_play_item_by_name(pa_core *c, const char *name, const char*sink_na if (!(sink = pa_namereg_get(c, sink_name, PA_NAMEREG_SINK, autoload))) return -1; - return pa_scache_play_item(c, name, sink, volume); + return pa_scache_play_item(c, name, sink, volume, p, sink_input_idx); } -const char * pa_scache_get_name_by_id(pa_core *c, uint32_t id) { +const char *pa_scache_get_name_by_id(pa_core *c, uint32_t id) { pa_scache_entry *e; pa_assert(c); @@ -366,9 +379,10 @@ uint32_t pa_scache_get_id_by_name(pa_core *c, const char *name) { return e->index; } -uint32_t pa_scache_total_size(pa_core *c) { +size_t pa_scache_total_size(pa_core *c) { pa_scache_entry *e; - uint32_t idx, sum = 0; + uint32_t idx; + size_t sum = 0; pa_assert(c); @@ -403,8 +417,7 @@ void pa_scache_unload_unused(pa_core *c) { continue; pa_memblock_unref(e->memchunk.memblock); - e->memchunk.memblock = NULL; - e->memchunk.index = e->memchunk.length = 0; + pa_memchunk_reset(&e->memchunk); pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE|PA_SUBSCRIPTION_EVENT_CHANGE, e->index); } @@ -467,8 +480,9 @@ int pa_scache_add_directory_lazy(pa_core *c, const char *pathname) { pa_snprintf(p, sizeof(p), "%s/%s", pathname, e->d_name); add_file(c, p); } + + closedir(dir); } - closedir(dir); return 0; } diff --git a/src/pulsecore/core-scache.h b/src/pulsecore/core-scache.h index ab7ec0ef..31f3ff32 100644 --- a/src/pulsecore/core-scache.h +++ b/src/pulsecore/core-scache.h @@ -29,11 +29,12 @@ #include #include -#define PA_SCACHE_ENTRY_SIZE_MAX (1024*1024*2) +#define PA_SCACHE_ENTRY_SIZE_MAX (1024*1024*16) typedef struct pa_scache_entry { - pa_core *core; uint32_t index; + pa_core *core; + char *name; pa_cvolume volume; @@ -43,25 +44,27 @@ typedef struct pa_scache_entry { char *filename; - int lazy; + pa_bool_t lazy; time_t last_used_time; + + pa_proplist *proplist; } pa_scache_entry; -int pa_scache_add_item(pa_core *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, const pa_memchunk *chunk, uint32_t *idx); +int pa_scache_add_item(pa_core *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, const pa_memchunk *chunk, pa_proplist *p, uint32_t *idx); int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint32_t *idx); int pa_scache_add_file_lazy(pa_core *c, const char *name, const char *filename, uint32_t *idx); int pa_scache_add_directory_lazy(pa_core *c, const char *pathname); int pa_scache_remove_item(pa_core *c, const char *name); -int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t volume); -int pa_scache_play_item_by_name(pa_core *c, const char *name, const char*sink_name, pa_volume_t volume, int autoload); +int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t volume, pa_proplist *p, uint32_t *sink_input_idx); +int pa_scache_play_item_by_name(pa_core *c, const char *name, const char*sink_name, pa_bool_t autoload, pa_volume_t volume, pa_proplist *p, uint32_t *sink_input_idx); void pa_scache_free(pa_core *c); const char *pa_scache_get_name_by_id(pa_core *c, uint32_t id); uint32_t pa_scache_get_id_by_name(pa_core *c, const char *name); -uint32_t pa_scache_total_size(pa_core *c); +size_t pa_scache_total_size(pa_core *c); void pa_scache_unload_unused(pa_core *c); diff --git a/src/pulsecore/core.h b/src/pulsecore/core.h index ce45e300..6be1a0c5 100644 --- a/src/pulsecore/core.h +++ b/src/pulsecore/core.h @@ -43,16 +43,20 @@ typedef struct pa_core pa_core; #include typedef enum pa_core_hook { - PA_CORE_HOOK_SINK_NEW_POST, + PA_CORE_HOOK_SINK_NEW, + PA_CORE_HOOK_SINK_FIXATE, + PA_CORE_HOOK_SINK_PUT, PA_CORE_HOOK_SINK_UNLINK, PA_CORE_HOOK_SINK_UNLINK_POST, PA_CORE_HOOK_SINK_STATE_CHANGED, - PA_CORE_HOOK_SINK_DESCRIPTION_CHANGED, - PA_CORE_HOOK_SOURCE_NEW_POST, + PA_CORE_HOOK_SINK_PROPLIST_CHANGED, + PA_CORE_HOOK_SOURCE_NEW, + PA_CORE_HOOK_SOURCE_FIXATE, + PA_CORE_HOOK_SOURCE_PUT, PA_CORE_HOOK_SOURCE_UNLINK, PA_CORE_HOOK_SOURCE_UNLINK_POST, PA_CORE_HOOK_SOURCE_STATE_CHANGED, - PA_CORE_HOOK_SOURCE_DESCRIPTION_CHANGED, + PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED, PA_CORE_HOOK_SINK_INPUT_NEW, PA_CORE_HOOK_SINK_INPUT_FIXATE, PA_CORE_HOOK_SINK_INPUT_PUT, @@ -60,8 +64,8 @@ typedef enum pa_core_hook { PA_CORE_HOOK_SINK_INPUT_UNLINK_POST, PA_CORE_HOOK_SINK_INPUT_MOVE, PA_CORE_HOOK_SINK_INPUT_MOVE_POST, - PA_CORE_HOOK_SINK_INPUT_NAME_CHANGED, PA_CORE_HOOK_SINK_INPUT_STATE_CHANGED, + PA_CORE_HOOK_SINK_INPUT_PROPLIST_CHANGED, PA_CORE_HOOK_SOURCE_OUTPUT_NEW, PA_CORE_HOOK_SOURCE_OUTPUT_FIXATE, PA_CORE_HOOK_SOURCE_OUTPUT_PUT, @@ -69,8 +73,8 @@ typedef enum pa_core_hook { PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK_POST, PA_CORE_HOOK_SOURCE_OUTPUT_MOVE, PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_POST, - PA_CORE_HOOK_SOURCE_OUTPUT_NAME_CHANGED, PA_CORE_HOOK_SOURCE_OUTPUT_STATE_CHANGED, + PA_CORE_HOOK_SOURCE_OUTPUT_PROPLIST_CHANGED, PA_CORE_HOOK_MAX } pa_core_hook_t; diff --git a/src/pulsecore/envelope.c b/src/pulsecore/envelope.c index 571f8754..2f5da5a0 100644 --- a/src/pulsecore/envelope.c +++ b/src/pulsecore/envelope.c @@ -381,7 +381,7 @@ static void envelope_merge(pa_envelope *e, int v) { break; if (e->points[v].n_points >= e->points[v].n_allocated) { - e->points[v].n_allocated = MAX(e->points[v].n_points*2, PA_ENVELOPE_POINTS_MAX); + e->points[v].n_allocated = PA_MAX(e->points[v].n_points*2, PA_ENVELOPE_POINTS_MAX); e->points[v].x = pa_xrealloc(e->points[v].x, sizeof(size_t) * e->points[v].n_allocated); e->points[v].y.i = pa_xrealloc(e->points[v].y.i, sizeof(int32_t) * e->points[v].n_allocated); diff --git a/src/pulsecore/envelope.h b/src/pulsecore/envelope.h index 23be8f6a..c54c137a 100644 --- a/src/pulsecore/envelope.h +++ b/src/pulsecore/envelope.h @@ -29,7 +29,7 @@ #include -#define PA_ENVELOPE_POINTS_MAX 4 +#define PA_ENVELOPE_POINTS_MAX 4U typedef struct pa_envelope pa_envelope; typedef struct pa_envelope_item pa_envelope_item; diff --git a/src/pulsecore/fdsem.c b/src/pulsecore/fdsem.c index 59eec18e..22d2a850 100644 --- a/src/pulsecore/fdsem.c +++ b/src/pulsecore/fdsem.c @@ -78,21 +78,19 @@ struct pa_fdsem { #ifdef HAVE_EVENTFD int efd; #endif - pa_atomic_t waiting; - pa_atomic_t signalled; - pa_atomic_t in_pipe; + + pa_fdsem_data *data; }; pa_fdsem *pa_fdsem_new(void) { pa_fdsem *f; - f = pa_xnew(pa_fdsem, 1); + f = pa_xmalloc(PA_ALIGN(sizeof(pa_fdsem)) + PA_ALIGN(sizeof(pa_fdsem_data))); #ifdef HAVE_EVENTFD if ((f->efd = eventfd(0)) >= 0) { pa_make_fd_cloexec(f->efd); f->fds[0] = f->fds[1] = -1; - } else #endif { @@ -105,9 +103,57 @@ pa_fdsem *pa_fdsem_new(void) { pa_make_fd_cloexec(f->fds[1]); } - pa_atomic_store(&f->waiting, 0); - pa_atomic_store(&f->signalled, 0); - pa_atomic_store(&f->in_pipe, 0); + f->data = (pa_fdsem_data*) ((uint8_t*) f + PA_ALIGN(sizeof(pa_fdsem))); + + pa_atomic_store(&f->data->waiting, 0); + pa_atomic_store(&f->data->signalled, 0); + pa_atomic_store(&f->data->in_pipe, 0); + + return f; +} + +pa_fdsem *pa_fdsem_open_shm(pa_fdsem_data *data, int event_fd) { + pa_fdsem *f = NULL; + + pa_assert(data); + pa_assert(event_fd >= 0); + +#ifdef HAVE_EVENTFD + f = pa_xnew(pa_fdsem, 1); + + f->efd = event_fd; + pa_make_fd_cloexec(f->efd); + f->fds[0] = f->fds[1] = -1; + f->data = data; +#endif + + return f; +} + +pa_fdsem *pa_fdsem_new_shm(pa_fdsem_data *data, int* event_fd) { + pa_fdsem *f = NULL; + + pa_assert(data); + pa_assert(event_fd); + +#ifdef HAVE_EVENTFD + + f = pa_xnew(pa_fdsem, 1); + + if ((f->efd = eventfd(0)) < 0) { + pa_xfree(f); + return NULL; + } + + pa_make_fd_cloexec(f->efd); + f->fds[0] = f->fds[1] = -1; + f->data = data; + + pa_atomic_store(&f->data->waiting, 0); + pa_atomic_store(&f->data->signalled, 0); + pa_atomic_store(&f->data->in_pipe, 0); + +#endif return f; } @@ -128,7 +174,7 @@ static void flush(pa_fdsem *f) { ssize_t r; pa_assert(f); - if (pa_atomic_load(&f->in_pipe) <= 0) + if (pa_atomic_load(&f->data->in_pipe) <= 0) return; do { @@ -151,19 +197,19 @@ static void flush(pa_fdsem *f) { continue; } - } while (pa_atomic_sub(&f->in_pipe, r) > r); + } while (pa_atomic_sub(&f->data->in_pipe, r) > r); } void pa_fdsem_post(pa_fdsem *f) { pa_assert(f); - if (pa_atomic_cmpxchg(&f->signalled, 0, 1)) { + if (pa_atomic_cmpxchg(&f->data->signalled, 0, 1)) { - if (pa_atomic_load(&f->waiting)) { + if (pa_atomic_load(&f->data->waiting)) { ssize_t r; char x = 'x'; - pa_atomic_inc(&f->in_pipe); + pa_atomic_inc(&f->data->in_pipe); for (;;) { @@ -194,12 +240,12 @@ void pa_fdsem_wait(pa_fdsem *f) { flush(f); - if (pa_atomic_cmpxchg(&f->signalled, 1, 0)) + if (pa_atomic_cmpxchg(&f->data->signalled, 1, 0)) return; - pa_atomic_inc(&f->waiting); + pa_atomic_inc(&f->data->waiting); - while (!pa_atomic_cmpxchg(&f->signalled, 1, 0)) { + while (!pa_atomic_cmpxchg(&f->data->signalled, 1, 0)) { char x[10]; ssize_t r; @@ -221,10 +267,10 @@ void pa_fdsem_wait(pa_fdsem *f) { continue; } - pa_atomic_sub(&f->in_pipe, r); + pa_atomic_sub(&f->data->in_pipe, r); } - pa_assert_se(pa_atomic_dec(&f->waiting) >= 1); + pa_assert_se(pa_atomic_dec(&f->data->waiting) >= 1); } int pa_fdsem_try(pa_fdsem *f) { @@ -232,7 +278,7 @@ int pa_fdsem_try(pa_fdsem *f) { flush(f); - if (pa_atomic_cmpxchg(&f->signalled, 1, 0)) + if (pa_atomic_cmpxchg(&f->data->signalled, 1, 0)) return 1; return 0; @@ -254,13 +300,13 @@ int pa_fdsem_before_poll(pa_fdsem *f) { flush(f); - if (pa_atomic_cmpxchg(&f->signalled, 1, 0)) + if (pa_atomic_cmpxchg(&f->data->signalled, 1, 0)) return -1; - pa_atomic_inc(&f->waiting); + pa_atomic_inc(&f->data->waiting); - if (pa_atomic_cmpxchg(&f->signalled, 1, 0)) { - pa_assert_se(pa_atomic_dec(&f->waiting) >= 1); + if (pa_atomic_cmpxchg(&f->data->signalled, 1, 0)) { + pa_assert_se(pa_atomic_dec(&f->data->waiting) >= 1); return -1; } return 0; @@ -269,11 +315,11 @@ int pa_fdsem_before_poll(pa_fdsem *f) { int pa_fdsem_after_poll(pa_fdsem *f) { pa_assert(f); - pa_assert_se(pa_atomic_dec(&f->waiting) >= 1); + pa_assert_se(pa_atomic_dec(&f->data->waiting) >= 1); flush(f); - if (pa_atomic_cmpxchg(&f->signalled, 1, 0)) + if (pa_atomic_cmpxchg(&f->data->signalled, 1, 0)) return 1; return 0; diff --git a/src/pulsecore/fdsem.h b/src/pulsecore/fdsem.h index f38ef205..f4f7b99a 100644 --- a/src/pulsecore/fdsem.h +++ b/src/pulsecore/fdsem.h @@ -33,7 +33,15 @@ typedef struct pa_fdsem pa_fdsem; +typedef struct pa_fdsem_data { + pa_atomic_t waiting; + pa_atomic_t signalled; + pa_atomic_t in_pipe; +} pa_fdsem_data; + pa_fdsem *pa_fdsem_new(void); +pa_fdsem *pa_fdsem_open_shm(pa_fdsem_data *data, int event_fd); +pa_fdsem *pa_fdsem_new_shm(pa_fdsem_data *data, int* event_fd); void pa_fdsem_free(pa_fdsem *f); void pa_fdsem_post(pa_fdsem *f); diff --git a/src/pulsecore/macro.h b/src/pulsecore/macro.h index 41af19c9..60ab025c 100644 --- a/src/pulsecore/macro.h +++ b/src/pulsecore/macro.h @@ -65,19 +65,53 @@ static inline size_t pa_page_align(size_t l) { #define PA_ELEMENTSOF(x) (sizeof(x)/sizeof((x)[0])) -#ifndef MAX -#define MAX(a, b) ((a) > (b) ? (a) : (b)) +/* The users of PA_MIN and PA_MAX should be aware that these macros on + * non-GCC executed code with side effects twice. It is thus + * considered misuse to use code with side effects as arguments to MIN + * and MAX. */ + +#ifdef __GNUC__ +#define PA_MAX(a,b) \ + __extension__ ({ typeof(a) _a = (a); \ + typeof(b) _b = (b); \ + _a > _b ? _a : _b; \ + }) +#else +#define PA_MAX(a, b) ((a) > (b) ? (a) : (b)) #endif -#ifndef MIN -#define MIN(a, b) ((a) < (b) ? (a) : (b)) +#ifdef __GNUC__ +#define PA_MIN(a,b) \ + __extension__ ({ typeof(a) _a = (a); \ + typeof(b) _b = (b); \ + _a < _b ? _a : _b; \ + }) +#else +#define PA_MIN(a, b) ((a) < (b) ? (a) : (b)) #endif -#ifndef CLAMP -#define CLAMP(x, low, high) (((x) > (high)) ? (high) : (((x) < (low)) ? (low) : (x))) +#ifdef __GNUC__ +#define PA_CLAMP(x, low, high) \ + __extension__ ({ typeof(x) _x = (x); \ + typeof(low) _low = (low); \ + typeof(high) _high = (high); \ + ((_x > _high) ? _high : ((_x < _low) ? _low : _x)); \ + }) +#else +#define PA_CLAMP(x, low, high) (((x) > (high)) ? (high) : (((x) < (low)) ? (low) : (x))) #endif +#ifdef __GNUC__ +#define PA_CLAMP_UNLIKELY(x, low, high) \ + __extension__ ({ typeof(x) _x = (x); \ + typeof(low) _low = (low); \ + typeof(high) _high = (high); \ + (PA_UNLIKELY(_x > _high) ? _high : (PA_UNLIKELY(_x < _low) ? _low : _x)); \ + }) +#else #define PA_CLAMP_UNLIKELY(x, low, high) (PA_UNLIKELY((x) > (high)) ? (high) : (PA_UNLIKELY((x) < (low)) ? (low) : (x))) +#endif + /* We don't define a PA_CLAMP_LIKELY here, because it doesn't really * make sense: we cannot know if it is more likely that the values is * lower or greater than the boundaries.*/ diff --git a/src/pulsecore/mcalign.c b/src/pulsecore/mcalign.c index 8ca7c962..e12f84f8 100644 --- a/src/pulsecore/mcalign.c +++ b/src/pulsecore/mcalign.c @@ -197,7 +197,6 @@ int pa_mcalign_pop(pa_mcalign *m, pa_memchunk *c) { /* There's simply nothing */ return -1; - } size_t pa_mcalign_csize(pa_mcalign *m, size_t l) { @@ -211,3 +210,11 @@ size_t pa_mcalign_csize(pa_mcalign *m, size_t l) { return (l/m->base)*m->base; } + +void pa_mcalign_flush(pa_mcalign *m) { + pa_memchunk chunk; + pa_assert(m); + + while (pa_mcalign_pop(m, &chunk) >= 0) + pa_memblock_unref(chunk.memblock); +} diff --git a/src/pulsecore/mcalign.h b/src/pulsecore/mcalign.h index 6ff8f94e..6c8b8d5f 100644 --- a/src/pulsecore/mcalign.h +++ b/src/pulsecore/mcalign.h @@ -79,4 +79,7 @@ int pa_mcalign_pop(pa_mcalign *m, pa_memchunk *c); /* If we pass l bytes in now, how many bytes would we get out? */ size_t pa_mcalign_csize(pa_mcalign *m, size_t l); +/* Flush what's still stored in the aligner */ +void pa_mcalign_flush(pa_mcalign *m); + #endif diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 99b5a13f..b76980ff 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -59,7 +59,7 @@ struct pa_memblock { pa_mempool *pool; pa_memblock_type_t type; - int read_only; /* boolean */ + pa_bool_t read_only, is_silence; pa_atomic_ptr_t data; size_t length; @@ -226,7 +226,8 @@ static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) { PA_REFCNT_INIT(b); b->pool = p; b->type = PA_MEMBLOCK_APPENDED; - b->read_only = 0; + b->read_only = FALSE; + b->is_silence = FALSE; pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock))); b->length = length; pa_atomic_store(&b->n_acquired, 0); @@ -330,7 +331,8 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) { PA_REFCNT_INIT(b); b->pool = p; - b->read_only = 0; + b->read_only = FALSE; + b->is_silence = FALSE; b->length = length; pa_atomic_store(&b->n_acquired, 0); pa_atomic_store(&b->please_signal, 0); @@ -340,7 +342,7 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) { } /* No lock necessary */ -pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) { +pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, pa_bool_t read_only) { pa_memblock *b; pa_assert(p); @@ -354,6 +356,7 @@ pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int re b->pool = p; b->type = PA_MEMBLOCK_FIXED; b->read_only = read_only; + b->is_silence = FALSE; pa_atomic_ptr_store(&b->data, d); b->length = length; pa_atomic_store(&b->n_acquired, 0); @@ -364,7 +367,7 @@ pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int re } /* No lock necessary */ -pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) { +pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free_cb_t free_cb, pa_bool_t read_only) { pa_memblock *b; pa_assert(p); @@ -379,6 +382,7 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (* b->pool = p; b->type = PA_MEMBLOCK_USER; b->read_only = read_only; + b->is_silence = FALSE; pa_atomic_ptr_store(&b->data, d); b->length = length; pa_atomic_store(&b->n_acquired, 0); @@ -391,7 +395,7 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (* } /* No lock necessary */ -int pa_memblock_is_read_only(pa_memblock *b) { +pa_bool_t pa_memblock_is_read_only(pa_memblock *b) { pa_assert(b); pa_assert(PA_REFCNT_VALUE(b) > 0); @@ -399,13 +403,28 @@ int pa_memblock_is_read_only(pa_memblock *b) { } /* No lock necessary */ -int pa_memblock_ref_is_one(pa_memblock *b) { +pa_bool_t pa_memblock_is_silence(pa_memblock *b) { + pa_assert(b); + pa_assert(PA_REFCNT_VALUE(b) > 0); + + return b->is_silence; +} + +/* No lock necessary */ +void pa_memblock_set_is_silence(pa_memblock *b, pa_bool_t v) { + pa_assert(b); + pa_assert(PA_REFCNT_VALUE(b) > 0); + + b->is_silence = v; +} + +/* No lock necessary */ +pa_bool_t pa_memblock_ref_is_one(pa_memblock *b) { int r; pa_assert(b); - r = PA_REFCNT_VALUE(b); - pa_assert(r > 0); + pa_assert_se((r = PA_REFCNT_VALUE(b)) > 0); return r == 1; } @@ -767,7 +786,7 @@ int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) { } /* No lock necessary */ -int pa_mempool_is_shared(pa_mempool *p) { +pa_bool_t pa_mempool_is_shared(pa_mempool *p) { pa_assert(p); return !!p->memory.shared; diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h index c704014a..338d1a13 100644 --- a/src/pulsecore/memblock.h +++ b/src/pulsecore/memblock.h @@ -87,13 +87,13 @@ pa_memblock *pa_memblock_new(pa_mempool *, size_t length); pa_memblock *pa_memblock_new_pool(pa_mempool *, size_t length); /* Allocate a new memory block of type PA_MEMBLOCK_USER */ -pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, void (*free_cb)(void *p), int read_only); +pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, pa_free_cb_t free_cb, pa_bool_t read_only); /* A special case of pa_memblock_new_user: take a memory buffer previously allocated with pa_xmalloc() */ #define pa_memblock_new_malloced(p,data,length) pa_memblock_new_user(p, data, length, pa_xfree, 0) /* Allocate a new memory block of type PA_MEMBLOCK_FIXED */ -pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, int read_only); +pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, pa_bool_t read_only); void pa_memblock_unref(pa_memblock*b); pa_memblock* pa_memblock_ref(pa_memblock*b); @@ -106,8 +106,11 @@ function is not multiple caller safe, i.e. needs to be locked manually if called from more than one thread at the same time. */ void pa_memblock_unref_fixed(pa_memblock*b); -int pa_memblock_is_read_only(pa_memblock *b); -int pa_memblock_ref_is_one(pa_memblock *b); +pa_bool_t pa_memblock_is_read_only(pa_memblock *b); +pa_bool_t pa_memblock_is_silence(pa_memblock *b); +pa_bool_t pa_memblock_ref_is_one(pa_memblock *b); +void pa_memblock_set_is_silence(pa_memblock *b, pa_bool_t v); + void* pa_memblock_acquire(pa_memblock *b); void pa_memblock_release(pa_memblock *b); size_t pa_memblock_get_length(pa_memblock *b); @@ -121,7 +124,7 @@ void pa_mempool_free(pa_mempool *p); const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p); void pa_mempool_vacuum(pa_mempool *p); int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id); -int pa_mempool_is_shared(pa_mempool *p); +pa_bool_t pa_mempool_is_shared(pa_mempool *p); size_t pa_mempool_block_size_max(pa_mempool *p); /* For recieving blocks from other nodes */ diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c index 8247feab..cc5e9e13 100644 --- a/src/pulsecore/memblockq.c +++ b/src/pulsecore/memblockq.c @@ -50,8 +50,9 @@ PA_STATIC_FLIST_DECLARE(list_items, 0, pa_xfree); struct pa_memblockq { struct list_item *blocks, *blocks_tail; + struct list_item *current_read, *current_write; unsigned n_blocks; - size_t maxlength, tlength, base, prebuf, minreq; + size_t maxlength, tlength, base, prebuf, minreq, maxrewind; int64_t read_index, write_index; pa_bool_t in_prebuf; pa_memblock *silence; @@ -67,6 +68,7 @@ pa_memblockq* pa_memblockq_new( size_t base, size_t prebuf, size_t minreq, + size_t maxrewind, pa_memblock *silence) { pa_memblockq* bq; @@ -75,27 +77,29 @@ pa_memblockq* pa_memblockq_new( bq = pa_xnew(pa_memblockq, 1); bq->blocks = bq->blocks_tail = NULL; + bq->current_read = bq->current_write = NULL; bq->n_blocks = 0; bq->base = base; bq->read_index = bq->write_index = idx; - pa_log_debug("memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu", - (unsigned long) maxlength, (unsigned long) tlength, (unsigned long) base, (unsigned long) prebuf, (unsigned long) minreq); + pa_log_debug("memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu maxrewind=%lu", + (unsigned long) maxlength, (unsigned long) tlength, (unsigned long) base, (unsigned long) prebuf, (unsigned long) minreq, (unsigned long) maxrewind); - bq->missing = bq->requested = bq->maxlength = bq->tlength = bq->prebuf = bq->minreq = 0; + bq->missing = bq->requested = bq->maxlength = bq->tlength = bq->prebuf = bq->minreq = bq->maxrewind = 0; bq->in_prebuf = TRUE; pa_memblockq_set_maxlength(bq, maxlength); pa_memblockq_set_tlength(bq, tlength); pa_memblockq_set_prebuf(bq, prebuf); pa_memblockq_set_minreq(bq, minreq); + pa_memblockq_set_maxrewind(bq, maxrewind); - pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu", - (unsigned long)bq->maxlength, (unsigned long)bq->tlength, (unsigned long)bq->base, (unsigned long)bq->prebuf, (unsigned long)bq->minreq); + pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu maxrewind=%lu", + (unsigned long) bq->maxlength, (unsigned long) bq->tlength, (unsigned long) bq->base, (unsigned long) bq->prebuf, (unsigned long) bq->minreq, (unsigned long) bq->maxrewind); bq->silence = silence ? pa_memblock_ref(silence) : NULL; - bq->mcalign = NULL; + bq->mcalign = pa_mcalign_new(bq->base); return bq; } @@ -114,6 +118,62 @@ void pa_memblockq_free(pa_memblockq* bq) { pa_xfree(bq); } +static void fix_current_read(pa_memblockq *bq) { + pa_assert(bq); + + if (PA_UNLIKELY(!bq->blocks)) { + bq->current_read = NULL; + return; + } + + if (PA_UNLIKELY(!bq->current_read)) + bq->current_read = bq->blocks; + + /* Scan left */ + while (PA_UNLIKELY(bq->current_read->index > bq->read_index)) + + if (bq->current_read->prev) + bq->current_read = bq->current_read->prev; + else + break; + + /* Scan right */ + while (PA_LIKELY(bq->current_read != NULL) && PA_UNLIKELY(bq->current_read->index + bq->current_read->chunk.length <= bq->read_index)) + bq->current_read = bq->current_read->next; + + /* At this point current_read will either point at or left of the + next block to play. It may be NULL in case everything in + the queue was already played */ +} + +static void fix_current_write(pa_memblockq *bq) { + pa_assert(bq); + + if (PA_UNLIKELY(!bq->blocks)) { + bq->current_write = NULL; + return; + } + + if (PA_UNLIKELY(!bq->current_write)) + bq->current_write = bq->blocks_tail; + + /* Scan right */ + while (PA_UNLIKELY(bq->current_write->index + bq->current_write->chunk.length <= bq->write_index)) + + if (bq->current_write->next) + bq->current_write = bq->current_write->next; + else + break; + + /* Scan left */ + while (PA_LIKELY(bq->current_write != NULL) && PA_UNLIKELY(bq->current_write->index > bq->write_index)) + bq->current_write = bq->current_write->prev; + + /* At this point current_write will either point at or right of + the next block to write data to. It may be NULL in case + everything in the queue is still to be played */ +} + static void drop_block(pa_memblockq *bq, struct list_item *q) { pa_assert(bq); pa_assert(q); @@ -122,13 +182,23 @@ static void drop_block(pa_memblockq *bq, struct list_item *q) { if (q->prev) q->prev->next = q->next; - else + else { + pa_assert(bq->blocks == q); bq->blocks = q->next; + } if (q->next) q->next->prev = q->prev; - else + else { + pa_assert(bq->blocks_tail == q); bq->blocks_tail = q->prev; + } + + if (bq->current_write == q) + bq->current_write = q->prev; + + if (bq->current_read == q) + bq->current_read = q->next; pa_memblock_unref(q->chunk.memblock); @@ -138,6 +208,16 @@ static void drop_block(pa_memblockq *bq, struct list_item *q) { bq->n_blocks--; } +static void drop_backlog(pa_memblockq *bq) { + int64_t boundary; + pa_assert(bq); + + boundary = bq->read_index - bq->maxrewind; + + while (bq->blocks && (bq->blocks->index + bq->blocks->chunk.length <= boundary)) + drop_block(bq, bq->blocks); +} + static pa_bool_t can_push(pa_memblockq *bq, size_t l) { int64_t end; @@ -152,10 +232,10 @@ static pa_bool_t can_push(pa_memblockq *bq, size_t l) { return TRUE; } - end = bq->blocks_tail ? bq->blocks_tail->index + bq->blocks_tail->chunk.length : 0; + end = bq->blocks_tail ? bq->blocks_tail->index + bq->blocks_tail->chunk.length : bq->write_index; /* Make sure that the list doesn't get too long */ - if (bq->write_index + (int64_t)l > end) + if (bq->write_index + l > end) if (bq->write_index + l - bq->read_index > bq->maxlength) return FALSE; @@ -182,34 +262,32 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { old = bq->write_index; chunk = *uchunk; - if (bq->read_index > bq->write_index) { + fix_current_write(bq); + q = bq->current_write; - /* We currently have a buffer underflow, we need to drop some - * incoming data */ + /* First we advance the q pointer right of where we want to + * write to */ - size_t d = bq->read_index - bq->write_index; - - if (chunk.length > d) { - chunk.index += d; - chunk.length -= d; - bq->write_index += d; - } else { - /* We drop the incoming data completely */ - bq->write_index += chunk.length; - goto finish; - } + if (q) { + while (bq->write_index + chunk.length > q->index) + if (q->next) + q = q->next; + else + break; } + if (!q) + q = bq->blocks_tail; + /* We go from back to front to look for the right place to add * this new entry. Drop data we will overwrite on the way */ - q = bq->blocks_tail; while (q) { - if (bq->write_index >= q->index + (int64_t) q->chunk.length) + if (bq->write_index >= q->index + q->chunk.length) /* We found the entry where we need to place the new entry immediately after */ break; - else if (bq->write_index + (int64_t) chunk.length <= q->index) { + else if (bq->write_index + chunk.length <= q->index) { /* This entry isn't touched at all, let's skip it */ q = q->prev; } else if (bq->write_index <= q->index && @@ -364,6 +442,7 @@ static pa_bool_t memblockq_check_prebuf(pa_memblockq *bq) { } int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { + int64_t d; pa_assert(bq); pa_assert(chunk); @@ -371,27 +450,35 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { if (memblockq_check_prebuf(bq)) return -1; + fix_current_read(bq); + /* Do we need to spit out silence? */ - if (!bq->blocks || bq->blocks->index > bq->read_index) { + if (!bq->current_read || bq->current_read->index > bq->read_index) { size_t length; /* How much silence shall we return? */ - length = bq->blocks ? bq->blocks->index - bq->read_index : 0; + if (bq->current_read) + length = bq->current_read->index - bq->read_index; + else if (bq->write_index > bq->read_index) + length = (size_t) (bq->write_index - bq->read_index); + else + length = 0; /* We need to return silence, since no data is yet available */ if (bq->silence) { + size_t l; + chunk->memblock = pa_memblock_ref(bq->silence); - if (!length || length > pa_memblock_get_length(chunk->memblock)) - length = pa_memblock_get_length(chunk->memblock); + l = pa_memblock_get_length(chunk->memblock); + chunk->length = (length <= 0 || length > l) ? l : length; - chunk->length = length; } else { /* If the memblockq is empty, return -1, otherwise return * the time to sleep */ - if (!bq->blocks) + if (length <= 0) return -1; chunk->memblock = NULL; @@ -403,11 +490,14 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { } /* Ok, let's pass real data to the caller */ - pa_assert(bq->blocks->index == bq->read_index); - - *chunk = bq->blocks->chunk; + *chunk = bq->current_read->chunk; pa_memblock_ref(chunk->memblock); + pa_assert(bq->read_index >= bq->current_read->index); + d = bq->read_index - bq->current_read->index; + chunk->index += d; + chunk->length -= d; + return 0; } @@ -424,42 +514,23 @@ void pa_memblockq_drop(pa_memblockq *bq, size_t length) { if (memblockq_check_prebuf(bq)) break; - if (bq->blocks) { - size_t d; - - pa_assert(bq->blocks->index >= bq->read_index); - - d = (size_t) (bq->blocks->index - bq->read_index); - - if (d >= length) { - /* The first block is too far in the future */ - - bq->read_index += length; - break; - } else { - - length -= d; - bq->read_index += d; - } + fix_current_read(bq); - pa_assert(bq->blocks->index == bq->read_index); + if (bq->current_read) { + int64_t p, d; - if (bq->blocks->chunk.length <= length) { - /* We need to drop the full block */ + /* We go through this piece by piece to make sure we don't + * drop more than allowed by prebuf */ - length -= bq->blocks->chunk.length; - bq->read_index += bq->blocks->chunk.length; + p = bq->current_read->index + bq->current_read->chunk.length; + pa_assert(p >= bq->read_index); + d = p - bq->read_index; - drop_block(bq, bq->blocks); - } else { - /* Only the start of this block needs to be dropped */ + if (d > length) + d = length; - bq->blocks->chunk.index += length; - bq->blocks->chunk.length -= length; - bq->blocks->index += length; - bq->read_index += length; - break; - } + bq->read_index += d; + length -= d; } else { @@ -469,20 +540,22 @@ void pa_memblockq_drop(pa_memblockq *bq, size_t length) { } } + drop_backlog(bq); + delta = bq->read_index - old; bq->missing += delta; } -int pa_memblockq_is_readable(pa_memblockq *bq) { +pa_bool_t pa_memblockq_is_readable(pa_memblockq *bq) { pa_assert(bq); if (memblockq_check_prebuf(bq)) - return 0; + return FALSE; if (pa_memblockq_get_length(bq) <= 0) - return 0; + return FALSE; - return 1; + return TRUE; } size_t pa_memblockq_get_length(pa_memblockq *bq) { @@ -506,12 +579,6 @@ size_t pa_memblockq_missing(pa_memblockq *bq) { return l >= bq->minreq ? l : 0; } -size_t pa_memblockq_get_minreq(pa_memblockq *bq) { - pa_assert(bq); - - return bq->minreq; -} - void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) { int64_t old, delta; pa_assert(bq); @@ -535,6 +602,8 @@ void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) { pa_assert_not_reached(); } + drop_backlog(bq); + delta = bq->write_index - old; if (delta >= bq->requested) { @@ -564,7 +633,7 @@ void pa_memblockq_flush(pa_memblockq *bq) { delta = bq->write_index - old; - if (delta > bq->requested) { + if (delta >= bq->requested) { delta -= bq->requested; bq->requested = 0; } else if (delta >= 0) { @@ -581,13 +650,21 @@ size_t pa_memblockq_get_tlength(pa_memblockq *bq) { return bq->tlength; } +size_t pa_memblockq_get_minreq(pa_memblockq *bq) { + pa_assert(bq); + + return bq->minreq; +} + int64_t pa_memblockq_get_read_index(pa_memblockq *bq) { pa_assert(bq); + return bq->read_index; } int64_t pa_memblockq_get_write_index(pa_memblockq *bq) { pa_assert(bq); + return bq->write_index; } @@ -600,9 +677,6 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) { if (bq->base == 1) return pa_memblockq_push(bq, chunk); - if (!bq->mcalign) - bq->mcalign = pa_mcalign_new(bq->base); - if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length))) return -1; @@ -613,23 +687,15 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) { r = pa_memblockq_push(bq, &rchunk); pa_memblock_unref(rchunk.memblock); - if (r < 0) + if (r < 0) { + pa_mcalign_flush(bq->mcalign); return -1; + } } return 0; } -void pa_memblockq_shorten(pa_memblockq *bq, size_t length) { - size_t l; - pa_assert(bq); - - l = pa_memblockq_get_length(bq); - - if (l > length) - pa_memblockq_drop(bq, l - length); -} - void pa_memblockq_prebuf_disable(pa_memblockq *bq) { pa_assert(bq); @@ -639,7 +705,7 @@ void pa_memblockq_prebuf_disable(pa_memblockq *bq) { void pa_memblockq_prebuf_force(pa_memblockq *bq) { pa_assert(bq); - if (!bq->in_prebuf && bq->prebuf > 0) + if (bq->prebuf > 0) bq->in_prebuf = TRUE; } @@ -710,7 +776,7 @@ void pa_memblockq_set_tlength(pa_memblockq *bq, size_t tlength) { void pa_memblockq_set_prebuf(pa_memblockq *bq, size_t prebuf) { pa_assert(bq); - bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength/2 : prebuf; + bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength : prebuf; bq->prebuf = ((bq->prebuf+bq->base-1)/bq->base)*bq->base; if (prebuf > 0 && bq->prebuf < bq->base) @@ -737,3 +803,73 @@ void pa_memblockq_set_minreq(pa_memblockq *bq, size_t minreq) { if (bq->minreq < bq->base) bq->minreq = bq->base; } + +void pa_memblockq_set_maxrewind(pa_memblockq *bq, size_t maxrewind) { + pa_assert(bq); + + bq->maxrewind = (maxrewind/bq->base)*bq->base; +} + +void pa_memblockq_rewind(pa_memblockq *bq, size_t length) { + pa_assert(bq); + pa_assert(length % bq->base == 0); + + bq->read_index -= length; + bq->missing -= length; +} + +int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) { + + pa_assert(bq); + pa_assert(source); + + pa_memblockq_prebuf_disable(bq); + + for (;;) { + pa_memchunk chunk; + + if (pa_memblockq_peek(source, &chunk) < 0) + return 0; + + pa_assert(chunk.length > 0); + + if (chunk.memblock) { + + if (pa_memblockq_push_align(bq, &chunk) < 0) { + pa_memblock_unref(chunk.memblock); + return -1; + } + + pa_memblock_unref(chunk.memblock); + } else + pa_memblockq_seek(bq, chunk.length, PA_SEEK_RELATIVE); + + pa_memblockq_drop(bq, chunk.length); + } +} + +void pa_memblockq_willneed(pa_memblockq *bq) { + struct list_item *q; + + pa_assert(bq); + + fix_current_read(bq); + + for (q = bq->current_read; q; q = q->next) + pa_memchunk_will_need(&q->chunk); +} + +void pa_memblockq_set_silence(pa_memblockq *bq, pa_memblock *silence) { + pa_assert(bq); + + if (bq->silence) + pa_memblock_unref(bq->silence); + + bq->silence = silence ? pa_memblock_ref(silence) : NULL; +} + +pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq) { + pa_assert(bq); + + return !bq->blocks; +} diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h index 46637f10..8610a1aa 100644 --- a/src/pulsecore/memblockq.h +++ b/src/pulsecore/memblockq.h @@ -62,6 +62,8 @@ typedef struct pa_memblockq pa_memblockq; - minreq: pa_memblockq_missing() will only return values greater than this value. Pass 0 for the default. + - maxrewind: how many bytes of history to keep in the queue + - silence: return this memblock when reading unitialized data */ pa_memblockq* pa_memblockq_new( @@ -71,6 +73,7 @@ pa_memblockq* pa_memblockq_new( size_t base, size_t prebuf, size_t minreq, + size_t maxrewind, pa_memblock *silence); void pa_memblockq_free(pa_memblockq*bq); @@ -95,7 +98,7 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk); void pa_memblockq_drop(pa_memblockq *bq, size_t length); /* Test if the pa_memblockq is currently readable, that is, more data than base */ -int pa_memblockq_is_readable(pa_memblockq *bq); +pa_bool_t pa_memblockq_is_readable(pa_memblockq *bq); /* Return the length of the queue in bytes */ size_t pa_memblockq_get_length(pa_memblockq *bq); @@ -107,6 +110,9 @@ size_t pa_memblockq_missing(pa_memblockq *bq); * this function, reset the internal counter to 0. */ size_t pa_memblockq_pop_missing(pa_memblockq *bq); +/* Directly moves the data from the source memblockq into bq */ +int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source); + /* Returns the minimal request value */ size_t pa_memblockq_get_minreq(pa_memblockq *bq); @@ -125,10 +131,8 @@ int64_t pa_memblockq_get_read_index(pa_memblockq *bq); /* Return the current write index */ int64_t pa_memblockq_get_write_index(pa_memblockq *bq); -/* Shorten the pa_memblockq to the specified length by dropping data - * at the read end of the queue. The read index is increased until the - * queue has the specified length */ -void pa_memblockq_shorten(pa_memblockq *bq, size_t length); +/* Rewind the read index. If the history is shorter than the specified length we'll point to silence afterwards. */ +void pa_memblockq_rewind(pa_memblockq *bq, size_t length); /* Ignore prebuf for now */ void pa_memblockq_prebuf_disable(pa_memblockq *bq); @@ -142,10 +146,20 @@ size_t pa_memblockq_get_maxlength(pa_memblockq *bq); /* Return the prebuffer length in bytes */ size_t pa_memblockq_get_prebuf(pa_memblockq *bq); -/* Change metrics. */ -void pa_memblockq_set_maxlength(pa_memblockq *memblockq, size_t maxlength); -void pa_memblockq_set_tlength(pa_memblockq *memblockq, size_t tlength); -void pa_memblockq_set_prebuf(pa_memblockq *memblockq, size_t prebuf); +/* Change metrics. Always call in order. */ +void pa_memblockq_set_maxlength(pa_memblockq *memblockq, size_t maxlength); /* might modify tlength, prebuf, minreq too */ +void pa_memblockq_set_tlength(pa_memblockq *memblockq, size_t tlength); /* might modify minreq, too */ +void pa_memblockq_set_prebuf(pa_memblockq *memblockq, size_t prebuf); /* might modify minreq, too */ void pa_memblockq_set_minreq(pa_memblockq *memblockq, size_t minreq); +void pa_memblockq_set_maxrewind(pa_memblockq *memblockq, size_t rewind); /* Set the maximum history size */ +void pa_memblockq_set_silence(pa_memblockq *memblockq, pa_memblock *silence); + +/* Call pa_memchunk_willneed() for every chunk in the queue from the current read pointer to the end */ +void pa_memblockq_willneed(pa_memblockq *bq); + +/* Check whether the memblockq is completely empty, i.e. no data + * neither left nor right of the read pointer, and hence no buffered + * data for the future nor data in the backlog. */ +pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq); #endif diff --git a/src/pulsecore/namereg.c b/src/pulsecore/namereg.c index fe520384..1b0977d7 100644 --- a/src/pulsecore/namereg.c +++ b/src/pulsecore/namereg.c @@ -179,7 +179,7 @@ void pa_namereg_unregister(pa_core *c, const char *name) { pa_xfree(e); } -void* pa_namereg_get(pa_core *c, const char *name, pa_namereg_type_t type, int autoload) { +void* pa_namereg_get(pa_core *c, const char *name, pa_namereg_type_t type, pa_bool_t autoload) { struct namereg_entry *e; uint32_t idx; pa_assert(c); diff --git a/src/pulsecore/namereg.h b/src/pulsecore/namereg.h index d0db9e81..0f5b4d4d 100644 --- a/src/pulsecore/namereg.h +++ b/src/pulsecore/namereg.h @@ -39,7 +39,7 @@ void pa_namereg_free(pa_core *c); const char *pa_namereg_register(pa_core *c, const char *name, pa_namereg_type_t type, void *data, int fail); void pa_namereg_unregister(pa_core *c, const char *name); -void* pa_namereg_get(pa_core *c, const char *name, pa_namereg_type_t type, int autoload); +void* pa_namereg_get(pa_core *c, const char *name, pa_namereg_type_t type, pa_bool_t autoload); int pa_namereg_set_default(pa_core*c, const char *name, pa_namereg_type_t type); const char *pa_namereg_get_default_sink_name(pa_core *c); diff --git a/src/pulsecore/native-common.h b/src/pulsecore/native-common.h index 3ab2361b..51f2b309 100644 --- a/src/pulsecore/native-common.h +++ b/src/pulsecore/native-common.h @@ -126,7 +126,7 @@ enum { PA_COMMAND_SUSPEND_SINK, PA_COMMAND_SUSPEND_SOURCE, - /* Supported since protocol v13 (0.9.8) */ + /* Supported since protocol v12 (0.9.8) */ PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR, PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR, @@ -139,6 +139,14 @@ enum { PA_COMMAND_PLAYBACK_STREAM_MOVED, PA_COMMAND_RECORD_STREAM_MOVED, + /* Supported since protocol v13 (0.9.10) */ + PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST, + PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST, + PA_COMMAND_UPDATE_CLIENT_PROPLIST, + PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST, + PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST, + PA_COMMAND_REMOVE_CLIENT_PROPLIST, + PA_COMMAND_MAX }; diff --git a/src/pulsecore/play-memblockq.c b/src/pulsecore/play-memblockq.c index 5d3c2d39..bad60ae5 100644 --- a/src/pulsecore/play-memblockq.c +++ b/src/pulsecore/play-memblockq.c @@ -34,6 +34,7 @@ #include #include #include +#include #include "play-memblockq.h" @@ -59,7 +60,6 @@ static void memblockq_stream_unlink(memblockq_stream *u) { return; pa_sink_input_unlink(u->sink_input); - pa_sink_input_unref(u->sink_input); u->sink_input = NULL; @@ -70,8 +70,6 @@ static void memblockq_stream_free(pa_object *o) { memblockq_stream *u = MEMBLOCKQ_STREAM(o); pa_assert(u); - memblockq_stream_unlink(u); - if (u->memblockq) pa_memblockq_free(u->memblockq); @@ -92,15 +90,19 @@ static int memblockq_stream_process_msg(pa_msgobject *o, int code, void*userdata } static void sink_input_kill_cb(pa_sink_input *i) { + memblockq_stream *u; + pa_sink_input_assert_ref(i); + u = MEMBLOCKQ_STREAM(i->userdata); + memblockq_stream_assert_ref(u); - memblockq_stream_unlink(MEMBLOCKQ_STREAM(i->userdata)); + memblockq_stream_unlink(u); } -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) { memblockq_stream *u; - pa_assert(i); + pa_sink_input_assert_ref(i); pa_assert(chunk); u = MEMBLOCKQ_STREAM(i->userdata); memblockq_stream_assert_ref(u); @@ -109,36 +111,56 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun return -1; if (pa_memblockq_peek(u->memblockq, chunk) < 0) { - pa_memblockq_free(u->memblockq); - u->memblockq = NULL; - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u), MEMBLOCKQ_STREAM_MESSAGE_UNLINK, NULL, 0, NULL, NULL); + + if (pa_sink_input_safe_to_remove(i)) { + + pa_memblockq_free(u->memblockq); + u->memblockq = NULL; + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u), MEMBLOCKQ_STREAM_MESSAGE_UNLINK, NULL, 0, NULL, NULL); + } + return -1; } + pa_memblockq_drop(u->memblockq, chunk->length); + return 0; } -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { +static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) { + memblockq_stream *u; + + pa_sink_input_assert_ref(i); + pa_assert(nbytes > 0); + u = MEMBLOCKQ_STREAM(i->userdata); + memblockq_stream_assert_ref(u); + + if (!u->memblockq) + return; + + pa_memblockq_rewind(u->memblockq, nbytes); +} + +static void sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes) { memblockq_stream *u; - pa_assert(i); - pa_assert(length > 0); + pa_sink_input_assert_ref(i); u = MEMBLOCKQ_STREAM(i->userdata); memblockq_stream_assert_ref(u); if (!u->memblockq) return; - pa_memblockq_drop(u->memblockq, length); + pa_memblockq_set_maxrewind(u->memblockq, nbytes); } pa_sink_input* pa_memblockq_sink_input_new( pa_sink *sink, - const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_memblockq *q, - pa_cvolume *volume) { + pa_cvolume *volume, + pa_proplist *p) { memblockq_stream *u = NULL; pa_sink_input_new_data data; @@ -149,41 +171,35 @@ pa_sink_input* pa_memblockq_sink_input_new( /* We allow creating this stream with no q set, so that it can be * filled in later */ - if (q && pa_memblockq_get_length(q) <= 0) { - pa_memblockq_free(q); - return NULL; - } - - if (volume && pa_cvolume_is_muted(volume)) { - pa_memblockq_free(q); - return NULL; - } - u = pa_msgobject_new(memblockq_stream); u->parent.parent.free = memblockq_stream_free; u->parent.process_msg = memblockq_stream_process_msg; u->core = sink->core; u->sink_input = NULL; - u->memblockq = q; + u->memblockq = NULL; pa_sink_input_new_data_init(&data); data.sink = sink; - data.name = name; data.driver = __FILE__; pa_sink_input_new_data_set_sample_spec(&data, ss); pa_sink_input_new_data_set_channel_map(&data, map); pa_sink_input_new_data_set_volume(&data, volume); + pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p); - if (!(u->sink_input = pa_sink_input_new(sink->core, &data, 0))) + u->sink_input = pa_sink_input_new(sink->core, &data, 0); + pa_sink_input_new_data_done(&data); + + if (!u->sink_input) goto fail; - u->sink_input->peek = sink_input_peek_cb; - u->sink_input->drop = sink_input_drop_cb; + u->sink_input->pop = sink_input_pop_cb; + u->sink_input->rewind = sink_input_rewind_cb; + u->sink_input->set_max_rewind = sink_input_set_max_rewind; u->sink_input->kill = sink_input_kill_cb; u->sink_input->userdata = u; if (q) - pa_memblockq_prebuf_disable(q); + pa_memblockq_sink_input_set_queue(u->sink_input, q); /* The reference to u is dangling here, because we want * to keep this stream around until it is fully played. */ @@ -202,11 +218,12 @@ fail: int pa_play_memblockq( pa_sink *sink, - const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_memblockq *q, - pa_cvolume *volume) { + pa_cvolume *volume, + pa_proplist *p, + uint32_t *sink_input_index) { pa_sink_input *i; @@ -214,10 +231,14 @@ int pa_play_memblockq( pa_assert(ss); pa_assert(q); - if (!(i = pa_memblockq_sink_input_new(sink, name, ss, map, q, volume))) + if (!(i = pa_memblockq_sink_input_new(sink, ss, map, q, volume, p))) return -1; pa_sink_input_put(i); + + if (sink_input_index) + *sink_input_index = i->index; + pa_sink_input_unref(i); return 0; @@ -232,5 +253,20 @@ void pa_memblockq_sink_input_set_queue(pa_sink_input *i, pa_memblockq *q) { if (u->memblockq) pa_memblockq_free(u->memblockq); - u->memblockq = q; + + if ((u->memblockq = q)) { + pa_memblock *silence; + + pa_memblockq_set_prebuf(q, 0); + + silence = pa_silence_memblock_new( + i->sink->core->mempool, + &i->sample_spec, + i->thread_info.resampler ? pa_resampler_max_block_size(i->thread_info.resampler) : 0); + + pa_memblockq_set_silence(q, silence); + pa_memblock_unref(silence); + + pa_memblockq_willneed(q); + } } diff --git a/src/pulsecore/play-memblockq.h b/src/pulsecore/play-memblockq.h index d8790316..9ecf7700 100644 --- a/src/pulsecore/play-memblockq.h +++ b/src/pulsecore/play-memblockq.h @@ -29,20 +29,21 @@ pa_sink_input* pa_memblockq_sink_input_new( pa_sink *sink, - const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_memblockq *q, - pa_cvolume *volume); + pa_cvolume *volume, + pa_proplist *p); void pa_memblockq_sink_input_set_queue(pa_sink_input *i, pa_memblockq *q); int pa_play_memblockq( pa_sink *sink, - const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_memblockq *q, - pa_cvolume *cvolume); + pa_cvolume *cvolume, + pa_proplist *p, + uint32_t *sink_input_index); #endif diff --git a/src/pulsecore/play-memchunk.c b/src/pulsecore/play-memchunk.c index 6aaec567..c528b1d0 100644 --- a/src/pulsecore/play-memchunk.c +++ b/src/pulsecore/play-memchunk.c @@ -34,163 +34,33 @@ #include #include #include +#include #include "play-memchunk.h" -typedef struct memchunk_stream { - pa_msgobject parent; - pa_core *core; - pa_sink_input *sink_input; - pa_memchunk memchunk; -} memchunk_stream; - -enum { - MEMCHUNK_STREAM_MESSAGE_UNLINK, -}; - -PA_DECLARE_CLASS(memchunk_stream); -#define MEMCHUNK_STREAM(o) (memchunk_stream_cast(o)) -static PA_DEFINE_CHECK_TYPE(memchunk_stream, pa_msgobject); - -static void memchunk_stream_unlink(memchunk_stream *u) { - pa_assert(u); - - if (!u->sink_input) - return; - - pa_sink_input_unlink(u->sink_input); - - pa_sink_input_unref(u->sink_input); - u->sink_input = NULL; - - memchunk_stream_unref(u); -} - -static void memchunk_stream_free(pa_object *o) { - memchunk_stream *u = MEMCHUNK_STREAM(o); - pa_assert(u); - - memchunk_stream_unlink(u); - - if (u->memchunk.memblock) - pa_memblock_unref(u->memchunk.memblock); - - pa_xfree(u); -} - -static int memchunk_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) { - memchunk_stream *u = MEMCHUNK_STREAM(o); - memchunk_stream_assert_ref(u); - - switch (code) { - case MEMCHUNK_STREAM_MESSAGE_UNLINK: - memchunk_stream_unlink(u); - break; - } - - return 0; -} - -static void sink_input_kill_cb(pa_sink_input *i) { - pa_sink_input_assert_ref(i); - - memchunk_stream_unlink(MEMCHUNK_STREAM(i->userdata)); -} - -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { - memchunk_stream *u; - - pa_assert(i); - pa_assert(chunk); - u = MEMCHUNK_STREAM(i->userdata); - memchunk_stream_assert_ref(u); - - if (!u->memchunk.memblock) - return -1; - - if (u->memchunk.length <= 0) { - pa_memblock_unref(u->memchunk.memblock); - u->memchunk.memblock = NULL; - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u), MEMCHUNK_STREAM_MESSAGE_UNLINK, NULL, 0, NULL, NULL); - return -1; - } - - pa_assert(u->memchunk.memblock); - *chunk = u->memchunk; - pa_memblock_ref(chunk->memblock); - - return 0; -} - -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { - memchunk_stream *u; - - pa_assert(i); - pa_assert(length > 0); - u = MEMCHUNK_STREAM(i->userdata); - memchunk_stream_assert_ref(u); - - if (length < u->memchunk.length) { - u->memchunk.length -= length; - u->memchunk.index += length; - } else - u->memchunk.length = 0; -} - int pa_play_memchunk( pa_sink *sink, - const char *name, const pa_sample_spec *ss, const pa_channel_map *map, const pa_memchunk *chunk, - pa_cvolume *volume) { + pa_cvolume *volume, + pa_proplist *p, + uint32_t *sink_input_index) { - memchunk_stream *u = NULL; - pa_sink_input_new_data data; + pa_memblockq *q; + int r; pa_assert(sink); pa_assert(ss); pa_assert(chunk); - if (volume && pa_cvolume_is_muted(volume)) - return 0; - - pa_memchunk_will_need(chunk); - - u = pa_msgobject_new(memchunk_stream); - u->parent.parent.free = memchunk_stream_free; - u->parent.process_msg = memchunk_stream_process_msg; - u->core = sink->core; - u->memchunk = *chunk; - pa_memblock_ref(u->memchunk.memblock); - - pa_sink_input_new_data_init(&data); - data.sink = sink; - data.driver = __FILE__; - data.name = name; - pa_sink_input_new_data_set_sample_spec(&data, ss); - pa_sink_input_new_data_set_channel_map(&data, map); - pa_sink_input_new_data_set_volume(&data, volume); - - if (!(u->sink_input = pa_sink_input_new(sink->core, &data, 0))) - goto fail; - - u->sink_input->peek = sink_input_peek_cb; - u->sink_input->drop = sink_input_drop_cb; - u->sink_input->kill = sink_input_kill_cb; - u->sink_input->userdata = u; + q = pa_memblockq_new(0, chunk->length, 0, pa_frame_size(ss), 0, 0, 0, NULL); + pa_assert_se(pa_memblockq_push(q, chunk) >= 0); - pa_sink_input_put(u->sink_input); - - /* The reference to u is dangling here, because we want to keep - * this stream around until it is fully played. */ + if ((r = pa_play_memblockq(sink, ss, map, q, volume, p, sink_input_index)) < 0) { + pa_memblockq_free(q); + return r; + } return 0; - -fail: - if (u) - memchunk_stream_unref(u); - - return -1; } - diff --git a/src/pulsecore/play-memchunk.h b/src/pulsecore/play-memchunk.h index 5afb094c..f7c9d178 100644 --- a/src/pulsecore/play-memchunk.h +++ b/src/pulsecore/play-memchunk.h @@ -29,10 +29,11 @@ int pa_play_memchunk( pa_sink *sink, - const char *name, const pa_sample_spec *ss, const pa_channel_map *map, const pa_memchunk *chunk, - pa_cvolume *cvolume); + pa_cvolume *cvolume, + pa_proplist *p, + uint32_t *sink_input_index); #endif diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c index 004e535e..972e8e1a 100644 --- a/src/pulsecore/protocol-esound.c +++ b/src/pulsecore/protocol-esound.c @@ -149,8 +149,7 @@ typedef struct proto_handler { const char *description; } esd_proto_handler_info_t; -static void sink_input_drop_cb(pa_sink_input *i, size_t length); -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); static void sink_input_kill_cb(pa_sink_input *i); static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); static pa_usec_t source_output_get_latency_cb(pa_source_output *o); @@ -410,14 +409,16 @@ static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t reques pa_assert(!c->sink_input && !c->input_memblockq); pa_sink_input_new_data_init(&sdata); - sdata.sink = sink; sdata.driver = __FILE__; - sdata.name = c->client->name; - pa_sink_input_new_data_set_sample_spec(&sdata, &ss); sdata.module = c->protocol->module; sdata.client = c->client; + sdata.sink = sink; + pa_proplist_update(sdata.proplist, PA_UPDATE_MERGE, c->client->proplist); + pa_sink_input_new_data_set_sample_spec(&sdata, &ss); c->sink_input = pa_sink_input_new(c->protocol->core, &sdata, 0); + pa_sink_input_new_data_done(&sdata); + CHECK_VALIDITY(c->sink_input, "Failed to create sink input."); l = (size_t) (pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS); @@ -428,13 +429,13 @@ static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t reques pa_frame_size(&ss), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, + 0, NULL); pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2); c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; c->sink_input->parent.process_msg = sink_input_process_msg; - c->sink_input->peek = sink_input_peek_cb; - c->sink_input->drop = sink_input_drop_cb; + c->sink_input->pop = sink_input_pop_cb; c->sink_input->kill = sink_input_kill_cb; c->sink_input->userdata = c; @@ -509,14 +510,16 @@ static int esd_proto_stream_record(connection *c, esd_proto_t request, const voi pa_assert(!c->output_memblockq && !c->source_output); pa_source_output_new_data_init(&sdata); - sdata.source = source; sdata.driver = __FILE__; - sdata.name = c->client->name; - pa_source_output_new_data_set_sample_spec(&sdata, &ss); sdata.module = c->protocol->module; sdata.client = c->client; + sdata.source = source; + pa_proplist_update(sdata.proplist, PA_UPDATE_MERGE, c->client->proplist); + pa_source_output_new_data_set_sample_spec(&sdata, &ss); + + c->source_output = pa_source_output_new(c->protocol->core, &sdata, 0); + pa_source_output_new_data_done(&sdata); - c->source_output = pa_source_output_new(c->protocol->core, &sdata, 9); CHECK_VALIDITY(c->source_output, "Failed to create source_output."); l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS); @@ -527,6 +530,7 @@ static int esd_proto_stream_record(connection *c, esd_proto_t request, const voi pa_frame_size(&ss), 1, 0, + 0, NULL); pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2); @@ -638,8 +642,8 @@ static int esd_proto_all_info(connection *c, esd_proto_t request, const void *da memset(name, 0, ESD_NAME_MAX); /* don't leak old data */ if (conn->original_name) strncpy(name, conn->original_name, ESD_NAME_MAX); - else if (conn->client && conn->client->name) - strncpy(name, conn->client->name, ESD_NAME_MAX); + else if (conn->client && pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME)) + strncpy(name, pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME), ESD_NAME_MAX); connection_write(c, name, ESD_NAME_MAX); /* rate */ @@ -800,7 +804,7 @@ static int esd_proto_sample_cache(connection *c, PA_GCC_UNUSED esd_proto_t reque c->state = ESD_CACHING_SAMPLE; - pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, &idx); + pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, c->client->proplist, &idx); idx += 1; connection_write(c, &idx, sizeof(uint32_t)); @@ -851,7 +855,7 @@ static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, con pa_sink *sink; if ((sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) - if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM) >= 0) + if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM, c->client->proplist, NULL) >= 0) ok = idx + 1; } else { pa_assert(request == ESD_PROTO_SAMPLE_FREE); @@ -992,7 +996,7 @@ static int do_read(connection *c) { uint32_t idx; c->scache.memchunk.index = 0; - pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, &idx); + pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, c->client->proplist, &idx); pa_memblock_unref(c->scache.memchunk.memblock); c->scache.memchunk.memblock = NULL; @@ -1237,7 +1241,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int } /* Called from thread context */ -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { connection*c; int r; @@ -1246,32 +1250,25 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun connection_assert_ref(c); pa_assert(chunk); - if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0 && c->dead) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0) { - return r; -} - -/* Called from thread context */ -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { - connection*c; - size_t old, new; - - pa_assert(i); - c = CONNECTION(i->userdata); - connection_assert_ref(c); - pa_assert(length); - /* pa_log("DROP"); */ + if (c->dead && pa_sink_input_safe_to_remove(i)) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + } else { + size_t old, new; - old = pa_memblockq_missing(c->input_memblockq); - pa_memblockq_drop(c->input_memblockq, length); - new = pa_memblockq_missing(c->input_memblockq); + old = pa_memblockq_missing(c->input_memblockq); + pa_memblockq_drop(c->input_memblockq, chunk->length); + new = pa_memblockq_missing(c->input_memblockq); - if (new > old) { - if (pa_atomic_add(&c->playback.missing, new - old) <= 0) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + if (new > old) { + if (pa_atomic_add(&c->playback.missing, new - old) <= 0) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + } } + + return r; } static void sink_input_kill_cb(pa_sink_input *i) { @@ -1349,7 +1346,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname)); pa_snprintf(cname, sizeof(cname), "EsounD client (%s)", pname); c->client = pa_client_new(p->core, __FILE__, cname); - c->client->owner = p->module; + c->client->module = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index 4f582798..811cc805 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -98,17 +98,17 @@ typedef struct playback_stream { pa_sink_input *sink_input; pa_memblockq *memblockq; - int drain_request; + pa_bool_t drain_request; uint32_t drain_tag; uint32_t syncid; - int underrun; + pa_bool_t underrun; pa_atomic_t missing; size_t minreq; /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */ int64_t read_index, write_index; - size_t resampled_chunk_length; + size_t render_memblockq_length; } playback_stream; typedef struct upload_stream { @@ -122,12 +122,13 @@ typedef struct upload_stream { char *name; pa_sample_spec sample_spec; pa_channel_map channel_map; + pa_proplist *proplist; } upload_stream; struct connection { pa_msgobject parent; - int authorized; + pa_bool_t authorized; uint32_t version; pa_protocol_native *protocol; pa_client *client; @@ -162,11 +163,11 @@ static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject); struct pa_protocol_native { pa_module *module; pa_core *core; - int public; + pa_bool_t public; pa_socket_server *server; pa_idxset *connections; uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH]; - int auth_cookie_in_property; + pa_bool_t auth_cookie_in_property; #ifdef HAVE_CREDS char *auth_group; #endif @@ -199,8 +200,7 @@ enum { CONNECTION_MESSAGE_REVOKE }; -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); -static void sink_input_drop_cb(pa_sink_input *i, size_t length); +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); static void sink_input_kill_cb(pa_sink_input *i); static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend); static void sink_input_moved_cb(pa_sink_input *i); @@ -254,6 +254,8 @@ static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); +static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); +static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = NULL, @@ -335,7 +337,15 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr, [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate, - [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate + [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate, + + [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist, + [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist, + [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist, + + [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist, + [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist, + [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist, }; /* structure management */ @@ -359,6 +369,9 @@ static void upload_stream_free(pa_object *o) { pa_xfree(s->name); + if (s->proplist) + pa_proplist_free(s->proplist); + if (s->memchunk.memblock) pa_memblock_unref(s->memchunk.memblock); @@ -369,7 +382,9 @@ static upload_stream* upload_stream_new( connection *c, const pa_sample_spec *ss, const pa_channel_map *map, - const char *name, size_t length) { + const char *name, + size_t length, + pa_proplist *p) { upload_stream *s; @@ -377,6 +392,7 @@ static upload_stream* upload_stream_new( pa_assert(ss); pa_assert(name); pa_assert(length > 0); + pa_assert(p); s = pa_msgobject_new(upload_stream); s->parent.parent.parent.free = upload_stream_free; @@ -386,6 +402,8 @@ static upload_stream* upload_stream_new( s->name = pa_xstrdup(name); pa_memchunk_reset(&s->memchunk); s->length = length; + s->proplist = pa_proplist_copy(p); + pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist); pa_idxset_put(c->output_streams, s, &s->index); @@ -452,7 +470,8 @@ static record_stream* record_stream_new( const char *name, uint32_t *maxlength, uint32_t fragment_size, - pa_source_output_flags_t flags) { + pa_source_output_flags_t flags, + pa_proplist *p) { record_stream *s; pa_source_output *source_output; @@ -464,17 +483,24 @@ static record_stream* record_stream_new( pa_assert(name); pa_assert(maxlength); pa_assert(*maxlength > 0); + pa_assert(p); pa_source_output_new_data_init(&data); + + pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p); + pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist); + data.driver = __FILE__; data.module = c->protocol->module; data.client = c->client; data.source = source; - data.driver = __FILE__; - data.name = name; pa_source_output_new_data_set_sample_spec(&data, ss); pa_source_output_new_data_set_channel_map(&data, map); - if (!(source_output = pa_source_output_new(c->protocol->core, &data, flags))) + source_output = pa_source_output_new(c->protocol->core, &data, flags); + + pa_source_output_new_data_done(&data); + + if (!source_output) return NULL; s = pa_msgobject_new(record_stream); @@ -496,6 +522,7 @@ static record_stream* record_stream_new( base = pa_frame_size(&s->source_output->sample_spec), 1, 0, + 0, NULL); *maxlength = pa_memblockq_get_maxlength(s->memblockq); @@ -633,7 +660,8 @@ static playback_stream* playback_stream_new( pa_cvolume *volume, uint32_t syncid, uint32_t *missing, - pa_sink_input_flags_t flags) { + pa_sink_input_flags_t flags, + pa_proplist *p) { playback_stream *s, *ssync; pa_sink_input *sink_input; @@ -646,6 +674,7 @@ static playback_stream* playback_stream_new( pa_assert(ss); pa_assert(name); pa_assert(maxlength); + pa_assert(p); /* Find syncid group */ for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) { @@ -667,17 +696,23 @@ static playback_stream* playback_stream_new( } pa_sink_input_new_data_init(&data); - data.sink = sink; + + pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p); + pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist); data.driver = __FILE__; - data.name = name; + data.module = c->protocol->module; + data.client = c->client; + data.sink = sink; pa_sink_input_new_data_set_sample_spec(&data, ss); pa_sink_input_new_data_set_channel_map(&data, map); pa_sink_input_new_data_set_volume(&data, volume); - data.module = c->protocol->module; - data.client = c->client; data.sync_base = ssync ? ssync->sink_input : NULL; - if (!(sink_input = pa_sink_input_new(c->protocol->core, &data, flags))) + sink_input = pa_sink_input_new(c->protocol->core, &data, flags); + + pa_sink_input_new_data_done(&data); + + if (!sink_input) return NULL; s = pa_msgobject_new(playback_stream); @@ -686,11 +721,10 @@ static playback_stream* playback_stream_new( s->connection = c; s->syncid = syncid; s->sink_input = sink_input; - s->underrun = 1; + s->underrun = TRUE; s->sink_input->parent.process_msg = sink_input_process_msg; - s->sink_input->peek = sink_input_peek_cb; - s->sink_input->drop = sink_input_drop_cb; + s->sink_input->pop = sink_input_pop_cb; s->sink_input->kill = sink_input_kill_cb; s->sink_input->moved = sink_input_moved_cb; s->sink_input->suspend = sink_input_suspend_cb; @@ -707,6 +741,7 @@ static playback_stream* playback_stream_new( pa_frame_size(&s->sink_input->sample_spec), *prebuf, *minreq, + 0, silence); pa_memblock_unref(silence); @@ -722,7 +757,7 @@ static playback_stream* playback_stream_new( s->minreq = pa_memblockq_get_minreq(s->memblockq); pa_atomic_store(&s->missing, 0); - s->drain_request = 0; + s->drain_request = FALSE; pa_idxset_put(c->output_streams, s, &s->index); @@ -909,7 +944,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int request_bytes(s); - s->underrun = 0; + s->underrun = FALSE; return 0; } @@ -921,7 +956,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL); else { s->drain_tag = PA_PTR_TO_UINT(userdata); - s->drain_request = 1; + s->drain_request = TRUE; } request_bytes(s); @@ -953,21 +988,21 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int } func(s->memblockq); - s->underrun = 0; + s->underrun = FALSE; request_bytes(s); /* Do the same for all other members in the sync group */ for (isync = i->sync_prev; isync; isync = isync->sync_prev) { playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); func(ssync->memblockq); - ssync->underrun = 0; + ssync->underrun = FALSE; request_bytes(ssync); } for (isync = i->sync_next; isync; isync = isync->sync_next) { playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); func(ssync->memblockq); - ssync->underrun = 0; + ssync->underrun = FALSE; request_bytes(ssync); } @@ -978,7 +1013,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int s->read_index = pa_memblockq_get_read_index(s->memblockq); s->write_index = pa_memblockq_get_write_index(s->memblockq); - s->resampled_chunk_length = s->sink_input->thread_info.resampled_chunk.memblock ? s->sink_input->thread_info.resampled_chunk.length : 0; + s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq); return 0; case PA_SINK_INPUT_MESSAGE_SET_STATE: @@ -1002,7 +1037,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int } /* Called from thread context */ -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { playback_stream *s; pa_sink_input_assert_ref(i); @@ -1010,42 +1045,23 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun playback_stream_assert_ref(s); pa_assert(chunk); - if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) { - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL); - s->underrun = 1; - } - if (pa_memblockq_peek(s->memblockq, chunk) < 0) { -/* pa_log("peek: failure"); */ - return -1; - } - -/* pa_log("peek: %u", chunk->length); */ - request_bytes(s); - - return 0; -} - -/* Called from thread context */ -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { - playback_stream *s; - - pa_sink_input_assert_ref(i); - s = PLAYBACK_STREAM(i->userdata); - playback_stream_assert_ref(s); - pa_assert(length > 0); - - pa_memblockq_drop(s->memblockq, length); + if (s->drain_request && pa_sink_input_safe_to_remove(i)) { + s->drain_request = FALSE; + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); + } else if (!s->underrun) { + s->underrun = TRUE; + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL); + } - if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) { - s->drain_request = 0; - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); + return -1; } + pa_memblockq_drop(s->memblockq, chunk->length); request_bytes(s); -/* pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */ + return 0; } /* Called from main context */ @@ -1207,7 +1223,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC connection *c = CONNECTION(userdata); playback_stream *s; uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid, missing; - const char *name, *sink_name; + const char *name = NULL, *sink_name; pa_sample_spec ss; pa_channel_map map; pa_tagstruct *reply; @@ -1216,29 +1232,45 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC int corked; int no_remap = 0, no_remix = 0, fix_format = 0, fix_rate = 0, fix_channels = 0, no_move = 0, variable_rate = 0; pa_sink_input_flags_t flags = 0; + pa_proplist *p; connection_assert_ref(c); pa_assert(t); - if (pa_tagstruct_get( - t, - PA_TAG_STRING, &name, - PA_TAG_SAMPLE_SPEC, &ss, - PA_TAG_CHANNEL_MAP, &map, - PA_TAG_U32, &sink_index, - PA_TAG_STRING, &sink_name, - PA_TAG_U32, &maxlength, - PA_TAG_BOOLEAN, &corked, - PA_TAG_U32, &tlength, - PA_TAG_U32, &prebuf, - PA_TAG_U32, &minreq, - PA_TAG_U32, &syncid, - PA_TAG_CVOLUME, &volume, - PA_TAG_INVALID) < 0 || !name) { + if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) || + pa_tagstruct_get( + t, + PA_TAG_SAMPLE_SPEC, &ss, + PA_TAG_CHANNEL_MAP, &map, + PA_TAG_U32, &sink_index, + PA_TAG_STRING, &sink_name, + PA_TAG_U32, &maxlength, + PA_TAG_BOOLEAN, &corked, + PA_TAG_U32, &tlength, + PA_TAG_U32, &prebuf, + PA_TAG_U32, &minreq, + PA_TAG_U32, &syncid, + PA_TAG_CVOLUME, &volume, + PA_TAG_INVALID) < 0) { + protocol_error(c); return; } + CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); + CHECK_VALIDITY(c->pstream, sink_index != PA_INVALID_INDEX || !sink_name || (*sink_name && pa_utf8_valid(sink_name)), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, maxlength > 0, tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, maxlength <= MAX_MEMBLOCKQ_LENGTH, tag, PA_ERR_INVALID); + + p = pa_proplist_new(); + + if (name) + pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name); + if (c->version >= 12) { /* Since 0.9.8 the user can ask for a couple of additional flags */ @@ -1249,32 +1281,43 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC pa_tagstruct_get_boolean(t, &fix_channels) < 0 || pa_tagstruct_get_boolean(t, &no_move) < 0 || pa_tagstruct_get_boolean(t, &variable_rate) < 0) { + + protocol_error(c); + pa_proplist_free(p); + return; + } + } + + if (c->version >= 13) { + + if (pa_tagstruct_get_proplist(t, p) < 0) { protocol_error(c); + pa_proplist_free(p); return; } } if (!pa_tagstruct_eof(t)) { protocol_error(c); + pa_proplist_free(p); return; } - CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); - CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, sink_index != PA_INVALID_INDEX || !sink_name || (*sink_name && pa_utf8_valid(name)), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, maxlength > 0, tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, maxlength <= MAX_MEMBLOCKQ_LENGTH, tag, PA_ERR_INVALID); - if (sink_index != PA_INVALID_INDEX) { - sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index); - CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY); + + if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) { + pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY); + pa_proplist_free(p); + return; + } + } else if (sink_name) { - sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1); - CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY); + + if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1))) { + pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY); + pa_proplist_free(p); + return; + } } flags = @@ -1287,7 +1330,9 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) | (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0); - s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, syncid, &missing, flags); + s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, syncid, &missing, flags, p); + pa_proplist_free(p); + CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); reply = reply_new(tag); @@ -1395,11 +1440,12 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ int corked; int no_remap = 0, no_remix = 0, fix_format = 0, fix_rate = 0, fix_channels = 0, no_move = 0, variable_rate = 0; pa_source_output_flags_t flags = 0; + pa_proplist *p; connection_assert_ref(c); pa_assert(t); - if (pa_tagstruct_gets(t, &name) < 0 || + if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) || pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_get_channel_map(t, &map) < 0 || pa_tagstruct_getu32(t, &source_index) < 0 || @@ -1411,6 +1457,19 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ return; } + CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); + CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, source_index != PA_INVALID_INDEX || !source_name || (*source_name && pa_utf8_valid(source_name)), tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, maxlength > 0, tag, PA_ERR_INVALID); + CHECK_VALIDITY(c->pstream, maxlength <= MAX_MEMBLOCKQ_LENGTH, tag, PA_ERR_INVALID); + + p = pa_proplist_new(); + + if (name) + pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name); + if (c->version >= 12) { /* Since 0.9.8 the user can ask for a couple of additional flags */ @@ -1421,16 +1480,45 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ pa_tagstruct_get_boolean(t, &fix_channels) < 0 || pa_tagstruct_get_boolean(t, &no_move) < 0 || pa_tagstruct_get_boolean(t, &variable_rate) < 0) { + + protocol_error(c); + pa_proplist_free(p); + return; + } + } + + if (c->version >= 13) { + + if (pa_tagstruct_get_proplist(t, p) < 0) { protocol_error(c); + pa_proplist_free(p); return; } } if (!pa_tagstruct_eof(t)) { protocol_error(c); + pa_proplist_free(p); return; } + if (source_index != PA_INVALID_INDEX) { + + if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) { + pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY); + pa_proplist_free(p); + return; + } + + } else if (source_name) { + + if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE, 1))) { + pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY); + pa_proplist_free(p); + return; + } + } + flags = (corked ? PA_SOURCE_OUTPUT_START_CORKED : 0) | (no_remap ? PA_SOURCE_OUTPUT_NO_REMAP : 0) | @@ -1441,24 +1529,9 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) | (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0); - CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); - CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, source_index != PA_INVALID_INDEX || !source_name || (*source_name && pa_utf8_valid(source_name)), tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, maxlength > 0, tag, PA_ERR_INVALID); - CHECK_VALIDITY(c->pstream, maxlength <= MAX_MEMBLOCKQ_LENGTH, tag, PA_ERR_INVALID); - - if (source_index != PA_INVALID_INDEX) { - source = pa_idxset_get_by_index(c->protocol->core->sources, source_index); - CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY); - } else if (source_name) { - source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE, 1); - CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY); - } + s = record_stream_new(c, source, &ss, &map, name, &maxlength, fragment_size, flags, p); + pa_proplist_free(p); - s = record_stream_new(c, source, &ss, &map, name, &maxlength, fragment_size, flags); CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); reply = reply_new(tag); @@ -1511,6 +1584,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t connection *c = CONNECTION(userdata); const void*cookie; pa_tagstruct *reply; + char tmp[16]; connection_assert_ref(c); pa_assert(t); @@ -1528,6 +1602,9 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t return; } + pa_snprintf(tmp, sizeof(tmp), "%u", c->version); + pa_proplist_sets(c->client->proplist, "native-protocol.version", tmp); + if (!c->authorized) { int success = 0; @@ -1579,7 +1656,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t return; } - c->authorized = 1; + c->authorized = TRUE; if (c->auth_timeout_event) { c->protocol->core->mainloop->time_free(c->auth_timeout_event); c->auth_timeout_event = NULL; @@ -1607,21 +1684,42 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t static void command_set_client_name(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { connection *c = CONNECTION(userdata); - const char *name; + const char *name = NULL; + pa_proplist *p; + pa_tagstruct *reply; connection_assert_ref(c); pa_assert(t); - if (pa_tagstruct_gets(t, &name) < 0 || + p = pa_proplist_new(); + + if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) || + (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) || !pa_tagstruct_eof(t)) { + protocol_error(c); + pa_proplist_free(p); return; } - CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID); + if (name) + if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) { + pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID); + pa_proplist_free(p); + return; + } - pa_client_set_name(c->client, name); - pa_pstream_send_simple_ack(c->pstream, tag); + pa_proplist_update(c->client->proplist, PA_UPDATE_REPLACE, p); + pa_proplist_free(p); + + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index); + + reply = reply_new(tag); + + if (c->version >= 13) + pa_tagstruct_putu32(reply, c->client->index); + + pa_pstream_send_tagstruct(c->pstream, reply); } static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { @@ -1737,7 +1835,7 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ reply = reply_new(tag); latency = pa_sink_get_latency(s->sink_input->sink); - latency += pa_bytes_to_usec(s->resampled_chunk_length, &s->sink_input->sample_spec); + latency += pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec); pa_tagstruct_put_usec(reply, latency); @@ -1786,19 +1884,19 @@ static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ connection *c = CONNECTION(userdata); upload_stream *s; uint32_t length; - const char *name; + const char *name = NULL; pa_sample_spec ss; pa_channel_map map; pa_tagstruct *reply; + pa_proplist *p; connection_assert_ref(c); pa_assert(t); - if (pa_tagstruct_gets(t, &name) < 0 || + if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) || pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_get_channel_map(t, &map) < 0 || - pa_tagstruct_getu32(t, &length) < 0 || - !pa_tagstruct_eof(t)) { + pa_tagstruct_getu32(t, &length) < 0) { protocol_error(c); return; } @@ -1809,9 +1907,24 @@ static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID); CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID); CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE); - CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name), tag, PA_ERR_INVALID); - s = upload_stream_new(c, &ss, &map, name, length); + if (c->version < 13) + CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name), tag, PA_ERR_INVALID); + + p = pa_proplist_new(); + + if (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) { + protocol_error(c); + pa_proplist_free(p); + return; + } + + if (c->version < 13) + pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name); + + s = upload_stream_new(c, &ss, &map, name, length, p); + pa_proplist_free(p); + CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); reply = reply_new(tag); @@ -1841,7 +1954,7 @@ static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY); - if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, &idx) < 0) + if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0) pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL); else pa_pstream_send_simple_ack(c->pstream, tag); @@ -1855,20 +1968,23 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui pa_volume_t volume; pa_sink *sink; const char *name, *sink_name; + uint32_t idx; + pa_proplist *p; + pa_tagstruct *reply; connection_assert_ref(c); pa_assert(t); + CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); + if (pa_tagstruct_getu32(t, &sink_index) < 0 || pa_tagstruct_gets(t, &sink_name) < 0 || pa_tagstruct_getu32(t, &volume) < 0 || - pa_tagstruct_gets(t, &name) < 0 || - !pa_tagstruct_eof(t)) { + pa_tagstruct_gets(t, &name) < 0) { protocol_error(c); return; } - CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); CHECK_VALIDITY(c->pstream, sink_index != PA_INVALID_INDEX || !sink_name || (*sink_name && pa_utf8_valid(name)), tag, PA_ERR_INVALID); CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name), tag, PA_ERR_INVALID); @@ -1879,12 +1995,29 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY); - if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) { + p = pa_proplist_new(); + + if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) || + !pa_tagstruct_eof(t)) { + protocol_error(c); + pa_proplist_free(p); + return; + } + + if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) { pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY); + pa_proplist_free(p); return; } - pa_pstream_send_simple_ack(c->pstream, tag); + pa_proplist_free(p); + + reply = reply_new(tag); + + if (c->version >= 13) + pa_tagstruct_putu32(reply, idx); + + pa_pstream_send_tagstruct(c->pstream, reply); } static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { @@ -1941,7 +2074,7 @@ static void sink_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink *sink) { t, PA_TAG_U32, sink->index, PA_TAG_STRING, sink->name, - PA_TAG_STRING, sink->description, + PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)), PA_TAG_SAMPLE_SPEC, &fixed_ss, PA_TAG_CHANNEL_MAP, &sink->channel_map, PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX, @@ -1953,6 +2086,9 @@ static void sink_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink *sink) { PA_TAG_STRING, sink->driver, PA_TAG_U32, sink->flags, PA_TAG_INVALID); + + if (c->version >= 13) + pa_tagstruct_put_proplist(t, sink->proplist); } static void source_fill_tagstruct(connection *c, pa_tagstruct *t, pa_source *source) { @@ -1967,7 +2103,7 @@ static void source_fill_tagstruct(connection *c, pa_tagstruct *t, pa_source *sou t, PA_TAG_U32, source->index, PA_TAG_STRING, source->name, - PA_TAG_STRING, source->description, + PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)), PA_TAG_SAMPLE_SPEC, &fixed_ss, PA_TAG_CHANNEL_MAP, &source->channel_map, PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX, @@ -1979,16 +2115,23 @@ static void source_fill_tagstruct(connection *c, pa_tagstruct *t, pa_source *sou PA_TAG_STRING, source->driver, PA_TAG_U32, source->flags, PA_TAG_INVALID); + + if (c->version >= 13) + pa_tagstruct_put_proplist(t, source->proplist); } -static void client_fill_tagstruct(pa_tagstruct *t, pa_client *client) { +static void client_fill_tagstruct(connection *c, pa_tagstruct *t, pa_client *client) { pa_assert(t); pa_assert(client); pa_tagstruct_putu32(t, client->index); - pa_tagstruct_puts(t, client->name); - pa_tagstruct_putu32(t, client->owner ? client->owner->index : PA_INVALID_INDEX); + pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME))); + pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX); pa_tagstruct_puts(t, client->driver); + + if (c->version >= 13) + pa_tagstruct_put_proplist(t, client->proplist); + } static void module_fill_tagstruct(pa_tagstruct *t, pa_module *module) { @@ -2011,7 +2154,7 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in fixup_sample_spec(c, &fixed_ss, &s->sample_spec); pa_tagstruct_putu32(t, s->index); - pa_tagstruct_puts(t, s->name); + pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME))); pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX); pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX); pa_tagstruct_putu32(t, s->sink->index); @@ -2024,6 +2167,8 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in pa_tagstruct_puts(t, s->driver); if (c->version >= 11) pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s)); + if (c->version >= 13) + pa_tagstruct_put_proplist(t, s->proplist); } static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_source_output *s) { @@ -2035,7 +2180,7 @@ static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sour fixup_sample_spec(c, &fixed_ss, &s->sample_spec); pa_tagstruct_putu32(t, s->index); - pa_tagstruct_puts(t, s->name); + pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME))); pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX); pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX); pa_tagstruct_putu32(t, s->source->index); @@ -2045,6 +2190,9 @@ static void source_output_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sour pa_tagstruct_put_usec(t, pa_source_get_latency(s->source)); pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s))); pa_tagstruct_puts(t, s->driver); + + if (c->version >= 13) + pa_tagstruct_put_proplist(t, s->proplist); } static void scache_fill_tagstruct(connection *c, pa_tagstruct *t, pa_scache_entry *e) { @@ -2064,6 +2212,9 @@ static void scache_fill_tagstruct(connection *c, pa_tagstruct *t, pa_scache_entr pa_tagstruct_putu32(t, e->memchunk.length); pa_tagstruct_put_boolean(t, e->lazy); pa_tagstruct_puts(t, e->filename); + + if (c->version >= 13) + pa_tagstruct_put_proplist(t, e->proplist); } static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { @@ -2133,7 +2284,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u else if (source) source_fill_tagstruct(c, reply, source); else if (client) - client_fill_tagstruct(reply, client); + client_fill_tagstruct(c, reply, client); else if (module) module_fill_tagstruct(reply, module); else if (si) @@ -2188,7 +2339,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST) source_fill_tagstruct(c, reply, p); else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST) - client_fill_tagstruct(reply, p); + client_fill_tagstruct(c, reply, p); else if (command == PA_COMMAND_GET_MODULE_INFO_LIST) module_fill_tagstruct(reply, p); else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST) @@ -2660,6 +2811,168 @@ static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command pa_pstream_send_simple_ack(c->pstream, tag); } +static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { + connection *c = CONNECTION(userdata); + uint32_t idx; + uint32_t mode; + pa_proplist *p; + + connection_assert_ref(c); + pa_assert(t); + + CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); + + p = pa_proplist_new(); + + if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) { + + if (pa_tagstruct_getu32(t, &mode) < 0 || + pa_tagstruct_get_proplist(t, p) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + pa_proplist_free(p); + return; + } + + } else { + + if (pa_tagstruct_getu32(t, &idx) < 0 || + pa_tagstruct_getu32(t, &mode) < 0 || + pa_tagstruct_get_proplist(t, p) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + pa_proplist_free(p); + return; + } + } + + CHECK_VALIDITY(c->pstream, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, tag, PA_ERR_INVALID); + + if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) { + playback_stream *s; + + s = pa_idxset_get_by_index(c->output_streams, idx); + CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); + CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY); + + pa_proplist_update(s->sink_input->proplist, mode, p); + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index); + + } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) { + record_stream *s; + + s = pa_idxset_get_by_index(c->record_streams, idx); + CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); + + pa_proplist_update(s->source_output->proplist, mode, p); + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index); + } else { + pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST); + + pa_proplist_update(c->client->proplist, mode, p); + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index); + } + + pa_pstream_send_simple_ack(c->pstream, tag); +} + +static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { + connection *c = CONNECTION(userdata); + uint32_t idx; + unsigned changed = 0; + pa_proplist *p; + pa_strlist *l = NULL; + + connection_assert_ref(c); + pa_assert(t); + + CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); + + if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) { + + if (pa_tagstruct_getu32(t, &idx) < 0) { + protocol_error(c); + return; + } + } + + if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) { + playback_stream *s; + + s = pa_idxset_get_by_index(c->output_streams, idx); + CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); + CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY); + + p = s->sink_input->proplist; + + } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) { + record_stream *s; + + s = pa_idxset_get_by_index(c->record_streams, idx); + CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); + + p = s->source_output->proplist; + } else { + pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST); + + p = c->client->proplist; + } + + for (;;) { + const char *k; + + if (pa_tagstruct_gets(t, &k) < 0) { + protocol_error(c); + pa_strlist_free(l); + return; + } + + if (!k) + break; + + l = pa_strlist_prepend(l, k); + } + + if (!pa_tagstruct_eof(t)) { + protocol_error(c); + pa_strlist_free(l); + return; + } + + for (;;) { + char *z; + + l = pa_strlist_pop(l, &z); + + if (!z) + break; + + changed += pa_proplist_unset(p, z) >= 0; + pa_xfree(z); + } + + pa_pstream_send_simple_ack(c->pstream, tag); + + if (changed) { + if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) { + playback_stream *s; + + s = pa_idxset_get_by_index(c->output_streams, idx); + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index); + + } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) { + record_stream *s; + + s = pa_idxset_get_by_index(c->record_streams, idx); + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index); + + } else { + pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST); + pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index); + } + } +} + static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { connection *c = CONNECTION(userdata); const char *s; @@ -3246,11 +3559,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo c->parent.parent.free = connection_free; c->parent.process_msg = connection_process_msg; - c->authorized = !!p->public; + c->authorized = p->public; if (!c->authorized && p->auth_ip_acl && pa_ip_acl_check(p->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) { pa_log_info("Client authenticated by IP ACL."); - c->authorized = 1; + c->authorized = TRUE; } if (!c->authorized) { @@ -3268,7 +3581,7 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo c->client = pa_client_new(p->core, __FILE__, cname); c->client->kill = client_kill_cb; c->client->userdata = c; - c->client->owner = p->module; + c->client->module = p->module; c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool); @@ -3301,12 +3614,12 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo static int load_key(pa_protocol_native*p, const char*fn) { pa_assert(p); - p->auth_cookie_in_property = 0; + p->auth_cookie_in_property = FALSE; if (!fn && pa_authkey_prop_get(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME, p->auth_cookie, sizeof(p->auth_cookie)) >= 0) { pa_log_info("using already loaded auth cookie."); pa_authkey_prop_ref(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME); - p->auth_cookie_in_property = 1; + p->auth_cookie_in_property = TRUE; return 0; } @@ -3319,7 +3632,7 @@ static int load_key(pa_protocol_native*p, const char*fn) { pa_log_info("loading cookie from disk."); if (pa_authkey_prop_put(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME, p->auth_cookie, sizeof(p->auth_cookie)) >= 0) - p->auth_cookie_in_property = 1; + p->auth_cookie_in_property = TRUE; return 0; } diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 777def30..3ee2a058 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -343,7 +343,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int } /* Called from thread context */ -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { connection *c; int r; @@ -352,34 +352,25 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun connection_assert_ref(c); pa_assert(chunk); - r = pa_memblockq_peek(c->input_memblockq, chunk); + if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0) { -/* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */ + if (c->dead && pa_sink_input_safe_to_remove(i)) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); - if (c->dead && r < 0) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + } else { + size_t old, new; - return r; -} - -/* Called from thread context */ -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { - connection *c; - size_t old, new; - - pa_assert(i); - c = CONNECTION(i->userdata); - connection_assert_ref(c); - pa_assert(length); + old = pa_memblockq_missing(c->input_memblockq); + pa_memblockq_drop(c->input_memblockq, chunk->length); + new = pa_memblockq_missing(c->input_memblockq); - old = pa_memblockq_missing(c->input_memblockq); - pa_memblockq_drop(c->input_memblockq, length); - new = pa_memblockq_missing(c->input_memblockq); - - if (new > old) { - if (pa_atomic_add(&c->playback.missing, new - old) <= 0) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + if (new > old) { + if (pa_atomic_add(&c->playback.missing, new - old) <= 0) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + } } + + return r; } /* Called from main context */ @@ -477,29 +468,38 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname)); - c->client->owner = p->module; + c->client->module = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; if (p->mode & PLAYBACK) { pa_sink_input_new_data data; size_t l; + pa_sink *sink; + + if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, TRUE))) { + pa_log("Failed to get sink."); + goto fail; + } pa_sink_input_new_data_init(&data); data.driver = __FILE__; - data.name = c->client->name; - pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec); data.module = p->module; data.client = c->client; + data.sink = sink; + pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist); + pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec); - if (!(c->sink_input = pa_sink_input_new(p->core, &data, 0))) { + c->sink_input = pa_sink_input_new(p->core, &data, 0); + pa_sink_input_new_data_done(&data); + + if (!c->sink_input) { pa_log("Failed to create sink input."); goto fail; } c->sink_input->parent.process_msg = sink_input_process_msg; - c->sink_input->peek = sink_input_peek_cb; - c->sink_input->drop = sink_input_drop_cb; + c->sink_input->pop = sink_input_pop_cb; c->sink_input->kill = sink_input_kill_cb; c->sink_input->userdata = c; @@ -511,6 +511,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_frame_size(&p->sample_spec), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, + 0, NULL); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; @@ -523,15 +524,25 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) if (p->mode & RECORD) { pa_source_output_new_data data; size_t l; + pa_source *source; + + if (!(source = pa_namereg_get(c->protocol->core, c->protocol->source_name, PA_NAMEREG_SOURCE, TRUE))) { + pa_log("Failed to get source."); + goto fail; + } pa_source_output_new_data_init(&data); data.driver = __FILE__; - data.name = c->client->name; - pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec); data.module = p->module; data.client = c->client; + data.source = source; + pa_proplist_update(data.proplist, PA_UPDATE_MERGE, c->client->proplist); + pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec); + + c->source_output = pa_source_output_new(p->core, &data, 0); + pa_source_output_new_data_done(&data); - if (!(c->source_output = pa_source_output_new(p->core, &data, 0))) { + if (!c->source_output) { pa_log("Failed to create source output."); goto fail; } @@ -548,6 +559,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_frame_size(&p->sample_spec), 1, 0, + 0, NULL); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 9d32a363..97ac2b08 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -374,7 +374,7 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa i = pa_xnew(struct item_info, 1); i->type = PA_PSTREAM_ITEM_MEMBLOCK; - n = MIN(length, bsm); + n = PA_MIN(length, bsm); i->chunk.index = chunk->index + idx; i->chunk.length = n; i->chunk.memblock = pa_memblock_ref(chunk->memblock); diff --git a/src/pulsecore/resampler.c b/src/pulsecore/resampler.c index fe7f1ad2..3d7e5364 100644 --- a/src/pulsecore/resampler.c +++ b/src/pulsecore/resampler.c @@ -47,7 +47,7 @@ #include "resampler.h" /* Number of samples of extra space we allow the resamplers to return */ -#define EXTRA_SAMPLES 128 +#define EXTRA_FRAMES 128 struct pa_resampler { pa_resample_method_t method; @@ -79,6 +79,15 @@ struct pa_resampler { unsigned i_counter; } trivial; + struct { /* data specific to the peak finder pseudo resampler */ + unsigned o_counter; + unsigned i_counter; + + float max_f[PA_CHANNELS_MAX]; + int16_t max_i[PA_CHANNELS_MAX]; + + } peaks; + #ifdef HAVE_LIBSAMPLERATE struct { /* data specific to libsamplerate */ SRC_STATE *state; @@ -99,6 +108,7 @@ static int copy_init(pa_resampler *r); static int trivial_init(pa_resampler*r); static int speex_init(pa_resampler*r); static int ffmpeg_init(pa_resampler*r); +static int peaks_init(pa_resampler*r); #ifdef HAVE_LIBSAMPLERATE static int libsamplerate_init(pa_resampler*r); #endif @@ -144,7 +154,8 @@ static int (* const init_table[])(pa_resampler*r) = { [PA_RESAMPLER_SPEEX_FIXED_BASE+10] = speex_init, [PA_RESAMPLER_FFMPEG] = ffmpeg_init, [PA_RESAMPLER_AUTO] = NULL, - [PA_RESAMPLER_COPY] = copy_init + [PA_RESAMPLER_COPY] = copy_init, + [PA_RESAMPLER_PEAKS] = peaks_init, }; static inline size_t sample_size(pa_sample_format_t f) { @@ -242,9 +253,9 @@ pa_resampler* pa_resampler_new( if ((method >= PA_RESAMPLER_SPEEX_FIXED_BASE && method <= PA_RESAMPLER_SPEEX_FIXED_MAX) || (method == PA_RESAMPLER_FFMPEG)) r->work_format = PA_SAMPLE_S16NE; - else if (method == PA_RESAMPLER_TRIVIAL || method == PA_RESAMPLER_COPY) { + else if (method == PA_RESAMPLER_TRIVIAL || method == PA_RESAMPLER_COPY || method == PA_RESAMPLER_PEAKS) { - if (r->map_required || a->format != b->format) { + if (r->map_required || a->format != b->format || method == PA_RESAMPLER_PEAKS) { if (a->format == PA_SAMPLE_S32NE || a->format == PA_SAMPLE_S32RE || a->format == PA_SAMPLE_FLOAT32NE || a->format == PA_SAMPLE_FLOAT32RE || @@ -347,6 +358,12 @@ size_t pa_resampler_request(pa_resampler *r, size_t out_length) { return (((out_length / r->o_fz)*r->i_ss.rate)/r->o_ss.rate) * r->i_fz; } +size_t pa_resampler_result(pa_resampler *r, size_t in_length) { + pa_assert(r); + + return (((in_length / r->i_fz)*r->o_ss.rate)/r->i_ss.rate) * r->o_fz; +} + size_t pa_resampler_max_block_size(pa_resampler *r) { size_t block_size_max; pa_sample_spec ss; @@ -358,22 +375,17 @@ size_t pa_resampler_max_block_size(pa_resampler *r) { /* We deduce the "largest" sample spec we're using during the * conversion */ - ss = r->i_ss; - if (r->o_ss.channels > ss.channels) - ss.channels = r->o_ss.channels; + ss.channels = PA_MAX(r->i_ss.channels, r->o_ss.channels); /* We silently assume that the format enum is ordered by size */ - if (r->o_ss.format > ss.format) - ss.format = r->o_ss.format; - if (r->work_format > ss.format) - ss.format = r->work_format; + ss.format = PA_MAX(r->i_ss.format, r->o_ss.format); + ss.format = PA_MAX(ss.format, r->work_format); - if (r->o_ss.rate > ss.rate) - ss.rate = r->o_ss.rate; + ss.rate = PA_MAX(r->i_ss.rate, r->o_ss.rate); fs = pa_frame_size(&ss); - return (((block_size_max/fs + EXTRA_SAMPLES)*r->i_ss.rate)/ss.rate)*r->i_fz; + return (((block_size_max/fs - EXTRA_FRAMES)*r->i_ss.rate)/ss.rate)*r->i_fz; } void pa_resampler_reset(pa_resampler *r) { @@ -420,7 +432,8 @@ static const char * const resample_methods[] = { "speex-fixed-10", "ffmpeg", "auto", - "copy" + "copy", + "peaks" }; const char *pa_resample_method_to_string(pa_resample_method_t m) { @@ -1069,7 +1082,7 @@ static pa_memchunk *resample(pa_resampler *r, pa_memchunk *input) { in_n_samples = input->length / r->w_sz; in_n_frames = in_n_samples / r->o_ss.channels; - out_n_frames = ((in_n_frames*r->o_ss.rate)/r->i_ss.rate)+EXTRA_SAMPLES; + out_n_frames = ((in_n_frames*r->o_ss.rate)/r->i_ss.rate)+EXTRA_FRAMES; out_n_samples = out_n_frames * r->o_ss.channels; r->buf3.index = 0; @@ -1400,6 +1413,114 @@ static int trivial_init(pa_resampler*r) { return 0; } +/* Peak finder implementation */ + +static void peaks_resample(pa_resampler *r, const pa_memchunk *input, unsigned in_n_frames, pa_memchunk *output, unsigned *out_n_frames) { + size_t fz; + unsigned o_index; + void *src, *dst; + unsigned start = 0; + + pa_assert(r); + pa_assert(input); + pa_assert(output); + pa_assert(out_n_frames); + + fz = r->w_sz * r->o_ss.channels; + + src = (uint8_t*) pa_memblock_acquire(input->memblock) + input->index; + dst = (uint8_t*) pa_memblock_acquire(output->memblock) + output->index; + + for (o_index = 0;; o_index++, r->peaks.o_counter++) { + unsigned j; + + j = ((r->peaks.o_counter * r->i_ss.rate) / r->o_ss.rate); + j = j > r->peaks.i_counter ? j - r->peaks.i_counter : 0; + + if (j >= in_n_frames) + break; + + pa_assert(o_index * fz < pa_memblock_get_length(output->memblock)); + + + if (r->work_format == PA_SAMPLE_S16NE) { + unsigned i, c; + int16_t *s = (int16_t*) ((uint8_t*) src + fz * j); + int16_t *d = (int16_t*) ((uint8_t*) dst + fz * o_index); + + for (i = start; i <= j; i++) + for (c = 0; c < r->o_ss.channels; c++, s++) { + int16_t n; + + n = *s < 0 ? -*s : *s; + + if (n > r->peaks.max_i[c]) + r->peaks.max_i[c] = n; + } + + for (c = 0; c < r->o_ss.channels; c++, d++) + *d = r->peaks.max_i[c]; + + memset(r->peaks.max_i, 0, sizeof(r->peaks.max_i)); + } else { + unsigned i, c; + float *s = (float*) ((uint8_t*) src + fz * j); + float *d = (float*) ((uint8_t*) dst + fz * o_index); + + pa_assert(r->work_format == PA_SAMPLE_FLOAT32NE); + + for (i = start; i <= j; i++) + for (c = 0; c < r->o_ss.channels; c++, s++) { + float n = fabsf(*s); + + if (n > r->peaks.max_f[c]) + r->peaks.max_f[c] = n; + } + + for (c = 0; c < r->o_ss.channels; c++, d++) + *d = r->peaks.max_f[c]; + + memset(r->peaks.max_f, 0, sizeof(r->peaks.max_f)); + } + } + + pa_memblock_release(input->memblock); + pa_memblock_release(output->memblock); + + *out_n_frames = o_index; + + r->peaks.i_counter += in_n_frames; + + /* Normalize counters */ + while (r->peaks.i_counter >= r->i_ss.rate) { + pa_assert(r->peaks.o_counter >= r->o_ss.rate); + + r->peaks.i_counter -= r->i_ss.rate; + r->peaks.o_counter -= r->o_ss.rate; + } +} + +static void peaks_update_rates_or_reset(pa_resampler *r) { + pa_assert(r); + + r->peaks.i_counter = 0; + r->peaks.o_counter = 0; +} + +static int peaks_init(pa_resampler*r) { + pa_assert(r); + + r->peaks.o_counter = r->peaks.i_counter = 0; + memset(r->peaks.max_i, 0, sizeof(r->peaks.max_i)); + memset(r->peaks.max_f, 0, sizeof(r->peaks.max_f)); + + r->impl_resample = peaks_resample; + r->impl_update_rates = peaks_update_rates_or_reset; + r->impl_reset = peaks_update_rates_or_reset; + + return 0; +} + /*** ffmpeg based implementation ***/ static void ffmpeg_resample(pa_resampler *r, const pa_memchunk *input, unsigned in_n_frames, pa_memchunk *output, unsigned *out_n_frames) { diff --git a/src/pulsecore/resampler.h b/src/pulsecore/resampler.h index 82d01082..8534f5b5 100644 --- a/src/pulsecore/resampler.h +++ b/src/pulsecore/resampler.h @@ -46,6 +46,7 @@ typedef enum pa_resample_method { PA_RESAMPLER_FFMPEG, PA_RESAMPLER_AUTO, /* automatic select based on sample format */ PA_RESAMPLER_COPY, + PA_RESAMPLER_PEAKS, PA_RESAMPLER_MAX } pa_resample_method_t; @@ -69,6 +70,9 @@ void pa_resampler_free(pa_resampler *r); /* Returns the size of an input memory block which is required to return the specified amount of output data */ size_t pa_resampler_request(pa_resampler *r, size_t out_length); +/* Inverse of pa_resampler_request() */ +size_t pa_resampler_result(pa_resampler *r, size_t in_length); + /* Returns the maximum size of input blocks we can process without needing bounce buffers larger than the mempool tile size. */ size_t pa_resampler_max_block_size(pa_resampler *r); diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c index 83008266..f7773be3 100644 --- a/src/pulsecore/rtpoll.c +++ b/src/pulsecore/rtpoll.c @@ -99,7 +99,7 @@ struct pa_rtpoll_item { PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree); -static void signal_handler_noop(int s) { } +static void signal_handler_noop(int s) { write(2, "signal\n", 7); } pa_rtpoll *pa_rtpoll_new(void) { pa_rtpoll *p; diff --git a/src/pulsecore/sample-util.c b/src/pulsecore/sample-util.c index 4ea5d446..4a532f29 100644 --- a/src/pulsecore/sample-util.c +++ b/src/pulsecore/sample-util.c @@ -43,26 +43,25 @@ #define PA_SILENCE_MAX (PA_PAGE_SIZE*16) pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length) { - size_t fs; + pa_memblock *b; + pa_assert(pool); pa_assert(spec); if (length <= 0) - length = pa_bytes_per_second(spec)/20; /* 50 ms */ + length = PA_MIN(pa_bytes_per_second(spec)/20, /* 50 ms */ + pa_mempool_block_size_max(pool)); if (length > PA_SILENCE_MAX) length = PA_SILENCE_MAX; - fs = pa_frame_size(spec); + length = pa_frame_align(length, spec); - length = (length+fs-1)/fs; - - if (length <= 0) - length = 1; + b = pa_silence_memblock(pa_memblock_new(pool, length), spec); - length *= fs; + pa_memblock_set_is_silence(b, TRUE); - return pa_silence_memblock(pa_memblock_new(pool, length), spec); + return b; } pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) { @@ -74,6 +73,7 @@ pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) { data = pa_memblock_acquire(b); pa_silence_memory(data, pa_memblock_get_length(b), spec); pa_memblock_release(b); + return b; } @@ -631,6 +631,9 @@ void pa_volume_memchunk( pa_assert(c->length % pa_frame_size(spec) == 0); pa_assert(volume); + if (pa_memblock_is_silence(c->memblock)) + return; + if (pa_cvolume_channels_equal_to(volume, PA_VOLUME_NORM)) return; diff --git a/src/pulsecore/shmasyncq.c b/src/pulsecore/shmasyncq.c new file mode 100644 index 00000000..7af2985c --- /dev/null +++ b/src/pulsecore/shmasyncq.c @@ -0,0 +1,222 @@ +/* $Id$ */ + +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "fdsem.h" + +/* For debugging purposes we can define _Y to put and extra thread + * yield between each operation. */ + +/* #define PROFILE */ + +#ifdef PROFILE +#define _Y pa_thread_yield() +#else +#define _Y do { } while(0) +#endif + + +struct pa_shmasyncq { + pa_fdsem *read_fdsem, *write_fdsem; + pa_shmasyncq_data *data; +}; + +static int is_power_of_two(unsigned size) { + return !(size & (size - 1)); +} + +static int reduce(pa_shmasyncq *l, int value) { + return value & (unsigned) (l->n_elements - 1); +} + +static pa_atomic_t* get_cell(pa_shmasyncq *l, unsigned i) { + pa_assert(i < l->data->n_elements); + + return (pa_atomic_t*) ((uint8*t) l->data + PA_ALIGN(sizeof(pa_shmasyncq_data)) + i * (PA_ALIGN(sizeof(pa_atomic_t)) + PA_ALIGN(element_size))) +} + +static void *get_cell_data(pa_atomic_t *a) { + return (uint8_t*) a + PA_ALIGN(sizeof(atomic_t)); +} + +pa_shmasyncq *pa_shmasyncq_new(unsigned n_elements, size_t element_size, void *data, int fd[2]) { + pa_shmasyncq *l; + + pa_assert(n_elements > 0); + pa_assert(is_power_of_two(n_elements)); + pa_assert(element_size > 0); + pa_assert(data); + pa_assert(fd); + + l = pa_xnew(pa_shmasyncq, 1); + + l->data = data; + memset(data, 0, PA_SHMASYNCQ_SIZE(n_elements, element_size)); + + l->data->n_elements = n_elements; + l->data->element_size = element_size; + + if (!(l->read_fdsem = pa_fdsem_new_shm(&d->read_fdsem_data, &fd[0]))) { + pa_xfree(l); + return NULL; + } + + if (!(l->write_fdsem = pa_fdsem_new(&d->write_fdsem_data, &fd[1]))) { + pa_fdsem_free(l->read_fdsem); + pa_xfree(l); + return NULL; + } + + return l; +} + +void pa_shmasyncq_free(pa_shmasyncq *l, pa_free_cb_t free_cb) { + pa_assert(l); + + if (free_cb) { + void *p; + + while ((p = pa_shmasyncq_pop(l, 0))) + free_cb(p); + } + + pa_fdsem_free(l->read_fdsem); + pa_fdsem_free(l->write_fdsem); + pa_xfree(l); +} + +int pa_shmasyncq_push(pa_shmasyncq*l, void *p, int wait) { + int idx; + pa_atomic_ptr_t *cells; + + pa_assert(l); + pa_assert(p); + + cells = PA_SHMASYNCQ_CELLS(l); + + _Y; + idx = reduce(l, l->write_idx); + + if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { + + if (!wait) + return -1; + +/* pa_log("sleeping on push"); */ + + do { + pa_fdsem_wait(l->read_fdsem); + } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)); + } + + _Y; + l->write_idx++; + + pa_fdsem_post(l->write_fdsem); + + return 0; +} + +void* pa_shmasyncq_pop(pa_shmasyncq*l, int wait) { + int idx; + void *ret; + pa_atomic_ptr_t *cells; + + pa_assert(l); + + cells = PA_SHMASYNCQ_CELLS(l); + + _Y; + idx = reduce(l, l->read_idx); + + if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { + + if (!wait) + return NULL; + +/* pa_log("sleeping on pop"); */ + + do { + pa_fdsem_wait(l->write_fdsem); + } while (!(ret = pa_atomic_ptr_load(&cells[idx]))); + } + + pa_assert(ret); + + /* Guaranteed to succeed if we only have a single reader */ + pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL)); + + _Y; + l->read_idx++; + + pa_fdsem_post(l->read_fdsem); + + return ret; +} + +int pa_shmasyncq_get_fd(pa_shmasyncq *q) { + pa_assert(q); + + return pa_fdsem_get(q->write_fdsem); +} + +int pa_shmasyncq_before_poll(pa_shmasyncq *l) { + int idx; + pa_atomic_ptr_t *cells; + + pa_assert(l); + + cells = PA_SHMASYNCQ_CELLS(l); + + _Y; + idx = reduce(l, l->read_idx); + + for (;;) { + if (pa_atomic_ptr_load(&cells[idx])) + return -1; + + if (pa_fdsem_before_poll(l->write_fdsem) >= 0) + return 0; + } + + return 0; +} + +void pa_shmasyncq_after_poll(pa_shmasyncq *l) { + pa_assert(l); + + pa_fdsem_after_poll(l->write_fdsem); +} diff --git a/src/pulsecore/shmasyncq.h b/src/pulsecore/shmasyncq.h new file mode 100644 index 00000000..ca35ffd2 --- /dev/null +++ b/src/pulsecore/shmasyncq.h @@ -0,0 +1,62 @@ +#ifndef foopulseshmasyncqhfoo +#define foopulseshmasyncqhfoo + +/* $Id$ */ + +/*** + This file is part of PulseAudio. + + Copyright 2004-2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#include + +#include +#include + +/* Similar to pa_asyncq, but stores data in a shared memory segment */ + + +struct pa_shmasyncq_data { + unsigned n_elements; + size_t element_size; + unsigned read_idx; + unsigned write_idx; + pa_fdsem_data read_fdsem_data, write_fdsem_data; +}; + +#define PA_SHMASYNCQ_DEFAULT_N_ELEMENTS 128 +#define PA_SHMASYNCQ_SIZE(n_elements, element_size) (PA_ALIGN(sizeof(pa_shmasyncq_data)) + (((n_elements) * (PA_ALIGN(sizeof(pa_atomic_t)) + PA_ALIGN(element_size))))) +#define PA_SHMASYNCQ_DEFAULT_SIZE(element_size) PA_SHMASYNCQ_SIZE(PA_SHMASYNCQ_DEFAULT_N_ELEMENTS, element_size) + +typedef struct pa_shmasyncq pa_shmasyncq; + +pa_shmasyncq *pa_shmasyncq_new(unsigned n_elements, size_t element_size, void *data, int fd[2]); +void pa_shmasyncq_free(pa_shmasyncq* q, pa_free_cb_t free_cb); + +void* pa_shmasyncq_pop_begin(pa_shmasyncq *q, pa_bool_t wait); +void pa_shmasyncq_pop_commit(pa_shmasyncq *q); + +int* pa_shmasyncq_push_begin(pa_shmasyncq *q, pa_bool_t wait); +void pa_shmasyncq_push_commit(pa_shmasyncq *q); + +int pa_shmasyncq_get_fd(pa_shmasyncq *q); +int pa_shmasyncq_before_poll(pa_shmasyncq *a); +void pa_shmasyncq_after_poll(pa_shmasyncq *a); + +#endif diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 07ddb83a..e15aa7bb 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -41,8 +41,8 @@ #include "sink-input.h" +#define MEMBLOCKQ_MAXLENGTH (16*1024*1024) #define CONVERT_BUFFER_LENGTH (PA_PAGE_SIZE) -#define SILENCE_BUFFER_LENGTH (PA_PAGE_SIZE*12) #define MOVE_BUFFER_LENGTH (PA_PAGE_SIZE*256) static PA_DEFINE_CHECK_TYPE(pa_sink_input, pa_msgobject); @@ -54,10 +54,18 @@ pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data memset(data, 0, sizeof(*data)); data->resample_method = PA_RESAMPLER_INVALID; + data->proplist = pa_proplist_new(); return data; } +void pa_sink_input_new_data_set_sample_spec(pa_sink_input_new_data *data, const pa_sample_spec *spec) { + pa_assert(data); + + if ((data->sample_spec_is_set = !!spec)) + data->sample_spec = *spec; +} + void pa_sink_input_new_data_set_channel_map(pa_sink_input_new_data *data, const pa_channel_map *map) { pa_assert(data); @@ -72,18 +80,17 @@ void pa_sink_input_new_data_set_volume(pa_sink_input_new_data *data, const pa_cv data->volume = *volume; } -void pa_sink_input_new_data_set_sample_spec(pa_sink_input_new_data *data, const pa_sample_spec *spec) { +void pa_sink_input_new_data_set_muted(pa_sink_input_new_data *data, pa_bool_t mute) { pa_assert(data); - if ((data->sample_spec_is_set = !!spec)) - data->sample_spec = *spec; + data->muted_is_set = TRUE; + data->muted = !!mute; } -void pa_sink_input_new_data_set_muted(pa_sink_input_new_data *data, pa_bool_t mute) { +void pa_sink_input_new_data_done(pa_sink_input_new_data *data) { pa_assert(data); - data->muted_is_set = TRUE; - data->muted = !!mute; + pa_proplist_free(data->proplist); } pa_sink_input* pa_sink_input_new( @@ -94,6 +101,7 @@ pa_sink_input* pa_sink_input_new( pa_sink_input *i; pa_resampler *resampler = NULL; char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; + pa_memblock *silence; pa_assert(core); pa_assert(data); @@ -102,7 +110,6 @@ pa_sink_input* pa_sink_input_new( return NULL; pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver)); - pa_return_null_if_fail(!data->name || pa_utf8_valid(data->name)); if (!data->sink) data->sink = pa_namereg_get(core, NULL, PA_NAMEREG_SINK, 1); @@ -132,6 +139,9 @@ pa_sink_input* pa_sink_input_new( pa_return_null_if_fail(pa_cvolume_valid(&data->volume)); pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels); + if (!data->muted_is_set) + data->muted = FALSE; + if (flags & PA_SINK_INPUT_FIX_FORMAT) data->sample_spec.format = data->sink->sample_spec.format; @@ -150,9 +160,6 @@ pa_sink_input* pa_sink_input_new( if (data->volume.channels != data->sample_spec.channels) pa_cvolume_set(&data->volume, data->sample_spec.channels, pa_cvolume_avg(&data->volume)); - if (!data->muted_is_set) - data->muted = 0; - if (data->resample_method == PA_RESAMPLER_INVALID) data->resample_method = core->resample_method; @@ -192,7 +199,7 @@ pa_sink_input* pa_sink_input_new( i->core = core; i->state = PA_SINK_INPUT_INIT; i->flags = flags; - i->name = pa_xstrdup(data->name); + i->proplist = pa_proplist_copy(data->proplist); i->driver = pa_xstrdup(data->driver); i->module = data->module; i->sink = data->sink; @@ -215,8 +222,9 @@ pa_sink_input* pa_sink_input_new( } else i->sync_next = i->sync_prev = NULL; - i->peek = NULL; - i->drop = NULL; + i->pop = NULL; + i->rewind = NULL; + i->set_max_rewind = NULL; i->kill = NULL; i->get_latency = NULL; i->attach = NULL; @@ -226,22 +234,37 @@ pa_sink_input* pa_sink_input_new( i->userdata = NULL; i->thread_info.state = i->state; + i->thread_info.attached = FALSE; pa_atomic_store(&i->thread_info.drained, 1); + pa_atomic_store(&i->thread_info.render_memblockq_is_empty, 0); i->thread_info.sample_spec = i->sample_spec; - i->thread_info.silence_memblock = NULL; - i->thread_info.move_silence = 0; - pa_memchunk_reset(&i->thread_info.resampled_chunk); i->thread_info.resampler = resampler; i->thread_info.volume = i->volume; i->thread_info.muted = i->muted; - i->thread_info.attached = FALSE; + i->thread_info.requested_sink_latency = 0; + i->thread_info.rewrite_nbytes = 0; + i->thread_info.ignore_rewind = FALSE; + + silence = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, 0); + + i->thread_info.render_memblockq = pa_memblockq_new( + 0, + MEMBLOCKQ_MAXLENGTH, + 0, + pa_frame_size(&i->sink->sample_spec), + 0, + 1, + 0, + silence); + + pa_memblock_unref(silence); pa_assert_se(pa_idxset_put(core->sink_inputs, pa_sink_input_ref(i), &i->index) == 0); pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0); pa_log_info("Created input %u \"%s\" on %s with sample spec %s and channel map %s", i->index, - i->name, + pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), i->sink->name, pa_sample_spec_snprint(st, sizeof(st), &i->sample_spec), pa_channel_map_snprint(cm, sizeof(cm), &i->channel_map)); @@ -325,8 +348,9 @@ void pa_sink_input_unlink(pa_sink_input *i) { } else i->state = PA_SINK_INPUT_UNLINKED; - i->peek = NULL; - i->drop = NULL; + i->pop = NULL; + i->rewind = NULL; + i->set_max_rewind = NULL; i->kill = NULL; i->get_latency = NULL; i->attach = NULL; @@ -352,20 +376,19 @@ static void sink_input_free(pa_object *o) { if (PA_SINK_INPUT_LINKED(i->state)) pa_sink_input_unlink(i); - pa_log_info("Freeing output %u \"%s\"", i->index, i->name); + pa_log_info("Freeing input %u \"%s\"", i->index, pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME))); pa_assert(!i->thread_info.attached); - if (i->thread_info.resampled_chunk.memblock) - pa_memblock_unref(i->thread_info.resampled_chunk.memblock); + if (i->thread_info.render_memblockq) + pa_memblockq_free(i->thread_info.render_memblockq); if (i->thread_info.resampler) pa_resampler_free(i->thread_info.resampler); - if (i->thread_info.silence_memblock) - pa_memblock_unref(i->thread_info.silence_memblock); + if (i->proplist) + pa_proplist_free(i->proplist); - pa_xfree(i->name); pa_xfree(i->driver); pa_xfree(i); } @@ -374,8 +397,8 @@ void pa_sink_input_put(pa_sink_input *i) { pa_sink_input_assert_ref(i); pa_assert(i->state == PA_SINK_INPUT_INIT); - pa_assert(i->peek); - pa_assert(i->drop); + pa_assert(i->pop); + pa_assert(i->rewind); i->thread_info.state = i->state = i->flags & PA_SINK_INPUT_START_CORKED ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING; i->thread_info.volume = i->volume; @@ -384,8 +407,8 @@ void pa_sink_input_put(pa_sink_input *i) { if (i->state == PA_SINK_INPUT_CORKED) i->sink->n_corked++; - pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL); pa_sink_update_status(i->sink); + pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index); pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_PUT], i); @@ -419,85 +442,83 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i) { } /* Called from thread context */ -int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_cvolume *volume) { - int ret = -1; - int do_volume_adj_here; - int volume_is_norm; - size_t block_size_max; +int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa_memchunk *chunk, pa_cvolume *volume) { + pa_bool_t do_volume_adj_here; + pa_bool_t volume_is_norm; + size_t block_size_max_sink, block_size_max_sink_input; + size_t ilength; pa_sink_input_assert_ref(i); pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); - pa_assert(pa_frame_aligned(length, &i->sink->sample_spec)); + pa_assert(pa_frame_aligned(slength, &i->sink->sample_spec)); pa_assert(chunk); pa_assert(volume); - if (!i->peek || !i->drop || i->thread_info.state == PA_SINK_INPUT_CORKED) - goto finish; + pa_log_debug("peek"); + + if (!i->pop || i->thread_info.state == PA_SINK_INPUT_CORKED) + return -1; pa_assert(i->thread_info.state == PA_SINK_INPUT_RUNNING || i->thread_info.state == PA_SINK_INPUT_DRAINED); + /* If there's still some rewrite request the handle, but the sink + didn't do this for us, we do it here. However, since the sink + apparently doesn't support rewinding, we pass 0 here. This still + allows rewinding through the render buffer. */ + pa_sink_input_rewind(i, 0); + + block_size_max_sink_input = i->thread_info.resampler ? + pa_resampler_max_block_size(i->thread_info.resampler) : + pa_frame_align(pa_mempool_block_size_max(i->sink->core->mempool), &i->sample_spec); + + block_size_max_sink = pa_frame_align(pa_mempool_block_size_max(i->sink->core->mempool), &i->sink->sample_spec); + /* Default buffer size */ - if (length <= 0) - length = pa_frame_align(CONVERT_BUFFER_LENGTH, &i->sink->sample_spec); - - /* Make sure the buffer fits in the mempool tile */ - block_size_max = pa_mempool_block_size_max(i->sink->core->mempool); - if (length > block_size_max) - length = pa_frame_align(block_size_max, &i->sink->sample_spec); - - if (i->thread_info.move_silence > 0) { - size_t l; - - /* We have just been moved and shall play some silence for a - * while until the old sink has drained its playback buffer */ - - if (!i->thread_info.silence_memblock) - i->thread_info.silence_memblock = pa_silence_memblock_new( - i->sink->core->mempool, - &i->sink->sample_spec, - pa_frame_align(SILENCE_BUFFER_LENGTH, &i->sink->sample_spec)); - - chunk->memblock = pa_memblock_ref(i->thread_info.silence_memblock); - chunk->index = 0; - l = pa_memblock_get_length(chunk->memblock); - chunk->length = i->thread_info.move_silence < l ? i->thread_info.move_silence : l; - - ret = 0; - do_volume_adj_here = 1; - goto finish; - } + if (slength <= 0) + slength = pa_frame_align(CONVERT_BUFFER_LENGTH, &i->sink->sample_spec); - if (!i->thread_info.resampler) { - do_volume_adj_here = 0; /* FIXME??? */ - ret = i->peek(i, length, chunk); - goto finish; - } + if (slength > block_size_max_sink) + slength = block_size_max_sink; + + if (i->thread_info.resampler) { + ilength = pa_resampler_request(i->thread_info.resampler, slength); + + if (ilength <= 0) + ilength = pa_frame_align(CONVERT_BUFFER_LENGTH, &i->sample_spec); + } else + ilength = slength; + + if (ilength > block_size_max_sink_input) + ilength = block_size_max_sink_input; + + /* If the channel maps of the sink and this stream differ, we need + * to adjust the volume *before* we resample. Otherwise we can do + * it after and leave it for the sink code */ do_volume_adj_here = !pa_channel_map_equal(&i->channel_map, &i->sink->channel_map); volume_is_norm = pa_cvolume_is_norm(&i->thread_info.volume) && !i->thread_info.muted; - while (!i->thread_info.resampled_chunk.memblock) { + while (!pa_memblockq_is_readable(i->thread_info.render_memblockq)) { pa_memchunk tchunk; - size_t l, rmbs; - l = pa_resampler_request(i->thread_info.resampler, length); + /* There's nothing in our render queue. We need to fill it up + * with data from the implementor. */ - if (l <= 0) - l = pa_frame_align(CONVERT_BUFFER_LENGTH, &i->sample_spec); + if (i->pop(i, ilength, &tchunk) < 0) { + pa_atomic_store(&i->thread_info.drained, 1); - rmbs = pa_resampler_max_block_size(i->thread_info.resampler); - if (l > rmbs) - l = rmbs; + /* OK, we got no data from the implementor, so let's just skip ahead */ + pa_memblockq_seek(i->thread_info.render_memblockq, slength, PA_SEEK_RELATIVE_ON_READ); + break; + } - if ((ret = i->peek(i, l, &tchunk)) < 0) - goto finish; + pa_atomic_store(&i->thread_info.drained, 0); pa_assert(tchunk.length > 0); + pa_assert(tchunk.memblock); - if (tchunk.length > l) - tchunk.length = l; - - i->drop(i, tchunk.length); + if (tchunk.length > block_size_max_sink_input) + tchunk.length = block_size_max_sink_input; /* It might be necessary to adjust the volume here */ if (do_volume_adj_here && !volume_is_norm) { @@ -509,137 +530,146 @@ int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_c pa_volume_memchunk(&tchunk, &i->thread_info.sample_spec, &i->thread_info.volume); } - pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk); + if (!i->thread_info.resampler) + pa_memblockq_push_align(i->thread_info.render_memblockq, &tchunk); + else { + pa_memchunk rchunk; + pa_resampler_run(i->thread_info.resampler, &tchunk, &rchunk); + + if (rchunk.memblock) { + pa_memblockq_push_align(i->thread_info.render_memblockq, &rchunk); + pa_memblock_unref(rchunk.memblock); + } + } + pa_memblock_unref(tchunk.memblock); } - pa_assert(i->thread_info.resampled_chunk.memblock); - pa_assert(i->thread_info.resampled_chunk.length > 0); + pa_assert_se(pa_memblockq_peek(i->thread_info.render_memblockq, chunk) >= 0); - *chunk = i->thread_info.resampled_chunk; - pa_memblock_ref(i->thread_info.resampled_chunk.memblock); + pa_assert(chunk->length > 0); + pa_assert(chunk->memblock); - ret = 0; + if (chunk->length > block_size_max_sink) + chunk->length = block_size_max_sink; -finish: + /* Let's see if we had to apply the volume adjustment ourselves, + * or if this can be done by the sink for us */ - if (ret >= 0) - pa_atomic_store(&i->thread_info.drained, 0); - else if (ret < 0) - pa_atomic_store(&i->thread_info.drained, 1); - - if (ret >= 0) { - /* Let's see if we had to apply the volume adjustment - * ourselves, or if this can be done by the sink for us */ - - if (do_volume_adj_here) - /* We had different channel maps, so we already did the adjustment */ - pa_cvolume_reset(volume, i->sink->sample_spec.channels); - else if (i->thread_info.muted) - /* We've both the same channel map, so let's have the sink do the adjustment for us*/ - pa_cvolume_mute(volume, i->sink->sample_spec.channels); - else - *volume = i->thread_info.volume; - } + if (do_volume_adj_here) + /* We had different channel maps, so we already did the adjustment */ + pa_cvolume_reset(volume, i->sink->sample_spec.channels); + else if (i->thread_info.muted) + /* We've both the same channel map, so let's have the sink do the adjustment for us*/ + pa_cvolume_mute(volume, i->sink->sample_spec.channels); + else + *volume = i->thread_info.volume; + + pa_atomic_store(&i->thread_info.render_memblockq_is_empty, pa_memblockq_is_empty(i->thread_info.render_memblockq)); - return ret; + return 0; } /* Called from thread context */ -void pa_sink_input_drop(pa_sink_input *i, size_t length) { +void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec */) { + pa_sink_input_assert_ref(i); + pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); - pa_assert(pa_frame_aligned(length, &i->sink->sample_spec)); - pa_assert(length > 0); + pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec)); + pa_assert(nbytes > 0); - if (!i->peek || !i->drop || i->thread_info.state == PA_SINK_INPUT_CORKED) + if (i->thread_info.state == PA_SINK_INPUT_CORKED) return; - if (i->thread_info.move_silence > 0) { + /* If there's still some rewrite request the handle, but the sink + didn't do this for us, we do it here. However, since the sink + apparently doesn't support rewinding, we pass 0 here. This still + allows rewinding through the render buffer. */ + if (i->thread_info.rewrite_nbytes > 0) + pa_sink_input_rewind(i, 0); - if (i->thread_info.move_silence >= length) { - i->thread_info.move_silence -= length; - length = 0; - } else { - length -= i->thread_info.move_silence; - i->thread_info.move_silence = 0; - } - - if (i->thread_info.move_silence <= 0) { - if (i->thread_info.silence_memblock) { - pa_memblock_unref(i->thread_info.silence_memblock); - i->thread_info.silence_memblock = NULL; - } - } + pa_memblockq_drop(i->thread_info.render_memblockq, nbytes); - if (length <= 0) - return; - } + pa_atomic_store(&i->thread_info.render_memblockq_is_empty, pa_memblockq_is_empty(i->thread_info.render_memblockq)); +} - if (i->thread_info.resampled_chunk.memblock) { - size_t l = length; +/* Called from thread context */ +void pa_sink_input_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) { + pa_sink_input_assert_ref(i); - if (l > i->thread_info.resampled_chunk.length) - l = i->thread_info.resampled_chunk.length; + pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); + pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec)); - i->thread_info.resampled_chunk.index += l; - i->thread_info.resampled_chunk.length -= l; + pa_log_debug("rewind(%u, %u)", nbytes, i->thread_info.rewrite_nbytes); - if (i->thread_info.resampled_chunk.length <= 0) { - pa_memblock_unref(i->thread_info.resampled_chunk.memblock); - pa_memchunk_reset(&i->thread_info.resampled_chunk); - } + if (i->thread_info.state == PA_SINK_INPUT_CORKED) + return; - length -= l; + if (i->thread_info.ignore_rewind) { + i->thread_info.rewrite_nbytes = 0; + i->thread_info.ignore_rewind = FALSE; + return; } - if (length > 0) { + if (nbytes > 0) + pa_log_debug("Have to rewind %u bytes.", nbytes); - if (i->thread_info.resampler) { - /* So, we have a resampler. To avoid discontinuities we - * have to actually read all data that could be read and - * pass it through the resampler. */ + if (i->thread_info.rewrite_nbytes > 0) { + size_t max_rewrite; - while (length > 0) { - pa_memchunk chunk; - pa_cvolume volume; + /* Calculate how much make sense to rewrite at most */ + if ((max_rewrite = nbytes + pa_memblockq_get_length(i->thread_info.render_memblockq)) > 0) { + size_t amount, r; - if (pa_sink_input_peek(i, length, &chunk, &volume) >= 0) { - size_t l; + /* Transform into local domain */ + if (i->thread_info.resampler) + max_rewrite = pa_resampler_request(i->thread_info.resampler, max_rewrite); - pa_memblock_unref(chunk.memblock); + /* Calculate how much of the rewinded data should actually be rewritten */ + amount = PA_MIN(max_rewrite, i->thread_info.rewrite_nbytes); - l = chunk.length; - if (l > length) - l = length; + /* Convert back to to sink domain */ + r = i->thread_info.resampler ? pa_resampler_result(i->thread_info.resampler, amount) : amount; - pa_sink_input_drop(i, l); - length -= l; + /* Ok, now update the write pointer */ + pa_memblockq_seek(i->thread_info.render_memblockq, -r, PA_SEEK_RELATIVE); - } else { - size_t l; + /* Tell the implementor */ + if (i->rewind) + i->rewind(i, amount); - l = pa_resampler_request(i->thread_info.resampler, length); + /* And reset the resampler */ + if (i->thread_info.resampler) + pa_resampler_reset(i->thread_info.resampler); + } - /* Hmmm, peeking failed, so let's at least drop - * the right amount of data */ - if (l > 0) - if (i->drop) - i->drop(i, l); + i->thread_info.rewrite_nbytes = 0; + } - break; - } - } + if (nbytes > 0) + pa_memblockq_rewind(i->thread_info.render_memblockq, nbytes); +} - } else { +/* Called from thread context */ +void pa_sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */) { + pa_sink_input_assert_ref(i); - /* We have no resampler, hence let's just drop the data */ + pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); + pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec)); - if (i->drop) - i->drop(i, length); - } - } + pa_memblockq_set_maxrewind(i->thread_info.render_memblockq, nbytes); + + if (i->set_max_rewind) + i->set_max_rewind(i, i->thread_info.resampler ? pa_resampler_request(i->thread_info.resampler, nbytes) : nbytes); +} + +void pa_sink_input_set_requested_latency(pa_sink_input *i, pa_usec_t usec) { + pa_sink_input_assert_ref(i); + pa_assert(PA_SINK_INPUT_LINKED(i->state)); + + pa_asyncmsgq_post(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, (int64_t) usec, NULL, NULL); } void pa_sink_input_set_volume(pa_sink_input *i, const pa_cvolume *volume) { @@ -707,19 +737,24 @@ int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) { } void pa_sink_input_set_name(pa_sink_input *i, const char *name) { + const char *old; pa_sink_input_assert_ref(i); - if (!i->name && !name) + if (!name && !pa_proplist_contains(i->proplist, PA_PROP_MEDIA_NAME)) return; - if (i->name && name && !strcmp(i->name, name)) + old = pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME); + + if (old && name && !strcmp(old, name)) return; - pa_xfree(i->name); - i->name = pa_xstrdup(name); + if (name) + pa_proplist_sets(i->proplist, PA_PROP_MEDIA_NAME, name); + else + pa_proplist_unset(i->proplist, PA_PROP_MEDIA_NAME); if (PA_SINK_INPUT_LINKED(i->state)) { - pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_NAME_CHANGED], i); + pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_PROPLIST_CHANGED], i); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); } } @@ -829,20 +864,29 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { /* Okey, let's move it */ if (info.buffer_bytes > 0) { + pa_proplist *p; + + p = pa_proplist_new(); + pa_proplist_sets(p, PA_PROP_MEDIA_NAME, "Ghost For Moved Stream"); + pa_proplist_sets(p, PA_PROP_MEDIA_ROLE, "routing"); info.ghost_sink_input = pa_memblockq_sink_input_new( origin, - "Ghost Stream", &origin->sample_spec, &origin->channel_map, NULL, - NULL); + NULL, + p); + + pa_proplist_free(p); - info.ghost_sink_input->thread_info.state = info.ghost_sink_input->state = PA_SINK_INPUT_RUNNING; - info.ghost_sink_input->thread_info.volume = info.ghost_sink_input->volume; - info.ghost_sink_input->thread_info.muted = info.ghost_sink_input->muted; + if (info.ghost_sink_input) { + info.ghost_sink_input->thread_info.state = info.ghost_sink_input->state = PA_SINK_INPUT_RUNNING; + info.ghost_sink_input->thread_info.volume = info.ghost_sink_input->volume; + info.ghost_sink_input->thread_info.muted = info.ghost_sink_input->muted; - info.buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL); + info.buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, 0, NULL); + } } } @@ -867,34 +911,26 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { /* Replace resampler */ if (new_resampler != i->thread_info.resampler) { + pa_memblock *silence; + if (i->thread_info.resampler) pa_resampler_free(i->thread_info.resampler); i->thread_info.resampler = new_resampler; /* if the resampler changed, the silence memblock is * probably invalid now, too */ - if (i->thread_info.silence_memblock) { - pa_memblock_unref(i->thread_info.silence_memblock); - i->thread_info.silence_memblock = NULL; - } - } - /* Dump already resampled data */ - if (i->thread_info.resampled_chunk.memblock) { - /* Hmm, this data has already been added to the ghost queue, presumably, hence let's sleep a little bit longer */ - silence_usec += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &origin->sample_spec); - pa_memblock_unref(i->thread_info.resampled_chunk.memblock); - pa_memchunk_reset(&i->thread_info.resampled_chunk); + silence = pa_silence_memblock_new(i->sink->core->mempool, &dest->sample_spec, new_resampler ? pa_resampler_max_block_size(new_resampler) : 0); + pa_memblockq_set_silence(i->thread_info.render_memblockq, silence); + pa_memblock_unref(silence); + } + pa_memblockq_flush(i->thread_info.render_memblockq); + /* Calculate the new sleeping time */ - if (immediately) - i->thread_info.move_silence = 0; - else - i->thread_info.move_silence = pa_usec_to_bytes( - pa_bytes_to_usec(i->thread_info.move_silence, &origin->sample_spec) + - silence_usec, - &dest->sample_spec); + if (!immediately) + pa_memblockq_seek(i->thread_info.render_memblockq, pa_usec_to_bytes(silence_usec, &dest->sample_spec), PA_SEEK_RELATIVE); pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL); @@ -924,20 +960,18 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t switch (code) { case PA_SINK_INPUT_MESSAGE_SET_VOLUME: i->thread_info.volume = *((pa_cvolume*) userdata); + pa_sink_input_request_rewrite(i, 0); return 0; case PA_SINK_INPUT_MESSAGE_SET_MUTE: i->thread_info.muted = PA_PTR_TO_UINT(userdata); + pa_sink_input_request_rewrite(i, 0); return 0; case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = userdata; - if (i->thread_info.resampled_chunk.memblock) - *r += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &i->sink->sample_spec); - - if (i->thread_info.move_silence) - *r += pa_bytes_to_usec(i->thread_info.move_silence, &i->sink->sample_spec); + *r += pa_bytes_to_usec(pa_memblockq_get_length(i->thread_info.render_memblockq), &i->sink->sample_spec); return 0; } @@ -974,6 +1008,13 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t return 0; } + + case PA_SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: + + i->thread_info.requested_sink_latency = (pa_usec_t) offset; + pa_sink_invalidate_requested_latency(i->sink); + + return 0; } return -1; @@ -987,3 +1028,38 @@ pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i) { return i->state; } + +pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i) { + pa_sink_input_assert_ref(i); + + if (i->state == PA_SINK_INPUT_RUNNING || i->state == PA_SINK_INPUT_DRAINED || i->state == PA_SINK_INPUT_CORKED) + return pa_atomic_load(&i->thread_info.render_memblockq_is_empty); + + return TRUE; +} + +void pa_sink_input_request_rewrite(pa_sink_input *i, size_t nbytes /* in our sample spec */) { + size_t l, lbq; + + pa_sink_input_assert_ref(i); + + lbq = pa_memblockq_get_length(i->thread_info.render_memblockq); + + if (nbytes <= 0) { + nbytes = + i->thread_info.resampler ? + pa_resampler_request(i->thread_info.resampler, i->sink->thread_info.max_rewind + lbq) : + (i->sink->thread_info.max_rewind + lbq); + } + + i->thread_info.rewrite_nbytes = PA_MAX(nbytes, i->thread_info.rewrite_nbytes); + + /* Transform to sink domain */ + l = i->thread_info.resampler ? pa_resampler_result(i->thread_info.resampler, nbytes) : nbytes; + + if (l <= 0) + return; + + if (l > lbq) + pa_sink_request_rewind(i->sink, l - lbq); +} diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 8975db9e..c74b8912 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -73,7 +73,8 @@ struct pa_sink_input { pa_sink_input_state_t state; pa_sink_input_flags_t flags; - char *name, *driver; /* may be NULL */ + pa_proplist *proplist; + char *driver; /* may be NULL */ pa_module *module; /* may be NULL */ pa_client *client; /* may be NULL */ @@ -87,17 +88,26 @@ struct pa_sink_input { pa_cvolume volume; pa_bool_t muted; - /* Returns the chunk of audio data (but doesn't drop it - * yet!). Returns -1 on failure. Called from IO thread context. If - * data needs to be generated from scratch then please in the - * specified length. This is an optimization only. If less data is - * available, it's fine to return a smaller block. If more data is - * already ready, it is better to return the full block.*/ - int (*peek) (pa_sink_input *i, size_t length, pa_memchunk *chunk); + pa_resample_method_t resample_method; - /* Drops the specified number of bytes, usually called right after - * peek(), but not necessarily. Called from IO thread context. */ - void (*drop) (pa_sink_input *i, size_t length); + /* Returns the chunk of audio data and drops it from the + * queue. Returns -1 on failure. Called from IO thread context. If + * data needs to be generated from scratch then please in the + * specified length request_nbytes. This is an optimization + * only. If less data is available, it's fine to return a smaller + * block. If more data is already ready, it is better to return + * the full block. */ + int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); + + /* Rewind the queue by the specified number of bytes. Called just + * before peek() if it is called at all. Only called if the sink + * input driver ever plans to call + * pa_sink_input_request_rewrite(). Called from IO context. */ + void (*rewind) (pa_sink_input *i, size_t nbytes); + + /* Called whenever the maximum rewindable size of the sink + * changes. Called from UI context. */ + void (*set_max_rewind) (pa_sink_input *i, size_t nbytes); /* may be NULL */ /* If non-NULL this function is called when the input is first * connected to a sink or when the rtpoll/asyncmsgq fields @@ -128,29 +138,28 @@ struct pa_sink_input { instead. */ pa_usec_t (*get_latency) (pa_sink_input *i); /* may be NULL */ - pa_resample_method_t resample_method; - struct { pa_sink_input_state_t state; - pa_atomic_t drained; + pa_atomic_t drained, render_memblockq_is_empty; pa_bool_t attached; /* True only between ->attach() and ->detach() calls */ pa_sample_spec sample_spec; - pa_memchunk resampled_chunk; pa_resampler *resampler; /* may be NULL */ - /* Some silence to play before the actual data. This is used to - * compensate for latency differences when moving a sink input - * "hot" between sinks. */ - size_t move_silence; - pa_memblock *silence_memblock; /* may be NULL */ + /* We maintain a history of resampled audio data here. */ + pa_memblockq *render_memblockq; + size_t rewrite_nbytes; + pa_bool_t ignore_rewind; pa_sink_input *sync_prev, *sync_next; pa_cvolume volume; pa_bool_t muted; + + /* The requested latency for the sink */ + pa_usec_t requested_sink_latency; } thread_info; void *userdata; @@ -165,11 +174,14 @@ enum { PA_SINK_INPUT_MESSAGE_GET_LATENCY, PA_SINK_INPUT_MESSAGE_SET_RATE, PA_SINK_INPUT_MESSAGE_SET_STATE, + PA_SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, PA_SINK_INPUT_MESSAGE_MAX }; typedef struct pa_sink_input_new_data { - const char *name, *driver; + pa_proplist *proplist; + + const char *driver; pa_module *module; pa_client *client; @@ -190,16 +202,17 @@ typedef struct pa_sink_input_new_data { pa_sink_input *sync_base; } pa_sink_input_new_data; -typedef struct pa_sink_input_move_hook_data { - pa_sink_input *sink_input; - pa_sink *destination; -} pa_sink_input_move_hook_data; - pa_sink_input_new_data* pa_sink_input_new_data_init(pa_sink_input_new_data *data); void pa_sink_input_new_data_set_sample_spec(pa_sink_input_new_data *data, const pa_sample_spec *spec); void pa_sink_input_new_data_set_channel_map(pa_sink_input_new_data *data, const pa_channel_map *map); void pa_sink_input_new_data_set_volume(pa_sink_input_new_data *data, const pa_cvolume *volume); void pa_sink_input_new_data_set_muted(pa_sink_input_new_data *data, pa_bool_t mute); +void pa_sink_input_new_data_done(pa_sink_input_new_data *data); + +typedef struct pa_sink_input_move_hook_data { + pa_sink_input *sink_input; + pa_sink *destination; +} pa_sink_input_move_hook_data; /* To be called by the implementing module only */ @@ -213,7 +226,19 @@ void pa_sink_input_unlink(pa_sink_input* i); void pa_sink_input_set_name(pa_sink_input *i, const char *name); -/* Callable by everyone */ +void pa_sink_input_set_requested_latency(pa_sink_input *i, pa_usec_t usec); + +/* Request that the specified number of bytes already written out to +the hw device is rewritten, if possible. If this function is used you +need to supply the ->rewind() function pointer. Please note that this +is only a kind request. The sink driver may not be able to fulfill it +fully -- or at all. If the request for a rewrite was successful, the +sink driver will call ->rewind() and pass the number of bytes that +could be rewound in the HW device. This functionality is required for +implementing the "zero latency" write-through functionality. */ +void pa_sink_input_request_rewrite(pa_sink_input *i, size_t nbytes); + +/* Callable by everyone from main thread*/ /* External code may request disconnection with this function */ void pa_sink_input_kill(pa_sink_input*i); @@ -235,10 +260,14 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately); pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i); -/* To be used exclusively by the sink driver thread */ +pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i); +/* To be used exclusively by the sink driver IO thread */ int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_cvolume *volume); void pa_sink_input_drop(pa_sink_input *i, size_t length); +void pa_sink_input_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */); +void pa_sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */); + int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); typedef struct pa_sink_input_move_info { @@ -248,4 +277,5 @@ typedef struct pa_sink_input_move_info { size_t buffer_bytes; } pa_sink_input_move_info; + #endif diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 9adb6097..cded6ba7 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -47,42 +47,105 @@ #define MAX_MIX_CHANNELS 32 #define MIX_BUFFER_LENGTH (PA_PAGE_SIZE) -#define SILENCE_BUFFER_LENGTH (PA_PAGE_SIZE*12) static PA_DEFINE_CHECK_TYPE(pa_sink, pa_msgobject); static void sink_free(pa_object *s); +pa_sink_new_data* pa_sink_new_data_init(pa_sink_new_data *data) { + pa_assert(data); + + memset(data, 0, sizeof(*data)); + data->proplist = pa_proplist_new(); + + return data; +} + +void pa_sink_new_data_set_name(pa_sink_new_data *data, const char *name) { + pa_assert(data); + + pa_xfree(data->name); + data->name = pa_xstrdup(name); +} + +void pa_sink_new_data_set_sample_spec(pa_sink_new_data *data, const pa_sample_spec *spec) { + pa_assert(data); + + if ((data->sample_spec_is_set = !!spec)) + data->sample_spec = *spec; +} + +void pa_sink_new_data_set_channel_map(pa_sink_new_data *data, const pa_channel_map *map) { + pa_assert(data); + + if ((data->channel_map_is_set = !!map)) + data->channel_map = *map; +} + +void pa_sink_new_data_set_volume(pa_sink_new_data *data, const pa_cvolume *volume) { + pa_assert(data); + + if ((data->volume_is_set = !!volume)) + data->volume = *volume; +} + +void pa_sink_new_data_set_muted(pa_sink_new_data *data, pa_bool_t mute) { + pa_assert(data); + + data->muted_is_set = TRUE; + data->muted = !!mute; +} + +void pa_sink_new_data_done(pa_sink_new_data *data) { + pa_assert(data); + + pa_xfree(data->name); + pa_proplist_free(data->proplist); +} + pa_sink* pa_sink_new( pa_core *core, - const char *driver, - const char *name, - int fail, - const pa_sample_spec *spec, - const pa_channel_map *map) { + pa_sink_new_data *data, + pa_sink_flags_t flags) { pa_sink *s; - char *n = NULL; - char st[256]; - pa_channel_map tmap; + char *d; + const char *name; + char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; + pa_source_new_data source_data; pa_assert(core); - pa_assert(name); - pa_assert(spec); + pa_assert(data); + + if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SINK_NEW], data) < 0) + return NULL; + + pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver)); + pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]); + + pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec)); + + if (!data->channel_map_is_set) + pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT)); + + pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map)); + pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels); - pa_return_null_if_fail(pa_sample_spec_valid(spec)); + if (!data->volume_is_set) + pa_cvolume_reset(&data->volume, data->sample_spec.channels); - if (!map) - pa_return_null_if_fail((map = pa_channel_map_init_auto(&tmap, spec->channels, PA_CHANNEL_MAP_DEFAULT))); + pa_return_null_if_fail(pa_cvolume_valid(&data->volume)); + pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels); - pa_return_null_if_fail(map && pa_channel_map_valid(map)); - pa_return_null_if_fail(map->channels == spec->channels); - pa_return_null_if_fail(!driver || pa_utf8_valid(driver)); - pa_return_null_if_fail(name && pa_utf8_valid(name) && *name); + if (!data->muted_is_set) + data->muted = FALSE; + + if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SINK_FIXATE], data) < 0) + return NULL; s = pa_msgobject_new(pa_sink); - if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SINK, s, fail))) { + if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SINK, s, data->namereg_fail))) { pa_xfree(s); return NULL; } @@ -92,20 +155,20 @@ pa_sink* pa_sink_new( s->core = core; s->state = PA_SINK_INIT; - s->flags = 0; + s->flags = flags; s->name = pa_xstrdup(name); - s->description = NULL; - s->driver = pa_xstrdup(driver); - s->module = NULL; + s->proplist = pa_proplist_copy(data->proplist); + s->driver = pa_xstrdup(data->driver); + s->module = data->module; - s->sample_spec = *spec; - s->channel_map = *map; + s->sample_spec = data->sample_spec; + s->channel_map = data->channel_map; s->inputs = pa_idxset_new(NULL, NULL); s->n_corked = 0; - pa_cvolume_reset(&s->volume, spec->channels); - s->muted = FALSE; + s->volume = data->volume; + s->muted = data->muted; s->refresh_volume = s->refresh_mute = FALSE; s->get_latency = NULL; @@ -114,35 +177,53 @@ pa_sink* pa_sink_new( s->set_mute = NULL; s->get_mute = NULL; s->set_state = NULL; + s->request_rewind = NULL; + s->update_requested_latency = NULL; s->userdata = NULL; s->asyncmsgq = NULL; s->rtpoll = NULL; - s->silence = NULL; + s->silence = pa_silence_memblock_new(core->mempool, &s->sample_spec, 0); + + s->thread_info.inputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); + s->thread_info.soft_volume = s->volume; + s->thread_info.soft_muted = s->muted; + s->thread_info.state = s->state; + s->thread_info.rewind_nbytes = 0; + s->thread_info.max_rewind = 0; + s->thread_info.requested_latency_valid = TRUE; + s->thread_info.requested_latency = 0; pa_assert_se(pa_idxset_put(core->sinks, s, &s->index) >= 0); - pa_sample_spec_snprint(st, sizeof(st), spec); - pa_log_info("Created sink %u \"%s\" with sample spec \"%s\"", s->index, s->name, st); + pa_log_info("Created sink %u \"%s\" with sample spec %s and channel map %s", + s->index, + s->name, + pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec), + pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map)); - n = pa_sprintf_malloc("%s.monitor", name); + pa_source_new_data_init(&source_data); + pa_source_new_data_set_sample_spec(&source_data, &s->sample_spec); + pa_source_new_data_set_channel_map(&source_data, &s->channel_map); + source_data.name = pa_sprintf_malloc("%s.monitor", name); + source_data.driver = data->driver; - if (!(s->monitor_source = pa_source_new(core, driver, n, 0, spec, map))) - pa_log_warn("Failed to create monitor source."); - else { - char *d; - s->monitor_source->monitor_of = s; - d = pa_sprintf_malloc("Monitor Source of %s", s->name); - pa_source_set_description(s->monitor_source, d); - pa_xfree(d); - } + d = pa_sprintf_malloc("Monitor Source of %s", s->name); + pa_proplist_sets(data->proplist, PA_PROP_DEVICE_DESCRIPTION, d); + pa_xfree(d); + pa_proplist_sets(data->proplist, PA_PROP_DEVICE_CLASS, "monitor"); - pa_xfree(n); + s->monitor_source = pa_source_new(core, &source_data, 0); - s->thread_info.inputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); - s->thread_info.soft_volume = s->volume; - s->thread_info.soft_muted = s->muted; - s->thread_info.state = s->state; + pa_source_new_data_done(&source_data); + + if (!s->monitor_source) { + pa_sink_unlink(s); + pa_sink_unref(s); + return NULL; + } + + s->monitor_source->monitor_of = s; return s; } @@ -193,12 +274,22 @@ void pa_sink_put(pa_sink* s) { pa_assert(s->asyncmsgq); pa_assert(s->rtpoll); + if (s->get_volume && s->set_volume) + s->flags |= PA_SINK_HW_VOLUME_CTRL; + else + s->flags = (s->flags & ~PA_SINK_HW_VOLUME_CTRL) | PA_SINK_DECIBEL_VOLUME; + + if (s->get_mute && s->set_mute) + s->flags |= PA_SINK_HW_MUTE_CTRL; + else + s->flags &= ~PA_SINK_HW_MUTE_CTRL; + pa_assert_se(sink_set_state(s, PA_SINK_IDLE) == 0); pa_source_put(s->monitor_source); pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_NEW_POST], s); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_PUT], s); } void pa_sink_unlink(pa_sink* s) { @@ -241,6 +332,8 @@ void pa_sink_unlink(pa_sink* s) { s->set_mute = NULL; s->get_mute = NULL; s->set_state = NULL; + s->request_rewind = NULL; + s->update_requested_latency = NULL; if (s->monitor_source) pa_source_unlink(s->monitor_source); @@ -279,8 +372,11 @@ static void sink_free(pa_object *o) { pa_memblock_unref(s->silence); pa_xfree(s->name); - pa_xfree(s->description); pa_xfree(s->driver); + + if (s->proplist) + pa_proplist_free(s->proplist); + pa_xfree(s); } @@ -330,6 +426,26 @@ void pa_sink_ping(pa_sink *s) { pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, 0, NULL, NULL); } +void pa_sink_process_rewind(pa_sink *s) { + pa_sink_input *i; + void *state = NULL; + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->state)); + + if (s->thread_info.rewind_nbytes <= 0) + return; + + pa_log_debug("Processing rewind..."); + + while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) { + pa_sink_input_assert_ref(i); + + pa_sink_input_rewind(i, s->thread_info.rewind_nbytes); + } + + s->thread_info.rewind_nbytes = 0; +} + static unsigned fill_mix_info(pa_sink *s, size_t length, pa_mix_info *info, unsigned maxinfo) { pa_sink_input *i; unsigned n = 0; @@ -344,6 +460,11 @@ static unsigned fill_mix_info(pa_sink *s, size_t length, pa_mix_info *info, unsi if (pa_sink_input_peek(i, length, &info->chunk, &info->volume) < 0) continue; + if (pa_memblock_is_silence(info->chunk.memblock)) { + pa_memblock_unref(info->chunk.memblock); + continue; + } + info->userdata = pa_sink_input_ref(i); pa_assert(info->chunk.memblock); @@ -427,6 +548,8 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) { pa_sink_ref(s); + s->thread_info.rewind_nbytes = 0; + if (length <= 0) length = pa_frame_align(MIX_BUFFER_LENGTH, &s->sample_spec); @@ -440,19 +563,8 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) { if (n == 0) { - if (length > SILENCE_BUFFER_LENGTH) - length = pa_frame_align(SILENCE_BUFFER_LENGTH, &s->sample_spec); - - pa_assert(length > 0); - - if (!s->silence || pa_memblock_get_length(s->silence) < length) { - if (s->silence) - pa_memblock_unref(s->silence); - s->silence = pa_silence_memblock_new(s->core->mempool, &s->sample_spec, length); - } - result->memblock = pa_memblock_ref(s->silence); - result->length = length; + result->length = PA_MIN(pa_memblock_get_length(s->silence), length); result->index = 0; } else if (n == 1) { @@ -506,11 +618,13 @@ void pa_sink_render_into(pa_sink*s, pa_memchunk *target) { pa_sink_ref(s); + s->thread_info.rewind_nbytes = 0; + n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, target->length, info, MAX_MIX_CHANNELS) : 0; - if (n == 0) { + if (n == 0) pa_silence_memchunk(target, &s->sample_spec); - } else if (n == 1) { + else if (n == 1) { if (target->length > info[0].chunk.length) target->length = info[0].chunk.length; @@ -573,6 +687,8 @@ void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target) { pa_sink_ref(s); + s->thread_info.rewind_nbytes = 0; + l = target->length; d = 0; while (l > 0) { @@ -596,6 +712,8 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result) { pa_assert(pa_frame_aligned(length, &s->sample_spec)); pa_assert(result); + s->thread_info.rewind_nbytes = 0; + /*** This needs optimization ***/ result->index = 0; @@ -614,6 +732,8 @@ void pa_sink_skip(pa_sink *s, size_t length) { pa_assert(length > 0); pa_assert(pa_frame_aligned(length, &s->sample_spec)); + s->thread_info.rewind_nbytes = 0; + if (pa_source_used_by(s->monitor_source)) { pa_memchunk chunk; @@ -644,6 +764,8 @@ pa_usec_t pa_sink_get_latency(pa_sink *s) { pa_sink_assert_ref(s); pa_assert(PA_SINK_LINKED(s->state)); + /* The returned value is supposed to be in the time domain of the sound card! */ + if (!PA_SINK_OPENED(s->state)) return 0; @@ -735,43 +857,34 @@ pa_bool_t pa_sink_get_mute(pa_sink *s) { return s->muted; } -void pa_sink_set_module(pa_sink *s, pa_module *m) { - pa_sink_assert_ref(s); - - if (s->module == m) - return; - - s->module = m; - - if (s->monitor_source) - pa_source_set_module(s->monitor_source, m); - - pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); -} - void pa_sink_set_description(pa_sink *s, const char *description) { + const char *old; pa_sink_assert_ref(s); - if (!description && !s->description) + if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION)) return; - if (description && s->description && !strcmp(description, s->description)) + old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION); + + if (old && description && !strcmp(old, description)) return; - pa_xfree(s->description); - s->description = pa_xstrdup(description); + if (description) + pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description); + else + pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION); if (s->monitor_source) { char *n; - n = pa_sprintf_malloc("Monitor Source of %s", s->description? s->description : s->name); + n = pa_sprintf_malloc("Monitor Source of %s", description ? description : s->name); pa_source_set_description(s->monitor_source, n); pa_xfree(n); } if (PA_SINK_LINKED(s->state)) { pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_DESCRIPTION_CHANGED], s); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_PROPLIST_CHANGED], s); } } @@ -817,6 +930,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse case PA_SINK_MESSAGE_ADD_INPUT: { pa_sink_input *i = PA_SINK_INPUT(userdata); + pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i)); /* Since the caller sleeps in pa_sink_input_put(), we can @@ -835,6 +949,8 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse i->thread_info.sync_next->thread_info.sync_prev = i; } + pa_sink_input_set_max_rewind(i, s->thread_info.max_rewind); + pa_assert(!i->thread_info.attached); i->thread_info.attached = TRUE; @@ -845,6 +961,11 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse * ghost sink input handling a few lines down at * PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER, too. */ + pa_sink_invalidate_requested_latency(s); + +/* i->thread_info.ignore_rewind = TRUE; */ +/* pa_sink_request_rewind(s, 0); */ + return 0; } @@ -881,6 +1002,10 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index))) pa_sink_input_unref(i); + pa_sink_invalidate_requested_latency(s); + +/* pa_sink_request_rewind(s, 0); */ + return 0; } @@ -899,6 +1024,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse pa_assert(info->sink_input->thread_info.attached); info->sink_input->thread_info.attached = FALSE; + pa_sink_invalidate_requested_latency(info->sink_input->sink); if (info->ghost_sink_input) { pa_assert(info->buffer_bytes > 0); @@ -934,9 +1060,8 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse info->buffer_bytes -= n; } - /* Add the remaining already resampled chunk to the buffer */ - if (info->sink_input->thread_info.resampled_chunk.memblock) - pa_memblockq_push(info->buffer, &info->sink_input->thread_info.resampled_chunk); + /* Add the remaining already resampled chunks to the buffer */ + pa_memblockq_splice(info->buffer, info->sink_input->thread_info.render_memblockq); pa_memblockq_sink_input_set_queue(info->ghost_sink_input, info->buffer); @@ -952,22 +1077,33 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(info->ghost_sink_input->index), pa_sink_input_ref(info->ghost_sink_input)); info->ghost_sink_input->thread_info.sync_prev = info->ghost_sink_input->thread_info.sync_next = NULL; + pa_sink_input_set_max_rewind(info->ghost_sink_input, s->thread_info.max_rewind); + pa_assert(!info->ghost_sink_input->thread_info.attached); info->ghost_sink_input->thread_info.attached = TRUE; if (info->ghost_sink_input->attach) info->ghost_sink_input->attach(info->ghost_sink_input); + } + pa_sink_invalidate_requested_latency(s); + + pa_sink_request_rewind(s, 0); + return 0; } case PA_SINK_MESSAGE_SET_VOLUME: s->thread_info.soft_volume = *((pa_cvolume*) userdata); + + pa_sink_request_rewind(s, 0); return 0; case PA_SINK_MESSAGE_SET_MUTE: s->thread_info.soft_muted = PA_PTR_TO_UINT(userdata); + + pa_sink_request_rewind(s, 0); return 0; case PA_SINK_MESSAGE_GET_VOLUME: @@ -1064,3 +1200,73 @@ void pa_sink_attach_within_thread(pa_sink *s) { if (s->monitor_source) pa_source_attach_within_thread(s->monitor_source); } + +void pa_sink_request_rewind(pa_sink*s, size_t nbytes) { + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->thread_info.state)); + + if (nbytes <= 0) + nbytes = s->thread_info.max_rewind; + + nbytes = PA_MIN(nbytes, s->thread_info.max_rewind); + + if (nbytes <= s->thread_info.rewind_nbytes) + return; + + s->thread_info.rewind_nbytes = nbytes; + + if (s->request_rewind) + s->request_rewind(s); +} + +pa_usec_t pa_sink_get_requested_latency(pa_sink *s) { + pa_usec_t result = 0; + pa_sink_input *i; + void *state = NULL; + + pa_sink_assert_ref(s); + + if (s->thread_info.requested_latency_valid) + return s->thread_info.requested_latency; + + while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) + + if (i->thread_info.requested_sink_latency > 0 && + (!result || result > i->thread_info.requested_sink_latency)) + result = i->thread_info.requested_sink_latency; + + s->thread_info.requested_latency = result; + s->thread_info.requested_latency_valid = TRUE; + + return result; +} + +void pa_sink_set_max_rewind(pa_sink *s, size_t max_rewind) { + pa_sink_input *i; + void *state = NULL; + + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->thread_info.state)); + + if (max_rewind == s->thread_info.max_rewind) + return; + + s->thread_info.max_rewind = max_rewind; + + while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) + pa_sink_input_set_max_rewind(i, s->thread_info.max_rewind); +} + +void pa_sink_invalidate_requested_latency(pa_sink *s) { + + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->thread_info.state)); + + if (!s->thread_info.requested_latency_valid) + return; + + s->thread_info.requested_latency_valid = FALSE; + + if (s->update_requested_latency) + s->update_requested_latency(s); +} diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index e9969309..3f169952 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -69,7 +69,8 @@ struct pa_sink { pa_sink_flags_t flags; char *name; - char *description, *driver; /* may be NULL */ + char *driver; /* may be NULL */ + pa_proplist *proplist; pa_module *module; /* may be NULL */ @@ -85,16 +86,20 @@ struct pa_sink { pa_bool_t refresh_volume; pa_bool_t refresh_mute; - int (*set_state)(pa_sink *s, pa_sink_state_t state); /* may be NULL */ - int (*set_volume)(pa_sink *s); /* dito */ - int (*get_volume)(pa_sink *s); /* dito */ - int (*get_mute)(pa_sink *s); /* dito */ - int (*set_mute)(pa_sink *s); /* dito */ - pa_usec_t (*get_latency)(pa_sink *s); /* dito */ - pa_asyncmsgq *asyncmsgq; pa_rtpoll *rtpoll; + pa_memblock *silence; + + int (*set_state)(pa_sink *s, pa_sink_state_t state); /* may be NULL */ + int (*set_volume)(pa_sink *s); /* dito */ + int (*get_volume)(pa_sink *s); /* dito */ + int (*get_mute)(pa_sink *s); /* dito */ + int (*set_mute)(pa_sink *s); /* dito */ + pa_usec_t (*get_latency)(pa_sink *s); /* dito */ + void (*request_rewind)(pa_sink *s); /* dito */ + void (*update_requested_latency)(pa_sink *s); /* dito */ + /* Contains copies of the above data so that the real-time worker * thread can work without access locking */ struct { @@ -102,9 +107,17 @@ struct pa_sink { pa_hashmap *inputs; pa_cvolume soft_volume; pa_bool_t soft_muted; - } thread_info; - pa_memblock *silence; + pa_bool_t requested_latency_valid; + size_t requested_latency; + + /* The number of bytes we need keep around to be able to satisfy + * every DMA buffer rewrite */ + size_t max_rewind; + + /* Maximum of what clients requested to rewind in this cycle */ + size_t rewind_nbytes; + } thread_info; void *userdata; }; @@ -128,20 +141,43 @@ typedef enum pa_sink_message { PA_SINK_MESSAGE_MAX } pa_sink_message_t; +typedef struct pa_sink_new_data { + char *name; + pa_bool_t namereg_fail; + pa_proplist *proplist; + + const char *driver; + pa_module *module; + + pa_sample_spec sample_spec; + pa_bool_t sample_spec_is_set; + pa_channel_map channel_map; + pa_bool_t channel_map_is_set; + + pa_cvolume volume; + pa_bool_t volume_is_set; + pa_bool_t muted; + pa_bool_t muted_is_set; +} pa_sink_new_data; + +pa_sink_new_data* pa_sink_new_data_init(pa_sink_new_data *data); +void pa_sink_new_data_set_name(pa_sink_new_data *data, const char *name); +void pa_sink_new_data_set_sample_spec(pa_sink_new_data *data, const pa_sample_spec *spec); +void pa_sink_new_data_set_channel_map(pa_sink_new_data *data, const pa_channel_map *map); +void pa_sink_new_data_set_volume(pa_sink_new_data *data, const pa_cvolume *volume); +void pa_sink_new_data_set_muted(pa_sink_new_data *data, pa_bool_t mute); +void pa_sink_new_data_done(pa_sink_new_data *data); + /* To be called exclusively by the sink driver, from main context */ pa_sink* pa_sink_new( pa_core *core, - const char *driver, - const char *name, - int namereg_fail, - const pa_sample_spec *spec, - const pa_channel_map *map); + pa_sink_new_data *data, + pa_sink_flags_t flags); void pa_sink_put(pa_sink *s); void pa_sink_unlink(pa_sink* s); -void pa_sink_set_module(pa_sink *sink, pa_module *m); void pa_sink_set_description(pa_sink *s, const char *description); void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q); void pa_sink_set_rtpoll(pa_sink *s, pa_rtpoll *p); @@ -151,12 +187,15 @@ void pa_sink_attach(pa_sink *s); /* May be called by everyone, from main context */ +/* The returned value is supposed to be in the time domain of the sound card! */ pa_usec_t pa_sink_get_latency(pa_sink *s); int pa_sink_update_status(pa_sink*s); int pa_sink_suspend(pa_sink *s, pa_bool_t suspend); int pa_sink_suspend_all(pa_core *c, pa_bool_t suspend); +void pa_sink_rewind(pa_sink *s, size_t length); + /* Sends a ping message to the sink thread, to make it wake up and * check for data to process even if there is no real message is * sent */ @@ -173,11 +212,12 @@ unsigned pa_sink_used_by(pa_sink *s); /* Number of connected streams which are n /* To be called exclusively by the sink driver, from IO context */ +void pa_sink_process_rewind(pa_sink *s); + void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result); void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result); void pa_sink_render_into(pa_sink*s, pa_memchunk *target); void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target); - void pa_sink_skip(pa_sink *s, size_t length); int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); @@ -185,4 +225,14 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse void pa_sink_attach_within_thread(pa_sink *s); void pa_sink_detach_within_thread(pa_sink *s); +pa_usec_t pa_sink_get_requested_latency(pa_sink *s); + +void pa_sink_set_max_rewind(pa_sink *s, size_t max_rewind); + +/* To be called exclusively by sink input drivers, from IO context */ + +void pa_sink_request_rewind(pa_sink*s, size_t nbytes); + +void pa_sink_invalidate_requested_latency(pa_sink *s); + #endif diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c index bb1f3e9a..60c2560e 100644 --- a/src/pulsecore/sound-file-stream.c +++ b/src/pulsecore/sound-file-stream.c @@ -41,17 +41,21 @@ #include #include #include +#include #include "sound-file-stream.h" +#define MEMBLOCKQ_MAXLENGTH (16*1024*1024) + typedef struct file_stream { pa_msgobject parent; pa_core *core; - SNDFILE *sndfile; pa_sink_input *sink_input; - pa_memchunk memchunk; + + SNDFILE *sndfile; sf_count_t (*readf_function)(SNDFILE *sndfile, void *ptr, sf_count_t frames); - size_t drop; + + pa_memblockq *memblockq; } file_stream; enum { @@ -69,7 +73,6 @@ static void file_stream_unlink(file_stream *u) { return; pa_sink_input_unlink(u->sink_input); - pa_sink_input_unref(u->sink_input); u->sink_input = NULL; @@ -81,10 +84,8 @@ static void file_stream_free(pa_object *o) { file_stream *u = FILE_STREAM(o); pa_assert(u); - file_stream_unlink(u); - - if (u->memchunk.memblock) - pa_memblock_unref(u->memchunk.memblock); + if (u->memblockq) + pa_memblockq_free(u->memblockq); if (u->sndfile) sf_close(u->sndfile); @@ -106,116 +107,122 @@ static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, int } static void sink_input_kill_cb(pa_sink_input *i) { + file_stream *u; + pa_sink_input_assert_ref(i); + u = FILE_STREAM(i->userdata); + file_stream_assert_ref(u); - file_stream_unlink(FILE_STREAM(i->userdata)); + file_stream_unlink(u); } -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { file_stream *u; - pa_assert(i); + pa_sink_input_assert_ref(i); pa_assert(chunk); u = FILE_STREAM(i->userdata); file_stream_assert_ref(u); - if (!u->sndfile) + if (!u->memblockq) return -1; - for (;;) { - - if (!u->memchunk.memblock) { + pa_log_debug("pop: %lu", (unsigned long) length); - u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, length); - u->memchunk.index = 0; - - if (u->readf_function) { - sf_count_t n; - void *p; - size_t fs = pa_frame_size(&i->sample_spec); + for (;;) { + pa_memchunk tchunk; - p = pa_memblock_acquire(u->memchunk.memblock); - n = u->readf_function(u->sndfile, p, length/fs); - pa_memblock_release(u->memchunk.memblock); + if (pa_memblockq_peek(u->memblockq, chunk) >= 0) { + pa_memblockq_drop(u->memblockq, chunk->length); + return 0; + } - if (n <= 0) - n = 0; + if (!u->sndfile) + break; - u->memchunk.length = n * fs; - } else { - sf_count_t n; - void *p; + tchunk.memblock = pa_memblock_new(i->sink->core->mempool, length); + tchunk.index = 0; - p = pa_memblock_acquire(u->memchunk.memblock); - n = sf_read_raw(u->sndfile, p, length); - pa_memblock_release(u->memchunk.memblock); + if (u->readf_function) { + sf_count_t n; + void *p; + size_t fs = pa_frame_size(&i->sample_spec); - if (n <= 0) - n = 0; + p = pa_memblock_acquire(tchunk.memblock); + n = u->readf_function(u->sndfile, p, length/fs); + pa_memblock_release(tchunk.memblock); - u->memchunk.length = n; - } + if (n <= 0) + n = 0; - if (u->memchunk.length <= 0) { + tchunk.length = n * fs; - pa_memblock_unref(u->memchunk.memblock); - pa_memchunk_reset(&u->memchunk); + } else { + sf_count_t n; + void *p; - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u), FILE_STREAM_MESSAGE_UNLINK, NULL, 0, NULL, NULL); + p = pa_memblock_acquire(tchunk.memblock); + n = sf_read_raw(u->sndfile, p, length); + pa_memblock_release(tchunk.memblock); - sf_close(u->sndfile); - u->sndfile = NULL; + if (n <= 0) + n = 0; - return -1; - } + tchunk.length = n; } - pa_assert(u->memchunk.memblock); - pa_assert(u->memchunk.length > 0); + if (tchunk.length <= 0) { + + pa_memblock_unref(tchunk.memblock); - if (u->drop < u->memchunk.length) { - u->memchunk.index += u->drop; - u->memchunk.length -= u->drop; - u->drop = 0; + sf_close(u->sndfile); + u->sndfile = NULL; break; } - u->drop -= u->memchunk.length; - pa_memblock_unref(u->memchunk.memblock); - pa_memchunk_reset(&u->memchunk); + pa_memblockq_push(u->memblockq, &tchunk); + pa_memblock_unref(tchunk.memblock); } - *chunk = u->memchunk; - pa_memblock_ref(chunk->memblock); + pa_log_debug("peek fail"); - pa_assert(chunk->length > 0); - pa_assert(u->drop <= 0); + if (pa_sink_input_safe_to_remove(i)) { + pa_log_debug("completed to play"); - return 0; + pa_memblockq_free(u->memblockq); + u->memblockq = NULL; + + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u), FILE_STREAM_MESSAGE_UNLINK, NULL, 0, NULL, NULL); + } + + return -1; } -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { +static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) { file_stream *u; - pa_assert(i); - pa_assert(length > 0); + pa_sink_input_assert_ref(i); + pa_assert(nbytes > 0); u = FILE_STREAM(i->userdata); file_stream_assert_ref(u); - if (u->memchunk.memblock) { + if (!u->memblockq) + return; - if (length < u->memchunk.length) { - u->memchunk.index += length; - u->memchunk.length -= length; - return; - } + pa_memblockq_rewind(u->memblockq, nbytes); +} - length -= u->memchunk.length; - pa_memblock_unref(u->memchunk.memblock); - pa_memchunk_reset(&u->memchunk); - } +static void sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes) { + file_stream *u; - u->drop += length; + pa_sink_input_assert_ref(i); + u = FILE_STREAM(i->userdata); + file_stream_assert_ref(u); + + if (!u->memblockq) + return; + + pa_memblockq_set_maxrewind(u->memblockq, nbytes); } int pa_play_file( @@ -228,6 +235,7 @@ int pa_play_file( pa_sample_spec ss; pa_sink_input_new_data data; int fd; + pa_memblock *silence; pa_assert(sink); pa_assert(fname); @@ -237,10 +245,8 @@ int pa_play_file( u->parent.process_msg = file_stream_process_msg; u->core = sink->core; u->sink_input = NULL; - pa_memchunk_reset(&u->memchunk); u->sndfile = NULL; u->readf_function = NULL; - u->drop = 0; memset(&sfinfo, 0, sizeof(sfinfo)); @@ -312,18 +318,31 @@ int pa_play_file( pa_sink_input_new_data_init(&data); data.sink = sink; data.driver = __FILE__; - data.name = fname; pa_sink_input_new_data_set_sample_spec(&data, &ss); pa_sink_input_new_data_set_volume(&data, volume); + pa_proplist_sets(data.proplist, PA_PROP_MEDIA_NAME, fname); + pa_proplist_sets(data.proplist, PA_PROP_MEDIA_FILENAME, fname); + + u->sink_input = pa_sink_input_new(sink->core, &data, 0); + pa_sink_input_new_data_done(&data); - if (!(u->sink_input = pa_sink_input_new(sink->core, &data, 0))) + if (!u->sink_input) goto fail; - u->sink_input->peek = sink_input_peek_cb; - u->sink_input->drop = sink_input_drop_cb; + u->sink_input->pop = sink_input_pop_cb; + u->sink_input->rewind = sink_input_rewind_cb; + u->sink_input->set_max_rewind = sink_input_set_max_rewind; u->sink_input->kill = sink_input_kill_cb; u->sink_input->userdata = u; + silence = pa_silence_memblock_new( + u->core->mempool, + &u->sink_input->sample_spec, + u->sink_input->thread_info.resampler ? pa_resampler_max_block_size(u->sink_input->thread_info.resampler) : 0); + + u->memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0, pa_frame_size(&u->sink_input->sample_spec), 1, 1, 0, silence); + pa_memblock_unref(silence); + pa_sink_input_put(u->sink_input); /* The reference to u is dangling here, because we want to keep diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 88c11469..45a8d74e 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -47,9 +48,18 @@ pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_d memset(data, 0, sizeof(*data)); data->resample_method = PA_RESAMPLER_INVALID; + data->proplist = pa_proplist_new(); + return data; } +void pa_source_output_new_data_set_sample_spec(pa_source_output_new_data *data, const pa_sample_spec *spec) { + pa_assert(data); + + if ((data->sample_spec_is_set = !!spec)) + data->sample_spec = *spec; +} + void pa_source_output_new_data_set_channel_map(pa_source_output_new_data *data, const pa_channel_map *map) { pa_assert(data); @@ -57,11 +67,10 @@ void pa_source_output_new_data_set_channel_map(pa_source_output_new_data *data, data->channel_map = *map; } -void pa_source_output_new_data_set_sample_spec(pa_source_output_new_data *data, const pa_sample_spec *spec) { +void pa_source_output_new_data_done(pa_source_output_new_data *data) { pa_assert(data); - if ((data->sample_spec_is_set = !!spec)) - data->sample_spec = *spec; + pa_proplist_free(data->proplist); } pa_source_output* pa_source_output_new( @@ -80,7 +89,6 @@ pa_source_output* pa_source_output_new( return NULL; pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver)); - pa_return_null_if_fail(!data->name || pa_utf8_valid(data->name)); if (!data->source) data->source = pa_namereg_get(core, NULL, PA_NAMEREG_SOURCE, 1); @@ -156,7 +164,7 @@ pa_source_output* pa_source_output_new( o->core = core; o->state = PA_SOURCE_OUTPUT_INIT; o->flags = flags; - o->name = pa_xstrdup(data->name); + o->proplist = pa_proplist_copy(data->proplist); o->driver = pa_xstrdup(data->driver); o->module = data->module; o->source = data->source; @@ -179,13 +187,14 @@ pa_source_output* pa_source_output_new( o->thread_info.attached = FALSE; o->thread_info.sample_spec = o->sample_spec; o->thread_info.resampler = resampler; + o->thread_info.requested_source_latency = 0; pa_assert_se(pa_idxset_put(core->source_outputs, o, &o->index) == 0); pa_assert_se(pa_idxset_put(o->source->outputs, pa_source_output_ref(o), NULL) == 0); pa_log_info("Created output %u \"%s\" on %s with sample spec %s and channel map %s", o->index, - o->name, + pa_strnull(pa_proplist_gets(o->proplist, PA_PROP_MEDIA_NAME)), o->source->name, pa_sample_spec_snprint(st, sizeof(st), &o->sample_spec), pa_channel_map_snprint(cm, sizeof(cm), &o->channel_map)); @@ -210,7 +219,6 @@ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t o->source->n_corked++; pa_source_update_status(o->source); - o->state = state; if (state != PA_SOURCE_OUTPUT_UNLINKED) @@ -269,14 +277,16 @@ static void source_output_free(pa_object* mo) { if (PA_SOURCE_OUTPUT_LINKED(o->state)) pa_source_output_unlink(o); - pa_log_info("Freeing output %u \"%s\"", o->index, o->name); + pa_log_info("Freeing output %u \"%s\"", o->index, pa_strnull(pa_proplist_gets(o->proplist, PA_PROP_MEDIA_NAME))); pa_assert(!o->thread_info.attached); if (o->thread_info.resampler) pa_resampler_free(o->thread_info.resampler); - pa_xfree(o->name); + if (o->proplist) + pa_proplist_free(o->proplist); + pa_xfree(o->driver); pa_xfree(o); } @@ -292,11 +302,10 @@ void pa_source_output_put(pa_source_output *o) { if (o->state == PA_SOURCE_OUTPUT_CORKED) o->source->n_corked++; - pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL); pa_source_update_status(o->source); + pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); - pa_hook_fire(&o->source->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT], o); } @@ -330,7 +339,7 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_LINKED(o->thread_info.state)); pa_assert(chunk); - pa_assert(chunk->length); + pa_assert(pa_frame_aligned(chunk->length, &o->source->sample_spec)); if (!o->push || o->state == PA_SOURCE_OUTPUT_CORKED) return; @@ -351,6 +360,14 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { pa_memblock_unref(rchunk.memblock); } +void pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t usec) { + pa_source_output_assert_ref(o); + pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state)); + + pa_asyncmsgq_post(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, (int64_t) usec, NULL, NULL); +} + + void pa_source_output_cork(pa_source_output *o, pa_bool_t b) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state)); @@ -375,19 +392,24 @@ int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { } void pa_source_output_set_name(pa_source_output *o, const char *name) { + const char *old; pa_source_output_assert_ref(o); - if (!o->name && !name) + old = pa_proplist_gets(o->proplist, PA_PROP_MEDIA_NAME); + + if (!old && !name) return; - if (o->name && name && !strcmp(o->name, name)) + if (old && name && !strcmp(old, name)) return; - pa_xfree(o->name); - o->name = pa_xstrdup(name); + if (name) + pa_proplist_sets(o->proplist, PA_PROP_MEDIA_NAME, name); + else + pa_proplist_unset(o->proplist, PA_PROP_MEDIA_NAME); if (PA_SOURCE_OUTPUT_LINKED(o->state)) { - pa_hook_fire(&o->source->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_NAME_CHANGED], o); + pa_hook_fire(&o->source->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PROPLIST_CHANGED], o); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, o->index); } } @@ -509,6 +531,13 @@ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int return 0; } + + case PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY: + + o->thread_info.requested_source_latency = (pa_usec_t) offset; + pa_source_invalidate_requested_latency(o->source); + + return 0; } return -1; diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h index d6da8d00..dc95217b 100644 --- a/src/pulsecore/source-output.h +++ b/src/pulsecore/source-output.h @@ -62,10 +62,12 @@ struct pa_source_output { uint32_t index; pa_core *core; + pa_source_output_state_t state; pa_source_output_flags_t flags; - char *name, *driver; /* may be NULL */ + pa_proplist *proplist; + char *driver; /* may be NULL */ pa_module *module; /* may be NULL */ pa_client *client; /* may be NULL */ @@ -74,6 +76,8 @@ struct pa_source_output { pa_sample_spec sample_spec; pa_channel_map channel_map; + pa_resample_method_t resample_method; + /* Pushes a new memchunk into the output. Called from IO thread * context. */ void (*push)(pa_source_output *o, const pa_memchunk *chunk); @@ -86,14 +90,14 @@ struct pa_source_output { * disconnected from its source. Called from IO thread context */ void (*detach) (pa_source_output *o); /* may be NULL */ - /* If non-NULL called whenever the the source this output is attached - * to changes. Called from main context */ - void (*moved) (pa_source_output *o); /* may be NULL */ - /* If non-NULL called whenever the the source this output is attached * to suspends or resumes. Called from main context */ void (*suspend) (pa_source_output *o, pa_bool_t b); /* may be NULL */ + /* If non-NULL called whenever the the source this output is attached + * to changes. Called from main context */ + void (*moved) (pa_source_output *o); /* may be NULL */ + /* Supposed to unlink and destroy this stream. Called from main * context. */ void (*kill)(pa_source_output* o); /* may be NULL */ @@ -104,8 +108,6 @@ struct pa_source_output { thread instead. */ pa_usec_t (*get_latency) (pa_source_output *o); /* may be NULL */ - pa_resample_method_t resample_method; - struct { pa_source_output_state_t state; @@ -114,6 +116,9 @@ struct pa_source_output { pa_sample_spec sample_spec; pa_resampler* resampler; /* may be NULL */ + + /* The requested latency for the source */ + pa_usec_t requested_source_latency; } thread_info; void *userdata; @@ -126,11 +131,14 @@ enum { PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, + PA_SOURCE_OUTPUT_MESSAGE_SET_REQUESTED_LATENCY, PA_SOURCE_OUTPUT_MESSAGE_MAX }; typedef struct pa_source_output_new_data { - const char *name, *driver; + pa_proplist *proplist; + + const char *driver; pa_module *module; pa_client *client; @@ -152,7 +160,7 @@ typedef struct pa_source_output_move_hook_data { pa_source_output_new_data* pa_source_output_new_data_init(pa_source_output_new_data *data); void pa_source_output_new_data_set_sample_spec(pa_source_output_new_data *data, const pa_sample_spec *spec); void pa_source_output_new_data_set_channel_map(pa_source_output_new_data *data, const pa_channel_map *map); -void pa_source_output_new_data_set_volume(pa_source_output_new_data *data, const pa_cvolume *volume); +void pa_source_output_new_data_done(pa_source_output_new_data *data); /* To be called by the implementing module only */ @@ -166,6 +174,8 @@ void pa_source_output_unlink(pa_source_output*o); void pa_source_output_set_name(pa_source_output *i, const char *name); +void pa_source_output_set_requested_latency(pa_source_output *i, pa_usec_t usec); + /* Callable by everyone */ /* External code may request disconnection with this funcion */ @@ -186,6 +196,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest); /* To be used exclusively by the source driver thread */ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk); + int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk *chunk); #endif diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index d707ad86..cc1c531d 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -45,35 +45,97 @@ static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject); static void source_free(pa_object *o); +pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) { + pa_assert(data); + + memset(data, 0, sizeof(*data)); + data->proplist = pa_proplist_new(); + + return data; +} + +void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) { + pa_assert(data); + + pa_xfree(data->name); + data->name = pa_xstrdup(name); +} + +void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) { + pa_assert(data); + + if ((data->sample_spec_is_set = !!spec)) + data->sample_spec = *spec; +} + +void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) { + pa_assert(data); + + if ((data->channel_map_is_set = !!map)) + data->channel_map = *map; +} + +void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) { + pa_assert(data); + + if ((data->volume_is_set = !!volume)) + data->volume = *volume; +} + +void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) { + pa_assert(data); + + data->muted_is_set = TRUE; + data->muted = !!mute; +} + +void pa_source_new_data_done(pa_source_new_data *data) { + pa_assert(data); + + pa_xfree(data->name); + pa_proplist_free(data->proplist); +} + pa_source* pa_source_new( pa_core *core, - const char *driver, - const char *name, - int fail, - const pa_sample_spec *spec, - const pa_channel_map *map) { + pa_source_new_data *data, + pa_source_flags_t flags) { pa_source *s; - char st[256]; - pa_channel_map tmap; + const char *name; + char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; pa_assert(core); - pa_assert(name); - pa_assert(spec); - pa_return_null_if_fail(pa_sample_spec_valid(spec)); + if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) + return NULL; + + pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver)); + pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]); + + pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec)); - if (!map) - pa_return_null_if_fail(map = pa_channel_map_init_auto(&tmap, spec->channels, PA_CHANNEL_MAP_DEFAULT)); + if (!data->channel_map_is_set) + pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT)); - pa_return_null_if_fail(map && pa_channel_map_valid(map)); - pa_return_null_if_fail(map->channels == spec->channels); - pa_return_null_if_fail(!driver || pa_utf8_valid(driver)); - pa_return_null_if_fail(pa_utf8_valid(name) && *name); + pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map)); + pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels); + + if (!data->volume_is_set) + pa_cvolume_reset(&data->volume, data->sample_spec.channels); + + pa_return_null_if_fail(pa_cvolume_valid(&data->volume)); + pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels); + + if (!data->muted_is_set) + data->muted = FALSE; + + if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) + return NULL; s = pa_msgobject_new(pa_source); - if (!(name = pa_namereg_register(core, name, PA_NAMEREG_SOURCE, s, fail))) { + if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) { pa_xfree(s); return NULL; } @@ -83,21 +145,21 @@ pa_source* pa_source_new( s->core = core; s->state = PA_SOURCE_INIT; - s->flags = 0; + s->flags = flags; s->name = pa_xstrdup(name); - s->description = NULL; - s->driver = pa_xstrdup(driver); - s->module = NULL; + s->proplist = pa_proplist_copy(data->proplist); + s->driver = pa_xstrdup(data->driver); + s->module = data->module; - s->sample_spec = *spec; - s->channel_map = *map; + s->sample_spec = data->sample_spec; + s->channel_map = data->channel_map; s->outputs = pa_idxset_new(NULL, NULL); s->n_corked = 0; s->monitor_of = NULL; - pa_cvolume_reset(&s->volume, spec->channels); - s->muted = FALSE; + s->volume = data->volume; + s->muted = data->muted; s->refresh_volume = s->refresh_muted = FALSE; s->get_latency = NULL; @@ -106,20 +168,26 @@ pa_source* pa_source_new( s->set_mute = NULL; s->get_mute = NULL; s->set_state = NULL; + s->update_requested_latency = NULL; s->userdata = NULL; s->asyncmsgq = NULL; s->rtpoll = NULL; - pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0); - - pa_sample_spec_snprint(st, sizeof(st), spec); - pa_log_info("Created source %u \"%s\" with sample spec \"%s\"", s->index, s->name, st); - s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); s->thread_info.soft_volume = s->volume; s->thread_info.soft_muted = s->muted; s->thread_info.state = s->state; + s->thread_info.requested_latency_valid = TRUE; + s->thread_info.requested_latency = 0; + + pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0); + + pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s", + s->index, + s->name, + pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec), + pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map)); return s; } @@ -170,10 +238,26 @@ void pa_source_put(pa_source *s) { pa_assert(s->rtpoll); pa_assert(s->asyncmsgq); + if (s->get_volume && s->set_volume) + s->flags |= PA_SOURCE_HW_VOLUME_CTRL; + else { + s->get_volume = NULL; + s->set_volume = NULL; + s->flags = (s->flags & ~PA_SOURCE_HW_VOLUME_CTRL) | PA_SOURCE_DECIBEL_VOLUME; + } + + if (s->get_mute && s->set_mute) + s->flags |= PA_SOURCE_HW_MUTE_CTRL; + else { + s->get_mute = NULL; + s->set_mute = NULL; + s->flags &= ~PA_SOURCE_HW_MUTE_CTRL; + } + pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0); pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_NEW_POST], s); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s); } void pa_source_unlink(pa_source *s) { @@ -211,6 +295,7 @@ void pa_source_unlink(pa_source *s) { s->set_mute = NULL; s->get_mute = NULL; s->set_state = NULL; + s->update_requested_latency = NULL; if (linked) { pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); @@ -238,8 +323,11 @@ static void source_free(pa_object *o) { pa_hashmap_free(s->thread_info.outputs, NULL, NULL); pa_xfree(s->name); - pa_xfree(s->description); pa_xfree(s->driver); + + if (s->proplist) + pa_proplist_free(s->proplist); + pa_xfree(s); } @@ -400,32 +488,25 @@ pa_bool_t pa_source_get_mute(pa_source *s) { return s->muted; } -void pa_source_set_module(pa_source *s, pa_module *m) { - pa_source_assert_ref(s); - - if (m == s->module) - return; - - s->module = m; - - pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); -} - void pa_source_set_description(pa_source *s, const char *description) { + const char *old; pa_source_assert_ref(s); - if (!description && !s->description) + if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION)) return; - if (description && s->description && !strcmp(description, s->description)) + old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION); + if (old && description && !strcmp(old, description)) return; - pa_xfree(s->description); - s->description = pa_xstrdup(description); + if (description) + pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description); + else + pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION); if (PA_SOURCE_LINKED(s->state)) { - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_DESCRIPTION_CHANGED], s); pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s); } } @@ -470,6 +551,7 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ switch ((pa_source_message_t) code) { case PA_SOURCE_MESSAGE_ADD_OUTPUT: { pa_source_output *o = PA_SOURCE_OUTPUT(userdata); + pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o)); pa_assert(!o->thread_info.attached); @@ -478,6 +560,8 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ if (o->attach) o->attach(o); + pa_source_invalidate_requested_latency(s); + return 0; } @@ -493,6 +577,8 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index))) pa_source_output_unref(o); + pa_source_invalidate_requested_latency(s); + return 0; } @@ -590,5 +676,40 @@ void pa_source_attach_within_thread(pa_source *s) { while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) if (o->attach) o->attach(o); +} + +pa_usec_t pa_source_get_requested_latency(pa_source *s) { + pa_usec_t result = 0; + pa_source_output *o; + void *state = NULL; + + pa_source_assert_ref(s); + + if (s->thread_info.requested_latency_valid) + return s->thread_info.requested_latency; + + while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) + + if (o->thread_info.requested_source_latency > 0 && + (!result || result > o->thread_info.requested_source_latency)) + result = o->thread_info.requested_source_latency; + + s->thread_info.requested_latency = result; + s->thread_info.requested_latency_valid = TRUE; + + return result; +} + +void pa_source_invalidate_requested_latency(pa_source *s) { + + pa_source_assert_ref(s); + pa_assert(PA_SOURCE_LINKED(s->thread_info.state)); + + if (!s->thread_info.requested_latency_valid) + return; + + s->thread_info.requested_latency_valid = FALSE; + if (s->update_requested_latency) + s->update_requested_latency(s); } diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h index bd0a9122..c880d3c5 100644 --- a/src/pulsecore/source.h +++ b/src/pulsecore/source.h @@ -71,7 +71,8 @@ struct pa_source { pa_source_flags_t flags; char *name; - char *description, *driver; /* may be NULL */ + char *driver; /* may be NULL */ + pa_proplist *proplist; pa_module *module; /* may be NULL */ @@ -87,15 +88,16 @@ struct pa_source { pa_bool_t refresh_volume; pa_bool_t refresh_muted; + pa_asyncmsgq *asyncmsgq; + pa_rtpoll *rtpoll; + int (*set_state)(pa_source*source, pa_source_state_t state); /* may be NULL */ int (*set_volume)(pa_source *s); /* dito */ int (*get_volume)(pa_source *s); /* dito */ int (*set_mute)(pa_source *s); /* dito */ int (*get_mute)(pa_source *s); /* dito */ pa_usec_t (*get_latency)(pa_source *s); /* dito */ - - pa_asyncmsgq *asyncmsgq; - pa_rtpoll *rtpoll; + void (*update_requested_latency)(pa_source *s); /* dito */ /* Contains copies of the above data so that the real-time worker * thread can work without access locking */ @@ -104,6 +106,9 @@ struct pa_source { pa_hashmap *outputs; pa_cvolume soft_volume; pa_bool_t soft_muted; + + pa_bool_t requested_latency_valid; + size_t requested_latency; } thread_info; void *userdata; @@ -127,20 +132,43 @@ typedef enum pa_source_message { PA_SOURCE_MESSAGE_MAX } pa_source_message_t; +typedef struct pa_source_new_data { + char *name; + pa_bool_t namereg_fail; + pa_proplist *proplist; + + const char *driver; + pa_module *module; + + pa_sample_spec sample_spec; + pa_bool_t sample_spec_is_set; + pa_channel_map channel_map; + pa_bool_t channel_map_is_set; + + pa_cvolume volume; + pa_bool_t volume_is_set; + pa_bool_t muted; + pa_bool_t muted_is_set; +} pa_source_new_data; + +pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data); +void pa_source_new_data_set_name(pa_source_new_data *data, const char *name); +void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec); +void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map); +void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume); +void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute); +void pa_source_new_data_done(pa_source_new_data *data); + /* To be called exclusively by the source driver, from main context */ pa_source* pa_source_new( pa_core *core, - const char *driver, - const char *name, - int namereg_fail, - const pa_sample_spec *spec, - const pa_channel_map *map); + pa_source_new_data *data, + pa_source_flags_t flags); void pa_source_put(pa_source *s); void pa_source_unlink(pa_source *s); -void pa_source_set_module(pa_source *s, pa_module *m); void pa_source_set_description(pa_source *s, const char *description); void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q); void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p); @@ -176,4 +204,10 @@ int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64_t, pa void pa_source_attach_within_thread(pa_source *s); void pa_source_detach_within_thread(pa_source *s); +pa_usec_t pa_source_get_requested_latency(pa_source *s); + +/* To be called exclusively by source output drivers, from IO context */ + +void pa_source_invalidate_requested_latency(pa_source *s); + #endif diff --git a/src/pulsecore/tagstruct.c b/src/pulsecore/tagstruct.c index 556fe806..0017c388 100644 --- a/src/pulsecore/tagstruct.c +++ b/src/pulsecore/tagstruct.c @@ -42,12 +42,14 @@ #include "tagstruct.h" +#define MAX_TAG_SIZE (64*1024) + struct pa_tagstruct { uint8_t *data; size_t length, allocated; size_t rindex; - int dynamic; + pa_bool_t dynamic; }; pa_tagstruct *pa_tagstruct_new(const uint8_t* data, size_t length) { @@ -254,6 +256,32 @@ void pa_tagstruct_put_cvolume(pa_tagstruct *t, const pa_cvolume *cvolume) { } } +void pa_tagstruct_put_proplist(pa_tagstruct *t, pa_proplist *p) { + void *state = NULL; + pa_assert(t); + pa_assert(p); + + extend(t, 1); + + t->data[t->length++] = PA_TAG_PROPLIST; + + for (;;) { + const char *k; + const void *d; + size_t l; + + if (!(k = pa_proplist_iterate(p, &state))) + break; + + pa_tagstruct_puts(t, k); + pa_assert_se(pa_proplist_get(p, k, &d, &l) >= 0); + pa_tagstruct_putu32(t, (uint32_t) l); + pa_tagstruct_put_arbitrary(t, d, l); + } + + pa_tagstruct_puts(t, NULL); +} + int pa_tagstruct_gets(pa_tagstruct*t, const char **s) { int error = 0; size_t n; @@ -529,6 +557,57 @@ int pa_tagstruct_get_cvolume(pa_tagstruct *t, pa_cvolume *cvolume) { return 0; } +int pa_tagstruct_get_proplist(pa_tagstruct *t, pa_proplist *p) { + size_t saved_rindex; + + pa_assert(t); + pa_assert(p); + + if (t->rindex+1 > t->length) + return -1; + + if (t->data[t->rindex] != PA_TAG_PROPLIST) + return -1; + + saved_rindex = t->rindex; + + for (;;) { + const char *k; + void *d; + uint32_t length; + + if (pa_tagstruct_gets(t, &k) < 0) + goto fail; + + if (!k) + break; + + if (pa_tagstruct_getu32(t, &length) < 0) + goto fail; + + if (length > MAX_TAG_SIZE) + goto fail; + + d = pa_xmalloc(length); + + if (pa_tagstruct_get_arbitrary(t, d, length) < 0) + goto fail; + + if (pa_proplist_set(p, k, d, length) < 0) { + pa_xfree(d); + goto fail; + } + + pa_xfree(d); + } + + return 0; + +fail: + t->rindex = saved_rindex; + return -1; +} + void pa_tagstruct_put(pa_tagstruct *t, ...) { va_list va; pa_assert(t); @@ -591,6 +670,9 @@ void pa_tagstruct_put(pa_tagstruct *t, ...) { pa_tagstruct_put_cvolume(t, va_arg(va, pa_cvolume *)); break; + case PA_TAG_PROPLIST: + pa_tagstruct_put_proplist(t, va_arg(va, pa_proplist *)); + default: pa_assert_not_reached(); } @@ -662,6 +744,9 @@ int pa_tagstruct_get(pa_tagstruct *t, ...) { ret = pa_tagstruct_get_cvolume(t, va_arg(va, pa_cvolume *)); break; + case PA_TAG_PROPLIST: + ret = pa_tagstruct_get_proplist(t, va_arg(va, pa_proplist *)); + default: pa_assert_not_reached(); } diff --git a/src/pulsecore/tagstruct.h b/src/pulsecore/tagstruct.h index e9bb9ac8..3b2ce7b9 100644 --- a/src/pulsecore/tagstruct.h +++ b/src/pulsecore/tagstruct.h @@ -32,6 +32,7 @@ #include #include #include +#include typedef struct pa_tagstruct pa_tagstruct; @@ -51,7 +52,8 @@ enum { PA_TAG_TIMEVAL = 'T', PA_TAG_USEC = 'U' /* 64bit unsigned */, PA_TAG_CHANNEL_MAP = 'm', - PA_TAG_CVOLUME = 'v' + PA_TAG_CVOLUME = 'v', + PA_TAG_PROPLIST = 'P' }; pa_tagstruct *pa_tagstruct_new(const uint8_t* data, size_t length); @@ -75,6 +77,7 @@ void pa_tagstruct_put_timeval(pa_tagstruct*t, const struct timeval *tv); void pa_tagstruct_put_usec(pa_tagstruct*t, pa_usec_t u); void pa_tagstruct_put_channel_map(pa_tagstruct *t, const pa_channel_map *map); void pa_tagstruct_put_cvolume(pa_tagstruct *t, const pa_cvolume *cvolume); +void pa_tagstruct_put_proplist(pa_tagstruct *t, pa_proplist *p); int pa_tagstruct_get(pa_tagstruct *t, ...); @@ -90,6 +93,7 @@ int pa_tagstruct_get_timeval(pa_tagstruct*t, struct timeval *tv); int pa_tagstruct_get_usec(pa_tagstruct*t, pa_usec_t *u); int pa_tagstruct_get_channel_map(pa_tagstruct *t, pa_channel_map *map); int pa_tagstruct_get_cvolume(pa_tagstruct *t, pa_cvolume *v); +int pa_tagstruct_get_proplist(pa_tagstruct *t, pa_proplist *p); #endif diff --git a/src/pulsecore/time-smoother.c b/src/pulsecore/time-smoother.c index 4cebded4..b44aaa50 100644 --- a/src/pulsecore/time-smoother.c +++ b/src/pulsecore/time-smoother.c @@ -332,6 +332,8 @@ void pa_smoother_put(pa_smoother *s, pa_usec_t x, pa_usec_t y) { s->py = y + s->dp *s->adjust_time; s->abc_valid = FALSE; + + pa_log_debug("put(%llu | %llu) = %llu", x + s->time_offset, x, y); } pa_usec_t pa_smoother_get(pa_smoother *s, pa_usec_t x) { @@ -350,6 +352,9 @@ pa_usec_t pa_smoother_get(pa_smoother *s, pa_usec_t x) { pa_assert(x >= s->ex); estimate(s, x, &y, NULL); + + pa_log_debug("get(%llu | %llu) = %llu", x + s->time_offset, x, y); + return y; } @@ -357,6 +362,8 @@ void pa_smoother_set_time_offset(pa_smoother *s, pa_usec_t offset) { pa_assert(s); s->time_offset = offset; + + pa_log_debug("offset(%llu)", offset); } void pa_smoother_pause(pa_smoother *s, pa_usec_t x) { @@ -365,6 +372,8 @@ void pa_smoother_pause(pa_smoother *s, pa_usec_t x) { if (s->paused) return; + pa_log_debug("pause(%llu)", x); + s->paused = TRUE; s->pause_time = x; } @@ -377,6 +386,31 @@ void pa_smoother_resume(pa_smoother *s, pa_usec_t x) { pa_assert(x >= s->pause_time); + pa_log_debug("resume(%llu)", x); + s->paused = FALSE; s->time_offset += x - s->pause_time; } + +pa_usec_t pa_smoother_translate(pa_smoother *s, pa_usec_t x, pa_usec_t y_delay) { + pa_usec_t ney; + double nde; + + pa_assert(s); + pa_assert(x >= s->time_offset); + + /* Fix up x value */ + if (s->paused) + x = s->pause_time; + + pa_assert(x >= s->time_offset); + x -= s->time_offset; + + pa_assert(x >= s->ex); + + estimate(s, x, &ney, &nde); + + pa_log_debug("translate(%llu) = %llu (%0.2f)", (unsigned long long) y_delay, (unsigned long long) ((double) y_delay / s->dp), s->dp); + + return (pa_usec_t) ((double) y_delay / s->dp); +} diff --git a/src/pulsecore/time-smoother.h b/src/pulsecore/time-smoother.h index 8b8512e2..85d9f0fd 100644 --- a/src/pulsecore/time-smoother.h +++ b/src/pulsecore/time-smoother.h @@ -29,13 +29,19 @@ typedef struct pa_smoother pa_smoother; -pa_smoother* pa_smoother_new(pa_usec_t adjust_time, pa_usec_t history_time, pa_bool_t monotonic); +pa_smoother* pa_smoother_new(pa_usec_t x_adjust_time, pa_usec_t x_history_time, pa_bool_t monotonic); void pa_smoother_free(pa_smoother* s); +/* Adds a new value to our dataset. x = local/system time, y = remote time */ void pa_smoother_put(pa_smoother *s, pa_usec_t x, pa_usec_t y); + +/* Returns an interpolated value based on the dataset. x = local/system time, return value = remote time */ pa_usec_t pa_smoother_get(pa_smoother *s, pa_usec_t x); -void pa_smoother_set_time_offset(pa_smoother *s, pa_usec_t offset); +/* Translates a time span from the remote time domain to the local one. x = local/system time when to estimate, y_delay = remote time span */ +pa_usec_t pa_smoother_translate(pa_smoother *s, pa_usec_t x, pa_usec_t y_delay); + +void pa_smoother_set_time_offset(pa_smoother *s, pa_usec_t x_offset); void pa_smoother_pause(pa_smoother *s, pa_usec_t x); void pa_smoother_resume(pa_smoother *s, pa_usec_t x);