]> code.delx.au - pulseaudio/blob - src/pulsecore/memblock.c
reverse order flist destruction and mempool allocation warning
[pulseaudio] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <signal.h>
35
36 #include <pulse/xmalloc.h>
37 #include <pulse/def.h>
38
39 #include <pulsecore/shm.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/hashmap.h>
42 #include <pulsecore/semaphore.h>
43 #include <pulsecore/macro.h>
44 #include <pulsecore/flist.h>
45
46 #include "memblock.h"
47
48 #define PA_MEMPOOL_SLOTS_MAX 128
49 #define PA_MEMPOOL_SLOT_SIZE (16*1024)
50
51 #define PA_MEMEXPORT_SLOTS_MAX 128
52
53 #define PA_MEMIMPORT_SLOTS_MAX 128
54 #define PA_MEMIMPORT_SEGMENTS_MAX 16
55
56 struct pa_memblock {
57 PA_REFCNT_DECLARE; /* the reference counter */
58 pa_mempool *pool;
59
60 pa_memblock_type_t type;
61 int read_only; /* boolean */
62
63 pa_atomic_ptr_t data;
64 size_t length;
65
66 pa_atomic_t n_acquired;
67 pa_atomic_t please_signal;
68
69 union {
70 struct {
71 /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
72 pa_free_cb_t free_cb;
73 } user;
74
75 struct {
76 uint32_t id;
77 pa_memimport_segment *segment;
78 } imported;
79 } per_type;
80 };
81
82 struct pa_memimport_segment {
83 pa_memimport *import;
84 pa_shm memory;
85 unsigned n_blocks;
86 };
87
88 struct pa_memimport {
89 pa_mutex *mutex;
90
91 pa_mempool *pool;
92 pa_hashmap *segments;
93 pa_hashmap *blocks;
94
95 /* Called whenever an imported memory block is no longer
96 * needed. */
97 pa_memimport_release_cb_t release_cb;
98 void *userdata;
99
100 PA_LLIST_FIELDS(pa_memimport);
101 };
102
103 struct memexport_slot {
104 PA_LLIST_FIELDS(struct memexport_slot);
105 pa_memblock *block;
106 };
107
108 struct pa_memexport {
109 pa_mutex *mutex;
110 pa_mempool *pool;
111
112 struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
113
114 PA_LLIST_HEAD(struct memexport_slot, free_slots);
115 PA_LLIST_HEAD(struct memexport_slot, used_slots);
116 unsigned n_init;
117
118 /* Called whenever a client from which we imported a memory block
119 which we in turn exported to another client dies and we need to
120 revoke the memory block accordingly */
121 pa_memexport_revoke_cb_t revoke_cb;
122 void *userdata;
123
124 PA_LLIST_FIELDS(pa_memexport);
125 };
126
127 struct mempool_slot {
128 PA_LLIST_FIELDS(struct mempool_slot);
129 /* the actual data follows immediately hereafter */
130 };
131
132 struct pa_mempool {
133 pa_semaphore *semaphore;
134 pa_mutex *mutex;
135
136 pa_shm memory;
137 size_t block_size;
138 unsigned n_blocks;
139
140 pa_atomic_t n_init;
141
142 PA_LLIST_HEAD(pa_memimport, imports);
143 PA_LLIST_HEAD(pa_memexport, exports);
144
145 /* A list of free slots that may be reused */
146 pa_flist *free_slots;
147
148 pa_mempool_stat stat;
149 };
150
151 static void segment_detach(pa_memimport_segment *seg);
152
153 PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);
154
155 /* No lock necessary */
156 static void stat_add(pa_memblock*b) {
157 pa_assert(b);
158 pa_assert(b->pool);
159
160 pa_atomic_inc(&b->pool->stat.n_allocated);
161 pa_atomic_add(&b->pool->stat.allocated_size, b->length);
162
163 pa_atomic_inc(&b->pool->stat.n_accumulated);
164 pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
165
166 if (b->type == PA_MEMBLOCK_IMPORTED) {
167 pa_atomic_inc(&b->pool->stat.n_imported);
168 pa_atomic_add(&b->pool->stat.imported_size, b->length);
169 }
170
171 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
172 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
173 }
174
175 /* No lock necessary */
176 static void stat_remove(pa_memblock *b) {
177 pa_assert(b);
178 pa_assert(b->pool);
179
180 pa_assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
181 pa_assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
182
183 pa_atomic_dec(&b->pool->stat.n_allocated);
184 pa_atomic_sub(&b->pool->stat.allocated_size, b->length);
185
186 if (b->type == PA_MEMBLOCK_IMPORTED) {
187 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
188 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
189
190 pa_atomic_dec(&b->pool->stat.n_imported);
191 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
192 }
193
194 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
195 }
196
197 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
198
199 /* No lock necessary */
200 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
201 pa_memblock *b;
202
203 pa_assert(p);
204 pa_assert(length > 0);
205
206 if (!(b = pa_memblock_new_pool(p, length)))
207 b = memblock_new_appended(p, length);
208
209 return b;
210 }
211
212 /* No lock necessary */
213 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
214 pa_memblock *b;
215
216 pa_assert(p);
217 pa_assert(length > 0);
218
219 b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
220 PA_REFCNT_INIT(b);
221 b->pool = p;
222 b->type = PA_MEMBLOCK_APPENDED;
223 b->read_only = 0;
224 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
225 b->length = length;
226 pa_atomic_store(&b->n_acquired, 0);
227 pa_atomic_store(&b->please_signal, 0);
228
229 stat_add(b);
230 return b;
231 }
232
233 /* No lock necessary */
234 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
235 struct mempool_slot *slot;
236 pa_assert(p);
237
238 if (!(slot = pa_flist_pop(p->free_slots))) {
239 int idx;
240
241 /* The free list was empty, we have to allocate a new entry */
242
243 if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
244 pa_atomic_dec(&p->n_init);
245 else
246 slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
247
248 if (!slot) {
249 pa_log_debug("Pool full");
250 pa_atomic_inc(&p->stat.n_pool_full);
251 return NULL;
252 }
253 }
254
255 return slot;
256 }
257
258 /* No lock necessary */
259 static void* mempool_slot_data(struct mempool_slot *slot) {
260 pa_assert(slot);
261
262 return (uint8_t*) slot + sizeof(struct mempool_slot);
263 }
264
265 /* No lock necessary */
266 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
267 pa_assert(p);
268
269 pa_assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
270 pa_assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
271
272 return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
273 }
274
275 /* No lock necessary */
276 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
277 unsigned idx;
278
279 if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
280 return NULL;
281
282 return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
283 }
284
285 /* No lock necessary */
286 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
287 pa_memblock *b = NULL;
288 struct mempool_slot *slot;
289
290 pa_assert(p);
291 pa_assert(length > 0);
292
293 if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
294
295 if (!(slot = mempool_allocate_slot(p)))
296 return NULL;
297
298 b = mempool_slot_data(slot);
299 b->type = PA_MEMBLOCK_POOL;
300 pa_atomic_ptr_store(&b->data, (uint8_t*) b + sizeof(pa_memblock));
301
302 } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
303
304 if (!(slot = mempool_allocate_slot(p)))
305 return NULL;
306
307 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
308 b = pa_xnew(pa_memblock, 1);
309
310 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
311 pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
312
313 } else {
314 pa_log_debug("Memory block too large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
315 pa_atomic_inc(&p->stat.n_too_large_for_pool);
316 return NULL;
317 }
318
319 PA_REFCNT_INIT(b);
320 b->pool = p;
321 b->read_only = 0;
322 b->length = length;
323 pa_atomic_store(&b->n_acquired, 0);
324 pa_atomic_store(&b->please_signal, 0);
325
326 stat_add(b);
327 return b;
328 }
329
330 /* No lock necessary */
331 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
332 pa_memblock *b;
333
334 pa_assert(p);
335 pa_assert(d);
336 pa_assert(length > 0);
337
338 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
339 b = pa_xnew(pa_memblock, 1);
340 PA_REFCNT_INIT(b);
341 b->pool = p;
342 b->type = PA_MEMBLOCK_FIXED;
343 b->read_only = read_only;
344 pa_atomic_ptr_store(&b->data, d);
345 b->length = length;
346 pa_atomic_store(&b->n_acquired, 0);
347 pa_atomic_store(&b->please_signal, 0);
348
349 stat_add(b);
350 return b;
351 }
352
353 /* No lock necessary */
354 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
355 pa_memblock *b;
356
357 pa_assert(p);
358 pa_assert(d);
359 pa_assert(length > 0);
360 pa_assert(free_cb);
361
362 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
363 b = pa_xnew(pa_memblock, 1);
364 PA_REFCNT_INIT(b);
365 b->pool = p;
366 b->type = PA_MEMBLOCK_USER;
367 b->read_only = read_only;
368 pa_atomic_ptr_store(&b->data, d);
369 b->length = length;
370 pa_atomic_store(&b->n_acquired, 0);
371 pa_atomic_store(&b->please_signal, 0);
372
373 b->per_type.user.free_cb = free_cb;
374
375 stat_add(b);
376 return b;
377 }
378
379 /* No lock necessary */
380 int pa_memblock_is_read_only(pa_memblock *b) {
381 pa_assert(b);
382 pa_assert(PA_REFCNT_VALUE(b) > 0);
383
384 return b->read_only && PA_REFCNT_VALUE(b) == 1;
385 }
386
387 /* No lock necessary */
388 void* pa_memblock_acquire(pa_memblock *b) {
389 pa_assert(b);
390 pa_assert(PA_REFCNT_VALUE(b) > 0);
391
392 pa_atomic_inc(&b->n_acquired);
393
394 return pa_atomic_ptr_load(&b->data);
395 }
396
397 /* No lock necessary, in corner cases locks by its own */
398 void pa_memblock_release(pa_memblock *b) {
399 int r;
400 pa_assert(b);
401 pa_assert(PA_REFCNT_VALUE(b) > 0);
402
403 r = pa_atomic_dec(&b->n_acquired);
404 pa_assert(r >= 1);
405
406 /* Signal a waiting thread that this memblock is no longer used */
407 if (r == 1 && pa_atomic_load(&b->please_signal))
408 pa_semaphore_post(b->pool->semaphore);
409 }
410
411 size_t pa_memblock_get_length(pa_memblock *b) {
412 pa_assert(b);
413 pa_assert(PA_REFCNT_VALUE(b) > 0);
414
415 return b->length;
416 }
417
418 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
419 pa_assert(b);
420 pa_assert(PA_REFCNT_VALUE(b) > 0);
421
422 return b->pool;
423 }
424
425 /* No lock necessary */
426 pa_memblock* pa_memblock_ref(pa_memblock*b) {
427 pa_assert(b);
428 pa_assert(PA_REFCNT_VALUE(b) > 0);
429
430 PA_REFCNT_INC(b);
431 return b;
432 }
433
434 static void memblock_free(pa_memblock *b) {
435 pa_assert(b);
436
437 pa_assert(pa_atomic_load(&b->n_acquired) == 0);
438
439 stat_remove(b);
440
441 switch (b->type) {
442 case PA_MEMBLOCK_USER :
443 pa_assert(b->per_type.user.free_cb);
444 b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
445
446 /* Fall through */
447
448 case PA_MEMBLOCK_FIXED:
449 case PA_MEMBLOCK_APPENDED :
450 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
451 pa_xfree(b);
452
453 break;
454
455 case PA_MEMBLOCK_IMPORTED : {
456 pa_memimport_segment *segment;
457 pa_memimport *import;
458
459 /* FIXME! This should be implemented lock-free */
460
461 segment = b->per_type.imported.segment;
462 pa_assert(segment);
463 import = segment->import;
464 pa_assert(import);
465
466 pa_mutex_lock(import->mutex);
467 pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
468 if (-- segment->n_blocks <= 0)
469 segment_detach(segment);
470
471 pa_mutex_unlock(import->mutex);
472
473 import->release_cb(import, b->per_type.imported.id, import->userdata);
474
475 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
476 pa_xfree(b);
477 break;
478 }
479
480 case PA_MEMBLOCK_POOL_EXTERNAL:
481 case PA_MEMBLOCK_POOL: {
482 struct mempool_slot *slot;
483 int call_free;
484
485 slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
486 pa_assert(slot);
487
488 call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
489
490 /* The free list dimensions should easily allow all slots
491 * to fit in, hence try harder if pushing this slot into
492 * the free list fails */
493 while (pa_flist_push(b->pool->free_slots, slot) < 0)
494 ;
495
496 if (call_free)
497 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
498 pa_xfree(b);
499
500 break;
501 }
502
503 case PA_MEMBLOCK_TYPE_MAX:
504 default:
505 pa_assert_not_reached();
506 }
507 }
508
509 /* No lock necessary */
510 void pa_memblock_unref(pa_memblock*b) {
511 pa_assert(b);
512 pa_assert(PA_REFCNT_VALUE(b) > 0);
513
514 if (PA_REFCNT_DEC(b) > 0)
515 return;
516
517 memblock_free(b);
518 }
519
520 /* Self locked */
521 static void memblock_wait(pa_memblock *b) {
522 pa_assert(b);
523
524 if (pa_atomic_load(&b->n_acquired) > 0) {
525 /* We need to wait until all threads gave up access to the
526 * memory block before we can go on. Unfortunately this means
527 * that we have to lock and wait here. Sniff! */
528
529 pa_atomic_inc(&b->please_signal);
530
531 while (pa_atomic_load(&b->n_acquired) > 0)
532 pa_semaphore_wait(b->pool->semaphore);
533
534 pa_atomic_dec(&b->please_signal);
535 }
536 }
537
538 /* No lock necessary. This function is not multiple caller safe! */
539 static void memblock_make_local(pa_memblock *b) {
540 pa_assert(b);
541
542 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
543
544 if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
545 struct mempool_slot *slot;
546
547 if ((slot = mempool_allocate_slot(b->pool))) {
548 void *new_data;
549 /* We can move it into a local pool, perfect! */
550
551 new_data = mempool_slot_data(slot);
552 memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
553 pa_atomic_ptr_store(&b->data, new_data);
554
555 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
556 b->read_only = 0;
557
558 goto finish;
559 }
560 }
561
562 /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
563 b->per_type.user.free_cb = pa_xfree;
564 pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
565
566 b->type = PA_MEMBLOCK_USER;
567 b->read_only = 0;
568
569 finish:
570 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
571 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
572 memblock_wait(b);
573 }
574
575 /* No lock necessary. This function is not multiple caller safe*/
576 void pa_memblock_unref_fixed(pa_memblock *b) {
577 pa_assert(b);
578 pa_assert(PA_REFCNT_VALUE(b) > 0);
579 pa_assert(b->type == PA_MEMBLOCK_FIXED);
580
581 if (PA_REFCNT_VALUE(b) > 1)
582 memblock_make_local(b);
583
584 pa_memblock_unref(b);
585 }
586
587 /* Self-locked. This function is not multiple-caller safe */
588 static void memblock_replace_import(pa_memblock *b) {
589 pa_memimport_segment *seg;
590
591 pa_assert(b);
592 pa_assert(b->type == PA_MEMBLOCK_IMPORTED);
593
594 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
595 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
596 pa_atomic_dec(&b->pool->stat.n_imported);
597 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
598
599 seg = b->per_type.imported.segment;
600 pa_assert(seg);
601 pa_assert(seg->import);
602
603 pa_mutex_lock(seg->import->mutex);
604
605 pa_hashmap_remove(
606 seg->import->blocks,
607 PA_UINT32_TO_PTR(b->per_type.imported.id));
608
609 memblock_make_local(b);
610
611 if (-- seg->n_blocks <= 0)
612 segment_detach(seg);
613
614 pa_mutex_unlock(seg->import->mutex);
615 }
616
617 pa_mempool* pa_mempool_new(int shared) {
618 size_t ps;
619 pa_mempool *p;
620
621 p = pa_xnew(pa_mempool, 1);
622
623 p->mutex = pa_mutex_new(1);
624 p->semaphore = pa_semaphore_new(0);
625
626 #ifdef HAVE_SYSCONF
627 ps = (size_t) sysconf(_SC_PAGESIZE);
628 #elif defined(PAGE_SIZE)
629 ps = (size_t) PAGE_SIZE;
630 #else
631 ps = 4096; /* Let's hope it's like x86. */
632 #endif
633
634 p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
635
636 if (p->block_size < ps)
637 p->block_size = ps;
638
639 p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
640
641 pa_assert(p->block_size > sizeof(struct mempool_slot));
642
643 if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
644 pa_xfree(p);
645 return NULL;
646 }
647
648 memset(&p->stat, 0, sizeof(p->stat));
649 pa_atomic_store(&p->n_init, 0);
650
651 PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
652 PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
653
654 p->free_slots = pa_flist_new(p->n_blocks*2);
655
656 return p;
657 }
658
659 void pa_mempool_free(pa_mempool *p) {
660 pa_assert(p);
661
662 pa_mutex_lock(p->mutex);
663
664 while (p->imports)
665 pa_memimport_free(p->imports);
666
667 while (p->exports)
668 pa_memexport_free(p->exports);
669
670 pa_mutex_unlock(p->mutex);
671
672 pa_flist_free(p->free_slots, NULL);
673
674 if (pa_atomic_load(&p->stat.n_allocated) > 0) {
675 /* raise(SIGTRAP); */
676 pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));
677 }
678
679 pa_shm_free(&p->memory);
680
681 pa_mutex_free(p->mutex);
682 pa_semaphore_free(p->semaphore);
683
684 pa_xfree(p);
685 }
686
687 /* No lock necessary */
688 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
689 pa_assert(p);
690
691 return &p->stat;
692 }
693
694 /* No lock necessary */
695 void pa_mempool_vacuum(pa_mempool *p) {
696 struct mempool_slot *slot;
697 pa_flist *list;
698
699 pa_assert(p);
700
701 list = pa_flist_new(p->n_blocks*2);
702
703 while ((slot = pa_flist_pop(p->free_slots)))
704 while (pa_flist_push(list, slot) < 0)
705 ;
706
707 while ((slot = pa_flist_pop(list))) {
708 pa_shm_punch(&p->memory,
709 (uint8_t*) slot - (uint8_t*) p->memory.ptr + sizeof(struct mempool_slot),
710 p->block_size - sizeof(struct mempool_slot));
711
712 while (pa_flist_push(p->free_slots, slot))
713 ;
714 }
715
716 pa_flist_free(list, NULL);
717 }
718
719 /* No lock necessary */
720 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
721 pa_assert(p);
722
723 if (!p->memory.shared)
724 return -1;
725
726 *id = p->memory.id;
727
728 return 0;
729 }
730
731 /* No lock necessary */
732 int pa_mempool_is_shared(pa_mempool *p) {
733 pa_assert(p);
734
735 return !!p->memory.shared;
736 }
737
738 /* For recieving blocks from other nodes */
739 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
740 pa_memimport *i;
741
742 pa_assert(p);
743 pa_assert(cb);
744
745 i = pa_xnew(pa_memimport, 1);
746 i->mutex = pa_mutex_new(0);
747 i->pool = p;
748 i->segments = pa_hashmap_new(NULL, NULL);
749 i->blocks = pa_hashmap_new(NULL, NULL);
750 i->release_cb = cb;
751 i->userdata = userdata;
752
753 pa_mutex_lock(p->mutex);
754 PA_LLIST_PREPEND(pa_memimport, p->imports, i);
755 pa_mutex_unlock(p->mutex);
756
757 return i;
758 }
759
760 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
761
762 /* Should be called locked */
763 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
764 pa_memimport_segment* seg;
765
766 if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
767 return NULL;
768
769 seg = pa_xnew(pa_memimport_segment, 1);
770
771 if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
772 pa_xfree(seg);
773 return NULL;
774 }
775
776 seg->import = i;
777 seg->n_blocks = 0;
778
779 pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
780 return seg;
781 }
782
783 /* Should be called locked */
784 static void segment_detach(pa_memimport_segment *seg) {
785 pa_assert(seg);
786
787 pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
788 pa_shm_free(&seg->memory);
789 pa_xfree(seg);
790 }
791
792 /* Self-locked. Not multiple-caller safe */
793 void pa_memimport_free(pa_memimport *i) {
794 pa_memexport *e;
795 pa_memblock *b;
796
797 pa_assert(i);
798
799 pa_mutex_lock(i->mutex);
800
801 while ((b = pa_hashmap_get_first(i->blocks)))
802 memblock_replace_import(b);
803
804 pa_assert(pa_hashmap_size(i->segments) == 0);
805
806 pa_mutex_unlock(i->mutex);
807
808 pa_mutex_lock(i->pool->mutex);
809
810 /* If we've exported this block further we need to revoke that export */
811 for (e = i->pool->exports; e; e = e->next)
812 memexport_revoke_blocks(e, i);
813
814 PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
815
816 pa_mutex_unlock(i->pool->mutex);
817
818 pa_hashmap_free(i->blocks, NULL, NULL);
819 pa_hashmap_free(i->segments, NULL, NULL);
820
821 pa_mutex_free(i->mutex);
822
823 pa_xfree(i);
824 }
825
826 /* Self-locked */
827 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
828 pa_memblock *b = NULL;
829 pa_memimport_segment *seg;
830
831 pa_assert(i);
832
833 pa_mutex_lock(i->mutex);
834
835 if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
836 goto finish;
837
838 if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
839 if (!(seg = segment_attach(i, shm_id)))
840 goto finish;
841
842 if (offset+size > seg->memory.size)
843 goto finish;
844
845 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
846 b = pa_xnew(pa_memblock, 1);
847
848 PA_REFCNT_INIT(b);
849 b->pool = i->pool;
850 b->type = PA_MEMBLOCK_IMPORTED;
851 b->read_only = 1;
852 pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
853 b->length = size;
854 pa_atomic_store(&b->n_acquired, 0);
855 pa_atomic_store(&b->please_signal, 0);
856 b->per_type.imported.id = block_id;
857 b->per_type.imported.segment = seg;
858
859 pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
860
861 seg->n_blocks++;
862
863 finish:
864 pa_mutex_unlock(i->mutex);
865
866 if (b)
867 stat_add(b);
868
869 return b;
870 }
871
872 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
873 pa_memblock *b;
874 pa_assert(i);
875
876 pa_mutex_lock(i->mutex);
877
878 if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
879 return -1;
880
881 memblock_replace_import(b);
882
883 pa_mutex_unlock(i->mutex);
884
885 return 0;
886 }
887
888 /* For sending blocks to other nodes */
889 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
890 pa_memexport *e;
891
892 pa_assert(p);
893 pa_assert(cb);
894
895 if (!p->memory.shared)
896 return NULL;
897
898 e = pa_xnew(pa_memexport, 1);
899 e->mutex = pa_mutex_new(1);
900 e->pool = p;
901 PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
902 PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
903 e->n_init = 0;
904 e->revoke_cb = cb;
905 e->userdata = userdata;
906
907 pa_mutex_lock(p->mutex);
908 PA_LLIST_PREPEND(pa_memexport, p->exports, e);
909 pa_mutex_unlock(p->mutex);
910 return e;
911 }
912
913 void pa_memexport_free(pa_memexport *e) {
914 pa_assert(e);
915
916 pa_mutex_lock(e->mutex);
917 while (e->used_slots)
918 pa_memexport_process_release(e, e->used_slots - e->slots);
919 pa_mutex_unlock(e->mutex);
920
921 pa_mutex_lock(e->pool->mutex);
922 PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
923 pa_mutex_unlock(e->pool->mutex);
924
925 pa_mutex_free(e->mutex);
926 pa_xfree(e);
927 }
928
929 /* Self-locked */
930 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
931 pa_memblock *b;
932
933 pa_assert(e);
934
935 pa_mutex_lock(e->mutex);
936
937 if (id >= e->n_init)
938 goto fail;
939
940 if (!e->slots[id].block)
941 goto fail;
942
943 b = e->slots[id].block;
944 e->slots[id].block = NULL;
945
946 PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
947 PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
948
949 pa_mutex_unlock(e->mutex);
950
951 /* pa_log("Processing release for %u", id); */
952
953 pa_assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
954 pa_assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
955
956 pa_atomic_dec(&e->pool->stat.n_exported);
957 pa_atomic_sub(&e->pool->stat.exported_size, b->length);
958
959 pa_memblock_unref(b);
960
961 return 0;
962
963 fail:
964 pa_mutex_unlock(e->mutex);
965
966 return -1;
967 }
968
969 /* Self-locked */
970 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
971 struct memexport_slot *slot, *next;
972 pa_assert(e);
973 pa_assert(i);
974
975 pa_mutex_lock(e->mutex);
976
977 for (slot = e->used_slots; slot; slot = next) {
978 uint32_t idx;
979 next = slot->next;
980
981 if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
982 slot->block->per_type.imported.segment->import != i)
983 continue;
984
985 idx = slot - e->slots;
986 e->revoke_cb(e, idx, e->userdata);
987 pa_memexport_process_release(e, idx);
988 }
989
990 pa_mutex_unlock(e->mutex);
991 }
992
993 /* No lock necessary */
994 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
995 pa_memblock *n;
996
997 pa_assert(p);
998 pa_assert(b);
999
1000 if (b->type == PA_MEMBLOCK_IMPORTED ||
1001 b->type == PA_MEMBLOCK_POOL ||
1002 b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
1003 pa_assert(b->pool == p);
1004 return pa_memblock_ref(b);
1005 }
1006
1007 if (!(n = pa_memblock_new_pool(p, b->length)))
1008 return NULL;
1009
1010 memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
1011 return n;
1012 }
1013
1014 /* Self-locked */
1015 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
1016 pa_shm *memory;
1017 struct memexport_slot *slot;
1018 void *data;
1019
1020 pa_assert(e);
1021 pa_assert(b);
1022 pa_assert(block_id);
1023 pa_assert(shm_id);
1024 pa_assert(offset);
1025 pa_assert(size);
1026 pa_assert(b->pool == e->pool);
1027
1028 if (!(b = memblock_shared_copy(e->pool, b)))
1029 return -1;
1030
1031 pa_mutex_lock(e->mutex);
1032
1033 if (e->free_slots) {
1034 slot = e->free_slots;
1035 PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1036 } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1037 slot = &e->slots[e->n_init++];
1038 else {
1039 pa_mutex_unlock(e->mutex);
1040 pa_memblock_unref(b);
1041 return -1;
1042 }
1043
1044 PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1045 slot->block = b;
1046 *block_id = slot - e->slots;
1047
1048 pa_mutex_unlock(e->mutex);
1049 /* pa_log("Got block id %u", *block_id); */
1050
1051 data = pa_memblock_acquire(b);
1052
1053 if (b->type == PA_MEMBLOCK_IMPORTED) {
1054 pa_assert(b->per_type.imported.segment);
1055 memory = &b->per_type.imported.segment->memory;
1056 } else {
1057 pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1058 pa_assert(b->pool);
1059 memory = &b->pool->memory;
1060 }
1061
1062 pa_assert(data >= memory->ptr);
1063 pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
1064
1065 *shm_id = memory->id;
1066 *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1067 *size = b->length;
1068
1069 pa_memblock_release(b);
1070
1071 pa_atomic_inc(&e->pool->stat.n_exported);
1072 pa_atomic_add(&e->pool->stat.exported_size, b->length);
1073
1074 return 0;
1075 }