]> code.delx.au - pulseaudio/commitdiff
Rework memory management to allow shared memory data transfer. The central idea
authorLennart Poettering <lennart@poettering.net>
Fri, 18 Aug 2006 19:55:18 +0000 (19:55 +0000)
committerLennart Poettering <lennart@poettering.net>
Fri, 18 Aug 2006 19:55:18 +0000 (19:55 +0000)
is to allocate all audio memory blocks from a per-process memory pool which is
available as read-only SHM segment to other local processes. Then, instead of
writing the actual audio data to the socket just write references to this
shared memory pool.

To work optimally all memory blocks should now be of type PA_MEMBLOCK_POOL or
PA_MEMBLOCK_POOL_EXTERNAL. The function pa_memblock_new() now generates memory
blocks of this type by default.

git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1266 fefdeb5f-60dc-0310-8127-8f9354f1896f

45 files changed:
src/daemon/main.c
src/modules/module-alsa-sink.c
src/modules/module-alsa-source.c
src/modules/module-combine.c
src/modules/module-jack-source.c
src/modules/module-oss-mmap.c
src/modules/module-oss.c
src/modules/module-pipe-source.c
src/modules/module-sine.c
src/modules/module-tunnel.c
src/modules/rtp/module-rtp-recv.c
src/modules/rtp/module-rtp-send.c
src/modules/rtp/rtp.c
src/modules/rtp/rtp.h
src/pulse/context.c
src/pulse/internal.h
src/pulse/stream.c
src/pulsecore/cli-command.c
src/pulsecore/core-scache.c
src/pulsecore/core.c
src/pulsecore/core.h
src/pulsecore/mcalign.c
src/pulsecore/mcalign.h
src/pulsecore/memblock.c
src/pulsecore/memblock.h
src/pulsecore/memblockq.c
src/pulsecore/memblockq.h
src/pulsecore/memchunk.c
src/pulsecore/memchunk.h
src/pulsecore/protocol-esound.c
src/pulsecore/protocol-native.c
src/pulsecore/protocol-simple.c
src/pulsecore/pstream.c
src/pulsecore/pstream.h
src/pulsecore/resampler.c
src/pulsecore/resampler.h
src/pulsecore/sample-util.c
src/pulsecore/sample-util.h
src/pulsecore/sink-input.c
src/pulsecore/sink.c
src/pulsecore/sound-file-stream.c
src/pulsecore/sound-file.c
src/pulsecore/sound-file.h
src/pulsecore/source-output.c
src/pulsecore/source.c

index 38d465f80f16008650380f63bd8704187905713b..aada0ad7977c04fa6d92c595d03b2d8c93bfdae8 100644 (file)
@@ -559,7 +559,7 @@ int main(int argc, char *argv[]) {
     mainloop = pa_mainloop_new();
     assert(mainloop);
 
-    c = pa_core_new(pa_mainloop_get_api(mainloop));
+    c = pa_core_new(pa_mainloop_get_api(mainloop), 1);
     assert(c);
     c->is_system_instance = !!conf->system_instance;
 
index 8da3d23609ed5054d51408ced1543f9fbc2c13a4..0cebd50f11b5d4a92c06ca9b531baf9bba6f9d11 100644 (file)
@@ -492,7 +492,7 @@ int pa__init(pa_core *c, pa_module*m) {
 
     pa_log_info(__FILE__": using %u fragments of size %lu bytes.", periods, (long unsigned)u->fragment_size);
 
-    u->silence.memblock = pa_memblock_new(u->silence.length = u->fragment_size, c->memblock_stat);
+    u->silence.memblock = pa_memblock_new(c->mempool, u->silence.length = u->fragment_size);
     assert(u->silence.memblock);
     pa_silence_memblock(u->silence.memblock, &ss);
     u->silence.index = 0;
index 4a8678c9c50c22f7ab8677e97302b29ebc9c9496..c3979df177623a7b3b29387a7057d5cfb27de006 100644 (file)
@@ -151,7 +151,7 @@ static void do_read(struct userdata *u) {
         size_t l;
         
         if (!u->memchunk.memblock) {
-            u->memchunk.memblock = pa_memblock_new(u->memchunk.length = u->fragment_size, u->source->core->memblock_stat);
+            u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size);
             u->memchunk.index = 0;
         }
             
index 008fe6e74ec4c7bd7369846582d1fb75622a7f00..5243975b5c950e70bf79b3f030600037d1b024a7 100644 (file)
@@ -235,8 +235,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink, int resample
             pa_frame_size(&u->sink->sample_spec),
             1,
             0,
-            NULL,
-            sink->core->memblock_stat);
+            NULL);
 
     snprintf(t, sizeof(t), "%s: output #%u", u->sink->name, u->n_outputs+1);
 
index 583f3b8e9cd6a0ef04c3e328d7000a926adbcd24..8e659198dabc525493d0fea01cb738a0ef191f0d 100644 (file)
@@ -137,7 +137,7 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_
         
         fs = pa_frame_size(&u->source->sample_spec);
 
-        chunk.memblock = pa_memblock_new(chunk.length = u->frames_posted * fs, u->core->memblock_stat);
+        chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs);
         chunk.index = 0;
         
         for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) {
index c783a2f1f207c78c8a7e384e5ea8c7bca3e7aa85..75ab9a9e97f3c2352faf40be9fd2299f8f8264d2 100644 (file)
@@ -162,10 +162,10 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) {
             
         chunk.memblock = u->out_memblocks[u->out_current] =
             pa_memblock_new_fixed(
+                    u->core->mempool,
                     (uint8_t*) u->out_mmap+u->out_fragment_size*u->out_current,
                     u->out_fragment_size,
-                    1,
-                    u->core->memblock_stat);
+                    1);
         assert(chunk.memblock);
         chunk.length = chunk.memblock->length;
         chunk.index = 0;
@@ -210,7 +210,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) {
         pa_memchunk chunk;
         
         if (!u->in_memblocks[u->in_current]) {
-            chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed((uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1, u->core->memblock_stat);
+            chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1);
             chunk.length = chunk.memblock->length;
             chunk.index = 0;
             
index ce11ee025c8c14bd282f2e8e052b5abd58f3d2af..b9b80e72536929011f9ebeb2682d51462ad6e662 100644 (file)
@@ -217,7 +217,7 @@ static void do_read(struct userdata *u) {
     }
     
     do {
-        memchunk.memblock = pa_memblock_new(l, u->core->memblock_stat);
+        memchunk.memblock = pa_memblock_new(u->core->mempool, l);
         assert(memchunk.memblock);
         if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) {
             pa_memblock_unref(memchunk.memblock);
@@ -503,7 +503,7 @@ int pa__init(pa_core *c, pa_module*m) {
 
     u->out_fragment_size = out_frag_size;
     u->in_fragment_size = in_frag_size;
-    u->silence.memblock = pa_memblock_new(u->silence.length = u->out_fragment_size, u->core->memblock_stat);
+    u->silence.memblock = pa_memblock_new(u->core->mempool, u->silence.length = u->out_fragment_size);
     assert(u->silence.memblock);
     pa_silence_memblock(u->silence.memblock, &ss);
     u->silence.index = 0;
index 5caa60a3e22bd1caf5b4e88c8f3b55394ea2a31a..43a8dab557d1cdd2f1785f71cd2665afd5036727 100644 (file)
@@ -91,7 +91,7 @@ static void do_read(struct userdata *u) {
     pa_module_set_used(u->module, pa_idxset_size(u->source->outputs));
 
     if (!u->chunk.memblock) {
-        u->chunk.memblock = pa_memblock_new(1024, u->core->memblock_stat);
+        u->chunk.memblock = pa_memblock_new(u->core->mempool, PIPE_BUF);
         u->chunk.index = chunk.length = 0;
     }
 
index 5ceddce084987d384fbaa84607e0ced4086a7452..89c3c6093250daea132a8ac6c223e6058426f9e0 100644 (file)
@@ -139,7 +139,7 @@ int pa__init(pa_core *c, pa_module*m) {
         goto fail;
     }
     
-    u->memblock = pa_memblock_new(pa_bytes_per_second(&ss), c->memblock_stat);
+    u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss));
     calc_sine(u->memblock->data, u->memblock->length, frequency);
 
     snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency);
index 9bb11c094debe8de289235039ff004d7dfcd227a..53bffd3b1cc052f7644be398a879b0862032f0c1 100644 (file)
@@ -651,7 +651,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
         return;
     }
 
-    u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->memblock_stat);
+    u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
     u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
 
     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
index df6f8c118d50b2a0b1ba390e57a0edc1778eb313..5d3f3e279852d1a5b00c14746d522e41fba41bf5 100644 (file)
@@ -150,7 +150,7 @@ static void rtp_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event
     assert(fd == s->rtp_context.fd);
     assert(flags == PA_IO_EVENT_INPUT);
 
-    if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->memblock_stat) < 0)
+    if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->mempool) < 0)
         return;
 
     if (s->sdp_info.payload != s->rtp_context.payload) {
@@ -312,10 +312,10 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
     s->sink_input->kill = sink_input_kill;
     s->sink_input->get_latency = sink_input_get_latency;
 
-    silence = pa_silence_memblock_new(&s->sink_input->sample_spec,
+    silence = pa_silence_memblock_new(s->userdata->core->mempool,
+                                      &s->sink_input->sample_spec,
                                       (pa_bytes_per_second(&s->sink_input->sample_spec)/128/pa_frame_size(&s->sink_input->sample_spec))*
-                                      pa_frame_size(&s->sink_input->sample_spec),
-                                      s->userdata->core->memblock_stat);
+                                      pa_frame_size(&s->sink_input->sample_spec));
     
     s->memblockq = pa_memblockq_new(
             0,
@@ -324,8 +324,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
             pa_frame_size(&s->sink_input->sample_spec),
             pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
             0,
-            silence,
-            u->core->memblock_stat);
+            silence);
 
     pa_memblock_unref(silence);
 
index 759aa8197a268016b4c4d6ee3c214b57460aa00b..1b85c840b8af4b3441ac53f3e0201bcbe6020823 100644 (file)
@@ -297,8 +297,7 @@ int pa__init(pa_core *c, pa_module*m) {
             pa_frame_size(&ss),
             1,
             0,
-            NULL,
-            c->memblock_stat);
+            NULL);
 
     u->mtu = mtu;
     
