]> code.delx.au - pulseaudio/blob - src/pulsecore/memblock.c
merge 'lennart' branch back into trunk.
[pulseaudio] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <signal.h>
34 #include <errno.h>
35
36 #include <pulse/xmalloc.h>
37 #include <pulse/def.h>
38
39 #include <pulsecore/shm.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/hashmap.h>
42 #include <pulsecore/semaphore.h>
43 #include <pulsecore/macro.h>
44 #include <pulsecore/flist.h>
45 #include <pulsecore/core-util.h>
46
47 #include "memblock.h"
48
49 #define PA_MEMPOOL_SLOTS_MAX 128
50 #define PA_MEMPOOL_SLOT_SIZE (16*1024)
51
52 #define PA_MEMEXPORT_SLOTS_MAX 128
53
54 #define PA_MEMIMPORT_SLOTS_MAX 128
55 #define PA_MEMIMPORT_SEGMENTS_MAX 16
56
57 struct pa_memblock {
58 PA_REFCNT_DECLARE; /* the reference counter */
59 pa_mempool *pool;
60
61 pa_memblock_type_t type;
62 int read_only; /* boolean */
63
64 pa_atomic_ptr_t data;
65 size_t length;
66
67 pa_atomic_t n_acquired;
68 pa_atomic_t please_signal;
69
70 union {
71 struct {
72 /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
73 pa_free_cb_t free_cb;
74 } user;
75
76 struct {
77 uint32_t id;
78 pa_memimport_segment *segment;
79 } imported;
80 } per_type;
81 };
82
83 struct pa_memimport_segment {
84 pa_memimport *import;
85 pa_shm memory;
86 unsigned n_blocks;
87 };
88
89 struct pa_memimport {
90 pa_mutex *mutex;
91
92 pa_mempool *pool;
93 pa_hashmap *segments;
94 pa_hashmap *blocks;
95
96 /* Called whenever an imported memory block is no longer
97 * needed. */
98 pa_memimport_release_cb_t release_cb;
99 void *userdata;
100
101 PA_LLIST_FIELDS(pa_memimport);
102 };
103
104 struct memexport_slot {
105 PA_LLIST_FIELDS(struct memexport_slot);
106 pa_memblock *block;
107 };
108
109 struct pa_memexport {
110 pa_mutex *mutex;
111 pa_mempool *pool;
112
113 struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
114
115 PA_LLIST_HEAD(struct memexport_slot, free_slots);
116 PA_LLIST_HEAD(struct memexport_slot, used_slots);
117 unsigned n_init;
118
119 /* Called whenever a client from which we imported a memory block
120 which we in turn exported to another client dies and we need to
121 revoke the memory block accordingly */
122 pa_memexport_revoke_cb_t revoke_cb;
123 void *userdata;
124
125 PA_LLIST_FIELDS(pa_memexport);
126 };
127
128 struct mempool_slot {
129 PA_LLIST_FIELDS(struct mempool_slot);
130 /* the actual data follows immediately hereafter */
131 };
132
133 struct pa_mempool {
134 pa_semaphore *semaphore;
135 pa_mutex *mutex;
136
137 pa_shm memory;
138 size_t block_size;
139 unsigned n_blocks;
140
141 pa_atomic_t n_init;
142
143 PA_LLIST_HEAD(pa_memimport, imports);
144 PA_LLIST_HEAD(pa_memexport, exports);
145
146 /* A list of free slots that may be reused */
147 pa_flist *free_slots;
148
149 pa_mempool_stat stat;
150 };
151
152 static void segment_detach(pa_memimport_segment *seg);
153
154 PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);
155
156 /* No lock necessary */
157 static void stat_add(pa_memblock*b) {
158 pa_assert(b);
159 pa_assert(b->pool);
160
161 pa_atomic_inc(&b->pool->stat.n_allocated);
162 pa_atomic_add(&b->pool->stat.allocated_size, b->length);
163
164 pa_atomic_inc(&b->pool->stat.n_accumulated);
165 pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
166
167 if (b->type == PA_MEMBLOCK_IMPORTED) {
168 pa_atomic_inc(&b->pool->stat.n_imported);
169 pa_atomic_add(&b->pool->stat.imported_size, b->length);
170 }
171
172 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
173 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
174 }
175
176 /* No lock necessary */
177 static void stat_remove(pa_memblock *b) {
178 pa_assert(b);
179 pa_assert(b->pool);
180
181 pa_assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
182 pa_assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
183
184 pa_atomic_dec(&b->pool->stat.n_allocated);
185 pa_atomic_sub(&b->pool->stat.allocated_size, b->length);
186
187 if (b->type == PA_MEMBLOCK_IMPORTED) {
188 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
189 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
190
191 pa_atomic_dec(&b->pool->stat.n_imported);
192 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
193 }
194
195 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
196 }
197
198 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
199
200 /* No lock necessary */
201 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
202 pa_memblock *b;
203
204 pa_assert(p);
205 pa_assert(length > 0);
206
207 if (!(b = pa_memblock_new_pool(p, length)))
208 b = memblock_new_appended(p, length);
209
210 return b;
211 }
212
213 /* No lock necessary */
214 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
215 pa_memblock *b;
216
217 pa_assert(p);
218 pa_assert(length > 0);
219
220 /* If -1 is passed as length we choose the size for the caller. */
221
222 if (length == (size_t) -1)
223 length = p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) - PA_ALIGN(sizeof(pa_memblock));
224
225 b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
226 PA_REFCNT_INIT(b);
227 b->pool = p;
228 b->type = PA_MEMBLOCK_APPENDED;
229 b->read_only = 0;
230 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
231 b->length = length;
232 pa_atomic_store(&b->n_acquired, 0);
233 pa_atomic_store(&b->please_signal, 0);
234
235 stat_add(b);
236 return b;
237 }
238
239 /* No lock necessary */
240 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
241 struct mempool_slot *slot;
242 pa_assert(p);
243
244 if (!(slot = pa_flist_pop(p->free_slots))) {
245 int idx;
246
247 /* The free list was empty, we have to allocate a new entry */
248
249 if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
250 pa_atomic_dec(&p->n_init);
251 else
252 slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
253
254 if (!slot) {
255 pa_log_debug("Pool full");
256 pa_atomic_inc(&p->stat.n_pool_full);
257 return NULL;
258 }
259 }
260
261 return slot;
262 }
263
264 /* No lock necessary */
265 static void* mempool_slot_data(struct mempool_slot *slot) {
266 pa_assert(slot);
267
268 return (uint8_t*) slot + PA_ALIGN(sizeof(struct mempool_slot));
269 }
270
271 /* No lock necessary */
272 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
273 pa_assert(p);
274
275 pa_assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
276 pa_assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
277
278 return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
279 }
280
281 /* No lock necessary */
282 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
283 unsigned idx;
284
285 if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
286 return NULL;
287
288 return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
289 }
290
291 /* No lock necessary */
292 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
293 pa_memblock *b = NULL;
294 struct mempool_slot *slot;
295
296 pa_assert(p);
297 pa_assert(length > 0);
298
299 /* If -1 is passed as length we choose the size for the caller: we
300 * take the largest size that fits in one of our slots. */
301
302 if (length == (size_t) -1)
303 length = pa_mempool_block_size_max(p);
304
305 if (p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) >= PA_ALIGN(sizeof(pa_memblock)) + length) {
306
307 if (!(slot = mempool_allocate_slot(p)))
308 return NULL;
309
310 b = mempool_slot_data(slot);
311 b->type = PA_MEMBLOCK_POOL;
312 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
313
314 } else if (p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) >= length) {
315
316 if (!(slot = mempool_allocate_slot(p)))
317 return NULL;
318
319 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
320 b = pa_xnew(pa_memblock, 1);
321
322 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
323 pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
324
325 } else {
326 pa_log_debug("Memory block too large for pool: %lu > %lu", (unsigned long) length, (unsigned long) (p->block_size - PA_ALIGN(sizeof(struct mempool_slot))));
327 pa_atomic_inc(&p->stat.n_too_large_for_pool);
328 return NULL;
329 }
330
331 PA_REFCNT_INIT(b);
332 b->pool = p;
333 b->read_only = 0;
334 b->length = length;
335 pa_atomic_store(&b->n_acquired, 0);
336 pa_atomic_store(&b->please_signal, 0);
337
338 stat_add(b);
339 return b;
340 }
341
342 /* No lock necessary */
343 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
344 pa_memblock *b;
345
346 pa_assert(p);
347 pa_assert(d);
348 pa_assert(length != (size_t) -1);
349 pa_assert(length > 0);
350
351 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
352 b = pa_xnew(pa_memblock, 1);
353 PA_REFCNT_INIT(b);
354 b->pool = p;
355 b->type = PA_MEMBLOCK_FIXED;
356 b->read_only = read_only;
357 pa_atomic_ptr_store(&b->data, d);
358 b->length = length;
359 pa_atomic_store(&b->n_acquired, 0);
360 pa_atomic_store(&b->please_signal, 0);
361
362 stat_add(b);
363 return b;
364 }
365
366 /* No lock necessary */
367 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
368 pa_memblock *b;
369
370 pa_assert(p);
371 pa_assert(d);
372 pa_assert(length > 0);
373 pa_assert(length != (size_t) -1);
374 pa_assert(free_cb);
375
376 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
377 b = pa_xnew(pa_memblock, 1);
378 PA_REFCNT_INIT(b);
379 b->pool = p;
380 b->type = PA_MEMBLOCK_USER;
381 b->read_only = read_only;
382 pa_atomic_ptr_store(&b->data, d);
383 b->length = length;
384 pa_atomic_store(&b->n_acquired, 0);
385 pa_atomic_store(&b->please_signal, 0);
386
387 b->per_type.user.free_cb = free_cb;
388
389 stat_add(b);
390 return b;
391 }
392
393 /* No lock necessary */
394 int pa_memblock_is_read_only(pa_memblock *b) {
395 pa_assert(b);
396 pa_assert(PA_REFCNT_VALUE(b) > 0);
397
398 return b->read_only && PA_REFCNT_VALUE(b) == 1;
399 }
400
401 /* No lock necessary */
402 int pa_memblock_ref_is_one(pa_memblock *b) {
403 int r;
404
405 pa_assert(b);
406
407 r = PA_REFCNT_VALUE(b);
408 pa_assert(r > 0);
409
410 return r == 1;
411 }
412
413 /* No lock necessary */
414 void* pa_memblock_acquire(pa_memblock *b) {
415 pa_assert(b);
416 pa_assert(PA_REFCNT_VALUE(b) > 0);
417
418 pa_atomic_inc(&b->n_acquired);
419
420 return pa_atomic_ptr_load(&b->data);
421 }
422
423 /* No lock necessary, in corner cases locks by its own */
424 void pa_memblock_release(pa_memblock *b) {
425 int r;
426 pa_assert(b);
427 pa_assert(PA_REFCNT_VALUE(b) > 0);
428
429 r = pa_atomic_dec(&b->n_acquired);
430 pa_assert(r >= 1);
431
432 /* Signal a waiting thread that this memblock is no longer used */
433 if (r == 1 && pa_atomic_load(&b->please_signal))
434 pa_semaphore_post(b->pool->semaphore);
435 }
436
437 size_t pa_memblock_get_length(pa_memblock *b) {
438 pa_assert(b);
439 pa_assert(PA_REFCNT_VALUE(b) > 0);
440
441 return b->length;
442 }
443
444 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
445 pa_assert(b);
446 pa_assert(PA_REFCNT_VALUE(b) > 0);
447
448 return b->pool;
449 }
450
451 /* No lock necessary */
452 pa_memblock* pa_memblock_ref(pa_memblock*b) {
453 pa_assert(b);
454 pa_assert(PA_REFCNT_VALUE(b) > 0);
455
456 PA_REFCNT_INC(b);
457 return b;
458 }
459
460 static void memblock_free(pa_memblock *b) {
461 pa_assert(b);
462
463 pa_assert(pa_atomic_load(&b->n_acquired) == 0);
464
465 stat_remove(b);
466
467 switch (b->type) {
468 case PA_MEMBLOCK_USER :
469 pa_assert(b->per_type.user.free_cb);
470 b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
471
472 /* Fall through */
473
474 case PA_MEMBLOCK_FIXED:
475 case PA_MEMBLOCK_APPENDED :
476 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
477 pa_xfree(b);
478
479 break;
480
481 case PA_MEMBLOCK_IMPORTED : {
482 pa_memimport_segment *segment;
483 pa_memimport *import;
484
485 /* FIXME! This should be implemented lock-free */
486
487 segment = b->per_type.imported.segment;
488 pa_assert(segment);
489 import = segment->import;
490 pa_assert(import);
491
492 pa_mutex_lock(import->mutex);
493 pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
494 if (-- segment->n_blocks <= 0)
495 segment_detach(segment);
496
497 pa_mutex_unlock(import->mutex);
498
499 import->release_cb(import, b->per_type.imported.id, import->userdata);
500
501 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
502 pa_xfree(b);
503 break;
504 }
505
506 case PA_MEMBLOCK_POOL_EXTERNAL:
507 case PA_MEMBLOCK_POOL: {
508 struct mempool_slot *slot;
509 int call_free;
510
511 slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
512 pa_assert(slot);
513
514 call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
515
516 /* The free list dimensions should easily allow all slots
517 * to fit in, hence try harder if pushing this slot into
518 * the free list fails */
519 while (pa_flist_push(b->pool->free_slots, slot) < 0)
520 ;
521
522 if (call_free)
523 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
524 pa_xfree(b);
525
526 break;
527 }
528
529 case PA_MEMBLOCK_TYPE_MAX:
530 default:
531 pa_assert_not_reached();
532 }
533 }
534
535 /* No lock necessary */
536 void pa_memblock_unref(pa_memblock*b) {
537 pa_assert(b);
538 pa_assert(PA_REFCNT_VALUE(b) > 0);
539
540 if (PA_REFCNT_DEC(b) > 0)
541 return;
542
543 memblock_free(b);
544 }
545
546 /* Self locked */
547 static void memblock_wait(pa_memblock *b) {
548 pa_assert(b);
549
550 if (pa_atomic_load(&b->n_acquired) > 0) {
551 /* We need to wait until all threads gave up access to the
552 * memory block before we can go on. Unfortunately this means
553 * that we have to lock and wait here. Sniff! */
554
555 pa_atomic_inc(&b->please_signal);
556
557 while (pa_atomic_load(&b->n_acquired) > 0)
558 pa_semaphore_wait(b->pool->semaphore);
559
560 pa_atomic_dec(&b->please_signal);
561 }
562 }
563
564 /* No lock necessary. This function is not multiple caller safe! */
565 static void memblock_make_local(pa_memblock *b) {
566 pa_assert(b);
567
568 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
569
570 if (b->length <= b->pool->block_size - PA_ALIGN(sizeof(struct mempool_slot))) {
571 struct mempool_slot *slot;
572
573 if ((slot = mempool_allocate_slot(b->pool))) {
574 void *new_data;
575 /* We can move it into a local pool, perfect! */
576
577 new_data = mempool_slot_data(slot);
578 memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
579 pa_atomic_ptr_store(&b->data, new_data);
580
581 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
582 b->read_only = 0;
583
584 goto finish;
585 }
586 }
587
588 /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
589 b->per_type.user.free_cb = pa_xfree;
590 pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
591
592 b->type = PA_MEMBLOCK_USER;
593 b->read_only = 0;
594
595 finish:
596 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
597 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
598 memblock_wait(b);
599 }
600
601 /* No lock necessary. This function is not multiple caller safe*/
602 void pa_memblock_unref_fixed(pa_memblock *b) {
603 pa_assert(b);
604 pa_assert(PA_REFCNT_VALUE(b) > 0);
605 pa_assert(b->type == PA_MEMBLOCK_FIXED);
606
607 if (PA_REFCNT_VALUE(b) > 1)
608 memblock_make_local(b);
609
610 pa_memblock_unref(b);
611 }
612
613 /* No lock necessary. */
614 pa_memblock *pa_memblock_will_need(pa_memblock *b) {
615 void *p;
616
617 pa_assert(b);
618 pa_assert(PA_REFCNT_VALUE(b) > 0);
619
620 p = pa_memblock_acquire(b);
621 pa_will_need(p, b->length);
622 pa_memblock_release(b);
623
624 return b;
625 }
626
627 /* Self-locked. This function is not multiple-caller safe */
628 static void memblock_replace_import(pa_memblock *b) {
629 pa_memimport_segment *seg;
630
631 pa_assert(b);
632 pa_assert(b->type == PA_MEMBLOCK_IMPORTED);
633
634 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
635 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
636 pa_atomic_dec(&b->pool->stat.n_imported);
637 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
638
639 seg = b->per_type.imported.segment;
640 pa_assert(seg);
641 pa_assert(seg->import);
642
643 pa_mutex_lock(seg->import->mutex);
644
645 pa_hashmap_remove(
646 seg->import->blocks,
647 PA_UINT32_TO_PTR(b->per_type.imported.id));
648
649 memblock_make_local(b);
650
651 if (-- seg->n_blocks <= 0) {
652 pa_mutex_unlock(seg->import->mutex);
653 segment_detach(seg);
654 } else
655 pa_mutex_unlock(seg->import->mutex);
656 }
657
658 pa_mempool* pa_mempool_new(int shared) {
659 pa_mempool *p;
660
661 p = pa_xnew(pa_mempool, 1);
662
663 p->mutex = pa_mutex_new(TRUE, TRUE);
664 p->semaphore = pa_semaphore_new(0);
665
666 p->block_size = PA_PAGE_ALIGN(PA_MEMPOOL_SLOT_SIZE);
667 if (p->block_size < PA_PAGE_SIZE)
668 p->block_size = PA_PAGE_SIZE;
669
670 p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
671
672 pa_assert(p->block_size > PA_ALIGN(sizeof(struct mempool_slot)));
673
674 if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
675 pa_xfree(p);
676 return NULL;
677 }
678
679 memset(&p->stat, 0, sizeof(p->stat));
680 pa_atomic_store(&p->n_init, 0);
681
682 PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
683 PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
684
685 p->free_slots = pa_flist_new(p->n_blocks*2);
686
687 return p;
688 }
689
690 void pa_mempool_free(pa_mempool *p) {
691 pa_assert(p);
692
693 pa_mutex_lock(p->mutex);
694
695 while (p->imports)
696 pa_memimport_free(p->imports);
697
698 while (p->exports)
699 pa_memexport_free(p->exports);
700
701 pa_mutex_unlock(p->mutex);
702
703 pa_flist_free(p->free_slots, NULL);
704
705 if (pa_atomic_load(&p->stat.n_allocated) > 0) {
706 /* raise(SIGTRAP); */
707 pa_log_warn("Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));
708 }
709
710 pa_shm_free(&p->memory);
711
712 pa_mutex_free(p->mutex);
713 pa_semaphore_free(p->semaphore);
714
715 pa_xfree(p);
716 }
717
718 /* No lock necessary */
719 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
720 pa_assert(p);
721
722 return &p->stat;
723 }
724
725 /* No lock necessary */
726 size_t pa_mempool_block_size_max(pa_mempool *p) {
727 pa_assert(p);
728
729 return p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) - PA_ALIGN(sizeof(pa_memblock));
730 }
731
732 /* No lock necessary */
733 void pa_mempool_vacuum(pa_mempool *p) {
734 struct mempool_slot *slot;
735 pa_flist *list;
736
737 pa_assert(p);
738
739 list = pa_flist_new(p->n_blocks*2);
740
741 while ((slot = pa_flist_pop(p->free_slots)))
742 while (pa_flist_push(list, slot) < 0)
743 ;
744
745 while ((slot = pa_flist_pop(list))) {
746 pa_shm_punch(&p->memory,
747 (uint8_t*) slot - (uint8_t*) p->memory.ptr + PA_ALIGN(sizeof(struct mempool_slot)),
748 p->block_size - PA_ALIGN(sizeof(struct mempool_slot)));
749
750 while (pa_flist_push(p->free_slots, slot))
751 ;
752 }
753
754 pa_flist_free(list, NULL);
755 }
756
757 /* No lock necessary */
758 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
759 pa_assert(p);
760
761 if (!p->memory.shared)
762 return -1;
763
764 *id = p->memory.id;
765
766 return 0;
767 }
768
769 /* No lock necessary */
770 int pa_mempool_is_shared(pa_mempool *p) {
771 pa_assert(p);
772
773 return !!p->memory.shared;
774 }
775
776 /* For recieving blocks from other nodes */
777 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
778 pa_memimport *i;
779
780 pa_assert(p);
781 pa_assert(cb);
782
783 i = pa_xnew(pa_memimport, 1);
784 i->mutex = pa_mutex_new(TRUE, TRUE);
785 i->pool = p;
786 i->segments = pa_hashmap_new(NULL, NULL);
787 i->blocks = pa_hashmap_new(NULL, NULL);
788 i->release_cb = cb;
789 i->userdata = userdata;
790
791 pa_mutex_lock(p->mutex);
792 PA_LLIST_PREPEND(pa_memimport, p->imports, i);
793 pa_mutex_unlock(p->mutex);
794
795 return i;
796 }
797
798 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
799
800 /* Should be called locked */
801 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
802 pa_memimport_segment* seg;
803
804 if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
805 return NULL;
806
807 seg = pa_xnew(pa_memimport_segment, 1);
808
809 if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
810 pa_xfree(seg);
811 return NULL;
812 }
813
814 seg->import = i;
815 seg->n_blocks = 0;
816
817 pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
818 return seg;
819 }
820
821 /* Should be called locked */
822 static void segment_detach(pa_memimport_segment *seg) {
823 pa_assert(seg);
824
825 pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
826 pa_shm_free(&seg->memory);
827 pa_xfree(seg);
828 }
829
830 /* Self-locked. Not multiple-caller safe */
831 void pa_memimport_free(pa_memimport *i) {
832 pa_memexport *e;
833 pa_memblock *b;
834
835 pa_assert(i);
836
837 pa_mutex_lock(i->mutex);
838
839 while ((b = pa_hashmap_get_first(i->blocks)))
840 memblock_replace_import(b);
841
842 pa_assert(pa_hashmap_size(i->segments) == 0);
843
844 pa_mutex_unlock(i->mutex);
845
846 pa_mutex_lock(i->pool->mutex);
847
848 /* If we've exported this block further we need to revoke that export */
849 for (e = i->pool->exports; e; e = e->next)
850 memexport_revoke_blocks(e, i);
851
852 PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
853
854 pa_mutex_unlock(i->pool->mutex);
855
856 pa_hashmap_free(i->blocks, NULL, NULL);
857 pa_hashmap_free(i->segments, NULL, NULL);
858
859 pa_mutex_free(i->mutex);
860
861 pa_xfree(i);
862 }
863
864 /* Self-locked */
865 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
866 pa_memblock *b = NULL;
867 pa_memimport_segment *seg;
868
869 pa_assert(i);
870
871 pa_mutex_lock(i->mutex);
872
873 if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
874 goto finish;
875
876 if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
877 if (!(seg = segment_attach(i, shm_id)))
878 goto finish;
879
880 if (offset+size > seg->memory.size)
881 goto finish;
882
883 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
884 b = pa_xnew(pa_memblock, 1);
885
886 PA_REFCNT_INIT(b);
887 b->pool = i->pool;
888 b->type = PA_MEMBLOCK_IMPORTED;
889 b->read_only = 1;
890 pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
891 b->length = size;
892 pa_atomic_store(&b->n_acquired, 0);
893 pa_atomic_store(&b->please_signal, 0);
894 b->per_type.imported.id = block_id;
895 b->per_type.imported.segment = seg;
896
897 pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
898
899 seg->n_blocks++;
900
901 finish:
902 pa_mutex_unlock(i->mutex);
903
904 if (b)
905 stat_add(b);
906
907 return b;
908 }
909
910 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
911 pa_memblock *b;
912 int ret = 0;
913 pa_assert(i);
914
915 pa_mutex_lock(i->mutex);
916
917 if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id)))) {
918 ret = -1;
919 goto finish;
920 }
921
922 memblock_replace_import(b);
923
924 finish:
925 pa_mutex_unlock(i->mutex);
926
927 return ret;
928 }
929
930 /* For sending blocks to other nodes */
931 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
932 pa_memexport *e;
933
934 pa_assert(p);
935 pa_assert(cb);
936
937 if (!p->memory.shared)
938 return NULL;
939
940 e = pa_xnew(pa_memexport, 1);
941 e->mutex = pa_mutex_new(TRUE, TRUE);
942 e->pool = p;
943 PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
944 PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
945 e->n_init = 0;
946 e->revoke_cb = cb;
947 e->userdata = userdata;
948
949 pa_mutex_lock(p->mutex);
950 PA_LLIST_PREPEND(pa_memexport, p->exports, e);
951 pa_mutex_unlock(p->mutex);
952 return e;
953 }
954
955 void pa_memexport_free(pa_memexport *e) {
956 pa_assert(e);
957
958 pa_mutex_lock(e->mutex);
959 while (e->used_slots)
960 pa_memexport_process_release(e, e->used_slots - e->slots);
961 pa_mutex_unlock(e->mutex);
962
963 pa_mutex_lock(e->pool->mutex);
964 PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
965 pa_mutex_unlock(e->pool->mutex);
966
967 pa_mutex_free(e->mutex);
968 pa_xfree(e);
969 }
970
971 /* Self-locked */
972 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
973 pa_memblock *b;
974
975 pa_assert(e);
976
977 pa_mutex_lock(e->mutex);
978
979 if (id >= e->n_init)
980 goto fail;
981
982 if (!e->slots[id].block)
983 goto fail;
984
985 b = e->slots[id].block;
986 e->slots[id].block = NULL;
987
988 PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
989 PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
990
991 pa_mutex_unlock(e->mutex);
992
993 /* pa_log("Processing release for %u", id); */
994
995 pa_assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
996 pa_assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
997
998 pa_atomic_dec(&e->pool->stat.n_exported);
999 pa_atomic_sub(&e->pool->stat.exported_size, b->length);
1000
1001 pa_memblock_unref(b);
1002
1003 return 0;
1004
1005 fail:
1006 pa_mutex_unlock(e->mutex);
1007
1008 return -1;
1009 }
1010
1011 /* Self-locked */
1012 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
1013 struct memexport_slot *slot, *next;
1014 pa_assert(e);
1015 pa_assert(i);
1016
1017 pa_mutex_lock(e->mutex);
1018
1019 for (slot = e->used_slots; slot; slot = next) {
1020 uint32_t idx;
1021 next = slot->next;
1022
1023 if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
1024 slot->block->per_type.imported.segment->import != i)
1025 continue;
1026
1027 idx = slot - e->slots;
1028 e->revoke_cb(e, idx, e->userdata);
1029 pa_memexport_process_release(e, idx);
1030 }
1031
1032 pa_mutex_unlock(e->mutex);
1033 }
1034
1035 /* No lock necessary */
1036 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
1037 pa_memblock *n;
1038
1039 pa_assert(p);
1040 pa_assert(b);
1041
1042 if (b->type == PA_MEMBLOCK_IMPORTED ||
1043 b->type == PA_MEMBLOCK_POOL ||
1044 b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
1045 pa_assert(b->pool == p);
1046 return pa_memblock_ref(b);
1047 }
1048
1049 if (!(n = pa_memblock_new_pool(p, b->length)))
1050 return NULL;
1051
1052 memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
1053 return n;
1054 }
1055
1056 /* Self-locked */
1057 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
1058 pa_shm *memory;
1059 struct memexport_slot *slot;
1060 void *data;
1061
1062 pa_assert(e);
1063 pa_assert(b);
1064 pa_assert(block_id);
1065 pa_assert(shm_id);
1066 pa_assert(offset);
1067 pa_assert(size);
1068 pa_assert(b->pool == e->pool);
1069
1070 if (!(b = memblock_shared_copy(e->pool, b)))
1071 return -1;
1072
1073 pa_mutex_lock(e->mutex);
1074
1075 if (e->free_slots) {
1076 slot = e->free_slots;
1077 PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1078 } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1079 slot = &e->slots[e->n_init++];
1080 else {
1081 pa_mutex_unlock(e->mutex);
1082 pa_memblock_unref(b);
1083 return -1;
1084 }
1085
1086 PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1087 slot->block = b;
1088 *block_id = slot - e->slots;
1089
1090 pa_mutex_unlock(e->mutex);
1091 /* pa_log("Got block id %u", *block_id); */
1092
1093 data = pa_memblock_acquire(b);
1094
1095 if (b->type == PA_MEMBLOCK_IMPORTED) {
1096 pa_assert(b->per_type.imported.segment);
1097 memory = &b->per_type.imported.segment->memory;
1098 } else {
1099 pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1100 pa_assert(b->pool);
1101 memory = &b->pool->memory;
1102 }
1103
1104 pa_assert(data >= memory->ptr);
1105 pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
1106
1107 *shm_id = memory->id;
1108 *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1109 *size = b->length;
1110
1111 pa_memblock_release(b);
1112
1113 pa_atomic_inc(&e->pool->stat.n_exported);
1114 pa_atomic_add(&e->pool->stat.exported_size, b->length);
1115
1116 return 0;
1117 }