]> code.delx.au - pulseaudio/blob - src/pulsecore/memblock.c
fix an assert when runnig module-oss in record only-mode. optimize allocation of...
[pulseaudio] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <signal.h>
35 #include <errno.h>
36
37 #include <pulse/xmalloc.h>
38 #include <pulse/def.h>
39
40 #include <pulsecore/shm.h>
41 #include <pulsecore/log.h>
42 #include <pulsecore/hashmap.h>
43 #include <pulsecore/semaphore.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/flist.h>
46 #include <pulsecore/core-util.h>
47
48 #include "memblock.h"
49
50 #define PA_MEMPOOL_SLOTS_MAX 128
51 #define PA_MEMPOOL_SLOT_SIZE (16*1024)
52
53 #define PA_MEMEXPORT_SLOTS_MAX 128
54
55 #define PA_MEMIMPORT_SLOTS_MAX 128
56 #define PA_MEMIMPORT_SEGMENTS_MAX 16
57
58 struct pa_memblock {
59 PA_REFCNT_DECLARE; /* the reference counter */
60 pa_mempool *pool;
61
62 pa_memblock_type_t type;
63 int read_only; /* boolean */
64
65 pa_atomic_ptr_t data;
66 size_t length;
67
68 pa_atomic_t n_acquired;
69 pa_atomic_t please_signal;
70
71 union {
72 struct {
73 /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
74 pa_free_cb_t free_cb;
75 } user;
76
77 struct {
78 uint32_t id;
79 pa_memimport_segment *segment;
80 } imported;
81 } per_type;
82 };
83
84 struct pa_memimport_segment {
85 pa_memimport *import;
86 pa_shm memory;
87 unsigned n_blocks;
88 };
89
90 struct pa_memimport {
91 pa_mutex *mutex;
92
93 pa_mempool *pool;
94 pa_hashmap *segments;
95 pa_hashmap *blocks;
96
97 /* Called whenever an imported memory block is no longer
98 * needed. */
99 pa_memimport_release_cb_t release_cb;
100 void *userdata;
101
102 PA_LLIST_FIELDS(pa_memimport);
103 };
104
105 struct memexport_slot {
106 PA_LLIST_FIELDS(struct memexport_slot);
107 pa_memblock *block;
108 };
109
110 struct pa_memexport {
111 pa_mutex *mutex;
112 pa_mempool *pool;
113
114 struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
115
116 PA_LLIST_HEAD(struct memexport_slot, free_slots);
117 PA_LLIST_HEAD(struct memexport_slot, used_slots);
118 unsigned n_init;
119
120 /* Called whenever a client from which we imported a memory block
121 which we in turn exported to another client dies and we need to
122 revoke the memory block accordingly */
123 pa_memexport_revoke_cb_t revoke_cb;
124 void *userdata;
125
126 PA_LLIST_FIELDS(pa_memexport);
127 };
128
129 struct mempool_slot {
130 PA_LLIST_FIELDS(struct mempool_slot);
131 /* the actual data follows immediately hereafter */
132 };
133
134 struct pa_mempool {
135 pa_semaphore *semaphore;
136 pa_mutex *mutex;
137
138 pa_shm memory;
139 size_t block_size;
140 unsigned n_blocks;
141
142 pa_atomic_t n_init;
143
144 PA_LLIST_HEAD(pa_memimport, imports);
145 PA_LLIST_HEAD(pa_memexport, exports);
146
147 /* A list of free slots that may be reused */
148 pa_flist *free_slots;
149
150 pa_mempool_stat stat;
151 };
152
153 static void segment_detach(pa_memimport_segment *seg);
154
155 PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);
156
157 /* No lock necessary */
158 static void stat_add(pa_memblock*b) {
159 pa_assert(b);
160 pa_assert(b->pool);
161
162 pa_atomic_inc(&b->pool->stat.n_allocated);
163 pa_atomic_add(&b->pool->stat.allocated_size, b->length);
164
165 pa_atomic_inc(&b->pool->stat.n_accumulated);
166 pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
167
168 if (b->type == PA_MEMBLOCK_IMPORTED) {
169 pa_atomic_inc(&b->pool->stat.n_imported);
170 pa_atomic_add(&b->pool->stat.imported_size, b->length);
171 }
172
173 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
174 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
175 }
176
177 /* No lock necessary */
178 static void stat_remove(pa_memblock *b) {
179 pa_assert(b);
180 pa_assert(b->pool);
181
182 pa_assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
183 pa_assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
184
185 pa_atomic_dec(&b->pool->stat.n_allocated);
186 pa_atomic_sub(&b->pool->stat.allocated_size, b->length);
187
188 if (b->type == PA_MEMBLOCK_IMPORTED) {
189 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
190 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
191
192 pa_atomic_dec(&b->pool->stat.n_imported);
193 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
194 }
195
196 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
197 }
198
199 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
200
201 /* No lock necessary */
202 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
203 pa_memblock *b;
204
205 pa_assert(p);
206 pa_assert(length > 0);
207
208 if (!(b = pa_memblock_new_pool(p, length)))
209 b = memblock_new_appended(p, length);
210
211 return b;
212 }
213
214 /* No lock necessary */
215 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
216 pa_memblock *b;
217
218 pa_assert(p);
219 pa_assert(length > 0);
220
221 /* If -1 is passed as length we choose the size for the caller. */
222
223 if (length == (size_t) -1)
224 length = p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) - PA_ALIGN(sizeof(pa_memblock));
225
226 b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
227 PA_REFCNT_INIT(b);
228 b->pool = p;
229 b->type = PA_MEMBLOCK_APPENDED;
230 b->read_only = 0;
231 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
232 b->length = length;
233 pa_atomic_store(&b->n_acquired, 0);
234 pa_atomic_store(&b->please_signal, 0);
235
236 stat_add(b);
237 return b;
238 }
239
240 /* No lock necessary */
241 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
242 struct mempool_slot *slot;
243 pa_assert(p);
244
245 if (!(slot = pa_flist_pop(p->free_slots))) {
246 int idx;
247
248 /* The free list was empty, we have to allocate a new entry */
249
250 if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
251 pa_atomic_dec(&p->n_init);
252 else
253 slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
254
255 if (!slot) {
256 pa_log_debug("Pool full");
257 pa_atomic_inc(&p->stat.n_pool_full);
258 return NULL;
259 }
260 }
261
262 return slot;
263 }
264
265 /* No lock necessary */
266 static void* mempool_slot_data(struct mempool_slot *slot) {
267 pa_assert(slot);
268
269 return (uint8_t*) slot + PA_ALIGN(sizeof(struct mempool_slot));
270 }
271
272 /* No lock necessary */
273 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
274 pa_assert(p);
275
276 pa_assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
277 pa_assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
278
279 return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
280 }
281
282 /* No lock necessary */
283 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
284 unsigned idx;
285
286 if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
287 return NULL;
288
289 return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
290 }
291
292 /* No lock necessary */
293 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
294 pa_memblock *b = NULL;
295 struct mempool_slot *slot;
296
297 pa_assert(p);
298 pa_assert(length > 0);
299
300 /* If -1 is passed as length we choose the size for the caller: we
301 * take the largest size that fits in one of our slots. */
302
303 if (length == (size_t) -1)
304 length = p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) - PA_ALIGN(sizeof(pa_memblock));
305
306 if (p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) >= PA_ALIGN(sizeof(pa_memblock)) + length) {
307
308 if (!(slot = mempool_allocate_slot(p)))
309 return NULL;
310
311 b = mempool_slot_data(slot);
312 b->type = PA_MEMBLOCK_POOL;
313 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
314
315 } else if (p->block_size - PA_ALIGN(sizeof(struct mempool_slot)) >= length) {
316
317 if (!(slot = mempool_allocate_slot(p)))
318 return NULL;
319
320 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
321 b = pa_xnew(pa_memblock, 1);
322
323 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
324 pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
325
326 } else {
327 pa_log_debug("Memory block too large for pool: %lu > %lu", (unsigned long) length, (unsigned long) (p->block_size - PA_ALIGN(sizeof(struct mempool_slot))));
328 pa_atomic_inc(&p->stat.n_too_large_for_pool);
329 return NULL;
330 }
331
332 PA_REFCNT_INIT(b);
333 b->pool = p;
334 b->read_only = 0;
335 b->length = length;
336 pa_atomic_store(&b->n_acquired, 0);
337 pa_atomic_store(&b->please_signal, 0);
338
339 stat_add(b);
340 return b;
341 }
342
343 /* No lock necessary */
344 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
345 pa_memblock *b;
346
347 pa_assert(p);
348 pa_assert(d);
349 pa_assert(length != (size_t) -1);
350 pa_assert(length > 0);
351
352 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
353 b = pa_xnew(pa_memblock, 1);
354 PA_REFCNT_INIT(b);
355 b->pool = p;
356 b->type = PA_MEMBLOCK_FIXED;
357 b->read_only = read_only;
358 pa_atomic_ptr_store(&b->data, d);
359 b->length = length;
360 pa_atomic_store(&b->n_acquired, 0);
361 pa_atomic_store(&b->please_signal, 0);
362
363 stat_add(b);
364 return b;
365 }
366
367 /* No lock necessary */
368 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
369 pa_memblock *b;
370
371 pa_assert(p);
372 pa_assert(d);
373 pa_assert(length > 0);
374 pa_assert(length != (size_t) -1);
375 pa_assert(free_cb);
376
377 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
378 b = pa_xnew(pa_memblock, 1);
379 PA_REFCNT_INIT(b);
380 b->pool = p;
381 b->type = PA_MEMBLOCK_USER;
382 b->read_only = read_only;
383 pa_atomic_ptr_store(&b->data, d);
384 b->length = length;
385 pa_atomic_store(&b->n_acquired, 0);
386 pa_atomic_store(&b->please_signal, 0);
387
388 b->per_type.user.free_cb = free_cb;
389
390 stat_add(b);
391 return b;
392 }
393
394 /* No lock necessary */
395 int pa_memblock_is_read_only(pa_memblock *b) {
396 pa_assert(b);
397 pa_assert(PA_REFCNT_VALUE(b) > 0);
398
399 return b->read_only && PA_REFCNT_VALUE(b) == 1;
400 }
401
402 /* No lock necessary */
403 int pa_memblock_ref_is_one(pa_memblock *b) {
404 int r;
405
406 pa_assert(b);
407
408 r = PA_REFCNT_VALUE(b);
409 pa_assert(r > 0);
410
411 return r == 1;
412 }
413
414 /* No lock necessary */
415 void* pa_memblock_acquire(pa_memblock *b) {
416 pa_assert(b);
417 pa_assert(PA_REFCNT_VALUE(b) > 0);
418
419 pa_atomic_inc(&b->n_acquired);
420
421 return pa_atomic_ptr_load(&b->data);
422 }
423
424 /* No lock necessary, in corner cases locks by its own */
425 void pa_memblock_release(pa_memblock *b) {
426 int r;
427 pa_assert(b);
428 pa_assert(PA_REFCNT_VALUE(b) > 0);
429
430 r = pa_atomic_dec(&b->n_acquired);
431 pa_assert(r >= 1);
432
433 /* Signal a waiting thread that this memblock is no longer used */
434 if (r == 1 && pa_atomic_load(&b->please_signal))
435 pa_semaphore_post(b->pool->semaphore);
436 }
437
438 size_t pa_memblock_get_length(pa_memblock *b) {
439 pa_assert(b);
440 pa_assert(PA_REFCNT_VALUE(b) > 0);
441
442 return b->length;
443 }
444
445 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
446 pa_assert(b);
447 pa_assert(PA_REFCNT_VALUE(b) > 0);
448
449 return b->pool;
450 }
451
452 /* No lock necessary */
453 pa_memblock* pa_memblock_ref(pa_memblock*b) {
454 pa_assert(b);
455 pa_assert(PA_REFCNT_VALUE(b) > 0);
456
457 PA_REFCNT_INC(b);
458 return b;
459 }
460
461 static void memblock_free(pa_memblock *b) {
462 pa_assert(b);
463
464 pa_assert(pa_atomic_load(&b->n_acquired) == 0);
465
466 stat_remove(b);
467
468 switch (b->type) {
469 case PA_MEMBLOCK_USER :
470 pa_assert(b->per_type.user.free_cb);
471 b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
472
473 /* Fall through */
474
475 case PA_MEMBLOCK_FIXED:
476 case PA_MEMBLOCK_APPENDED :
477 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
478 pa_xfree(b);
479
480 break;
481
482 case PA_MEMBLOCK_IMPORTED : {
483 pa_memimport_segment *segment;
484 pa_memimport *import;
485
486 /* FIXME! This should be implemented lock-free */
487
488 segment = b->per_type.imported.segment;
489 pa_assert(segment);
490 import = segment->import;
491 pa_assert(import);
492
493 pa_mutex_lock(import->mutex);
494 pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
495 if (-- segment->n_blocks <= 0)
496 segment_detach(segment);
497
498 pa_mutex_unlock(import->mutex);
499
500 import->release_cb(import, b->per_type.imported.id, import->userdata);
501
502 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
503 pa_xfree(b);
504 break;
505 }
506
507 case PA_MEMBLOCK_POOL_EXTERNAL:
508 case PA_MEMBLOCK_POOL: {
509 struct mempool_slot *slot;
510 int call_free;
511
512 slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
513 pa_assert(slot);
514
515 call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
516
517 /* The free list dimensions should easily allow all slots
518 * to fit in, hence try harder if pushing this slot into
519 * the free list fails */
520 while (pa_flist_push(b->pool->free_slots, slot) < 0)
521 ;
522
523 if (call_free)
524 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
525 pa_xfree(b);
526
527 break;
528 }
529
530 case PA_MEMBLOCK_TYPE_MAX:
531 default:
532 pa_assert_not_reached();
533 }
534 }
535
536 /* No lock necessary */
537 void pa_memblock_unref(pa_memblock*b) {
538 pa_assert(b);
539 pa_assert(PA_REFCNT_VALUE(b) > 0);
540
541 if (PA_REFCNT_DEC(b) > 0)
542 return;
543
544 memblock_free(b);
545 }
546
547 /* Self locked */
548 static void memblock_wait(pa_memblock *b) {
549 pa_assert(b);
550
551 if (pa_atomic_load(&b->n_acquired) > 0) {
552 /* We need to wait until all threads gave up access to the
553 * memory block before we can go on. Unfortunately this means
554 * that we have to lock and wait here. Sniff! */
555
556 pa_atomic_inc(&b->please_signal);
557
558 while (pa_atomic_load(&b->n_acquired) > 0)
559 pa_semaphore_wait(b->pool->semaphore);
560
561 pa_atomic_dec(&b->please_signal);
562 }
563 }
564
565 /* No lock necessary. This function is not multiple caller safe! */
566 static void memblock_make_local(pa_memblock *b) {
567 pa_assert(b);
568
569 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
570
571 if (b->length <= b->pool->block_size - PA_ALIGN(sizeof(struct mempool_slot))) {
572 struct mempool_slot *slot;
573
574 if ((slot = mempool_allocate_slot(b->pool))) {
575 void *new_data;
576 /* We can move it into a local pool, perfect! */
577
578 new_data = mempool_slot_data(slot);
579 memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
580 pa_atomic_ptr_store(&b->data, new_data);
581
582 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
583 b->read_only = 0;
584
585 goto finish;
586 }
587 }
588
589 /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
590 b->per_type.user.free_cb = pa_xfree;
591 pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
592
593 b->type = PA_MEMBLOCK_USER;
594 b->read_only = 0;
595
596 finish:
597 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
598 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
599 memblock_wait(b);
600 }
601
602 /* No lock necessary. This function is not multiple caller safe*/
603 void pa_memblock_unref_fixed(pa_memblock *b) {
604 pa_assert(b);
605 pa_assert(PA_REFCNT_VALUE(b) > 0);
606 pa_assert(b->type == PA_MEMBLOCK_FIXED);
607
608 if (PA_REFCNT_VALUE(b) > 1)
609 memblock_make_local(b);
610
611 pa_memblock_unref(b);
612 }
613
614 /* No lock necessary. */
615 pa_memblock *pa_memblock_will_need(pa_memblock *b) {
616 void *p;
617
618 pa_assert(b);
619 pa_assert(PA_REFCNT_VALUE(b) > 0);
620
621 p = pa_memblock_acquire(b);
622 pa_will_need(p, b->length);
623 pa_memblock_release(b);
624
625 return b;
626 }
627
628 /* Self-locked. This function is not multiple-caller safe */
629 static void memblock_replace_import(pa_memblock *b) {
630 pa_memimport_segment *seg;
631
632 pa_assert(b);
633 pa_assert(b->type == PA_MEMBLOCK_IMPORTED);
634
635 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
636 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
637 pa_atomic_dec(&b->pool->stat.n_imported);
638 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
639
640 seg = b->per_type.imported.segment;
641 pa_assert(seg);
642 pa_assert(seg->import);
643
644 pa_mutex_lock(seg->import->mutex);
645
646 pa_hashmap_remove(
647 seg->import->blocks,
648 PA_UINT32_TO_PTR(b->per_type.imported.id));
649
650 memblock_make_local(b);
651
652 if (-- seg->n_blocks <= 0) {
653 pa_mutex_unlock(seg->import->mutex);
654 segment_detach(seg);
655 } else
656 pa_mutex_unlock(seg->import->mutex);
657 }
658
659 pa_mempool* pa_mempool_new(int shared) {
660 pa_mempool *p;
661
662 p = pa_xnew(pa_mempool, 1);
663
664 p->mutex = pa_mutex_new(1);
665 p->semaphore = pa_semaphore_new(0);
666
667 p->block_size = PA_PAGE_ALIGN(PA_MEMPOOL_SLOT_SIZE);
668 if (p->block_size < PA_PAGE_SIZE)
669 p->block_size = PA_PAGE_SIZE;
670
671 p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
672
673 pa_assert(p->block_size > PA_ALIGN(sizeof(struct mempool_slot)));
674
675 if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
676 pa_xfree(p);
677 return NULL;
678 }
679
680 memset(&p->stat, 0, sizeof(p->stat));
681 pa_atomic_store(&p->n_init, 0);
682
683 PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
684 PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
685
686 p->free_slots = pa_flist_new(p->n_blocks*2);
687
688 return p;
689 }
690
691 void pa_mempool_free(pa_mempool *p) {
692 pa_assert(p);
693
694 pa_mutex_lock(p->mutex);
695
696 while (p->imports)
697 pa_memimport_free(p->imports);
698
699 while (p->exports)
700 pa_memexport_free(p->exports);
701
702 pa_mutex_unlock(p->mutex);
703
704 pa_flist_free(p->free_slots, NULL);
705
706 if (pa_atomic_load(&p->stat.n_allocated) > 0) {
707 /* raise(SIGTRAP); */
708 pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));
709 }
710
711 pa_shm_free(&p->memory);
712
713 pa_mutex_free(p->mutex);
714 pa_semaphore_free(p->semaphore);
715
716 pa_xfree(p);
717 }
718
719 /* No lock necessary */
720 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
721 pa_assert(p);
722
723 return &p->stat;
724 }
725
726 /* No lock necessary */
727 void pa_mempool_vacuum(pa_mempool *p) {
728 struct mempool_slot *slot;
729 pa_flist *list;
730
731 pa_assert(p);
732
733 list = pa_flist_new(p->n_blocks*2);
734
735 while ((slot = pa_flist_pop(p->free_slots)))
736 while (pa_flist_push(list, slot) < 0)
737 ;
738
739 while ((slot = pa_flist_pop(list))) {
740 pa_shm_punch(&p->memory,
741 (uint8_t*) slot - (uint8_t*) p->memory.ptr + PA_ALIGN(sizeof(struct mempool_slot)),
742 p->block_size - PA_ALIGN(sizeof(struct mempool_slot)));
743
744 while (pa_flist_push(p->free_slots, slot))
745 ;
746 }
747
748 pa_flist_free(list, NULL);
749 }
750
751 /* No lock necessary */
752 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
753 pa_assert(p);
754
755 if (!p->memory.shared)
756 return -1;
757
758 *id = p->memory.id;
759
760 return 0;
761 }
762
763 /* No lock necessary */
764 int pa_mempool_is_shared(pa_mempool *p) {
765 pa_assert(p);
766
767 return !!p->memory.shared;
768 }
769
770 /* For recieving blocks from other nodes */
771 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
772 pa_memimport *i;
773
774 pa_assert(p);
775 pa_assert(cb);
776
777 i = pa_xnew(pa_memimport, 1);
778 i->mutex = pa_mutex_new(1);
779 i->pool = p;
780 i->segments = pa_hashmap_new(NULL, NULL);
781 i->blocks = pa_hashmap_new(NULL, NULL);
782 i->release_cb = cb;
783 i->userdata = userdata;
784
785 pa_mutex_lock(p->mutex);
786 PA_LLIST_PREPEND(pa_memimport, p->imports, i);
787 pa_mutex_unlock(p->mutex);
788
789 return i;
790 }
791
792 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
793
794 /* Should be called locked */
795 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
796 pa_memimport_segment* seg;
797
798 if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
799 return NULL;
800
801 seg = pa_xnew(pa_memimport_segment, 1);
802
803 if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
804 pa_xfree(seg);
805 return NULL;
806 }
807
808 seg->import = i;
809 seg->n_blocks = 0;
810
811 pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
812 return seg;
813 }
814
815 /* Should be called locked */
816 static void segment_detach(pa_memimport_segment *seg) {
817 pa_assert(seg);
818
819 pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
820 pa_shm_free(&seg->memory);
821 pa_xfree(seg);
822 }
823
824 /* Self-locked. Not multiple-caller safe */
825 void pa_memimport_free(pa_memimport *i) {
826 pa_memexport *e;
827 pa_memblock *b;
828
829 pa_assert(i);
830
831 pa_mutex_lock(i->mutex);
832
833 while ((b = pa_hashmap_get_first(i->blocks)))
834 memblock_replace_import(b);
835
836 pa_assert(pa_hashmap_size(i->segments) == 0);
837
838 pa_mutex_unlock(i->mutex);
839
840 pa_mutex_lock(i->pool->mutex);
841
842 /* If we've exported this block further we need to revoke that export */
843 for (e = i->pool->exports; e; e = e->next)
844 memexport_revoke_blocks(e, i);
845
846 PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
847
848 pa_mutex_unlock(i->pool->mutex);
849
850 pa_hashmap_free(i->blocks, NULL, NULL);
851 pa_hashmap_free(i->segments, NULL, NULL);
852
853 pa_mutex_free(i->mutex);
854
855 pa_xfree(i);
856 }
857
858 /* Self-locked */
859 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
860 pa_memblock *b = NULL;
861 pa_memimport_segment *seg;
862
863 pa_assert(i);
864
865 pa_mutex_lock(i->mutex);
866
867 if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
868 goto finish;
869
870 if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
871 if (!(seg = segment_attach(i, shm_id)))
872 goto finish;
873
874 if (offset+size > seg->memory.size)
875 goto finish;
876
877 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
878 b = pa_xnew(pa_memblock, 1);
879
880 PA_REFCNT_INIT(b);
881 b->pool = i->pool;
882 b->type = PA_MEMBLOCK_IMPORTED;
883 b->read_only = 1;
884 pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
885 b->length = size;
886 pa_atomic_store(&b->n_acquired, 0);
887 pa_atomic_store(&b->please_signal, 0);
888 b->per_type.imported.id = block_id;
889 b->per_type.imported.segment = seg;
890
891 pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
892
893 seg->n_blocks++;
894
895 finish:
896 pa_mutex_unlock(i->mutex);
897
898 if (b)
899 stat_add(b);
900
901 return b;
902 }
903
904 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
905 pa_memblock *b;
906 pa_assert(i);
907
908 pa_mutex_lock(i->mutex);
909
910 if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
911 return -1;
912
913 memblock_replace_import(b);
914
915 pa_mutex_unlock(i->mutex);
916
917 return 0;
918 }
919
920 /* For sending blocks to other nodes */
921 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
922 pa_memexport *e;
923
924 pa_assert(p);
925 pa_assert(cb);
926
927 if (!p->memory.shared)
928 return NULL;
929
930 e = pa_xnew(pa_memexport, 1);
931 e->mutex = pa_mutex_new(1);
932 e->pool = p;
933 PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
934 PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
935 e->n_init = 0;
936 e->revoke_cb = cb;
937 e->userdata = userdata;
938
939 pa_mutex_lock(p->mutex);
940 PA_LLIST_PREPEND(pa_memexport, p->exports, e);
941 pa_mutex_unlock(p->mutex);
942 return e;
943 }
944
945 void pa_memexport_free(pa_memexport *e) {
946 pa_assert(e);
947
948 pa_mutex_lock(e->mutex);
949 while (e->used_slots)
950 pa_memexport_process_release(e, e->used_slots - e->slots);
951 pa_mutex_unlock(e->mutex);
952
953 pa_mutex_lock(e->pool->mutex);
954 PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
955 pa_mutex_unlock(e->pool->mutex);
956
957 pa_mutex_free(e->mutex);
958 pa_xfree(e);
959 }
960
961 /* Self-locked */
962 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
963 pa_memblock *b;
964
965 pa_assert(e);
966
967 pa_mutex_lock(e->mutex);
968
969 if (id >= e->n_init)
970 goto fail;
971
972 if (!e->slots[id].block)
973 goto fail;
974
975 b = e->slots[id].block;
976 e->slots[id].block = NULL;
977
978 PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
979 PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
980
981 pa_mutex_unlock(e->mutex);
982
983 /* pa_log("Processing release for %u", id); */
984
985 pa_assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
986 pa_assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
987
988 pa_atomic_dec(&e->pool->stat.n_exported);
989 pa_atomic_sub(&e->pool->stat.exported_size, b->length);
990
991 pa_memblock_unref(b);
992
993 return 0;
994
995 fail:
996 pa_mutex_unlock(e->mutex);
997
998 return -1;
999 }
1000
1001 /* Self-locked */
1002 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
1003 struct memexport_slot *slot, *next;
1004 pa_assert(e);
1005 pa_assert(i);
1006
1007 pa_mutex_lock(e->mutex);
1008
1009 for (slot = e->used_slots; slot; slot = next) {
1010 uint32_t idx;
1011 next = slot->next;
1012
1013 if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
1014 slot->block->per_type.imported.segment->import != i)
1015 continue;
1016
1017 idx = slot - e->slots;
1018 e->revoke_cb(e, idx, e->userdata);
1019 pa_memexport_process_release(e, idx);
1020 }
1021
1022 pa_mutex_unlock(e->mutex);
1023 }
1024
1025 /* No lock necessary */
1026 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
1027 pa_memblock *n;
1028
1029 pa_assert(p);
1030 pa_assert(b);
1031
1032 if (b->type == PA_MEMBLOCK_IMPORTED ||
1033 b->type == PA_MEMBLOCK_POOL ||
1034 b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
1035 pa_assert(b->pool == p);
1036 return pa_memblock_ref(b);
1037 }
1038
1039 if (!(n = pa_memblock_new_pool(p, b->length)))
1040 return NULL;
1041
1042 memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
1043 return n;
1044 }
1045
1046 /* Self-locked */
1047 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
1048 pa_shm *memory;
1049 struct memexport_slot *slot;
1050 void *data;
1051
1052 pa_assert(e);
1053 pa_assert(b);
1054 pa_assert(block_id);
1055 pa_assert(shm_id);
1056 pa_assert(offset);
1057 pa_assert(size);
1058 pa_assert(b->pool == e->pool);
1059
1060 if (!(b = memblock_shared_copy(e->pool, b)))
1061 return -1;
1062
1063 pa_mutex_lock(e->mutex);
1064
1065 if (e->free_slots) {
1066 slot = e->free_slots;
1067 PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1068 } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1069 slot = &e->slots[e->n_init++];
1070 else {
1071 pa_mutex_unlock(e->mutex);
1072 pa_memblock_unref(b);
1073 return -1;
1074 }
1075
1076 PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1077 slot->block = b;
1078 *block_id = slot - e->slots;
1079
1080 pa_mutex_unlock(e->mutex);
1081 /* pa_log("Got block id %u", *block_id); */
1082
1083 data = pa_memblock_acquire(b);
1084
1085 if (b->type == PA_MEMBLOCK_IMPORTED) {
1086 pa_assert(b->per_type.imported.segment);
1087 memory = &b->per_type.imported.segment->memory;
1088 } else {
1089 pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1090 pa_assert(b->pool);
1091 memory = &b->pool->memory;
1092 }
1093
1094 pa_assert(data >= memory->ptr);
1095 pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
1096
1097 *shm_id = memory->id;
1098 *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1099 *size = b->length;
1100
1101 pa_memblock_release(b);
1102
1103 pa_atomic_inc(&e->pool->stat.n_exported);
1104 pa_atomic_add(&e->pool->stat.exported_size, b->length);
1105
1106 return 0;
1107 }