index ee037d421f6465ef5601e8b92d0f82c4d5bf5b25..8e77c60a6b3c40154dab28d1e26522d1fe7cb8d9 100644 (file)
@@ -149,7 +149,7 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame
     return c;
 }
 
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     int size;
     struct msghdr m;
     struct iovec iov;
@@ -170,7 +170,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
     if (!size)
         return 0;
 
-    chunk->memblock = pa_memblock_new(size, st);
+    chunk->memblock = pa_memblock_new(pool, size);
 
     iov.iov_base = chunk->memblock->data;
     iov.iov_len = size;
index 35fbbd357c4f835966c56f6f6db988860550bec2..123602b230395961aecc3b9a8aa5ede06398c19e 100644 (file)
@@ -41,7 +41,7 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr
 int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q);
 
 pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size);
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool);
 
 void pa_rtp_context_destroy(pa_rtp_context *c);
 
index 34f517f0b7554ff9a6bb9a3eaa16e54ba4a381f1..b353054249e7cbc454e36ada2cbd241074393f71 100644 (file)
@@ -128,7 +128,7 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) {
     c->subscribe_callback = NULL;
     c->subscribe_userdata = NULL;
 
-    c->memblock_stat = pa_memblock_stat_new();
+    c->mempool = pa_mempool_new(1);
     c->local = -1;
     c->server_list = NULL;
     c->server = NULL;
@@ -177,7 +177,7 @@ static void context_free(pa_context *c) {
     if (c->playback_streams)
         pa_dynarray_free(c->playback_streams, NULL, NULL);
 
-    pa_memblock_stat_unref(c->memblock_stat);
+    pa_mempool_free(c->mempool);
 
     if (c->conf)
         pa_client_conf_free(c->conf);
@@ -407,7 +407,9 @@ static void setup_context(pa_context *c, pa_iochannel *io) {
     pa_context_ref(c);
     
     assert(!c->pstream);
-    c->pstream = pa_pstream_new(c->mainloop, io, c->memblock_stat);
+    c->pstream = pa_pstream_new(c->mainloop, io, c->mempool);
+
+    pa_pstream_use_shm(c->pstream, 1);
     
     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
index 96028d832c88c9453132d87be9766f0702031c33..afcfaeff5a0ab9cf33d6959ba227642e2ca15218 100644 (file)
@@ -69,7 +69,7 @@ struct pa_context {
     pa_context_subscribe_cb_t subscribe_callback;
     void *subscribe_userdata;
 
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
     int local;
     int do_autospawn;
index 677df009d0a39e0070e8cc8aec2a48966d922741..180cd096d5a05aee97406d02ac051dee11d7d249 100644 (file)
@@ -437,8 +437,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
                 pa_frame_size(&s->sample_spec),
                 1,
                 0,
-                NULL,
-                s->context->memblock_stat);
+                NULL);
     }
 
     s->channel_valid = 1;
@@ -604,9 +603,9 @@ int pa_stream_write(
         return 0;
 
     if (free_cb) 
-        chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
+        chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1);
     else {
-        chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
+        chunk.memblock = pa_memblock_new(s->context->mempool, length);
         memcpy(chunk.memblock->data, data, length);
     }
         
index f74258d3c70d4c3331b692b5718962bdd5cf64de..811b96d2aa10c5bd084416bff9577567427a6e94 100644 (file)
@@ -100,6 +100,7 @@ static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int
 static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 
 /* A method table for all available commands */
 
@@ -144,6 +145,7 @@ static const struct command commands[] = {
     { "list-props",              pa_cli_command_list_props,         NULL, 1},
     { "move-sink-input",         pa_cli_command_move_sink_input,    "Move sink input to another sink (args: index, sink)", 3},
     { "move-source-output",      pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3},
+    { "vacuum",                  pa_cli_command_vacuum,             NULL, 1},
     { NULL, NULL, NULL, 0 }
 };
 
@@ -239,23 +241,32 @@ static int pa_cli_command_source_outputs(pa_core *c, pa_tokenizer *t, pa_strbuf
 
 static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) {
     char s[256];
+    const pa_mempool_stat *stat;
     assert(c && t);
 
-    pa_bytes_snprint(s, sizeof(s), c->memblock_stat->total_size);
+    stat = pa_mempool_get_stat(c->mempool);
+    
     pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n",
-                     c->memblock_stat->total,
-                     s);
+                     stat->n_allocated,
+                     pa_bytes_snprint(s, sizeof(s), stat->allocated_size));
 
-    pa_bytes_snprint(s, sizeof(s), c->memblock_stat->allocated_size);
     pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n",
-                     c->memblock_stat->allocated,
-                     s);
+                     stat->n_accumulated,
+                     pa_bytes_snprint(s, sizeof(s), stat->accumulated_size));
+
+    pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n",
+                     stat->n_imported,
+                     pa_bytes_snprint(s, sizeof(s), stat->imported_size));
 
-    pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c));
-    pa_strbuf_printf(buf, "Total sample cache size: %s.\n", s);
+    pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n",
+                     stat->n_exported,
+                     pa_bytes_snprint(s, sizeof(s), stat->exported_size));
 
-    pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec);
-    pa_strbuf_printf(buf, "Default sample spec: %s\n", s);
+    pa_strbuf_printf(buf, "Total sample cache size: %s.\n",
+                     pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c)));
+
+    pa_strbuf_printf(buf, "Default sample spec: %s\n",
+                     pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec));
 
     pa_strbuf_printf(buf, "Default sink name: %s\n"
                      "Default source name: %s\n",
@@ -731,6 +742,15 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf
     return 0;
 }
 
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+    assert(c);
+    assert(t);
+
+    pa_mempool_vacuum(c->mempool);
+    
+    return 0;
+}
+
 static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
     const char *n, *k;
     pa_sink_input *si;
index 377dd569f4289f329c35123bc9bed17fdf477a68..ca2408fe7abfdaedb16846a53ff0c53816659445 100644 (file)
@@ -176,7 +176,7 @@ int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint3
         filename = buf;
 #endif
 
-    if (pa_sound_file_load(filename, &ss, &map, &chunk, c->memblock_stat) < 0)
+    if (pa_sound_file_load(c->mempool, filename, &ss, &map, &chunk) < 0)
         return -1;
         
     r = pa_scache_add_item(c, name, &ss, &map, &chunk, idx);
@@ -261,7 +261,7 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t
         return -1;
 
     if (e->lazy && !e->memchunk.memblock) {
-        if (pa_sound_file_load(e->filename, &e->sample_spec, &e->channel_map, &e->memchunk, c->memblock_stat) < 0)
+        if (pa_sound_file_load(c->mempool, e->filename, &e->sample_spec, &e->channel_map, &e->memchunk) < 0)
             return -1;
 
         pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE|PA_SUBSCRIPTION_EVENT_CHANGE, e->index);
index 7f2f0f60d19782c074af83e192b7e8349d91a07b..5fdeab569e913a90209e04cbe37ee5a53e4cd3c3 100644 (file)
@@ -44,7 +44,7 @@
 
 #include "core.h"
 
