]> code.delx.au - pulseaudio/blob - src/pulsecore/memblock.c
fix a bad memory access when destructing pa_memimports
[pulseaudio] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <signal.h>
35
36 #include <pulse/xmalloc.h>
37 #include <pulse/def.h>
38
39 #include <pulsecore/shm.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/hashmap.h>
42 #include <pulsecore/semaphore.h>
43 #include <pulsecore/macro.h>
44 #include <pulsecore/flist.h>
45
46 #include "memblock.h"
47
48 #define PA_MEMPOOL_SLOTS_MAX 128
49 #define PA_MEMPOOL_SLOT_SIZE (16*1024)
50
51 #define PA_MEMEXPORT_SLOTS_MAX 128
52
53 #define PA_MEMIMPORT_SLOTS_MAX 128
54 #define PA_MEMIMPORT_SEGMENTS_MAX 16
55
56 struct pa_memblock {
57 PA_REFCNT_DECLARE; /* the reference counter */
58 pa_mempool *pool;
59
60 pa_memblock_type_t type;
61 int read_only; /* boolean */
62
63 pa_atomic_ptr_t data;
64 size_t length;
65
66 pa_atomic_t n_acquired;
67 pa_atomic_t please_signal;
68
69 union {
70 struct {
71 /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
72 pa_free_cb_t free_cb;
73 } user;
74
75 struct {
76 uint32_t id;
77 pa_memimport_segment *segment;
78 } imported;
79 } per_type;
80 };
81
82 struct pa_memimport_segment {
83 pa_memimport *import;
84 pa_shm memory;
85 unsigned n_blocks;
86 };
87
88 struct pa_memimport {
89 pa_mutex *mutex;
90
91 pa_mempool *pool;
92 pa_hashmap *segments;
93 pa_hashmap *blocks;
94
95 /* Called whenever an imported memory block is no longer
96 * needed. */
97 pa_memimport_release_cb_t release_cb;
98 void *userdata;
99
100 PA_LLIST_FIELDS(pa_memimport);
101 };
102
103 struct memexport_slot {
104 PA_LLIST_FIELDS(struct memexport_slot);
105 pa_memblock *block;
106 };
107
108 struct pa_memexport {
109 pa_mutex *mutex;
110 pa_mempool *pool;
111
112 struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
113
114 PA_LLIST_HEAD(struct memexport_slot, free_slots);
115 PA_LLIST_HEAD(struct memexport_slot, used_slots);
116 unsigned n_init;
117
118 /* Called whenever a client from which we imported a memory block
119 which we in turn exported to another client dies and we need to
120 revoke the memory block accordingly */
121 pa_memexport_revoke_cb_t revoke_cb;
122 void *userdata;
123
124 PA_LLIST_FIELDS(pa_memexport);
125 };
126
127 struct mempool_slot {
128 PA_LLIST_FIELDS(struct mempool_slot);
129 /* the actual data follows immediately hereafter */
130 };
131
132 struct pa_mempool {
133 pa_semaphore *semaphore;
134 pa_mutex *mutex;
135
136 pa_shm memory;
137 size_t block_size;
138 unsigned n_blocks;
139
140 pa_atomic_t n_init;
141
142 PA_LLIST_HEAD(pa_memimport, imports);
143 PA_LLIST_HEAD(pa_memexport, exports);
144
145 /* A list of free slots that may be reused */
146 pa_flist *free_slots;
147
148 pa_mempool_stat stat;
149 };
150
151 static void segment_detach(pa_memimport_segment *seg);
152
153 PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);
154
155 /* No lock necessary */
156 static void stat_add(pa_memblock*b) {
157 pa_assert(b);
158 pa_assert(b->pool);
159
160 pa_atomic_inc(&b->pool->stat.n_allocated);
161 pa_atomic_add(&b->pool->stat.allocated_size, b->length);
162
163 pa_atomic_inc(&b->pool->stat.n_accumulated);
164 pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
165
166 if (b->type == PA_MEMBLOCK_IMPORTED) {
167 pa_atomic_inc(&b->pool->stat.n_imported);
168 pa_atomic_add(&b->pool->stat.imported_size, b->length);
169 }
170
171 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
172 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
173 }
174
175 /* No lock necessary */
176 static void stat_remove(pa_memblock *b) {
177 pa_assert(b);
178 pa_assert(b->pool);
179
180 pa_assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
181 pa_assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
182
183 pa_atomic_dec(&b->pool->stat.n_allocated);
184 pa_atomic_sub(&b->pool->stat.allocated_size, b->length);
185
186 if (b->type == PA_MEMBLOCK_IMPORTED) {
187 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
188 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
189
190 pa_atomic_dec(&b->pool->stat.n_imported);
191 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
192 }
193
194 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
195 }
196
197 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
198
199 /* No lock necessary */
200 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
201 pa_memblock *b;
202
203 pa_assert(p);
204 pa_assert(length > 0);
205
206 if (!(b = pa_memblock_new_pool(p, length)))
207 b = memblock_new_appended(p, length);
208
209 return b;
210 }
211
212 /* No lock necessary */
213 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
214 pa_memblock *b;
215
216 pa_assert(p);
217 pa_assert(length > 0);
218
219 b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
220 PA_REFCNT_INIT(b);
221 b->pool = p;
222 b->type = PA_MEMBLOCK_APPENDED;
223 b->read_only = 0;
224 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
225 b->length = length;
226 pa_atomic_store(&b->n_acquired, 0);
227 pa_atomic_store(&b->please_signal, 0);
228
229 stat_add(b);
230 return b;
231 }
232
233 /* No lock necessary */
234 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
235 struct mempool_slot *slot;
236 pa_assert(p);
237
238 if (!(slot = pa_flist_pop(p->free_slots))) {
239 int idx;
240
241 /* The free list was empty, we have to allocate a new entry */
242
243 if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
244 pa_atomic_dec(&p->n_init);
245 else
246 slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
247
248 if (!slot) {
249 pa_log_debug("Pool full");
250 pa_atomic_inc(&p->stat.n_pool_full);
251 return NULL;
252 }
253 }
254
255 return slot;
256 }
257
258 /* No lock necessary */
259 static void* mempool_slot_data(struct mempool_slot *slot) {
260 pa_assert(slot);
261
262 return (uint8_t*) slot + sizeof(struct mempool_slot);
263 }
264
265 /* No lock necessary */
266 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
267 pa_assert(p);
268
269 pa_assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
270 pa_assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
271
272 return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
273 }
274
275 /* No lock necessary */
276 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
277 unsigned idx;
278
279 if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
280 return NULL;
281
282 return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
283 }
284
285 /* No lock necessary */
286 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
287 pa_memblock *b = NULL;
288 struct mempool_slot *slot;
289
290 pa_assert(p);
291 pa_assert(length > 0);
292
293 if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
294
295 if (!(slot = mempool_allocate_slot(p)))
296 return NULL;
297
298 b = mempool_slot_data(slot);
299 b->type = PA_MEMBLOCK_POOL;
300 pa_atomic_ptr_store(&b->data, (uint8_t*) b + sizeof(pa_memblock));
301
302 } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
303
304 if (!(slot = mempool_allocate_slot(p)))
305 return NULL;
306
307 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
308 b = pa_xnew(pa_memblock, 1);
309
310 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
311 pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
312
313 } else {
314 pa_log_debug("Memory block too large for pool: %lu > %lu", (unsigned long) length, (unsigned long) (p->block_size - sizeof(struct mempool_slot)));
315 pa_atomic_inc(&p->stat.n_too_large_for_pool);
316 return NULL;
317 }
318
319 PA_REFCNT_INIT(b);
320 b->pool = p;
321 b->read_only = 0;
322 b->length = length;
323 pa_atomic_store(&b->n_acquired, 0);
324 pa_atomic_store(&b->please_signal, 0);
325
326 stat_add(b);
327 return b;
328 }
329
330 /* No lock necessary */
331 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
332 pa_memblock *b;
333
334 pa_assert(p);
335 pa_assert(d);
336 pa_assert(length > 0);
337
338 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
339 b = pa_xnew(pa_memblock, 1);
340 PA_REFCNT_INIT(b);
341 b->pool = p;
342 b->type = PA_MEMBLOCK_FIXED;
343 b->read_only = read_only;
344 pa_atomic_ptr_store(&b->data, d);
345 b->length = length;
346 pa_atomic_store(&b->n_acquired, 0);
347 pa_atomic_store(&b->please_signal, 0);
348
349 stat_add(b);
350 return b;
351 }
352
353 /* No lock necessary */
354 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
355 pa_memblock *b;
356
357 pa_assert(p);
358 pa_assert(d);
359 pa_assert(length > 0);
360 pa_assert(free_cb);
361
362 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
363 b = pa_xnew(pa_memblock, 1);
364 PA_REFCNT_INIT(b);
365 b->pool = p;
366 b->type = PA_MEMBLOCK_USER;
367 b->read_only = read_only;
368 pa_atomic_ptr_store(&b->data, d);
369 b->length = length;
370 pa_atomic_store(&b->n_acquired, 0);
371 pa_atomic_store(&b->please_signal, 0);
372
373 b->per_type.user.free_cb = free_cb;
374
375 stat_add(b);
376 return b;
377 }
378
379 /* No lock necessary */
380 int pa_memblock_is_read_only(pa_memblock *b) {
381 pa_assert(b);
382 pa_assert(PA_REFCNT_VALUE(b) > 0);
383
384 return b->read_only && PA_REFCNT_VALUE(b) == 1;
385 }
386
387 /* No lock necessary */
388 void* pa_memblock_acquire(pa_memblock *b) {
389 pa_assert(b);
390 pa_assert(PA_REFCNT_VALUE(b) > 0);
391
392 pa_atomic_inc(&b->n_acquired);
393
394 return pa_atomic_ptr_load(&b->data);
395 }
396
397 /* No lock necessary, in corner cases locks by its own */
398 void pa_memblock_release(pa_memblock *b) {
399 int r;
400 pa_assert(b);
401 pa_assert(PA_REFCNT_VALUE(b) > 0);
402
403 r = pa_atomic_dec(&b->n_acquired);
404 pa_assert(r >= 1);
405
406 /* Signal a waiting thread that this memblock is no longer used */
407 if (r == 1 && pa_atomic_load(&b->please_signal))
408 pa_semaphore_post(b->pool->semaphore);
409 }
410
411 size_t pa_memblock_get_length(pa_memblock *b) {
412 pa_assert(b);
413 pa_assert(PA_REFCNT_VALUE(b) > 0);
414
415 return b->length;
416 }
417
418 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
419 pa_assert(b);
420 pa_assert(PA_REFCNT_VALUE(b) > 0);
421
422 return b->pool;
423 }
424
425 /* No lock necessary */
426 pa_memblock* pa_memblock_ref(pa_memblock*b) {
427 pa_assert(b);
428 pa_assert(PA_REFCNT_VALUE(b) > 0);
429
430 PA_REFCNT_INC(b);
431 return b;
432 }
433
434 static void memblock_free(pa_memblock *b) {
435 pa_assert(b);
436
437 pa_assert(pa_atomic_load(&b->n_acquired) == 0);
438
439 stat_remove(b);
440
441 switch (b->type) {
442 case PA_MEMBLOCK_USER :
443 pa_assert(b->per_type.user.free_cb);
444 b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
445
446 /* Fall through */
447
448 case PA_MEMBLOCK_FIXED:
449 case PA_MEMBLOCK_APPENDED :
450 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
451 pa_xfree(b);
452
453 break;
454
455 case PA_MEMBLOCK_IMPORTED : {
456 pa_memimport_segment *segment;
457 pa_memimport *import;
458
459 /* FIXME! This should be implemented lock-free */
460
461 segment = b->per_type.imported.segment;
462 pa_assert(segment);
463 import = segment->import;
464 pa_assert(import);
465
466 pa_mutex_lock(import->mutex);
467 pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
468 if (-- segment->n_blocks <= 0)
469 segment_detach(segment);
470
471 pa_mutex_unlock(import->mutex);
472
473 import->release_cb(import, b->per_type.imported.id, import->userdata);
474
475 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
476 pa_xfree(b);
477 break;
478 }
479
480 case PA_MEMBLOCK_POOL_EXTERNAL:
481 case PA_MEMBLOCK_POOL: {
482 struct mempool_slot *slot;
483 int call_free;
484
485 slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
486 pa_assert(slot);
487
488 call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
489
490 /* The free list dimensions should easily allow all slots
491 * to fit in, hence try harder if pushing this slot into
492 * the free list fails */
493 while (pa_flist_push(b->pool->free_slots, slot) < 0)
494 ;
495
496 if (call_free)
497 if (pa_flist_push(PA_STATIC_FLIST_GET(unused_memblocks), b) < 0)
498 pa_xfree(b);
499
500 break;
501 }
502
503 case PA_MEMBLOCK_TYPE_MAX:
504 default:
505 pa_assert_not_reached();
506 }
507 }
508
509 /* No lock necessary */
510 void pa_memblock_unref(pa_memblock*b) {
511 pa_assert(b);
512 pa_assert(PA_REFCNT_VALUE(b) > 0);
513
514 if (PA_REFCNT_DEC(b) > 0)
515 return;
516
517 memblock_free(b);
518 }
519
520 /* Self locked */
521 static void memblock_wait(pa_memblock *b) {
522 pa_assert(b);
523
524 if (pa_atomic_load(&b->n_acquired) > 0) {
525 /* We need to wait until all threads gave up access to the
526 * memory block before we can go on. Unfortunately this means
527 * that we have to lock and wait here. Sniff! */
528
529 pa_atomic_inc(&b->please_signal);
530
531 while (pa_atomic_load(&b->n_acquired) > 0)
532 pa_semaphore_wait(b->pool->semaphore);
533
534 pa_atomic_dec(&b->please_signal);
535 }
536 }
537
538 /* No lock necessary. This function is not multiple caller safe! */
539 static void memblock_make_local(pa_memblock *b) {
540 pa_assert(b);
541
542 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
543
544 if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
545 struct mempool_slot *slot;
546
547 if ((slot = mempool_allocate_slot(b->pool))) {
548 void *new_data;
549 /* We can move it into a local pool, perfect! */
550
551 new_data = mempool_slot_data(slot);
552 memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
553 pa_atomic_ptr_store(&b->data, new_data);
554
555 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
556 b->read_only = 0;
557
558 goto finish;
559 }
560 }
561
562 /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
563 b->per_type.user.free_cb = pa_xfree;
564 pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
565
566 b->type = PA_MEMBLOCK_USER;
567 b->read_only = 0;
568
569 finish:
570 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
571 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
572 memblock_wait(b);
573 }
574
575 /* No lock necessary. This function is not multiple caller safe*/
576 void pa_memblock_unref_fixed(pa_memblock *b) {
577 pa_assert(b);
578 pa_assert(PA_REFCNT_VALUE(b) > 0);
579 pa_assert(b->type == PA_MEMBLOCK_FIXED);
580
581 if (PA_REFCNT_VALUE(b) > 1)
582 memblock_make_local(b);
583
584 pa_memblock_unref(b);
585 }
586
587 /* Self-locked. This function is not multiple-caller safe */
588 static void memblock_replace_import(pa_memblock *b) {
589 pa_memimport_segment *seg;
590
591 pa_assert(b);
592 pa_assert(b->type == PA_MEMBLOCK_IMPORTED);
593
594 pa_assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
595 pa_assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
596 pa_atomic_dec(&b->pool->stat.n_imported);
597 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
598
599 seg = b->per_type.imported.segment;
600 pa_assert(seg);
601 pa_assert(seg->import);
602
603 pa_mutex_lock(seg->import->mutex);
604
605 pa_hashmap_remove(
606 seg->import->blocks,
607 PA_UINT32_TO_PTR(b->per_type.imported.id));
608
609 memblock_make_local(b);
610
611 if (-- seg->n_blocks <= 0) {
612 pa_mutex_unlock(seg->import->mutex);
613 segment_detach(seg);
614 } else
615 pa_mutex_unlock(seg->import->mutex);
616 }
617
618 pa_mempool* pa_mempool_new(int shared) {
619 size_t ps;
620 pa_mempool *p;
621
622 p = pa_xnew(pa_mempool, 1);
623
624 p->mutex = pa_mutex_new(1);
625 p->semaphore = pa_semaphore_new(0);
626
627 #ifdef HAVE_SYSCONF
628 ps = (size_t) sysconf(_SC_PAGESIZE);
629 #elif defined(PAGE_SIZE)
630 ps = (size_t) PAGE_SIZE;
631 #else
632 ps = 4096; /* Let's hope it's like x86. */
633 #endif
634
635 p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
636
637 if (p->block_size < ps)
638 p->block_size = ps;
639
640 p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
641
642 pa_assert(p->block_size > sizeof(struct mempool_slot));
643
644 if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
645 pa_xfree(p);
646 return NULL;
647 }
648
649 memset(&p->stat, 0, sizeof(p->stat));
650 pa_atomic_store(&p->n_init, 0);
651
652 PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
653 PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
654
655 p->free_slots = pa_flist_new(p->n_blocks*2);
656
657 return p;
658 }
659
660 void pa_mempool_free(pa_mempool *p) {
661 pa_assert(p);
662
663 pa_mutex_lock(p->mutex);
664
665 while (p->imports)
666 pa_memimport_free(p->imports);
667
668 while (p->exports)
669 pa_memexport_free(p->exports);
670
671 pa_mutex_unlock(p->mutex);
672
673 pa_flist_free(p->free_slots, NULL);
674
675 if (pa_atomic_load(&p->stat.n_allocated) > 0) {
676 /* raise(SIGTRAP); */
677 pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));
678 }
679
680 pa_shm_free(&p->memory);
681
682 pa_mutex_free(p->mutex);
683 pa_semaphore_free(p->semaphore);
684
685 pa_xfree(p);
686 }
687
688 /* No lock necessary */
689 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
690 pa_assert(p);
691
692 return &p->stat;
693 }
694
695 /* No lock necessary */
696 void pa_mempool_vacuum(pa_mempool *p) {
697 struct mempool_slot *slot;
698 pa_flist *list;
699
700 pa_assert(p);
701
702 list = pa_flist_new(p->n_blocks*2);
703
704 while ((slot = pa_flist_pop(p->free_slots)))
705 while (pa_flist_push(list, slot) < 0)
706 ;
707
708 while ((slot = pa_flist_pop(list))) {
709 pa_shm_punch(&p->memory,
710 (uint8_t*) slot - (uint8_t*) p->memory.ptr + sizeof(struct mempool_slot),
711 p->block_size - sizeof(struct mempool_slot));
712
713 while (pa_flist_push(p->free_slots, slot))
714 ;
715 }
716
717 pa_flist_free(list, NULL);
718 }
719
720 /* No lock necessary */
721 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
722 pa_assert(p);
723
724 if (!p->memory.shared)
725 return -1;
726
727 *id = p->memory.id;
728
729 return 0;
730 }
731
732 /* No lock necessary */
733 int pa_mempool_is_shared(pa_mempool *p) {
734 pa_assert(p);
735
736 return !!p->memory.shared;
737 }
738
739 /* For recieving blocks from other nodes */
740 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
741 pa_memimport *i;
742
743 pa_assert(p);
744 pa_assert(cb);
745
746 i = pa_xnew(pa_memimport, 1);
747 i->mutex = pa_mutex_new(1);
748 i->pool = p;
749 i->segments = pa_hashmap_new(NULL, NULL);
750 i->blocks = pa_hashmap_new(NULL, NULL);
751 i->release_cb = cb;
752 i->userdata = userdata;
753
754 pa_mutex_lock(p->mutex);
755 PA_LLIST_PREPEND(pa_memimport, p->imports, i);
756 pa_mutex_unlock(p->mutex);
757
758 return i;
759 }
760
761 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
762
763 /* Should be called locked */
764 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
765 pa_memimport_segment* seg;
766
767 if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
768 return NULL;
769
770 seg = pa_xnew(pa_memimport_segment, 1);
771
772 if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
773 pa_xfree(seg);
774 return NULL;
775 }
776
777 seg->import = i;
778 seg->n_blocks = 0;
779
780 pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
781 return seg;
782 }
783
784 /* Should be called locked */
785 static void segment_detach(pa_memimport_segment *seg) {
786 pa_assert(seg);
787
788 pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
789 pa_shm_free(&seg->memory);
790 pa_xfree(seg);
791 }
792
793 /* Self-locked. Not multiple-caller safe */
794 void pa_memimport_free(pa_memimport *i) {
795 pa_memexport *e;
796 pa_memblock *b;
797
798 pa_assert(i);
799
800 pa_mutex_lock(i->mutex);
801
802 while ((b = pa_hashmap_get_first(i->blocks)))
803 memblock_replace_import(b);
804
805 pa_assert(pa_hashmap_size(i->segments) == 0);
806
807 pa_mutex_unlock(i->mutex);
808
809 pa_mutex_lock(i->pool->mutex);
810
811 /* If we've exported this block further we need to revoke that export */
812 for (e = i->pool->exports; e; e = e->next)
813 memexport_revoke_blocks(e, i);
814
815 PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
816
817 pa_mutex_unlock(i->pool->mutex);
818
819 pa_hashmap_free(i->blocks, NULL, NULL);
820 pa_hashmap_free(i->segments, NULL, NULL);
821
822 pa_mutex_free(i->mutex);
823
824 pa_xfree(i);
825 }
826
827 /* Self-locked */
828 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
829 pa_memblock *b = NULL;
830 pa_memimport_segment *seg;
831
832 pa_assert(i);
833
834 pa_mutex_lock(i->mutex);
835
836 if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
837 goto finish;
838
839 if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
840 if (!(seg = segment_attach(i, shm_id)))
841 goto finish;
842
843 if (offset+size > seg->memory.size)
844 goto finish;
845
846 if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks))))
847 b = pa_xnew(pa_memblock, 1);
848
849 PA_REFCNT_INIT(b);
850 b->pool = i->pool;
851 b->type = PA_MEMBLOCK_IMPORTED;
852 b->read_only = 1;
853 pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
854 b->length = size;
855 pa_atomic_store(&b->n_acquired, 0);
856 pa_atomic_store(&b->please_signal, 0);
857 b->per_type.imported.id = block_id;
858 b->per_type.imported.segment = seg;
859
860 pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
861
862 seg->n_blocks++;
863
864 finish:
865 pa_mutex_unlock(i->mutex);
866
867 if (b)
868 stat_add(b);
869
870 return b;
871 }
872
873 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
874 pa_memblock *b;
875 pa_assert(i);
876
877 pa_mutex_lock(i->mutex);
878
879 if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
880 return -1;
881
882 memblock_replace_import(b);
883
884 pa_mutex_unlock(i->mutex);
885
886 return 0;
887 }
888
889 /* For sending blocks to other nodes */
890 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
891 pa_memexport *e;
892
893 pa_assert(p);
894 pa_assert(cb);
895
896 if (!p->memory.shared)
897 return NULL;
898
899 e = pa_xnew(pa_memexport, 1);
900 e->mutex = pa_mutex_new(1);
901 e->pool = p;
902 PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
903 PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
904 e->n_init = 0;
905 e->revoke_cb = cb;
906 e->userdata = userdata;
907
908 pa_mutex_lock(p->mutex);
909 PA_LLIST_PREPEND(pa_memexport, p->exports, e);
910 pa_mutex_unlock(p->mutex);
911 return e;
912 }
913
914 void pa_memexport_free(pa_memexport *e) {
915 pa_assert(e);
916
917 pa_mutex_lock(e->mutex);
918 while (e->used_slots)
919 pa_memexport_process_release(e, e->used_slots - e->slots);
920 pa_mutex_unlock(e->mutex);
921
922 pa_mutex_lock(e->pool->mutex);
923 PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
924 pa_mutex_unlock(e->pool->mutex);
925
926 pa_mutex_free(e->mutex);
927 pa_xfree(e);
928 }
929
930 /* Self-locked */
931 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
932 pa_memblock *b;
933
934 pa_assert(e);
935
936 pa_mutex_lock(e->mutex);
937
938 if (id >= e->n_init)
939 goto fail;
940
941 if (!e->slots[id].block)
942 goto fail;
943
944 b = e->slots[id].block;
945 e->slots[id].block = NULL;
946
947 PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
948 PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
949
950 pa_mutex_unlock(e->mutex);
951
952 /* pa_log("Processing release for %u", id); */
953
954 pa_assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
955 pa_assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
956
957 pa_atomic_dec(&e->pool->stat.n_exported);
958 pa_atomic_sub(&e->pool->stat.exported_size, b->length);
959
960 pa_memblock_unref(b);
961
962 return 0;
963
964 fail:
965 pa_mutex_unlock(e->mutex);
966
967 return -1;
968 }
969
970 /* Self-locked */
971 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
972 struct memexport_slot *slot, *next;
973 pa_assert(e);
974 pa_assert(i);
975
976 pa_mutex_lock(e->mutex);
977
978 for (slot = e->used_slots; slot; slot = next) {
979 uint32_t idx;
980 next = slot->next;
981
982 if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
983 slot->block->per_type.imported.segment->import != i)
984 continue;
985
986 idx = slot - e->slots;
987 e->revoke_cb(e, idx, e->userdata);
988 pa_memexport_process_release(e, idx);
989 }
990
991 pa_mutex_unlock(e->mutex);
992 }
993
994 /* No lock necessary */
995 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
996 pa_memblock *n;
997
998 pa_assert(p);
999 pa_assert(b);
1000
1001 if (b->type == PA_MEMBLOCK_IMPORTED ||
1002 b->type == PA_MEMBLOCK_POOL ||
1003 b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
1004 pa_assert(b->pool == p);
1005 return pa_memblock_ref(b);
1006 }
1007
1008 if (!(n = pa_memblock_new_pool(p, b->length)))
1009 return NULL;
1010
1011 memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
1012 return n;
1013 }
1014
1015 /* Self-locked */
1016 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
1017 pa_shm *memory;
1018 struct memexport_slot *slot;
1019 void *data;
1020
1021 pa_assert(e);
1022 pa_assert(b);
1023 pa_assert(block_id);
1024 pa_assert(shm_id);
1025 pa_assert(offset);
1026 pa_assert(size);
1027 pa_assert(b->pool == e->pool);
1028
1029 if (!(b = memblock_shared_copy(e->pool, b)))
1030 return -1;
1031
1032 pa_mutex_lock(e->mutex);
1033
1034 if (e->free_slots) {
1035 slot = e->free_slots;
1036 PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1037 } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1038 slot = &e->slots[e->n_init++];
1039 else {
1040 pa_mutex_unlock(e->mutex);
1041 pa_memblock_unref(b);
1042 return -1;
1043 }
1044
1045 PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1046 slot->block = b;
1047 *block_id = slot - e->slots;
1048
1049 pa_mutex_unlock(e->mutex);
1050 /* pa_log("Got block id %u", *block_id); */
1051
1052 data = pa_memblock_acquire(b);
1053
1054 if (b->type == PA_MEMBLOCK_IMPORTED) {
1055 pa_assert(b->per_type.imported.segment);
1056 memory = &b->per_type.imported.segment->memory;
1057 } else {
1058 pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1059 pa_assert(b->pool);
1060 memory = &b->pool->memory;
1061 }
1062
1063 pa_assert(data >= memory->ptr);
1064 pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size);
1065
1066 *shm_id = memory->id;
1067 *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1068 *size = b->length;
1069
1070 pa_memblock_release(b);
1071
1072 pa_atomic_inc(&e->pool->stat.n_exported);
1073 pa_atomic_add(&e->pool->stat.exported_size, b->length);
1074
1075 return 0;
1076 }