]> code.delx.au - pulseaudio/blob - src/pulsecore/memblock.c
Fix concurrency bug when turning memblock into a local memblock
[pulseaudio] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <unistd.h>
34
35 #include <pulse/xmalloc.h>
36 #include <pulse/def.h>
37
38 #include <pulsecore/shm.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/hashmap.h>
41 #include <pulsecore/semaphore.h>
42 #include <pulsecore/macro.h>
43 #include <pulsecore/flist.h>
44
45 #include "memblock.h"
46
47 #define PA_MEMPOOL_SLOTS_MAX 128
48 #define PA_MEMPOOL_SLOT_SIZE (16*1024)
49
50 #define PA_MEMEXPORT_SLOTS_MAX 128
51
52 #define PA_MEMIMPORT_SLOTS_MAX 128
53 #define PA_MEMIMPORT_SEGMENTS_MAX 16
54
55 struct pa_memblock {
56 PA_REFCNT_DECLARE; /* the reference counter */
57 pa_mempool *pool;
58
59 pa_memblock_type_t type;
60 int read_only; /* boolean */
61
62 pa_atomic_ptr_t data;
63 size_t length;
64
65 pa_atomic_t n_acquired;
66 pa_atomic_t please_signal;
67
68 union {
69 struct {
70 /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
71 pa_free_cb_t free_cb;
72 } user;
73
74 struct {
75 uint32_t id;
76 pa_memimport_segment *segment;
77 } imported;
78 } per_type;
79 };
80
81 struct pa_memimport_segment {
82 pa_memimport *import;
83 pa_shm memory;
84 unsigned n_blocks;
85 };
86
87 struct pa_memimport {
88 pa_mutex *mutex;
89
90 pa_mempool *pool;
91 pa_hashmap *segments;
92 pa_hashmap *blocks;
93
94 /* Called whenever an imported memory block is no longer
95 * needed. */
96 pa_memimport_release_cb_t release_cb;
97 void *userdata;
98
99 PA_LLIST_FIELDS(pa_memimport);
100 };
101
102 struct memexport_slot {
103 PA_LLIST_FIELDS(struct memexport_slot);
104 pa_memblock *block;
105 };
106
107 struct pa_memexport {
108 pa_mutex *mutex;
109 pa_mempool *pool;
110
111 struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
112
113 PA_LLIST_HEAD(struct memexport_slot, free_slots);
114 PA_LLIST_HEAD(struct memexport_slot, used_slots);
115 unsigned n_init;
116
117 /* Called whenever a client from which we imported a memory block
118 which we in turn exported to another client dies and we need to
119 revoke the memory block accordingly */
120 pa_memexport_revoke_cb_t revoke_cb;
121 void *userdata;
122
123 PA_LLIST_FIELDS(pa_memexport);
124 };
125
126 struct mempool_slot {
127 PA_LLIST_FIELDS(struct mempool_slot);
128 /* the actual data follows immediately hereafter */
129 };
130
131 struct pa_mempool {
132 pa_semaphore *semaphore;
133 pa_mutex *mutex;
134
135 pa_shm memory;
136 size_t block_size;
137 unsigned n_blocks;
138
139 pa_atomic_t n_init;
140
141 PA_LLIST_HEAD(pa_memimport, imports);
142 PA_LLIST_HEAD(pa_memexport, exports);
143
144 /* A list of free slots that may be reused */
145 pa_flist *free_slots;
146
147 pa_mempool_stat stat;
148 };
149
150 static void segment_detach(pa_memimport_segment *seg);
151
152 /* No lock necessary */
153 static void stat_add(pa_memblock*b) {
154 assert(b);
155 assert(b->pool);
156
157 pa_atomic_inc(&b->pool->stat.n_allocated);
158 pa_atomic_add(&b->pool->stat.allocated_size, b->length);
159
160 pa_atomic_inc(&b->pool->stat.n_accumulated);
161 pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
162
163 if (b->type == PA_MEMBLOCK_IMPORTED) {
164 pa_atomic_inc(&b->pool->stat.n_imported);
165 pa_atomic_add(&b->pool->stat.imported_size, b->length);
166 }
167
168 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
169 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
170 }
171
172 /* No lock necessary */
173 static void stat_remove(pa_memblock *b) {
174 assert(b);
175 assert(b->pool);
176
177 assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
178 assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
179
180 pa_atomic_dec(&b->pool->stat.n_allocated);
181 pa_atomic_sub(&b->pool->stat.allocated_size, b->length);
182
183 if (b->type == PA_MEMBLOCK_IMPORTED) {
184 assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
185 assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
186
187 pa_atomic_dec(&b->pool->stat.n_imported);
188 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
189 }
190
191 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
192 }
193
194 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
195
196 /* No lock necessary */
197 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
198 pa_memblock *b;
199
200 assert(p);
201 assert(length > 0);
202
203 if (!(b = pa_memblock_new_pool(p, length)))
204 b = memblock_new_appended(p, length);
205
206 return b;
207 }
208
209 /* No lock necessary */
210 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
211 pa_memblock *b;
212
213 assert(p);
214 assert(length > 0);
215
216 b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
217 PA_REFCNT_INIT(b);
218 b->pool = p;
219 b->type = PA_MEMBLOCK_APPENDED;
220 b->read_only = 0;
221 pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
222 b->length = length;
223 pa_atomic_store(&b->n_acquired, 0);
224 pa_atomic_store(&b->please_signal, 0);
225
226 stat_add(b);
227 return b;
228 }
229
230 /* No lock necessary */
231 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
232 struct mempool_slot *slot;
233 assert(p);
234
235 if (!(slot = pa_flist_pop(p->free_slots))) {
236 int idx;
237
238 /* The free list was empty, we have to allocate a new entry */
239
240 if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
241 pa_atomic_dec(&p->n_init);
242 else
243 slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
244
245 if (!slot) {
246 pa_log_debug("Pool full");
247 pa_atomic_inc(&p->stat.n_pool_full);
248 return NULL;
249 }
250 }
251
252 return slot;
253 }
254
255 /* No lock necessary */
256 static void* mempool_slot_data(struct mempool_slot *slot) {
257 assert(slot);
258
259 return (uint8_t*) slot + sizeof(struct mempool_slot);
260 }
261
262 /* No lock necessary */
263 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
264 assert(p);
265
266 assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
267 assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
268
269 return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
270 }
271
272 /* No lock necessary */
273 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
274 unsigned idx;
275
276 if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
277 return NULL;
278
279 return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
280 }
281
282 /* No lock necessary */
283 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
284 pa_memblock *b = NULL;
285 struct mempool_slot *slot;
286
287 assert(p);
288 assert(length > 0);
289
290 if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
291
292 if (!(slot = mempool_allocate_slot(p)))
293 return NULL;
294
295 b = mempool_slot_data(slot);
296 b->type = PA_MEMBLOCK_POOL;
297 pa_atomic_ptr_store(&b->data, (uint8_t*) b + sizeof(pa_memblock));
298
299 } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
300
301 if (!(slot = mempool_allocate_slot(p)))
302 return NULL;
303
304 b = pa_xnew(pa_memblock, 1);
305 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
306 pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
307
308 } else {
309 pa_log_debug("Memory block too large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
310 pa_atomic_inc(&p->stat.n_too_large_for_pool);
311 return NULL;
312 }
313
314 PA_REFCNT_INIT(b);
315 b->pool = p;
316 b->read_only = 0;
317 b->length = length;
318 pa_atomic_store(&b->n_acquired, 0);
319 pa_atomic_store(&b->please_signal, 0);
320
321 stat_add(b);
322 return b;
323 }
324
325 /* No lock necessary */
326 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
327 pa_memblock *b;
328
329 assert(p);
330 assert(d);
331 assert(length > 0);
332
333 b = pa_xnew(pa_memblock, 1);
334 PA_REFCNT_INIT(b);
335 b->pool = p;
336 b->type = PA_MEMBLOCK_FIXED;
337 b->read_only = read_only;
338 pa_atomic_ptr_store(&b->data, d);
339 b->length = length;
340 pa_atomic_store(&b->n_acquired, 0);
341 pa_atomic_store(&b->please_signal, 0);
342
343 stat_add(b);
344 return b;
345 }
346
347 /* No lock necessary */
348 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
349 pa_memblock *b;
350
351 assert(p);
352 assert(d);
353 assert(length > 0);
354 assert(free_cb);
355
356 b = pa_xnew(pa_memblock, 1);
357 PA_REFCNT_INIT(b);
358 b->pool = p;
359 b->type = PA_MEMBLOCK_USER;
360 b->read_only = read_only;
361 pa_atomic_ptr_store(&b->data, d);
362 b->length = length;
363 pa_atomic_store(&b->n_acquired, 0);
364 pa_atomic_store(&b->please_signal, 0);
365
366 b->per_type.user.free_cb = free_cb;
367
368 stat_add(b);
369 return b;
370 }
371
372 /* No lock necessary */
373 int pa_memblock_is_read_only(pa_memblock *b) {
374 assert(b);
375 assert(PA_REFCNT_VALUE(b) > 0);
376
377 return b->read_only && PA_REFCNT_VALUE(b) == 1;
378 }
379
380 /* No lock necessary */
381 void* pa_memblock_acquire(pa_memblock *b) {
382 assert(b);
383 assert(PA_REFCNT_VALUE(b) > 0);
384
385 pa_atomic_inc(&b->n_acquired);
386
387 return pa_atomic_ptr_load(&b->data);
388 }
389
390 /* No lock necessary, in corner cases locks by its own */
391 void pa_memblock_release(pa_memblock *b) {
392 int r;
393 assert(b);
394 assert(PA_REFCNT_VALUE(b) > 0);
395
396 r = pa_atomic_dec(&b->n_acquired);
397 assert(r >= 1);
398
399 /* Signal a waiting thread that this memblock is no longer used */
400 if (r == 1 && pa_atomic_load(&b->please_signal))
401 pa_semaphore_post(b->pool->semaphore);
402 }
403
404 size_t pa_memblock_get_length(pa_memblock *b) {
405 assert(b);
406 assert(PA_REFCNT_VALUE(b) > 0);
407
408 return b->length;
409 }
410
411 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
412 assert(b);
413 assert(PA_REFCNT_VALUE(b) > 0);
414
415 return b->pool;
416 }
417
418 /* No lock necessary */
419 pa_memblock* pa_memblock_ref(pa_memblock*b) {
420 assert(b);
421 assert(PA_REFCNT_VALUE(b) > 0);
422
423 PA_REFCNT_INC(b);
424 return b;
425 }
426
427 static void memblock_free(pa_memblock *b) {
428 assert(b);
429
430 assert(pa_atomic_load(&b->n_acquired) == 0);
431
432 stat_remove(b);
433
434 switch (b->type) {
435 case PA_MEMBLOCK_USER :
436 assert(b->per_type.user.free_cb);
437 b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
438
439 /* Fall through */
440
441 case PA_MEMBLOCK_FIXED:
442 case PA_MEMBLOCK_APPENDED :
443 pa_xfree(b);
444 break;
445
446 case PA_MEMBLOCK_IMPORTED : {
447 pa_memimport_segment *segment;
448 pa_memimport *import;
449
450 /* FIXME! This should be implemented lock-free */
451
452 segment = b->per_type.imported.segment;
453 assert(segment);
454 import = segment->import;
455 assert(import);
456
457 pa_mutex_lock(import->mutex);
458 pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
459 if (-- segment->n_blocks <= 0)
460 segment_detach(segment);
461
462 pa_mutex_unlock(import->mutex);
463
464 import->release_cb(import, b->per_type.imported.id, import->userdata);
465
466 pa_xfree(b);
467 break;
468 }
469
470 case PA_MEMBLOCK_POOL_EXTERNAL:
471 case PA_MEMBLOCK_POOL: {
472 struct mempool_slot *slot;
473 int call_free;
474
475 slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
476 assert(slot);
477
478 call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
479
480 /* The free list dimensions should easily allow all slots
481 * to fit in, hence try harder if pushing this slot into
482 * the free list fails */
483 while (pa_flist_push(b->pool->free_slots, slot) < 0)
484 ;
485
486 if (call_free)
487 pa_xfree(b);
488
489 break;
490 }
491
492 case PA_MEMBLOCK_TYPE_MAX:
493 default:
494 abort();
495 }
496 }
497
498 /* No lock necessary */
499 void pa_memblock_unref(pa_memblock*b) {
500 assert(b);
501 assert(PA_REFCNT_VALUE(b) > 0);
502
503 if (PA_REFCNT_DEC(b) > 0)
504 return;
505
506 memblock_free(b);
507 }
508
509 /* Self locked */
510 static void memblock_wait(pa_memblock *b) {
511 assert(b);
512
513 if (pa_atomic_load(&b->n_acquired) > 0) {
514 /* We need to wait until all threads gave up access to the
515 * memory block before we can go on. Unfortunately this means
516 * that we have to lock and wait here. Sniff! */
517
518 pa_atomic_inc(&b->please_signal);
519
520 while (pa_atomic_load(&b->n_acquired) > 0)
521 pa_semaphore_wait(b->pool->semaphore);
522
523 pa_atomic_dec(&b->please_signal);
524 }
525 }
526
527 /* No lock necessary. This function is not multiple caller safe! */
528 static void memblock_make_local(pa_memblock *b) {
529 assert(b);
530
531 pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
532
533 if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
534 struct mempool_slot *slot;
535
536 if ((slot = mempool_allocate_slot(b->pool))) {
537 void *new_data;
538 /* We can move it into a local pool, perfect! */
539
540 new_data = mempool_slot_data(slot);
541 memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
542 pa_atomic_ptr_store(&b->data, new_data);
543
544 b->type = PA_MEMBLOCK_POOL_EXTERNAL;
545 b->read_only = 0;
546
547 goto finish;
548 }
549 }
550
551 /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
552 b->per_type.user.free_cb = pa_xfree;
553 pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
554
555 b->type = PA_MEMBLOCK_USER;
556 b->read_only = 0;
557
558 finish:
559 pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
560 pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
561 memblock_wait(b);
562 }
563
564 /* No lock necessary. This function is not multiple caller safe*/
565 void pa_memblock_unref_fixed(pa_memblock *b) {
566 assert(b);
567 assert(PA_REFCNT_VALUE(b) > 0);
568 assert(b->type == PA_MEMBLOCK_FIXED);
569
570 if (PA_REFCNT_VALUE(b) > 1)
571 memblock_make_local(b);
572
573 pa_memblock_unref(b);
574 }
575
576 /* Self-locked. This function is not multiple-caller safe */
577 static void memblock_replace_import(pa_memblock *b) {
578 pa_memimport_segment *seg;
579
580 assert(b);
581 assert(b->type == PA_MEMBLOCK_IMPORTED);
582
583 assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
584 assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
585 pa_atomic_dec(&b->pool->stat.n_imported);
586 pa_atomic_sub(&b->pool->stat.imported_size, b->length);
587
588 seg = b->per_type.imported.segment;
589 assert(seg);
590 assert(seg->import);
591
592 pa_mutex_lock(seg->import->mutex);
593
594 pa_hashmap_remove(
595 seg->import->blocks,
596 PA_UINT32_TO_PTR(b->per_type.imported.id));
597
598 memblock_make_local(b);
599
600 if (-- seg->n_blocks <= 0)
601 segment_detach(seg);
602
603 pa_mutex_unlock(seg->import->mutex);
604 }
605
606 pa_mempool* pa_mempool_new(int shared) {
607 size_t ps;
608 pa_mempool *p;
609
610 p = pa_xnew(pa_mempool, 1);
611
612 p->mutex = pa_mutex_new(1);
613 p->semaphore = pa_semaphore_new(0);
614
615 #ifdef HAVE_SYSCONF
616 ps = (size_t) sysconf(_SC_PAGESIZE);
617 #elif defined(PAGE_SIZE)
618 ps = (size_t) PAGE_SIZE;
619 #else
620 ps = 4096; /* Let's hope it's like x86. */
621 #endif
622
623 p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
624
625 if (p->block_size < ps)
626 p->block_size = ps;
627
628 p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
629
630 assert(p->block_size > sizeof(struct mempool_slot));
631
632 if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
633 pa_xfree(p);
634 return NULL;
635 }
636
637 memset(&p->stat, 0, sizeof(p->stat));
638 pa_atomic_store(&p->n_init, 0);
639
640 PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
641 PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
642
643 p->free_slots = pa_flist_new(p->n_blocks*2);
644
645 return p;
646 }
647
648 void pa_mempool_free(pa_mempool *p) {
649 assert(p);
650
651 pa_mutex_lock(p->mutex);
652
653 while (p->imports)
654 pa_memimport_free(p->imports);
655
656 while (p->exports)
657 pa_memexport_free(p->exports);
658
659 pa_mutex_unlock(p->mutex);
660
661 if (pa_atomic_load(&p->stat.n_allocated) > 0)
662 pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!");
663
664 pa_flist_free(p->free_slots, NULL);
665 pa_shm_free(&p->memory);
666
667 pa_mutex_free(p->mutex);
668 pa_semaphore_free(p->semaphore);
669
670 pa_xfree(p);
671 }
672
673 /* No lock necessary */
674 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
675 assert(p);
676
677 return &p->stat;
678 }
679
680 /* No lock necessary */
681 void pa_mempool_vacuum(pa_mempool *p) {
682 struct mempool_slot *slot;
683 pa_flist *list;
684
685 assert(p);
686
687 list = pa_flist_new(p->n_blocks*2);
688
689 while ((slot = pa_flist_pop(p->free_slots)))
690 while (pa_flist_push(list, slot) < 0)
691 ;
692
693 while ((slot = pa_flist_pop(list))) {
694 pa_shm_punch(&p->memory,
695 (uint8_t*) slot - (uint8_t*) p->memory.ptr + sizeof(struct mempool_slot),
696 p->block_size - sizeof(struct mempool_slot));
697
698 while (pa_flist_push(p->free_slots, slot))
699 ;
700 }
701
702 pa_flist_free(list, NULL);
703 }
704
705 /* No lock necessary */
706 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
707 assert(p);
708
709 if (!p->memory.shared)
710 return -1;
711
712 *id = p->memory.id;
713
714 return 0;
715 }
716
717 /* No lock necessary */
718 int pa_mempool_is_shared(pa_mempool *p) {
719 assert(p);
720
721 return !!p->memory.shared;
722 }
723
724 /* For recieving blocks from other nodes */
725 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
726 pa_memimport *i;
727
728 assert(p);
729 assert(cb);
730
731 i = pa_xnew(pa_memimport, 1);
732 i->mutex = pa_mutex_new(0);
733 i->pool = p;
734 i->segments = pa_hashmap_new(NULL, NULL);
735 i->blocks = pa_hashmap_new(NULL, NULL);
736 i->release_cb = cb;
737 i->userdata = userdata;
738
739 pa_mutex_lock(p->mutex);
740 PA_LLIST_PREPEND(pa_memimport, p->imports, i);
741 pa_mutex_unlock(p->mutex);
742
743 return i;
744 }
745
746 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
747
748 /* Should be called locked */
749 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
750 pa_memimport_segment* seg;
751
752 if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
753 return NULL;
754
755 seg = pa_xnew(pa_memimport_segment, 1);
756
757 if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
758 pa_xfree(seg);
759 return NULL;
760 }
761
762 seg->import = i;
763 seg->n_blocks = 0;
764
765 pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
766 return seg;
767 }
768
769 /* Should be called locked */
770 static void segment_detach(pa_memimport_segment *seg) {
771 assert(seg);
772
773 pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
774 pa_shm_free(&seg->memory);
775 pa_xfree(seg);
776 }
777
778 /* Self-locked. Not multiple-caller safe */
779 void pa_memimport_free(pa_memimport *i) {
780 pa_memexport *e;
781 pa_memblock *b;
782
783 assert(i);
784
785 pa_mutex_lock(i->mutex);
786
787 while ((b = pa_hashmap_get_first(i->blocks)))
788 memblock_replace_import(b);
789
790 assert(pa_hashmap_size(i->segments) == 0);
791
792 pa_mutex_unlock(i->mutex);
793
794 pa_mutex_lock(i->pool->mutex);
795
796 /* If we've exported this block further we need to revoke that export */
797 for (e = i->pool->exports; e; e = e->next)
798 memexport_revoke_blocks(e, i);
799
800 PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
801
802 pa_mutex_unlock(i->pool->mutex);
803
804 pa_hashmap_free(i->blocks, NULL, NULL);
805 pa_hashmap_free(i->segments, NULL, NULL);
806
807 pa_mutex_free(i->mutex);
808
809 pa_xfree(i);
810 }
811
812 /* Self-locked */
813 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
814 pa_memblock *b = NULL;
815 pa_memimport_segment *seg;
816
817 assert(i);
818
819 pa_mutex_lock(i->mutex);
820
821 if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
822 goto finish;
823
824 if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
825 if (!(seg = segment_attach(i, shm_id)))
826 goto finish;
827
828 if (offset+size > seg->memory.size)
829 goto finish;
830
831 b = pa_xnew(pa_memblock, 1);
832 PA_REFCNT_INIT(b);
833 b->pool = i->pool;
834 b->type = PA_MEMBLOCK_IMPORTED;
835 b->read_only = 1;
836 pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
837 b->length = size;
838 pa_atomic_store(&b->n_acquired, 0);
839 pa_atomic_store(&b->please_signal, 0);
840 b->per_type.imported.id = block_id;
841 b->per_type.imported.segment = seg;
842
843 pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
844
845 seg->n_blocks++;
846
847 finish:
848 pa_mutex_unlock(i->mutex);
849
850 if (b)
851 stat_add(b);
852
853 return b;
854 }
855
856 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
857 pa_memblock *b;
858 assert(i);
859
860 pa_mutex_lock(i->mutex);
861
862 if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
863 return -1;
864
865 memblock_replace_import(b);
866
867 pa_mutex_unlock(i->mutex);
868
869 return 0;
870 }
871
872 /* For sending blocks to other nodes */
873 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
874 pa_memexport *e;
875
876 assert(p);
877 assert(cb);
878
879 if (!p->memory.shared)
880 return NULL;
881
882 e = pa_xnew(pa_memexport, 1);
883 e->mutex = pa_mutex_new(1);
884 e->pool = p;
885 PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
886 PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
887 e->n_init = 0;
888 e->revoke_cb = cb;
889 e->userdata = userdata;
890
891 pa_mutex_lock(p->mutex);
892 PA_LLIST_PREPEND(pa_memexport, p->exports, e);
893 pa_mutex_unlock(p->mutex);
894 return e;
895 }
896
897 void pa_memexport_free(pa_memexport *e) {
898 assert(e);
899
900 pa_mutex_lock(e->mutex);
901 while (e->used_slots)
902 pa_memexport_process_release(e, e->used_slots - e->slots);
903 pa_mutex_unlock(e->mutex);
904
905 pa_mutex_lock(e->pool->mutex);
906 PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
907 pa_mutex_unlock(e->pool->mutex);
908
909 pa_xfree(e);
910 }
911
912 /* Self-locked */
913 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
914 pa_memblock *b;
915
916 assert(e);
917
918 pa_mutex_lock(e->mutex);
919
920 if (id >= e->n_init)
921 goto fail;
922
923 if (!e->slots[id].block)
924 goto fail;
925
926 b = e->slots[id].block;
927 e->slots[id].block = NULL;
928
929 PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
930 PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
931
932 pa_mutex_unlock(e->mutex);
933
934 /* pa_log("Processing release for %u", id); */
935
936 assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
937 assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
938
939 pa_atomic_dec(&e->pool->stat.n_exported);
940 pa_atomic_sub(&e->pool->stat.exported_size, b->length);
941
942 pa_memblock_unref(b);
943
944 return 0;
945
946 fail:
947 pa_mutex_unlock(e->mutex);
948
949 return -1;
950 }
951
952 /* Self-locked */
953 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
954 struct memexport_slot *slot, *next;
955 assert(e);
956 assert(i);
957
958 pa_mutex_lock(e->mutex);
959
960 for (slot = e->used_slots; slot; slot = next) {
961 uint32_t idx;
962 next = slot->next;
963
964 if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
965 slot->block->per_type.imported.segment->import != i)
966 continue;
967
968 idx = slot - e->slots;
969 e->revoke_cb(e, idx, e->userdata);
970 pa_memexport_process_release(e, idx);
971 }
972
973 pa_mutex_unlock(e->mutex);
974 }
975
976 /* No lock necessary */
977 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
978 pa_memblock *n;
979
980 assert(p);
981 assert(b);
982
983 if (b->type == PA_MEMBLOCK_IMPORTED ||
984 b->type == PA_MEMBLOCK_POOL ||
985 b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
986 assert(b->pool == p);
987 return pa_memblock_ref(b);
988 }
989
990 if (!(n = pa_memblock_new_pool(p, b->length)))
991 return NULL;
992
993 memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
994 return n;
995 }
996
997 /* Self-locked */
998 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
999 pa_shm *memory;
1000 struct memexport_slot *slot;
1001 void *data;
1002 size_t length;
1003
1004 assert(e);
1005 assert(b);
1006 assert(block_id);
1007 assert(shm_id);
1008 assert(offset);
1009 assert(size);
1010 assert(b->pool == e->pool);
1011
1012 if (!(b = memblock_shared_copy(e->pool, b)))
1013 return -1;
1014
1015 pa_mutex_lock(e->mutex);
1016
1017 if (e->free_slots) {
1018 slot = e->free_slots;
1019 PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1020 } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1021 slot = &e->slots[e->n_init++];
1022 else {
1023 pa_mutex_unlock(e->mutex);
1024 pa_memblock_unref(b);
1025 return -1;
1026 }
1027
1028 PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1029 slot->block = b;
1030 *block_id = slot - e->slots;
1031
1032 pa_mutex_unlock(e->mutex);
1033 /* pa_log("Got block id %u", *block_id); */
1034
1035 data = pa_memblock_acquire(b);
1036
1037 if (b->type == PA_MEMBLOCK_IMPORTED) {
1038 assert(b->per_type.imported.segment);
1039 memory = &b->per_type.imported.segment->memory;
1040 } else {
1041 assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1042 assert(b->pool);
1043 memory = &b->pool->memory;
1044 }
1045
1046 assert(data >= memory->ptr);
1047 assert((uint8_t*) data + length <= (uint8_t*) memory->ptr + memory->size);
1048
1049 *shm_id = memory->id;
1050 *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1051 *size = length;
1052
1053 pa_memblock_release(b);
1054
1055 pa_atomic_inc(&e->pool->stat.n_exported);
1056 pa_atomic_add(&e->pool->stat.exported_size, length);
1057
1058 return 0;
1059 }