-pa_core* pa_core_new(pa_mainloop_api *m) {
+pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
     pa_core* c;
     
     c = pa_xnew(pa_core, 1);
@@ -78,7 +78,7 @@ pa_core* pa_core_new(pa_mainloop_api *m) {
     PA_LLIST_HEAD_INIT(pa_subscription_event, c->subscription_event_queue);
     c->subscription_event_last = NULL;
 
-    c->memblock_stat = pa_memblock_stat_new();
+    c->mempool = pa_mempool_new(shared);
 
     c->disallow_module_loading = 0;
 
@@ -139,7 +139,7 @@ void pa_core_free(pa_core *c) {
     pa_xfree(c->default_source_name);
     pa_xfree(c->default_sink_name);
 
-    pa_memblock_stat_unref(c->memblock_stat);
+    pa_mempool_free(c->mempool);
 
     pa_property_cleanup(c);
 
index f9fa386e8b520137b92ef402c37087955ca5a394..3a34d297acc8ee8bd55a5026b6f942ae55b24a6e 100644 (file)
@@ -67,7 +67,7 @@ struct pa_core {
     PA_LLIST_HEAD(pa_subscription_event, subscription_event_queue);
     pa_subscription_event *subscription_event_last;
 
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
     int disallow_module_loading, running_as_daemon;
     int exit_idle_time, module_idle_time, scache_idle_time;
@@ -88,7 +88,7 @@ struct pa_core {
         hook_source_disconnect;
 };
 
-pa_core* pa_core_new(pa_mainloop_api *m);
+pa_core* pa_core_new(pa_mainloop_api *m, int shared);
 void pa_core_free(pa_core*c);
 
 /* Check whether noone is connected to this core */
index 8283a7a0b737520bb36b0bcabd4305afc241c345..9ede610d7b2d1f5035dbb680339e3c917c612df9 100644 (file)
 struct pa_mcalign {
     size_t base;
     pa_memchunk leftover, current;
-    pa_memblock_stat *memblock_stat;
 };
 
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
+pa_mcalign *pa_mcalign_new(size_t base) {
     pa_mcalign *m;
     assert(base);
 
@@ -47,7 +46,6 @@ pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
     m->base = base;
     pa_memchunk_reset(&m->leftover);
     pa_memchunk_reset(&m->current);
-    m->memblock_stat = s;
     
     return m;
 }
@@ -100,7 +98,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
                 l = c->length;
 
             /* Can we use the current block? */
-            pa_memchunk_make_writable(&m->leftover, m->memblock_stat, m->base);
+            pa_memchunk_make_writable(&m->leftover, m->base);
 
             memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l);
             m->leftover.length += l;
index 80e3749966e79a69d11a4477234ff24188977879..94e99e21253cddb523be60ecc732d7c5c6c40f98 100644 (file)
@@ -63,7 +63,7 @@
 
 typedef struct pa_mcalign pa_mcalign;
 
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s);
+pa_mcalign *pa_mcalign_new(size_t base);
 void pa_mcalign_free(pa_mcalign *m);
 
 /* Push a new memchunk into the aligner. The caller of this routine
index 36de17fb35aa48dafee50e857fa21339a11f79df..4ce1b7c1a11f99316f4b3fce4ba2cb7a25fe84b3 100644 (file)
 #include <stdlib.h>
 #include <assert.h>
 #include <string.h>
+#include <unistd.h>
 
 #include <pulse/xmalloc.h>
 
+#include <pulsecore/shm.h>
+#include <pulsecore/log.h>
+#include <pulsecore/hashmap.h>
+
 #include "memblock.h"
 
-static void stat_add(pa_memblock*m, pa_memblock_stat *s) {
-    assert(m);
+#define PA_MEMPOOL_SLOTS_MAX 128
+#define PA_MEMPOOL_SLOT_SIZE (16*1024)
 
-    if (!s) {
-        m->stat = NULL;
-        return;
-    }
+#define PA_MEMEXPORT_SLOTS_MAX 128
+
+#define PA_MEMIMPORT_SLOTS_MAX 128
+#define PA_MEMIMPORT_SEGMENTS_MAX 16
+
+struct pa_memimport_segment {
+    pa_memimport *import;
+    pa_shm memory;
+    unsigned n_blocks;
+};
+
+struct pa_memimport {
+    pa_mempool *pool;
+    pa_hashmap *segments;
+    pa_hashmap *blocks;
+
+    /* Called whenever an imported memory block is no longer
+     * needed. */
+    pa_memimport_release_cb_t release_cb;
+    void *userdata;
+
+    PA_LLIST_FIELDS(pa_memimport);
+};
+
+struct memexport_slot {
+    PA_LLIST_FIELDS(struct memexport_slot);
+    pa_memblock *block;
+};
+
+struct pa_memexport {
+    pa_mempool *pool;
+    
+    struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
+    PA_LLIST_HEAD(struct memexport_slot, free_slots);
+    PA_LLIST_HEAD(struct memexport_slot, used_slots);
+    unsigned n_init;
+
+    /* Called whenever a client from which we imported a memory block
+       which we in turn exported to another client dies and we need to
+       revoke the memory block accordingly */
+    pa_memexport_revoke_cb_t revoke_cb;
+    void *userdata;
+
+    PA_LLIST_FIELDS(pa_memexport);
+};
+
+struct mempool_slot {
+    PA_LLIST_FIELDS(struct mempool_slot);
+    /* the actual data follows immediately hereafter */
+};
 
-    m->stat = pa_memblock_stat_ref(s);
-    s->total++;
-    s->allocated++;
-    s->total_size += m->length;
-    s->allocated_size += m->length;
+struct pa_mempool {
+    pa_shm memory;
+    size_t block_size;
+    unsigned n_blocks, n_init;
+
+    PA_LLIST_HEAD(pa_memimport, imports);
+    PA_LLIST_HEAD(pa_memexport, exports);
+
+    /* A list of free slots that may be reused */
+    PA_LLIST_HEAD(struct mempool_slot, free_slots);
+    PA_LLIST_HEAD(struct mempool_slot, used_slots);
+    
+    pa_mempool_stat stat;
+};
+
+static void segment_detach(pa_memimport_segment *seg);
+
+static void stat_add(pa_memblock*b) {
+    assert(b);
+    assert(b->pool);
+
+    b->pool->stat.n_allocated ++;
+    b->pool->stat.n_accumulated ++;
+    b->pool->stat.allocated_size += b->length;
+    b->pool->stat.accumulated_size += b->length;
+
+    if (b->type == PA_MEMBLOCK_IMPORTED) {
+        b->pool->stat.n_imported++;
+        b->pool->stat.imported_size += b->length;
+    }
 }
 
-static void stat_remove(pa_memblock *m) {
-    assert(m);
+static void stat_remove(pa_memblock *b) {
+    assert(b);
+    assert(b->pool);
 
-    if (!m->stat)
-        return;
+    assert(b->pool->stat.n_allocated > 0);
+    assert(b->pool->stat.allocated_size >= b->length);
+           
+    b->pool->stat.n_allocated --;
+    b->pool->stat.allocated_size -= b->length;
+
+    if (b->type == PA_MEMBLOCK_IMPORTED) {
+        assert(b->pool->stat.n_imported > 0);
+        assert(b->pool->stat.imported_size >= b->length);
+        
+        b->pool->stat.n_imported --;
+        b->pool->stat.imported_size -= b->length;
+    }
+}
 
-    m->stat->total--;
-    m->stat->total_size -= m->length;
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
+
+pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
+    pa_memblock *b;
     
-    pa_memblock_stat_unref(m->stat);
-    m->stat = NULL;
+    assert(p);
+    assert(length > 0);
+    
+    if (!(b = pa_memblock_new_pool(p, length)))
+        b = memblock_new_appended(p, length);
+
+    return b;
 }
 
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s) {
-    pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)+length);
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
+    pa_memblock *b;
+
+    assert(p);
+    assert(length > 0);
+
+    b = pa_xmalloc(sizeof(pa_memblock) + length);
     b->type = PA_MEMBLOCK_APPENDED;
+    b->read_only = 0;
     b->ref = 1;
     b->length = length;
-    b->data = b+1;
-    b->free_cb = NULL;
-    b->read_only = 0;
-    stat_add(b, s);
+    b->data = (uint8_t*) b + sizeof(pa_memblock);
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
-pa_memblock *pa_memblock_new_dynamic(void *d, size_t length, pa_memblock_stat*s) {
-    pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
-    b->type = PA_MEMBLOCK_DYNAMIC;
-    b->ref = 1;
+static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
+    struct mempool_slot *slot;
+    assert(p);
+
+    if (p->free_slots) {
+        slot = p->free_slots;
+        PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot);
+    } else if (p->n_init < p->n_blocks)
+        slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++));
+    else {
+        pa_log_debug(__FILE__": Pool full");
+        p->stat.n_pool_full++;
+        return NULL;
+    }
+
+    PA_LLIST_PREPEND(struct mempool_slot, p->used_slots, slot);
+    return slot;
+}
+
+static void* mempool_slot_data(struct mempool_slot *slot) {
+    assert(slot);
+
+    return (uint8_t*) slot + sizeof(struct mempool_slot);
+}
+
+static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
+    assert(p);
+    assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
+    assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
+
+    return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
+}
+
+static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
+    unsigned idx;
+
+    if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
+        return NULL;
+
+    return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
+}
+
+pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
+    pa_memblock *b = NULL;
+    struct mempool_slot *slot;
+
+    assert(p);
+    assert(length > 0);
+
+    if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
+
+        if (!(slot = mempool_allocate_slot(p)))
+            return NULL;
+        
+        b = mempool_slot_data(slot);
+        b->type = PA_MEMBLOCK_POOL;
+        b->data = (uint8_t*) b + sizeof(pa_memblock);
+        
+    } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
+
+        if (!(slot = mempool_allocate_slot(p)))
+            return NULL;
+        
+        b = pa_xnew(pa_memblock, 1);
+        b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+        b->data = mempool_slot_data(slot);
+    } else {
+        pa_log_debug(__FILE__": Memory block to large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
+        p->stat.n_too_large_for_pool++;
+        return NULL;
+    }
+
     b->length = length;
-    b->data = d;
-    b->free_cb = NULL;
     b->read_only = 0;
-    stat_add(b, s);
+    b->ref = 1;
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
-pa_memblock *pa_memblock_new_fixed(void *d, size_t length, int read_only, pa_memblock_stat*s) {
-    pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
+pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
+    pa_memblock *b;
+
+    assert(p);
+    assert(d);
+    assert(length > 0);
+
+    b = pa_xnew(pa_memblock, 1);
     b->type = PA_MEMBLOCK_FIXED;
+    b->read_only = read_only;
     b->ref = 1;
     b->length = length;
     b->data = d;
-    b->free_cb = NULL;
-    b->read_only = read_only;
-    stat_add(b, s);
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
-pa_memblock *pa_memblock_new_user(void *d, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s) {
+pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
     pa_memblock *b;
-    assert(d && length && free_cb);
-    b = pa_xmalloc(sizeof(pa_memblock));
+
+    assert(p);
+    assert(d);
+    assert(length > 0);
+    assert(free_cb);
+    
+    b = pa_xnew(pa_memblock, 1);
     b->type = PA_MEMBLOCK_USER;
+    b->read_only = read_only;
     b->ref = 1;
     b->length = length;
     b->data = d;
-    b->free_cb = free_cb;
-    b->read_only = read_only;
-    stat_add(b, s);
+    b->per_type.user.free_cb = free_cb;
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
@@ -122,52 +307,458 @@ void pa_memblock_unref(pa_memblock*b) {
     assert(b);
     assert(b->ref >= 1);
 
-    if ((--(b->ref)) == 0) {
-        stat_remove(b);
+    if ((--(b->ref)) > 0)
+        return;
+    
+    stat_remove(b);
+
+    switch (b->type) {
+        case PA_MEMBLOCK_USER :
+            assert(b->per_type.user.free_cb);
+            b->per_type.user.free_cb(b->data);
+
+            /* Fall through */
+
+        case PA_MEMBLOCK_FIXED:
+        case PA_MEMBLOCK_APPENDED :
+            pa_xfree(b);
+            break;
+
+        case PA_MEMBLOCK_IMPORTED : {
+            pa_memimport_segment *segment;
+
+            segment = b->per_type.imported.segment;
+            assert(segment);
+            assert(segment->import);
+            
+            pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
+            segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata);
+
+            if (-- segment->n_blocks <= 0)
+                segment_detach(segment);
+            
+            pa_xfree(b);
+            break;
+        }
 
-        if (b->type == PA_MEMBLOCK_USER) {
-            assert(b->free_cb);
-            b->free_cb(b->data);
-        } else if (b->type == PA_MEMBLOCK_DYNAMIC)
-            pa_xfree(b->data);
+        case PA_MEMBLOCK_POOL_EXTERNAL:
+        case PA_MEMBLOCK_POOL: {
+            struct mempool_slot *slot;
 
-        pa_xfree(b);
+            slot = mempool_slot_by_ptr(b->pool, b->data);
+            assert(slot);
+            
+            PA_LLIST_REMOVE(struct mempool_slot, b->pool->used_slots, slot);
+            PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot);
+            
+            if (b->type == PA_MEMBLOCK_POOL_EXTERNAL)
+                pa_xfree(b);
+        }
     }
 }
 
+static void memblock_make_local(pa_memblock *b) {
+    assert(b);
+
+    if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
+        struct mempool_slot *slot;
+
+        if ((slot = mempool_allocate_slot(b->pool))) {
+            void *new_data;
+            /* We can move it into a local pool, perfect! */
+            
+            b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+            b->read_only = 0;
+
+            new_data = mempool_slot_data(slot);
+            memcpy(new_data, b->data, b->length);
+            b->data = new_data;
+            return;
+        }
+    }
+
+    /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
+    b->type = PA_MEMBLOCK_USER;
+    b->per_type.user.free_cb = pa_xfree;
+    b->read_only = 0;
+    b->data = pa_xmemdup(b->data, b->length);
+}
+
 void pa_memblock_unref_fixed(pa_memblock *b) {
-    assert(b && b->ref >= 1 && b->type == PA_MEMBLOCK_FIXED);
+    assert(b);
+    assert(b->ref >= 1);
+    assert(b->type == PA_MEMBLOCK_FIXED);
 
-    if (b->ref == 1)
-        pa_memblock_unref(b);
-    else {
-        b->data = pa_xmemdup(b->data, b->length);
-        b->type = PA_MEMBLOCK_DYNAMIC;
-        b->ref--;
+    if (b->ref > 1)
+        memblock_make_local(b);
+
+    pa_memblock_unref(b);
+}
+
+static void memblock_replace_import(pa_memblock *b) {
+    pa_memimport_segment *seg;
+    
+    assert(b);
+    assert(b->type == PA_MEMBLOCK_IMPORTED);
+
+    assert(b->pool->stat.n_imported > 0);
+    assert(b->pool->stat.imported_size >= b->length);
+    b->pool->stat.n_imported --;
+    b->pool->stat.imported_size -= b->length;
+
+    seg = b->per_type.imported.segment;
+    assert(seg);
+    assert(seg->import);
+
+    pa_hashmap_remove(
+            seg->import->blocks,
+            PA_UINT32_TO_PTR(b->per_type.imported.id));
+
+    memblock_make_local(b);
+
+    if (-- seg->n_blocks <= 0)
+        segment_detach(seg);
+}
+
+pa_mempool* pa_mempool_new(int shared) {
+    size_t ps;
+    pa_mempool *p;
+
+    p = pa_xnew(pa_mempool, 1);
+
+    ps = (size_t) sysconf(_SC_PAGESIZE);
+    
+    p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
+
+    if (p->block_size < ps)
+        p->block_size = ps;
+    
+    p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
+
+    assert(p->block_size > sizeof(struct mempool_slot));
+
+    if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
+        pa_xfree(p);
+        return NULL;
+    }
+
+    p->n_init = 0;
+    
+    PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
+    PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
+    PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots);
+    PA_LLIST_HEAD_INIT(struct mempool_slot, p->used_slots);
+
+    memset(&p->stat, 0, sizeof(p->stat));
+
+    return p;
+}
+
+void pa_mempool_free(pa_mempool *p) {
+    assert(p);
+
+    while (p->imports)
+        pa_memimport_free(p->imports);
+
+    while (p->exports)
+        pa_memexport_free(p->exports);
+
+    if (p->stat.n_allocated > 0)
+        pa_log_warn(__FILE__": WARNING! Memory pool destroyed but not all memory blocks freed!");
+    
+    pa_shm_free(&p->memory);
+    pa_xfree(p);
+}
+
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
+    assert(p);
+
+    return &p->stat;
+}
+
+void pa_mempool_vacuum(pa_mempool *p) {
+    struct mempool_slot *slot;
+    
+    assert(p);
+
+    for (slot = p->free_slots; slot; slot = slot->next) {
+        pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot));
     }
 }
 
