]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
369e22ca82eb1943c9ea3da50785dbfcb43478de
[pulseaudio] / src / pulsecore / pstream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_UN_H
32 #include <sys/un.h>
33 #endif
34 #ifdef HAVE_NETINET_IN_H
35 #include <netinet/in.h>
36 #endif
37
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/socket.h>
41 #include <pulsecore/queue.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-scache.h>
44 #include <pulsecore/creds.h>
45 #include <pulsecore/refcnt.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/macro.h>
48
49 #include "pstream.h"
50
51 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
52 #define PA_FLAG_SHMDATA 0x80000000LU
53 #define PA_FLAG_SHMRELEASE 0x40000000LU
54 #define PA_FLAG_SHMREVOKE 0xC0000000LU
55 #define PA_FLAG_SHMMASK 0xFF000000LU
56 #define PA_FLAG_SEEKMASK 0x000000FFLU
57
58 /* The sequence descriptor header consists of 5 32bit integers: */
59 enum {
60 PA_PSTREAM_DESCRIPTOR_LENGTH,
61 PA_PSTREAM_DESCRIPTOR_CHANNEL,
62 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
63 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
64 PA_PSTREAM_DESCRIPTOR_FLAGS,
65 PA_PSTREAM_DESCRIPTOR_MAX
66 };
67
68 /* If we have an SHM block, this info follows the descriptor */
69 enum {
70 PA_PSTREAM_SHM_BLOCKID,
71 PA_PSTREAM_SHM_SHMID,
72 PA_PSTREAM_SHM_INDEX,
73 PA_PSTREAM_SHM_LENGTH,
74 PA_PSTREAM_SHM_MAX
75 };
76
77 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
78
79 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
80 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
81
82 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
83
84 struct item_info {
85 enum {
86 PA_PSTREAM_ITEM_PACKET,
87 PA_PSTREAM_ITEM_MEMBLOCK,
88 PA_PSTREAM_ITEM_SHMRELEASE,
89 PA_PSTREAM_ITEM_SHMREVOKE
90 } type;
91
92 /* packet info */
93 pa_packet *packet;
94 #ifdef HAVE_CREDS
95 pa_bool_t with_creds;
96 pa_creds creds;
97 #endif
98
99 /* memblock info */
100 pa_memchunk chunk;
101 uint32_t channel;
102 int64_t offset;
103 pa_seek_mode_t seek_mode;
104
105 /* release/revoke info */
106 uint32_t block_id;
107 };
108
109 struct pa_pstream {
110 PA_REFCNT_DECLARE;
111
112 pa_mainloop_api *mainloop;
113 pa_defer_event *defer_event;
114 pa_iochannel *io;
115
116 pa_queue *send_queue;
117
118 pa_bool_t dead;
119
120 struct {
121 pa_pstream_descriptor descriptor;
122 struct item_info* current;
123 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
124 void *data;
125 size_t index;
126 pa_memchunk memchunk;
127 } write;
128
129 struct {
130 pa_pstream_descriptor descriptor;
131 pa_memblock *memblock;
132 pa_packet *packet;
133 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
134 void *data;
135 size_t index;
136 } read;
137
138 pa_bool_t use_shm;
139 pa_memimport *import;
140 pa_memexport *export;
141
142 pa_pstream_packet_cb_t recieve_packet_callback;
143 void *recieve_packet_callback_userdata;
144
145 pa_pstream_memblock_cb_t recieve_memblock_callback;
146 void *recieve_memblock_callback_userdata;
147
148 pa_pstream_notify_cb_t drain_callback;
149 void *drain_callback_userdata;
150
151 pa_pstream_notify_cb_t die_callback;
152 void *die_callback_userdata;
153
154 pa_pstream_block_id_cb_t revoke_callback;
155 void *revoke_callback_userdata;
156
157 pa_pstream_block_id_cb_t release_callback;
158 void *release_callback_userdata;
159
160 pa_mempool *mempool;
161
162 #ifdef HAVE_CREDS
163 pa_creds read_creds, write_creds;
164 pa_bool_t read_creds_valid, send_creds_now;
165 #endif
166 };
167
168 static int do_write(pa_pstream *p);
169 static int do_read(pa_pstream *p);
170
171 static void do_something(pa_pstream *p) {
172 pa_assert(p);
173 pa_assert(PA_REFCNT_VALUE(p) > 0);
174
175 pa_pstream_ref(p);
176
177 p->mainloop->defer_enable(p->defer_event, 0);
178
179 if (!p->dead && pa_iochannel_is_readable(p->io)) {
180 if (do_read(p) < 0)
181 goto fail;
182 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
183 goto fail;
184
185 if (!p->dead && pa_iochannel_is_writable(p->io)) {
186 if (do_write(p) < 0)
187 goto fail;
188 }
189
190 pa_pstream_unref(p);
191 return;
192
193 fail:
194
195 if (p->die_callback)
196 p->die_callback(p, p->die_callback_userdata);
197
198 pa_pstream_unlink(p);
199 pa_pstream_unref(p);
200 }
201
202 static void io_callback(pa_iochannel*io, void *userdata) {
203 pa_pstream *p = userdata;
204
205 pa_assert(p);
206 pa_assert(PA_REFCNT_VALUE(p) > 0);
207 pa_assert(p->io == io);
208
209 do_something(p);
210 }
211
212 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
213 pa_pstream *p = userdata;
214
215 pa_assert(p);
216 pa_assert(PA_REFCNT_VALUE(p) > 0);
217 pa_assert(p->defer_event == e);
218 pa_assert(p->mainloop == m);
219
220 do_something(p);
221 }
222
223 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
224
225 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
226 pa_pstream *p;
227
228 pa_assert(m);
229 pa_assert(io);
230 pa_assert(pool);
231
232 p = pa_xnew(pa_pstream, 1);
233 PA_REFCNT_INIT(p);
234 p->io = io;
235 pa_iochannel_set_callback(io, io_callback, p);
236 p->dead = FALSE;
237
238 p->mainloop = m;
239 p->defer_event = m->defer_new(m, defer_callback, p);
240 m->defer_enable(p->defer_event, 0);
241
242 p->send_queue = pa_queue_new();
243
244 p->write.current = NULL;
245 p->write.index = 0;
246 pa_memchunk_reset(&p->write.memchunk);
247 p->read.memblock = NULL;
248 p->read.packet = NULL;
249 p->read.index = 0;
250
251 p->recieve_packet_callback = NULL;
252 p->recieve_packet_callback_userdata = NULL;
253 p->recieve_memblock_callback = NULL;
254 p->recieve_memblock_callback_userdata = NULL;
255 p->drain_callback = NULL;
256 p->drain_callback_userdata = NULL;
257 p->die_callback = NULL;
258 p->die_callback_userdata = NULL;
259 p->revoke_callback = NULL;
260 p->revoke_callback_userdata = NULL;
261 p->release_callback = NULL;
262 p->release_callback_userdata = NULL;
263
264 p->mempool = pool;
265
266 p->use_shm = FALSE;
267 p->export = NULL;
268
269 /* We do importing unconditionally */
270 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
271
272 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
273 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
274
275 #ifdef HAVE_CREDS
276 p->send_creds_now = FALSE;
277 p->read_creds_valid = FALSE;
278 #endif
279 return p;
280 }
281
282 static void item_free(void *item, void *q) {
283 struct item_info *i = item;
284 pa_assert(i);
285
286 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
287 pa_assert(i->chunk.memblock);
288 pa_memblock_unref(i->chunk.memblock);
289 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
290 pa_assert(i->packet);
291 pa_packet_unref(i->packet);
292 }
293
294 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
295 pa_xfree(i);
296 }
297
298 static void pstream_free(pa_pstream *p) {
299 pa_assert(p);
300
301 pa_pstream_unlink(p);
302
303 pa_queue_free(p->send_queue, item_free, NULL);
304
305 if (p->write.current)
306 item_free(p->write.current, NULL);
307
308 if (p->write.memchunk.memblock)
309 pa_memblock_unref(p->write.memchunk.memblock);
310
311 if (p->read.memblock)
312 pa_memblock_unref(p->read.memblock);
313
314 if (p->read.packet)
315 pa_packet_unref(p->read.packet);
316
317 pa_xfree(p);
318 }
319
320 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
321 struct item_info *i;
322
323 pa_assert(p);
324 pa_assert(PA_REFCNT_VALUE(p) > 0);
325 pa_assert(packet);
326
327 if (p->dead)
328 return;
329
330 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
331 i = pa_xnew(struct item_info, 1);
332
333 i->type = PA_PSTREAM_ITEM_PACKET;
334 i->packet = pa_packet_ref(packet);
335
336 #ifdef HAVE_CREDS
337 if ((i->with_creds = !!creds))
338 i->creds = *creds;
339 #endif
340
341 pa_queue_push(p->send_queue, i);
342
343 p->mainloop->defer_enable(p->defer_event, 1);
344 }
345
346 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
347 size_t length, idx;
348 size_t bsm;
349
350 pa_assert(p);
351 pa_assert(PA_REFCNT_VALUE(p) > 0);
352 pa_assert(channel != (uint32_t) -1);
353 pa_assert(chunk);
354
355 if (p->dead)
356 return;
357
358 idx = 0;
359 length = chunk->length;
360
361 bsm = pa_mempool_block_size_max(p->mempool);
362
363 while (length > 0) {
364 struct item_info *i;
365 size_t n;
366
367 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
368 i = pa_xnew(struct item_info, 1);
369 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
370
371 n = PA_MIN(length, bsm);
372 i->chunk.index = chunk->index + idx;
373 i->chunk.length = n;
374 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
375
376 i->channel = channel;
377 i->offset = offset;
378 i->seek_mode = seek_mode;
379 #ifdef HAVE_CREDS
380 i->with_creds = FALSE;
381 #endif
382
383 pa_queue_push(p->send_queue, i);
384
385 idx += n;
386 length -= n;
387 }
388
389 p->mainloop->defer_enable(p->defer_event, 1);
390 }
391
392 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
393 struct item_info *item;
394 pa_assert(p);
395 pa_assert(PA_REFCNT_VALUE(p) > 0);
396
397 if (p->dead)
398 return;
399
400 /* pa_log("Releasing block %u", block_id); */
401
402 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
403 item = pa_xnew(struct item_info, 1);
404 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
405 item->block_id = block_id;
406 #ifdef HAVE_CREDS
407 item->with_creds = FALSE;
408 #endif
409
410 pa_queue_push(p->send_queue, item);
411 p->mainloop->defer_enable(p->defer_event, 1);
412 }
413
414 /* might be called from thread context */
415 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
416 pa_pstream *p = userdata;
417
418 pa_assert(p);
419 pa_assert(PA_REFCNT_VALUE(p) > 0);
420
421 if (p->dead)
422 return;
423
424 if (p->release_callback)
425 p->release_callback(p, block_id, p->release_callback_userdata);
426 else
427 pa_pstream_send_release(p, block_id);
428 }
429
430 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
431 struct item_info *item;
432 pa_assert(p);
433 pa_assert(PA_REFCNT_VALUE(p) > 0);
434
435 if (p->dead)
436 return;
437 /* pa_log("Revoking block %u", block_id); */
438
439 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
440 item = pa_xnew(struct item_info, 1);
441 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
442 item->block_id = block_id;
443 #ifdef HAVE_CREDS
444 item->with_creds = FALSE;
445 #endif
446
447 pa_queue_push(p->send_queue, item);
448 p->mainloop->defer_enable(p->defer_event, 1);
449 }
450
451 /* might be called from thread context */
452 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
453 pa_pstream *p = userdata;
454
455 pa_assert(p);
456 pa_assert(PA_REFCNT_VALUE(p) > 0);
457
458 if (p->revoke_callback)
459 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
460 else
461 pa_pstream_send_revoke(p, block_id);
462 }
463
464 static void prepare_next_write_item(pa_pstream *p) {
465 pa_assert(p);
466 pa_assert(PA_REFCNT_VALUE(p) > 0);
467
468 p->write.current = pa_queue_pop(p->send_queue);
469
470 if (!p->write.current)
471 return;
472
473 p->write.index = 0;
474 p->write.data = NULL;
475 pa_memchunk_reset(&p->write.memchunk);
476
477 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
478 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
479 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
480 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
482
483 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
484
485 pa_assert(p->write.current->packet);
486 p->write.data = p->write.current->packet->data;
487 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
488
489 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
490
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
492 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
493
494 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
495
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
497 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
498
499 } else {
500 uint32_t flags;
501 pa_bool_t send_payload = TRUE;
502
503 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
504 pa_assert(p->write.current->chunk.memblock);
505
506 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
507 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
508 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
509
510 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
511
512 if (p->use_shm) {
513 uint32_t block_id, shm_id;
514 size_t offset, length;
515
516 pa_assert(p->export);
517
518 if (pa_memexport_put(p->export,
519 p->write.current->chunk.memblock,
520 &block_id,
521 &shm_id,
522 &offset,
523 &length) >= 0) {
524
525 flags |= PA_FLAG_SHMDATA;
526 send_payload = FALSE;
527
528 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
529 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
530 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
531 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
532
533 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
534 p->write.data = p->write.shm_info;
535 }
536 /* else */
537 /* pa_log_warn("Failed to export memory block."); */
538 }
539
540 if (send_payload) {
541 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
542 p->write.memchunk = p->write.current->chunk;
543 pa_memblock_ref(p->write.memchunk.memblock);
544 p->write.data = NULL;
545 }
546
547 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
548 }
549
550 #ifdef HAVE_CREDS
551 if ((p->send_creds_now = p->write.current->with_creds))
552 p->write_creds = p->write.current->creds;
553 #endif
554 }
555
556 static int do_write(pa_pstream *p) {
557 void *d;
558 size_t l;
559 ssize_t r;
560 pa_memblock *release_memblock = NULL;
561
562 pa_assert(p);
563 pa_assert(PA_REFCNT_VALUE(p) > 0);
564
565 if (!p->write.current)
566 prepare_next_write_item(p);
567
568 if (!p->write.current)
569 return 0;
570
571 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
572 d = (uint8_t*) p->write.descriptor + p->write.index;
573 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
574 } else {
575 pa_assert(p->write.data || p->write.memchunk.memblock);
576
577 if (p->write.data)
578 d = p->write.data;
579 else {
580 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
581 release_memblock = p->write.memchunk.memblock;
582 }
583
584 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
585 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
586 }
587
588 pa_assert(l > 0);
589
590 #ifdef HAVE_CREDS
591 if (p->send_creds_now) {
592
593 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
594 goto fail;
595
596 p->send_creds_now = FALSE;
597 } else
598 #endif
599
600 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
601 goto fail;
602
603 if (release_memblock)
604 pa_memblock_release(release_memblock);
605
606 p->write.index += (size_t) r;
607
608 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
609 pa_assert(p->write.current);
610 item_free(p->write.current, NULL);
611 p->write.current = NULL;
612
613 if (p->write.memchunk.memblock)
614 pa_memblock_unref(p->write.memchunk.memblock);
615
616 pa_memchunk_reset(&p->write.memchunk);
617
618 if (p->drain_callback && !pa_pstream_is_pending(p))
619 p->drain_callback(p, p->drain_callback_userdata);
620 }
621
622 return 0;
623
624 fail:
625
626 if (release_memblock)
627 pa_memblock_release(release_memblock);
628
629 return -1;
630 }
631
632 static int do_read(pa_pstream *p) {
633 void *d;
634 size_t l;
635 ssize_t r;
636 pa_memblock *release_memblock = NULL;
637 pa_assert(p);
638 pa_assert(PA_REFCNT_VALUE(p) > 0);
639
640 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
641 d = (uint8_t*) p->read.descriptor + p->read.index;
642 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
643 } else {
644 pa_assert(p->read.data || p->read.memblock);
645
646 if (p->read.data)
647 d = p->read.data;
648 else {
649 d = pa_memblock_acquire(p->read.memblock);
650 release_memblock = p->read.memblock;
651 }
652
653 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
654 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
655 }
656
657 #ifdef HAVE_CREDS
658 {
659 pa_bool_t b = 0;
660
661 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
662 goto fail;
663
664 p->read_creds_valid = p->read_creds_valid || b;
665 }
666 #else
667 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
668 goto fail;
669 #endif
670
671 if (release_memblock)
672 pa_memblock_release(release_memblock);
673
674 p->read.index += (size_t) r;
675
676 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
677 uint32_t flags, length, channel;
678 /* Reading of frame descriptor complete */
679
680 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
681
682 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
683 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
684 return -1;
685 }
686
687 if (flags == PA_FLAG_SHMRELEASE) {
688
689 /* This is a SHM memblock release frame with no payload */
690
691 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
692
693 pa_assert(p->export);
694 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
695
696 goto frame_done;
697
698 } else if (flags == PA_FLAG_SHMREVOKE) {
699
700 /* This is a SHM memblock revoke frame with no payload */
701
702 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
703
704 pa_assert(p->import);
705 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
706
707 goto frame_done;
708 }
709
710 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
711
712 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
713 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
714 return -1;
715 }
716
717 pa_assert(!p->read.packet && !p->read.memblock);
718
719 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
720
721 if (channel == (uint32_t) -1) {
722
723 if (flags != 0) {
724 pa_log_warn("Received packet frame with invalid flags value.");
725 return -1;
726 }
727
728 /* Frame is a packet frame */
729 p->read.packet = pa_packet_new(length);
730 p->read.data = p->read.packet->data;
731
732 } else {
733
734 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
735 pa_log_warn("Received memblock frame with invalid seek mode.");
736 return -1;
737 }
738
739 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
740
741 if (length != sizeof(p->read.shm_info)) {
742 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
743 return -1;
744 }
745
746 /* Frame is a memblock frame referencing an SHM memblock */
747 p->read.data = p->read.shm_info;
748
749 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
750
751 /* Frame is a memblock frame */
752
753 p->read.memblock = pa_memblock_new(p->mempool, length);
754 p->read.data = NULL;
755 } else {
756
757 pa_log_warn("Received memblock frame with invalid flags value.");
758 return -1;
759 }
760 }
761
762 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
763 /* Frame payload available */
764
765 if (p->read.memblock && p->recieve_memblock_callback) {
766
767 /* Is this memblock data? Than pass it to the user */
768 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
769
770 if (l > 0) {
771 pa_memchunk chunk;
772
773 chunk.memblock = p->read.memblock;
774 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
775 chunk.length = l;
776
777 if (p->recieve_memblock_callback) {
778 int64_t offset;
779
780 offset = (int64_t) (
781 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
782 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
783
784 p->recieve_memblock_callback(
785 p,
786 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
787 offset,
788 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
789 &chunk,
790 p->recieve_memblock_callback_userdata);
791 }
792
793 /* Drop seek info for following callbacks */
794 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
795 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
796 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
797 }
798 }
799
800 /* Frame complete */
801 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
802
803 if (p->read.memblock) {
804
805 /* This was a memblock frame. We can unref the memblock now */
806 pa_memblock_unref(p->read.memblock);
807
808 } else if (p->read.packet) {
809
810 if (p->recieve_packet_callback)
811 #ifdef HAVE_CREDS
812 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
813 #else
814 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
815 #endif
816
817 pa_packet_unref(p->read.packet);
818 } else {
819 pa_memblock *b;
820
821 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
822
823 pa_assert(p->import);
824
825 if (!(b = pa_memimport_get(p->import,
826 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
827 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
828 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
829 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
830
831 if (pa_log_ratelimit(PA_LOG_DEBUG))
832 pa_log_debug("Failed to import memory block.");
833 }
834
835 if (p->recieve_memblock_callback) {
836 int64_t offset;
837 pa_memchunk chunk;
838
839 chunk.memblock = b;
840 chunk.index = 0;
841 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
842
843 offset = (int64_t) (
844 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
845 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
846
847 p->recieve_memblock_callback(
848 p,
849 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
850 offset,
851 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
852 &chunk,
853 p->recieve_memblock_callback_userdata);
854 }
855
856 if (b)
857 pa_memblock_unref(b);
858 }
859
860 goto frame_done;
861 }
862 }
863
864 return 0;
865
866 frame_done:
867 p->read.memblock = NULL;
868 p->read.packet = NULL;
869 p->read.index = 0;
870 p->read.data = NULL;
871
872 #ifdef HAVE_CREDS
873 p->read_creds_valid = FALSE;
874 #endif
875
876 return 0;
877
878 fail:
879 if (release_memblock)
880 pa_memblock_release(release_memblock);
881
882 return -1;
883 }
884
885 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
886 pa_assert(p);
887 pa_assert(PA_REFCNT_VALUE(p) > 0);
888
889 p->die_callback = cb;
890 p->die_callback_userdata = userdata;
891 }
892
893 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
894 pa_assert(p);
895 pa_assert(PA_REFCNT_VALUE(p) > 0);
896
897 p->drain_callback = cb;
898 p->drain_callback_userdata = userdata;
899 }
900
901 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
902 pa_assert(p);
903 pa_assert(PA_REFCNT_VALUE(p) > 0);
904
905 p->recieve_packet_callback = cb;
906 p->recieve_packet_callback_userdata = userdata;
907 }
908
909 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
910 pa_assert(p);
911 pa_assert(PA_REFCNT_VALUE(p) > 0);
912
913 p->recieve_memblock_callback = cb;
914 p->recieve_memblock_callback_userdata = userdata;
915 }
916
917 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
918 pa_assert(p);
919 pa_assert(PA_REFCNT_VALUE(p) > 0);
920
921 p->release_callback = cb;
922 p->release_callback_userdata = userdata;
923 }
924
925 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
926 pa_assert(p);
927 pa_assert(PA_REFCNT_VALUE(p) > 0);
928
929 p->release_callback = cb;
930 p->release_callback_userdata = userdata;
931 }
932
933 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
934 pa_bool_t b;
935
936 pa_assert(p);
937 pa_assert(PA_REFCNT_VALUE(p) > 0);
938
939 if (p->dead)
940 b = FALSE;
941 else
942 b = p->write.current || !pa_queue_isempty(p->send_queue);
943
944 return b;
945 }
946
947 void pa_pstream_unref(pa_pstream*p) {
948 pa_assert(p);
949 pa_assert(PA_REFCNT_VALUE(p) > 0);
950
951 if (PA_REFCNT_DEC(p) <= 0)
952 pstream_free(p);
953 }
954
955 pa_pstream* pa_pstream_ref(pa_pstream*p) {
956 pa_assert(p);
957 pa_assert(PA_REFCNT_VALUE(p) > 0);
958
959 PA_REFCNT_INC(p);
960 return p;
961 }
962
963 void pa_pstream_unlink(pa_pstream *p) {
964 pa_assert(p);
965
966 if (p->dead)
967 return;
968
969 p->dead = TRUE;
970
971 if (p->import) {
972 pa_memimport_free(p->import);
973 p->import = NULL;
974 }
975
976 if (p->export) {
977 pa_memexport_free(p->export);
978 p->export = NULL;
979 }
980
981 if (p->io) {
982 pa_iochannel_free(p->io);
983 p->io = NULL;
984 }
985
986 if (p->defer_event) {
987 p->mainloop->defer_free(p->defer_event);
988 p->defer_event = NULL;
989 }
990
991 p->die_callback = NULL;
992 p->drain_callback = NULL;
993 p->recieve_packet_callback = NULL;
994 p->recieve_memblock_callback = NULL;
995 }
996
997 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
998 pa_assert(p);
999 pa_assert(PA_REFCNT_VALUE(p) > 0);
1000
1001 p->use_shm = enable;
1002
1003 if (enable) {
1004
1005 if (!p->export)
1006 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1007
1008 } else {
1009
1010 if (p->export) {
1011 pa_memexport_free(p->export);
1012 p->export = NULL;
1013 }
1014 }
1015 }
1016
1017 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1018 pa_assert(p);
1019 pa_assert(PA_REFCNT_VALUE(p) > 0);
1020
1021 return p->use_shm;
1022 }