-pa_memblock_stat* pa_memblock_stat_new(void) {
-    pa_memblock_stat *s;
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
+    assert(p);
 
-    s = pa_xmalloc(sizeof(pa_memblock_stat));
-    s->ref = 1;
-    s->total = s->total_size = s->allocated = s->allocated_size = 0;
+    if (!p->memory.shared)
+        return -1;
 
-    return s;
+    *id = p->memory.id;
+    
+    return 0;
+}
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
+    pa_memimport *i;
+
+    assert(p);
+    assert(cb);
+    
+    i = pa_xnew(pa_memimport, 1);
+    i->pool = p;
+    i->segments = pa_hashmap_new(NULL, NULL);
+    i->blocks = pa_hashmap_new(NULL, NULL);
+    i->release_cb = cb;
+    i->userdata = userdata;
+    
+    PA_LLIST_PREPEND(pa_memimport, p->imports, i);
+    return i;
 }
 
-void pa_memblock_stat_unref(pa_memblock_stat *s) {
-    assert(s && s->ref >= 1);
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
+
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+    pa_memimport_segment* seg;
 
-    if (!(--(s->ref))) {
-        assert(!s->total);
-        pa_xfree(s);
+    if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
+        return NULL;
+
+    seg = pa_xnew(pa_memimport_segment, 1);
+    
+    if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
+        pa_xfree(seg);
+        return NULL;
     }
+
+    seg->import = i;
+    seg->n_blocks = 0;
+    
+    pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
+    return seg;
+}
+
+static void segment_detach(pa_memimport_segment *seg) {
+    assert(seg);
+
+    pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
+    pa_shm_free(&seg->memory);
+    pa_xfree(seg);
+}
+
+void pa_memimport_free(pa_memimport *i) {
+    pa_memexport *e;
+    pa_memblock *b;
+    
+    assert(i);
+
+    /* If we've exported this block further we need to revoke that export */
+    for (e = i->pool->exports; e; e = e->next)
+        memexport_revoke_blocks(e, i);
+
+    while ((b = pa_hashmap_get_first(i->blocks)))
+        memblock_replace_import(b);
+
+    assert(pa_hashmap_size(i->segments) == 0);
+
+    pa_hashmap_free(i->blocks, NULL, NULL);
+    pa_hashmap_free(i->segments, NULL, NULL);
+    
+    PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
+    pa_xfree(i);
 }
 
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s) {
-    assert(s);
-    s->ref++;
-    return s;
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+    pa_memblock *b;
+    pa_memimport_segment *seg;
+    
+    assert(i);
+
+    if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
+        return NULL;
+
+    if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id)))) 
+        if (!(seg = segment_attach(i, shm_id)))
+            return NULL;
+
+    if (offset+size > seg->memory.size)
+        return NULL;
+    
+    b = pa_xnew(pa_memblock, 1);
+    b->type = PA_MEMBLOCK_IMPORTED;
+    b->read_only = 1;
+    b->ref = 1;
+    b->length = size;
+    b->data = (uint8_t*) seg->memory.ptr + offset;
+    b->pool = i->pool;
+    b->per_type.imported.id = block_id;
+    b->per_type.imported.segment = seg;
+
+    pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
+
+    seg->n_blocks++;
+    
+    stat_add(b);
+    
+    return b;
+}
+
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
+    pa_memblock *b;
+    assert(i);
+
+    if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
+        return -1;
+    
+    memblock_replace_import(b);
+    return 0;
+}
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
+    pa_memexport *e;
+    
+    assert(p);
+    assert(cb);
+
+    if (!p->memory.shared)
+        return NULL;
+    
+    e = pa_xnew(pa_memexport, 1);
+    e->pool = p;
+    PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
+    PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
+    e->n_init = 0;
+    e->revoke_cb = cb;
+    e->userdata = userdata;
+    
+    PA_LLIST_PREPEND(pa_memexport, p->exports, e);
+    return e;
+}
+
+void pa_memexport_free(pa_memexport *e) {
+    assert(e);
+
+    while (e->used_slots)
+        pa_memexport_process_release(e, e->used_slots - e->slots);
+
+    PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
+    pa_xfree(e);
+}
+
+int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
+    assert(e);
+
+    if (id >= e->n_init)
+        return -1;
+
+    if (!e->slots[id].block)
+        return -1;
+
+/*     pa_log("Processing release for %u", id); */
+
+    assert(e->pool->stat.n_exported > 0);
+    assert(e->pool->stat.exported_size >= e->slots[id].block->length);
+    
+    e->pool->stat.n_exported --;
+    e->pool->stat.exported_size -= e->slots[id].block->length;
+    
+    pa_memblock_unref(e->slots[id].block);
+    e->slots[id].block = NULL;
+
+    PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
+    PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
+
+    return 0;
+}
+
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
+    struct memexport_slot *slot, *next;
+    assert(e);
+    assert(i);
+
+    for (slot = e->used_slots; slot; slot = next) {
+        uint32_t idx;
+        next = slot->next;
+        
+        if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
+            slot->block->per_type.imported.segment->import != i)
+            continue;
+
+        idx = slot - e->slots;
+        e->revoke_cb(e, idx, e->userdata);
+        pa_memexport_process_release(e, idx);
+    }
+}
+
+static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
+    pa_memblock *n;
+
+    assert(p);
+    assert(b);
+    
+    if (b->type == PA_MEMBLOCK_IMPORTED ||
+        b->type == PA_MEMBLOCK_POOL ||
+        b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
+        assert(b->pool == p);
+        return pa_memblock_ref(b);
+    }
+
+    if (!(n = pa_memblock_new_pool(p, b->length)))
+        return NULL;
+
+    memcpy(n->data, b->data, b->length);
+    return n;
+}
+
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
+    pa_shm *memory;
+    struct memexport_slot *slot;
+    
+    assert(e);
+    assert(b);
+    assert(block_id);
+    assert(shm_id);
+    assert(offset);
+    assert(size);
+    assert(b->pool == e->pool);
+
+    if (!(b = memblock_shared_copy(e->pool, b)))
+        return -1;
+
+    if (e->free_slots) {
+        slot = e->free_slots;
+        PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
+    } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) {
+        slot = &e->slots[e->n_init++];
+    } else {
+        pa_memblock_unref(b);
+        return -1;
+    }
+
+    PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
+    slot->block = b;
+    *block_id = slot - e->slots;
+
+/*     pa_log("Got block id %u", *block_id); */
+
+    if (b->type == PA_MEMBLOCK_IMPORTED) {
+        assert(b->per_type.imported.segment);
+        memory = &b->per_type.imported.segment->memory;
+    } else {
+        assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
+        assert(b->pool);
+        memory = &b->pool->memory;
+    }
+        
+    assert(b->data >= memory->ptr);
+    assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size);
+    
+    *shm_id = memory->id;
+    *offset = (uint8_t*) b->data - (uint8_t*) memory->ptr;
+    *size = b->length;
+
+    e->pool->stat.n_exported ++;
+    e->pool->stat.exported_size += b->length;
+
+    return 0;
 }
index 04a0b55b15de76abd2ed0677a2a393b803a50114..e63e1e0feef8c83a1a1cf08b53bcf466f880d7d0 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef foomemblockhfoo
-#define foomemblockhfoo
+#ifndef foopulsememblockhfoo
+#define foopulsememblockhfoo
 
 /* $Id$ */
 
@@ -25,6 +25,8 @@
 #include <sys/types.h>
 #include <inttypes.h>
 
+#include <pulsecore/llist.h>
+
 /* A pa_memblock is a reference counted memory block. PulseAudio
  * passed references to pa_memblocks around instead of copying
  * data. See pa_memchunk for a structure that describes parts of
 
 /* The type of memory this block points to */
 typedef enum pa_memblock_type {
-    PA_MEMBLOCK_FIXED,     /* data is a pointer to fixed memory that needs not to be freed */
-    PA_MEMBLOCK_APPENDED,  /* The most common kind: the data is appended to the memory block */ 
-    PA_MEMBLOCK_DYNAMIC,   /* data is a pointer to some memory allocated with pa_xmalloc() */
-    PA_MEMBLOCK_USER       /* User supplied memory, to be freed with free_cb */
+    PA_MEMBLOCK_POOL,             /* Memory is part of the memory pool */
+    PA_MEMBLOCK_POOL_EXTERNAL,    /* Data memory is part of the memory pool but the pa_memblock structure itself not */
+    PA_MEMBLOCK_APPENDED,         /* the data is appended to the memory block */ 
+    PA_MEMBLOCK_USER,             /* User supplied memory, to be freed with free_cb */
+    PA_MEMBLOCK_FIXED,            /* data is a pointer to fixed memory that needs not to be freed */
+    PA_MEMBLOCK_IMPORTED,         /* Memory is imported from another process via shm */
 } pa_memblock_type_t;
 
-/* A structure of keeping memory block statistics */
-/* Maintains statistics about memory blocks */
-typedef struct pa_memblock_stat {
-    int ref;
-    unsigned total;
-    unsigned total_size;
-    unsigned allocated;
-    unsigned allocated_size;
-} pa_memblock_stat;
-
-typedef struct pa_memblock {
+typedef struct pa_memblock pa_memblock;
+typedef struct pa_mempool pa_mempool;
+typedef struct pa_mempool_stat pa_mempool_stat;
+typedef struct pa_memimport_segment pa_memimport_segment;
+typedef struct pa_memimport pa_memimport;
+typedef struct pa_memexport pa_memexport;
+
+typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata);
+typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata);
+
+struct pa_memblock {
     pa_memblock_type_t type;
-    unsigned ref;  /* the reference counter */
     int read_only; /* boolean */
+    unsigned ref;  /* the reference counter */
     size_t length;
     void *data;
-    void (*free_cb)(void *p);  /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
-    pa_memblock_stat *stat;
-} pa_memblock;
+    pa_mempool *pool;
 
-/* Allocate a new memory block of type PA_MEMBLOCK_APPENDED */
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s);
+    union {
+        struct {
+            void (*free_cb)(void *p);  /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
+        } user;
+            
+        struct  {
+            uint32_t id;
+            pa_memimport_segment *segment;
+        } imported;
+    } per_type;
+};
 
-/* Allocate a new memory block of type PA_MEMBLOCK_DYNAMIC. The pointer data is to be maintained be the memory block */
-pa_memblock *pa_memblock_new_dynamic(void *data, size_t length, pa_memblock_stat*s);
+struct pa_mempool_stat {
+    unsigned n_allocated;
+    unsigned n_accumulated;
+    unsigned n_imported;
+    unsigned n_exported;
+    size_t allocated_size;
+    size_t accumulated_size;
+    size_t imported_size;
+    size_t exported_size;
 
-/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
-pa_memblock *pa_memblock_new_fixed(void *data, size_t length, int read_only, pa_memblock_stat*s);
+    unsigned n_too_large_for_pool;
+    unsigned n_pool_full;
+};
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */
+pa_memblock *pa_memblock_new(pa_mempool *, size_t length);
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL. If the requested size is too large, return NULL */
+pa_memblock *pa_memblock_new_pool(pa_mempool *, size_t length);
 
 /* Allocate a new memory block of type PA_MEMBLOCK_USER */
-pa_memblock *pa_memblock_new_user(void *data, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s);
+pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, void (*free_cb)(void *p), int read_only);
+
+/* A special case of pa_memblock_new_user: take a memory buffer previously allocated with pa_xmalloc()  */
+#define pa_memblock_new_malloced(p,data,length) pa_memblock_new_user(p, data, length, pa_xfree, 0)
+
+/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
+pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, int read_only);
 
 void pa_memblock_unref(pa_memblock*b);
 pa_memblock* pa_memblock_ref(pa_memblock*b);
@@ -79,8 +110,23 @@ references to the memory. This causes the memory to be copied and
 converted into a PA_MEMBLOCK_DYNAMIC type memory block */
 void pa_memblock_unref_fixed(pa_memblock*b);
 
-pa_memblock_stat* pa_memblock_stat_new(void);
-void pa_memblock_stat_unref(pa_memblock_stat *s);
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s);
+/* The memory block manager */
+pa_mempool* pa_mempool_new(int shared);
+void pa_mempool_free(pa_mempool *p);
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
+void pa_mempool_vacuum(pa_mempool *p);
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
+void pa_memimport_free(pa_memimport *i);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
+void pa_memexport_free(pa_memexport *e);
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_process_release(pa_memexport *e, uint32_t id);
 
 #endif
index 822bd66cd138db5b81d448754b096ecec76ff589..2fd38850e6ae404c6ab967972097063aed4a009d 100644 (file)
@@ -49,7 +49,6 @@ struct pa_memblockq {
     size_t maxlength, tlength, base, prebuf, minreq;
     int64_t read_index, write_index;
     enum { PREBUF, RUNNING } state;
-    pa_memblock_stat *memblock_stat;
     pa_memblock *silence;
     pa_mcalign *mcalign;
 };
@@ -61,8 +60,7 @@ pa_memblockq* pa_memblockq_new(
         size_t base,
         size_t prebuf,
         size_t minreq,
-        pa_memblock *silence,
-        pa_memblock_stat *s) {
+        pa_memblock *silence) {
     
     pa_memblockq* bq;
     
@@ -75,7 +73,6 @@ pa_memblockq* pa_memblockq_new(
 
     bq->base = base;
     bq->read_index = bq->write_index = idx;
-    bq->memblock_stat = s;
 
     pa_log_debug(__FILE__": memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",
         (unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq);
@@ -586,7 +583,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {
         return pa_memblockq_push(bq, chunk);
        
     if (!bq->mcalign)
-        bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat);
+        bq->mcalign = pa_mcalign_new(bq->base);
 
     if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length)))
         return -1;
index c35b62ddd6dcedc9d2d766584ddde5a57d93801e..4d701a80ef01aea4cba12278c6abb5ff7edf1a4a 100644 (file)
@@ -69,8 +69,7 @@ pa_memblockq* pa_memblockq_new(
         size_t base,
         size_t prebuf, 
         size_t minreq,
-        pa_memblock *silence,
-        pa_memblock_stat *s);
+        pa_memblock *silence);
 
 void pa_memblockq_free(pa_memblockq*bq);
 
index abfc2cab7f84dbc3fff7670a25e8550c52e5a7cb..bcf0ce043b2af49286c66ccbb006d2514af70e0c 100644 (file)
 
 #include "memchunk.h"
 
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min) {
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
     pa_memblock *n;
     size_t l;
-    assert(c && c->memblock && c->memblock->ref >= 1);
+    
+    assert(c);
+    assert(c->memblock);
+    assert(c->memblock->ref >= 1);
 
     if (c->memblock->ref == 1 && !c->memblock->read_only && c->memblock->length >= c->index+min)
         return;
@@ -44,7 +47,7 @@ void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min)
     if (l < min)
         l = min;
     
-    n = pa_memblock_new(l, s);
+    n = pa_memblock_new(c->memblock->pool, l);
     memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length);
     pa_memblock_unref(c->memblock);
     c->memblock = n;
index 1b26c2e641ee86c62acae3c8b3a51449600298d3..b8ce62495034a29726772ce10e5f29f6f71df7f2 100644 (file)
@@ -36,7 +36,7 @@ typedef struct pa_memchunk {
 /* Make a memchunk writable, i.e. make sure that the caller may have
  * exclusive access to the memblock and it is not read_only. If needed
  * the memblock in the structure is replaced by a copy. */
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min);
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min);
 
 /* Invalidate a memchunk. This does not free the cotaining memblock,
  * but sets all members to zero. */
index f1a827bceb2dad6dc8a2a06d4a37c1349a937a70..2fadeca303e58c17d3775559ae4baefacb7b577a 100644 (file)
@@ -377,8 +377,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t
             pa_frame_size(&ss),
             (size_t) -1,
             l/PLAYBACK_BUFFER_FRAGMENTS,
-            NULL,
-            c->protocol->core->memblock_stat);
+            NULL);
     pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2);
     c->playback.fragment_size = l/10;
 
@@ -469,8 +468,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co
             pa_frame_size(&ss),
             1,
             0,
-            NULL,
-            c->protocol->core->memblock_stat);
+            NULL);
     pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2);
     
     c->source_output->push = source_output_push_cb;
@@ -722,7 +720,7 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_
     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
     
     assert(!c->scache.memchunk.memblock);
-    c->scache.memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat);
+    c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length);
     c->scache.memchunk.index = 0;
     c->scache.memchunk.length = sc_length;
     c->scache.sample_spec = ss;
@@ -941,7 +939,7 @@ static int do_read(struct connection *c) {
             }
         
         if (!c->playback.current_memblock) {
-            c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+            c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
             assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
             c->playback.memblock_index = 0;
         }
index 0b79892c58ebb7839c8a568cebe76b73e004d29f..2c9b3566ccbc3af961c2b73850b225b305b22917 100644 (file)
@@ -348,8 +348,7 @@ static struct record_stream* record_stream_new(
             base = pa_frame_size(ss),
             1,
             0,
-            NULL,
-            c->protocol->core->memblock_stat);
+            NULL);
     assert(s->memblockq);
 
     s->fragment_size = (fragment_size/base)*base;
@@ -448,7 +447,7 @@ static struct playback_stream* playback_stream_new(
         start_index = 0;
     }
     
-    silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat);
+    silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
     
     s->memblockq = pa_memblockq_new(
             start_index,
@@ -457,8 +456,7 @@ static struct playback_stream* playback_stream_new(
             pa_frame_size(ss),
             prebuf,
             minreq,
-            silence,
-            c->protocol->core->memblock_stat);
+            silence);
 
     pa_memblock_unref(silence);
     
@@ -1076,6 +1074,7 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
 static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     pa_tagstruct *reply;
+    const pa_mempool_stat *stat;
     assert(c && t);
 
     if (!pa_tagstruct_eof(t)) {
@@ -1085,11 +1084,13 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
 
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 
+    stat = pa_mempool_get_stat(c->protocol->core->mempool);
+    
     reply = reply_new(tag);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
+    pa_tagstruct_putu32(reply, stat->n_allocated);
+    pa_tagstruct_putu32(reply, stat->allocated_size);
+    pa_tagstruct_putu32(reply, stat->n_accumulated);
+    pa_tagstruct_putu32(reply, stat->accumulated_size);
     pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
     pa_pstream_send_tagstruct(c->pstream, reply);
 }
@@ -2256,7 +2257,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
                 pa_memblock_ref(u->memchunk.memblock);
                 u->length = 0;
             } else {
-                u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
+                u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
                 u->memchunk.index = u->memchunk.length = 0;
             }
         }
@@ -2349,9 +2350,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
     c->client->userdata = c;
     c->client->owner = p->module;
     
-    c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
+    c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
     assert(c->pstream);
 
+    pa_pstream_use_shm(c->pstream, 1);
+
     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
index 3705986d591db7bc2de58b44cb3dde7dea6437ed..924ee29ec4b82c73ca455b4290230447fabc65a7 100644 (file)
@@ -128,7 +128,7 @@ static int do_read(struct connection *c) {
         }
 
     if (!c->playback.current_memblock) {
-        c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+        c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
         assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
         c->playback.memblock_index = 0;
     }
@@ -369,8 +369,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
                 pa_frame_size(&p->sample_spec),
                 (size_t) -1,
                 l/PLAYBACK_BUFFER_FRAGMENTS,
-                NULL,
-                p->core->memblock_stat);
+                NULL);
         assert(c->input_memblockq);
         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
         c->playback.fragment_size = l/10;
@@ -406,8 +405,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
                 pa_frame_size(&p->sample_spec),
                 1,
                 0,
-                NULL,
-                p->core->memblock_stat);
+                NULL);
         pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
         pa_source_notify(c->source_output->source);
     }
index 7096d65aff79641d7287469b01816a2ba70c0497..421f5de9caa44a7541a4a3bae04c1094375b33e2 100644 (file)
 
 #include "pstream.h"
 
+/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
+#define PA_FLAG_SHMDATA    0x80000000LU
+#define PA_FLAG_SHMRELEASE 0x40000000LU
+#define PA_FLAG_SHMREVOKE  0xC0000000LU
+#define PA_FLAG_SHMMASK    0xFF000000LU
+#define PA_FLAG_SEEKMASK   0x000000FFLU
+
+/* The sequence descriptor header consists of 5 32bit integers: */
 enum {
     PA_PSTREAM_DESCRIPTOR_LENGTH,
     PA_PSTREAM_DESCRIPTOR_CHANNEL,
     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
-    PA_PSTREAM_DESCRIPTOR_SEEK,
+    PA_PSTREAM_DESCRIPTOR_FLAGS,
     PA_PSTREAM_DESCRIPTOR_MAX
 };
 
+/* If we have an SHM block, this info follows the descriptor */
+enum {
+    PA_PSTREAM_SHM_BLOCKID,
+    PA_PSTREAM_SHM_SHMID,
+    PA_PSTREAM_SHM_INDEX,
+    PA_PSTREAM_SHM_LENGTH,
+    PA_PSTREAM_SHM_MAX
+};
+
 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
 
 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
 
 struct item_info {
-    enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
+    enum {
+        PA_PSTREAM_ITEM_PACKET,
+        PA_PSTREAM_ITEM_MEMBLOCK,
+        PA_PSTREAM_ITEM_SHMRELEASE,
+        PA_PSTREAM_ITEM_SHMREVOKE
+    } type;
 
-    /* memblock info */
-    pa_memchunk chunk;
-    uint32_t channel;
-    int64_t offset;
-    pa_seek_mode_t seek_mode;
 
     /* packet info */
     pa_packet *packet;
@@ -78,6 +95,15 @@ struct item_info {
     int with_creds;
     pa_creds creds;
 #endif
+
+    /* memblock info */
+    pa_memchunk chunk;
+    uint32_t channel;
+    int64_t offset;
+    pa_seek_mode_t seek_mode;
+
+    /* release/revoke info */
+    uint32_t block_id;
 };
 
 struct pa_pstream {
@@ -91,20 +117,26 @@ struct pa_pstream {
     int dead;
 
     struct {
-        struct item_info* current;
         pa_pstream_descriptor descriptor;
+        struct item_info* current;
+        uint32_t shm_info[PA_PSTREAM_SHM_MAX];
         void *data;
         size_t index;
     } write;
 
     struct {
+        pa_pstream_descriptor descriptor;
         pa_memblock *memblock;
         pa_packet *packet;
-        pa_pstream_descriptor descriptor;
+        uint32_t shm_info[PA_PSTREAM_SHM_MAX];
         void *data;
         size_t index;
     } read;
 
+    int use_shm;
+    pa_memimport *import;
+    pa_memexport *export;
+
     pa_pstream_packet_cb_t recieve_packet_callback;
     void *recieve_packet_callback_userdata;
 
@@ -117,7 +149,7 @@ struct pa_pstream {
     pa_pstream_notify_cb_t die_callback;
     void *die_callback_userdata;
 
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
 #ifdef HAVE_CREDS
     pa_creds read_creds, write_creds;
@@ -178,16 +210,19 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata)
     do_something(p);
 }
 
-pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
+
+pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
     pa_pstream *p;
+    
+    assert(m);
     assert(io);
+    assert(pool);
 
     p = pa_xnew(pa_pstream, 1);
-    
     p->ref = 1;
     p->io = io;
     pa_iochannel_set_callback(io, io_callback, p);
-
     p->dead = 0;
 
     p->mainloop = m;
@@ -199,24 +234,24 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta
 
     p->write.current = NULL;
     p->write.index = 0;
-
     p->read.memblock = NULL;
     p->read.packet = NULL;
     p->read.index = 0;
 
     p->recieve_packet_callback = NULL;
     p->recieve_packet_callback_userdata = NULL;
-    
     p->recieve_memblock_callback = NULL;
     p->recieve_memblock_callback_userdata = NULL;
-
     p->drain_callback = NULL;
     p->drain_callback_userdata = NULL;
-
     p->die_callback = NULL;
     p->die_callback_userdata = NULL;
 
-    p->memblock_stat = s;
+    p->mempool = pool;
+
+    p->use_shm = 0;
+    p->export = NULL;
+    p->import = NULL; 
 
     pa_iochannel_socket_set_rcvbuf(io, 1024*8); 
     pa_iochannel_socket_set_sndbuf(io, 1024*8);
@@ -235,8 +270,7 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) {
     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
         assert(i->chunk.memblock);
         pa_memblock_unref(i->chunk.memblock);
-    } else {
-        assert(i->type == PA_PSTREAM_ITEM_PACKET);
+    } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
         assert(i->packet);
         pa_packet_unref(i->packet);
     }
@@ -265,16 +299,18 @@ static void pstream_free(pa_pstream *p) {
 
 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
     struct item_info *i;
-    assert(p && packet && p->ref >= 1);
+
+    assert(p);
+    assert(p->ref >= 1);
+    assert(packet);
 
     if (p->dead)
         return;
     
-/*     pa_log(__FILE__": push-packet %p", packet); */
-    
     i = pa_xnew(struct item_info, 1);
     i->type = PA_PSTREAM_ITEM_PACKET;
     i->packet = pa_packet_ref(packet);
+    
 #ifdef HAVE_CREDS
     if ((i->with_creds = !!creds))
         i->creds = *creds;
@@ -286,13 +322,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
 
 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
     struct item_info *i;
-    assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
+    
+    assert(p);
+    assert(p->ref >= 1);
+    assert(channel != (uint32_t) -1);
+    assert(chunk);
 
     if (p->dead)
         return;
-    
-/*     pa_log(__FILE__": push-memblock %p", chunk); */
-    
+
     i = pa_xnew(struct item_info, 1);
     i->type = PA_PSTREAM_ITEM_MEMBLOCK;
     i->chunk = *chunk;
@@ -309,6 +347,52 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
     p->mainloop->defer_enable(p->defer_event, 1);
 }
 
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
+    struct item_info *item;
+    pa_pstream *p = userdata;
+
+    assert(p);
+    assert(p->ref >= 1);
+    
+    if (p->dead)
+        return;
+
+/*     pa_log(__FILE__": Releasing block %u", block_id); */
+
+    item = pa_xnew(struct item_info, 1);
+    item->type = PA_PSTREAM_ITEM_SHMRELEASE;
+    item->block_id = block_id;
+#ifdef HAVE_CREDS
+    item->with_creds = 0;
+#endif
+
+    pa_queue_push(p->send_queue, item);
+    p->mainloop->defer_enable(p->defer_event, 1);
+}
+
+static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
+    struct item_info *item;
+    pa_pstream *p = userdata;
+
+    assert(p);
+    assert(p->ref >= 1);
+    
+    if (p->dead)
+        return;
+
+/*     pa_log(__FILE__": Revoking block %u", block_id); */
+    
+    item = pa_xnew(struct item_info, 1);
+    item->type = PA_PSTREAM_ITEM_SHMREVOKE;
+    item->block_id = block_id;
+#ifdef HAVE_CREDS
+    item->with_creds = 0;
+#endif
+
+    pa_queue_push(p->send_queue, item);
+    p->mainloop->defer_enable(p->defer_event, 1);
+}
+
 static void prepare_next_write_item(pa_pstream *p) {
     assert(p);
 
@@ -316,27 +400,77 @@ static void prepare_next_write_item(pa_pstream *p) {
         return;
     
     p->write.index = 0;
+    p->write.data = NULL;
+
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
     
     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
-        /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
         
         assert(p->write.current->packet);
         p->write.data = p->write.current->packet->data;
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
 
+    } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
+
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
+
+    } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
+
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
         
     } else {
-        assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
-        p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+        uint32_t flags;
+        int send_payload = 1;
+        
+        assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
+        assert(p->write.current->chunk.memblock);
+        
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
+
+        flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
+
+        if (p->use_shm) {
+            uint32_t block_id, shm_id;
+            size_t offset, length;
+
+            assert(p->export);
+
+            if (pa_memexport_put(p->export,
+                                 p->write.current->chunk.memblock,
+                                 &block_id,
+                                 &shm_id,
+                                 &offset,
+                                 &length) >= 0) {
+                
+                flags |= PA_FLAG_SHMDATA;
+                send_payload = 0;
+                
+                p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+                p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+                p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+                p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+                
+                p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
+                p->write.data = p->write.shm_info;
+            }
+/*             else */
+/*                 pa_log_warn(__FILE__": Failed to export memory block."); */
+        }
+
+        if (send_payload) {
+            p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+            p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
+        }
+        
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
     }
 
 #ifdef HAVE_CREDS
@@ -344,7 +478,6 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write_creds = p->write.current->creds;
     
 #endif
-
 }
 
 static int do_write(pa_pstream *p) {
@@ -359,16 +492,18 @@ static int do_write(pa_pstream *p) {
     if (!p->write.current)
         return 0;
 
-    assert(p->write.data);
-
     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
         d = (uint8_t*) p->write.descriptor + p->write.index;
         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
     } else {
+        assert(p->write.data);
+    
         d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
     }
 
+    assert(l > 0);
+        
 #ifdef HAVE_CREDS
     if (p->send_creds_now) {
 
@@ -384,7 +519,7 @@ static int do_write(pa_pstream *p) {
 
     p->write.index += r;
 
-    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
+    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
         assert(p->write.current);
         item_free(p->write.current, (void *) 1);
         p->write.current = NULL;
@@ -428,27 +563,87 @@ static int do_read(pa_pstream *p) {
     p->read.index += r;
 
     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+        uint32_t flags, length, channel;
         /* Reading of frame descriptor complete */
 
-        /* Frame size too large */
-        if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
-            pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
+        flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+
+        if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
+            pa_log_warn(__FILE__": Recieved SHM frame on a socket where SHM is disabled.");
+            return -1;
+        }
+        
+        if (flags == PA_FLAG_SHMRELEASE) {
+
+            /* This is a SHM memblock release frame with no payload */
+
+/*             pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+            
+            assert(p->export);
+            pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+            goto frame_done;
+            
+        } else if (flags == PA_FLAG_SHMREVOKE) {
+
+            /* This is a SHM memblock revoke frame with no payload */
+
+/*             pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+            assert(p->import);
+            pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+            goto frame_done;
+        }
+
+        length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+        
+        if (length > FRAME_SIZE_MAX) {
+            pa_log_warn(__FILE__": Recieved invalid frame size : %lu", (unsigned long) length);
             return -1;
         }
         
         assert(!p->read.packet && !p->read.memblock);
 
-        if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
+        channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+        
+        if (channel == (uint32_t) -1) {
+
+            if (flags != 0) {
+                pa_log_warn(__FILE__": Received packet frame with invalid flags value.");
+                return -1;
+            }
+            
             /* Frame is a packet frame */
-            p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
+            p->read.packet = pa_packet_new(length);
             p->read.data = p->read.packet->data;
+            
         } else {
-            /* Frame is a memblock frame */
-            p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
-            p->read.data = p->read.memblock->data;
 
-            if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
-                pa_log_warn(__FILE__": Invalid seek mode");
+            if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
+                pa_log_warn(__FILE__": Received memblock frame with invalid seek mode.");
+                return -1;
+            }
+            
+            if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+
+                if (length != sizeof(p->read.shm_info)) {
+                    pa_log_warn(__FILE__": Recieved SHM memblock frame with Invalid frame length.");
+                    return -1;
+                }
+            
+                /* Frame is a memblock frame referencing an SHM memblock */
+                p->read.data = p->read.shm_info;
+
+            } else if ((flags & PA_FLAG_SHMMASK) == 0) {
+
+                /* Frame is a memblock frame */
+                
+                p->read.memblock = pa_memblock_new(p->mempool, length);
+                p->read.data = p->read.memblock->data;
+            } else {
+                
+                pa_log_warn(__FILE__": Recieved memblock frame with invalid flags value.");
                 return -1;
             }
         }
@@ -456,7 +651,9 @@ static int do_read(pa_pstream *p) {
     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
         /* Frame payload available */
         
-        if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
+        if (p->read.memblock && p->recieve_memblock_callback) {
+
+            /* Is this memblock data? Than pass it to the user */
             l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
                 
             if (l > 0) {
@@ -477,13 +674,13 @@ static int do_read(pa_pstream *p) {
                         p,
                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
                         offset,
-                        ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
+                        ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
                         &chunk,
                         p->recieve_memblock_callback_userdata);
                 }
 
                 /* Drop seek info for following callbacks */
-                p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
+                p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
             }
@@ -491,13 +688,13 @@ static int do_read(pa_pstream *p) {
 
         /* Frame complete */
         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+            
             if (p->read.memblock) {
-                assert(!p->read.packet);
-                
+
+                /* This was a memblock frame. We can unref the memblock now */
                 pa_memblock_unref(p->read.memblock);
-                p->read.memblock = NULL;
-            } else {
-                assert(p->read.packet);
+
+            } else if (p->read.packet) {
                 
                 if (p->recieve_packet_callback)
 #ifdef HAVE_CREDS
@@ -507,17 +704,63 @@ static int do_read(pa_pstream *p) {
 #endif
 
                 pa_packet_unref(p->read.packet);
-                p->read.packet = NULL;
+            } else {
+                pa_memblock *b;
+                
+                assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+
+                assert(p->import);
+
+                if (!(b = pa_memimport_get(p->import,
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+
+                    pa_log_warn(__FILE__": Failed to import memory block.");
+                    return -1;
+                }
+
+                if (p->recieve_memblock_callback) {
+                    int64_t offset;
+                    pa_memchunk chunk;
+                    
+                    chunk.memblock = b;
+                    chunk.index = 0;
+                    chunk.length = b->length;
+
+                    offset = (int64_t) (
+                            (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+                            (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+                    
+                    p->recieve_memblock_callback(
+                            p,
+                            ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+                            offset,
+                            ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+                            &chunk,
+                            p->recieve_memblock_callback_userdata);
+                }
+
+                pa_memblock_unref(b);
             }
 
-            p->read.index = 0;
-#ifdef HAVE_CREDS
-            p->read_creds_valid = 0;
-#endif
+            goto frame_done;
         }
     }
 
-    return 0;   
+    return 0;
+
+frame_done:
+    p->read.memblock = NULL;
+    p->read.packet = NULL;
+    p->read.index = 0;
+
+#ifdef HAVE_CREDS
+    p->read_creds_valid = 0;
+#endif
+
+    return 0;
 }
 
 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
@@ -583,6 +826,16 @@ void pa_pstream_close(pa_pstream *p) {
 
     p->dead = 1;
 
+    if (p->import) {
+        pa_memimport_free(p->import);
+        p->import = NULL;
+    }
+
+    if (p->export) {
+        pa_memexport_free(p->export);
+        p->export = NULL;
+    }
+
     if (p->io) {
         pa_iochannel_free(p->io);
         p->io = NULL;
@@ -597,4 +850,19 @@ void pa_pstream_close(pa_pstream *p) {
     p->drain_callback = NULL;
     p->recieve_packet_callback = NULL;
     p->recieve_memblock_callback = NULL;
+
+
+}
+
+void pa_pstream_use_shm(pa_pstream *p, int enable) {
+    assert(p);
+    assert(p->ref >= 1);
+
+    p->use_shm = enable;
+
+    if (!p->import)
+        p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
+
+    if (!p->export)
+        p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
 }
index 789e40bc29e24b142d859ef663f7258e690c1855..26bb7699cd24ac11052c7f9e182617887aa7aed1 100644 (file)
@@ -39,7 +39,7 @@ typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const p
 typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata);
 typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata);
 
-pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s);
+pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *p);
 void pa_pstream_unref(pa_pstream*p);
 pa_pstream* pa_pstream_ref(pa_pstream*p);
 
@@ -54,6 +54,8 @@ void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void
 
 int pa_pstream_is_pending(pa_pstream *p);
 
+void pa_pstream_use_shm(pa_pstream *p, int enable);
+
 void pa_pstream_close(pa_pstream *p);
 
 #endif
index 23cdf38117e9d507a9d7236483b0685a1b40c10c..7422671468c90e043d8b19d0c0b0f4ab3ab0adc1 100644 (file)
@@ -42,7 +42,7 @@ struct pa_resampler {
     pa_sample_spec i_ss, o_ss;
     pa_channel_map i_cm, o_cm;
     size_t i_fz, o_fz;
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
     void (*impl_free)(pa_resampler *r);
     void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate);
@@ -71,15 +71,16 @@ static int libsamplerate_init(pa_resampler*r);
 static int trivial_init(pa_resampler*r);
 
 pa_resampler* pa_resampler_new(
-    const pa_sample_spec *a,
-    const pa_channel_map *am,
-    const pa_sample_spec *b,
-    const pa_channel_map *bm,
-    pa_memblock_stat *s,
-    pa_resample_method_t resample_method) {
+        pa_mempool *pool,
+        const pa_sample_spec *a,
+        const pa_channel_map *am,
+        const pa_sample_spec *b,
+        const pa_channel_map *bm,
+        pa_resample_method_t resample_method) {
     
     pa_resampler *r = NULL;
 
+    assert(pool);
     assert(a);
     assert(b);
     assert(pa_sample_spec_valid(a));
@@ -88,7 +89,7 @@ pa_resampler* pa_resampler_new(
 
     r = pa_xnew(pa_resampler, 1);
     r->impl_data = NULL;
-    r->memblock_stat = s;
+    r->mempool = pool;
     r->resample_method = resample_method;
 
     r->impl_free = NULL;
@@ -450,7 +451,7 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun
             assert(p);
 
             /* Take the existing buffer and make it a memblock */
-            out->memblock = pa_memblock_new_dynamic(*p, out->length, r->memblock_stat);
+            out->memblock = pa_memblock_new_malloced(r->mempool, *p, out->length);
             *p = NULL;
         }
     } else {
@@ -549,7 +550,7 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out
         l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz;
         
         out->index = 0;
-        out->memblock = pa_memblock_new(l, r->memblock_stat);
+        out->memblock = pa_memblock_new(r->mempool, l);
         
         for (o_index = 0;; o_index++, u->o_counter++) {
             unsigned j;
index c1199e5ce3e85c6d8c3be6231b5b98555fdad9cc..327e24a29841b3d5168a16e9a0b89f9848958476 100644 (file)
@@ -43,12 +43,12 @@ typedef enum pa_resample_method {
 } pa_resample_method_t;
 
 pa_resampler* pa_resampler_new(
-    const pa_sample_spec *a,
-    const pa_channel_map *am,
-    const pa_sample_spec *b,
-    const pa_channel_map *bm,
-    pa_memblock_stat *s,
-    pa_resample_method_t resample_method);
+        pa_mempool *pool,
+        const pa_sample_spec *a,
+        const pa_channel_map *am,
+        const pa_sample_spec *b,
+        const pa_channel_map *bm,
+        pa_resample_method_t resample_method);
 
 void pa_resampler_free(pa_resampler *r);
 
index 638f8067090128792ed86b68b4afecddfde16181..7f5d8a0206fba08fba2ab039f2ce88849d856b2c 100644 (file)
 #include "sample-util.h"
 #include "endianmacros.h"
 
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s) {
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length) {
+    assert(pool);
     assert(spec);
 
     if (length == 0)
-        length = pa_bytes_per_second(spec)/10; /* 100 ms */
+        length = pa_bytes_per_second(spec)/20; /* 50 ms */
 
-    return pa_silence_memblock(pa_memblock_new(length, s), spec);
+    return pa_silence_memblock(pa_memblock_new(pool, length), spec);
 }
 
 pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) {
index 3ebb7e2eaa44d9c0725ef99fc55e32e1a4461037..6b7707925b37ee22d1038d73e56b1bc4d9d6351c 100644 (file)
@@ -28,7 +28,7 @@
 #include <pulsecore/memchunk.h>
 
 pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec);
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s);
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length);
 void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec);
 void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec);
 
index b3fabad3adbe1fab34915ac474f54cc74d6122c7..b5ba9df1974af9a45e0b979934b47614813b56d2 100644 (file)
@@ -136,9 +136,9 @@ pa_sink_input* pa_sink_input_new(
         !pa_channel_map_equal(&data->channel_map, &data->sink->channel_map))
         
         if (!(resampler = pa_resampler_new(
+                      core->mempool, 
                       &data->sample_spec, &data->channel_map,
                       &data->sink->sample_spec, &data->sink->channel_map,
-                      core->memblock_stat,
                       data->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return NULL;
@@ -299,7 +299,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
          * while until the old sink has drained its playback buffer */
         
         if (!i->silence_memblock)
-            i->silence_memblock = pa_silence_memblock_new(&i->sink->sample_spec, SILENCE_BUFFER_LENGTH, i->sink->core->memblock_stat);
+            i->silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH);
 
         chunk->memblock = pa_memblock_ref(i->silence_memblock);
         chunk->index = 0;
@@ -338,7 +338,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
 
         /* It might be necessary to adjust the volume here */
         if (do_volume_adj_here && !volume_is_norm) {
-            pa_memchunk_make_writable(&tchunk, i->sink->core->memblock_stat, 0);
+            pa_memchunk_make_writable(&tchunk, 0);
             pa_volume_memchunk(&tchunk, &i->sample_spec, &i->volume);
         }
 
@@ -540,9 +540,9 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
         /* Okey, we need a new resampler for the new sink */
         
         if (!(new_resampler = pa_resampler_new(
+                      dest->core->mempool,
                       &i->sample_spec, &i->channel_map,
                       &dest->sample_spec, &dest->channel_map,
-                      dest->core->memblock_stat,
                       i->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return -1;
@@ -553,7 +553,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
         pa_usec_t old_latency, new_latency;
         pa_usec_t silence_usec = 0;
 
-        buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL, NULL);
+        buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL);
         
         /* Let's do a little bit of Voodoo for compensating latency
          * differences */
@@ -599,7 +599,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
                 chunk.length = n;
 
                 if (!volume_is_norm) {
-                    pa_memchunk_make_writable(&chunk, origin->core->memblock_stat, 0);
+                    pa_memchunk_make_writable(&chunk, 0);
                     pa_volume_memchunk(&chunk, &origin->sample_spec, &volume);
                 }
 
index 557d5efcb395d8bf8523cfd372a15b3a0a1cd96e..aacb89fd5c22c25f752467779c019e13f5855fac 100644 (file)
@@ -298,14 +298,14 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
         pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume);
         
         if (s->sw_muted || !pa_cvolume_is_norm(&volume)) {
-            pa_memchunk_make_writable(result, s->core->memblock_stat, 0);
+            pa_memchunk_make_writable(result, 0);
             if (s->sw_muted)
                 pa_silence_memchunk(result, &s->sample_spec);
             else
                 pa_volume_memchunk(result, &s->sample_spec, &volume);
         }
     } else {
-        result->memblock = pa_memblock_new(length, s->core->memblock_stat);
+        result->memblock = pa_memblock_new(s->core->mempool, length);
         assert(result->memblock);
 
 /*          pa_log("mixing %i", n);  */
@@ -429,7 +429,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result) {
 
     /*** This needs optimization ***/
     
-    result->memblock = pa_memblock_new(result->length = length, s->core->memblock_stat);
+    result->memblock = pa_memblock_new(s->core->mempool, result->length = length);
     result->index = 0;
 
     pa_sink_render_into_full(s, result);
index 6063b93e3115795751fbe9250cf076e0c9881635..6782f50eb93abcaaeaaabd64c0f6164d9012b819 100644 (file)
@@ -75,7 +75,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
         uint32_t fs = pa_frame_size(&i->sample_spec);
         sf_count_t n;
 
-        u->memchunk.memblock = pa_memblock_new(BUF_SIZE, i->sink->core->memblock_stat);
+        u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
         u->memchunk.index = 0;
 
         if (u->readf_function) {
index d11d4b9d9cbe87fc3c2416af861baafc806b1ca6..256cce4347d219e67adeeda1492dfabe9b11674f 100644 (file)
@@ -34,7 +34,7 @@
 #include "sound-file.h"
 #include "core-scache.h"
 
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s) {
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk) {
     SNDFILE*sf = NULL;
     SF_INFO sfinfo;
     int ret = -1;
@@ -92,7 +92,7 @@ int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *ma
         goto finish;
     }
 
-    chunk->memblock = pa_memblock_new(l, s);
+    chunk->memblock = pa_memblock_new(pool, l);
     assert(chunk->memblock);
     chunk->index = 0;
     chunk->length = l;
index 0b81d97e1d5749950fdbacc647ec834a1ab486b4..7e3c82eab8d9687ffbcbb3c592bc9f783acbeaa9 100644 (file)
@@ -26,7 +26,7 @@
 #include <pulse/channelmap.h>
 #include <pulsecore/memchunk.h>
 
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s);
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk);
 
 int pa_sound_file_too_big_to_cache(const char *fname);
 
index 7371474f15b6fa2079a3a6e2a0c29b1f32e97682..f9d66f6d169812113195c63dea835c54e8e44790 100644 (file)
@@ -115,9 +115,9 @@ pa_source_output* pa_source_output_new(
     if (!pa_sample_spec_equal(&data->sample_spec, &data->source->sample_spec) ||
         !pa_channel_map_equal(&data->channel_map, &data->source->channel_map))
         if (!(resampler = pa_resampler_new(
+                      core->mempool,
                       &data->source->sample_spec, &data->source->channel_map,
                       &data->sample_spec, &data->channel_map,
-                      core->memblock_stat,
                       data->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return NULL;
@@ -330,9 +330,9 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
         /* Okey, we need a new resampler for the new sink */
         
         if (!(new_resampler = pa_resampler_new(
+                      dest->core->mempool,
                       &dest->sample_spec, &dest->channel_map,
                       &o->sample_spec, &o->channel_map,
-                      dest->core->memblock_stat,
                       o->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return -1;
index 0d55da448d83a63d61a0d1fbb0d6280ed6ab4982..cb5b10302a7ee2d602b827b4e36032f40044edfb 100644 (file)
@@ -211,7 +211,7 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
         pa_memchunk vchunk = *chunk;
         
         pa_memblock_ref(vchunk.memblock);
-        pa_memchunk_make_writable(&vchunk, s->core->memblock_stat, 0);
+        pa_memchunk_make_writable(&vchunk, 0);
         if (s->sw_muted)
             pa_silence_memchunk(&vchunk, &s->sample_spec);
         else