]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <unistd.h>
33
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
42 #endif
43
44 #include "winsock.h"
45
46 #include <pulse/xmalloc.h>
47
48 #include <pulsecore/queue.h>
49 #include <pulsecore/log.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/creds.h>
52 #include <pulsecore/refcnt.h>
53
54 #include "pstream.h"
55
56 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
57 #define PA_FLAG_SHMDATA 0x80000000LU
58 #define PA_FLAG_SHMRELEASE 0x40000000LU
59 #define PA_FLAG_SHMREVOKE 0xC0000000LU
60 #define PA_FLAG_SHMMASK 0xFF000000LU
61 #define PA_FLAG_SEEKMASK 0x000000FFLU
62
63 /* The sequence descriptor header consists of 5 32bit integers: */
64 enum {
65 PA_PSTREAM_DESCRIPTOR_LENGTH,
66 PA_PSTREAM_DESCRIPTOR_CHANNEL,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
69 PA_PSTREAM_DESCRIPTOR_FLAGS,
70 PA_PSTREAM_DESCRIPTOR_MAX
71 };
72
73 /* If we have an SHM block, this info follows the descriptor */
74 enum {
75 PA_PSTREAM_SHM_BLOCKID,
76 PA_PSTREAM_SHM_SHMID,
77 PA_PSTREAM_SHM_INDEX,
78 PA_PSTREAM_SHM_LENGTH,
79 PA_PSTREAM_SHM_MAX
80 };
81
82 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
83
84 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
85 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 #define FRAME_SIZE_MAX_USE (1024*64)
87
88 struct item_info {
89 enum {
90 PA_PSTREAM_ITEM_PACKET,
91 PA_PSTREAM_ITEM_MEMBLOCK,
92 PA_PSTREAM_ITEM_SHMRELEASE,
93 PA_PSTREAM_ITEM_SHMREVOKE
94 } type;
95
96
97 /* packet info */
98 pa_packet *packet;
99 #ifdef HAVE_CREDS
100 int with_creds;
101 pa_creds creds;
102 #endif
103
104 /* memblock info */
105 pa_memchunk chunk;
106 uint32_t channel;
107 int64_t offset;
108 pa_seek_mode_t seek_mode;
109
110 /* release/revoke info */
111 uint32_t block_id;
112 };
113
114 struct pa_pstream {
115 PA_REFCNT_DECLARE;
116
117 pa_mainloop_api *mainloop;
118 pa_defer_event *defer_event;
119 pa_iochannel *io;
120
121 pa_queue *send_queue;
122
123 int dead;
124
125 struct {
126 pa_pstream_descriptor descriptor;
127 struct item_info* current;
128 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
129 void *data;
130 size_t index;
131 pa_memchunk memchunk;
132 } write;
133
134 struct {
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
137 pa_packet *packet;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139 void *data;
140 size_t index;
141 } read;
142
143 int use_shm;
144 pa_memimport *import;
145 pa_memexport *export;
146
147 pa_pstream_packet_cb_t recieve_packet_callback;
148 void *recieve_packet_callback_userdata;
149
150 pa_pstream_memblock_cb_t recieve_memblock_callback;
151 void *recieve_memblock_callback_userdata;
152
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
155
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
158
159 pa_mempool *mempool;
160
161 #ifdef HAVE_CREDS
162 pa_creds read_creds, write_creds;
163 int read_creds_valid, send_creds_now;
164 #endif
165 };
166
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
169
170 static void do_something(pa_pstream *p) {
171 assert(p);
172 assert(PA_REFCNT_VALUE(p) > 0);
173
174 pa_pstream_ref(p);
175
176 p->mainloop->defer_enable(p->defer_event, 0);
177
178 if (!p->dead && pa_iochannel_is_readable(p->io)) {
179 if (do_read(p) < 0)
180 goto fail;
181 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
182 goto fail;
183
184 if (!p->dead && pa_iochannel_is_writable(p->io)) {
185 if (do_write(p) < 0)
186 goto fail;
187 }
188
189 pa_pstream_unref(p);
190 return;
191
192 fail:
193
194 p->dead = 1;
195
196 if (p->die_callback)
197 p->die_callback(p, p->die_callback_userdata);
198
199 pa_pstream_unref(p);
200 }
201
202 static void io_callback(pa_iochannel*io, void *userdata) {
203 pa_pstream *p = userdata;
204
205 assert(p);
206 assert(p->io == io);
207
208 do_something(p);
209 }
210
211 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
212
213 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
214 pa_pstream *p;
215
216 assert(m);
217 assert(io);
218 assert(pool);
219
220 p = pa_xnew(pa_pstream, 1);
221 PA_REFCNT_INIT(p);
222 p->io = io;
223 pa_iochannel_set_callback(io, io_callback, p);
224 p->dead = 0;
225
226 p->mainloop = m;
227
228 p->send_queue = pa_queue_new();
229 assert(p->send_queue);
230
231 p->write.current = NULL;
232 p->write.index = 0;
233 pa_memchunk_reset(&p->write.memchunk);
234 p->read.memblock = NULL;
235 p->read.packet = NULL;
236 p->read.index = 0;
237
238 p->recieve_packet_callback = NULL;
239 p->recieve_packet_callback_userdata = NULL;
240 p->recieve_memblock_callback = NULL;
241 p->recieve_memblock_callback_userdata = NULL;
242 p->drain_callback = NULL;
243 p->drain_callback_userdata = NULL;
244 p->die_callback = NULL;
245 p->die_callback_userdata = NULL;
246
247 p->mempool = pool;
248
249 p->use_shm = 0;
250 p->export = NULL;
251
252 /* We do importing unconditionally */
253 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
254
255 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
256 pa_iochannel_socket_set_sndbuf(io, 1024*8);
257
258 #ifdef HAVE_CREDS
259 p->send_creds_now = 0;
260 p->read_creds_valid = 0;
261 #endif
262 return p;
263 }
264
265 static void item_free(void *item, PA_GCC_UNUSED void *p) {
266 struct item_info *i = item;
267 assert(i);
268
269 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
270 assert(i->chunk.memblock);
271 pa_memblock_unref(i->chunk.memblock);
272 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
273 assert(i->packet);
274 pa_packet_unref(i->packet);
275 }
276
277 pa_xfree(i);
278 }
279
280 static void pstream_free(pa_pstream *p) {
281 assert(p);
282
283 pa_pstream_close(p);
284
285 pa_queue_free(p->send_queue, item_free, NULL);
286
287 if (p->write.current)
288 item_free(p->write.current, NULL);
289
290 if (p->read.memblock)
291 pa_memblock_unref(p->read.memblock);
292
293 if (p->read.packet)
294 pa_packet_unref(p->read.packet);
295
296 if (p->write.memchunk.memblock)
297 pa_memblock_unref(p->write.memchunk.memblock);
298
299 pa_xfree(p);
300 }
301
302 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
303 struct item_info *i;
304
305 assert(p);
306 assert(PA_REFCNT_VALUE(p) > 0);
307 assert(packet);
308
309 if (p->dead)
310 return;
311
312 i = pa_xnew(struct item_info, 1);
313 i->type = PA_PSTREAM_ITEM_PACKET;
314 i->packet = pa_packet_ref(packet);
315
316 #ifdef HAVE_CREDS
317 if ((i->with_creds = !!creds))
318 i->creds = *creds;
319 #endif
320
321 pa_queue_push(p->send_queue, i);
322 }
323
324 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
325 size_t length, idx;
326
327 assert(p);
328 assert(PA_REFCNT_VALUE(p) > 0);
329 assert(channel != (uint32_t) -1);
330 assert(chunk);
331
332 if (p->dead)
333 return;
334
335 idx = 0;
336
337 while (length > 0) {
338 struct item_info *i;
339 size_t n;
340
341 i = pa_xnew(struct item_info, 1);
342 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
343
344 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
345 i->chunk.index = chunk->index + idx;
346 i->chunk.length = n;
347 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
348
349 i->channel = channel;
350 i->offset = offset;
351 i->seek_mode = seek_mode;
352 #ifdef HAVE_CREDS
353 i->with_creds = 0;
354 #endif
355
356 pa_queue_push(p->send_queue, i);
357
358 idx += n;
359 length -= n;
360 }
361
362 p->mainloop->defer_enable(p->defer_event, 1);
363 }
364
365 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
366 struct item_info *item;
367 pa_pstream *p = userdata;
368
369 assert(p);
370 assert(PA_REFCNT_VALUE(p) > 0);
371
372 if (p->dead)
373 return;
374
375 /* pa_log("Releasing block %u", block_id); */
376
377 item = pa_xnew(struct item_info, 1);
378 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
379 item->block_id = block_id;
380 #ifdef HAVE_CREDS
381 item->with_creds = 0;
382 #endif
383
384 pa_queue_push(p->send_queue, item);
385 }
386
387 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
388 struct item_info *item;
389 pa_pstream *p = userdata;
390
391 assert(p);
392 assert(PA_REFCNT_VALUE(p) > 0);
393
394 if (p->dead)
395 return;
396 /* pa_log("Revoking block %u", block_id); */
397
398 item = pa_xnew(struct item_info, 1);
399 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
400 item->block_id = block_id;
401 #ifdef HAVE_CREDS
402 item->with_creds = 0;
403 #endif
404
405 pa_queue_push(p->send_queue, item);
406 }
407
408 static void prepare_next_write_item(pa_pstream *p) {
409 assert(p);
410 assert(PA_REFCNT_VALUE(p) > 0);
411
412 p->write.current = pa_queue_pop(p->send_queue);
413
414 if (!p->write.current)
415 return;
416
417 p->write.index = 0;
418 p->write.data = NULL;
419 pa_memchunk_reset(&p->write.memchunk);
420
421 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
422 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
423 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
424 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
425 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
426
427 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
428
429 assert(p->write.current->packet);
430 p->write.data = p->write.current->packet->data;
431 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
432
433 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
434
435 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
436 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
437
438 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
439
440 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
441 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
442
443 } else {
444 uint32_t flags;
445 int send_payload = 1;
446
447 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
448 assert(p->write.current->chunk.memblock);
449
450 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
451 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
452 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
453
454 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
455
456 if (p->use_shm) {
457 uint32_t block_id, shm_id;
458 size_t offset, length;
459
460 assert(p->export);
461
462 if (pa_memexport_put(p->export,
463 p->write.current->chunk.memblock,
464 &block_id,
465 &shm_id,
466 &offset,
467 &length) >= 0) {
468
469 flags |= PA_FLAG_SHMDATA;
470 send_payload = 0;
471
472 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
473 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
474 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
475 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
476
477 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
478 p->write.data = p->write.shm_info;
479 }
480 /* else */
481 /* pa_log_warn("Failed to export memory block."); */
482 }
483
484 if (send_payload) {
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
486 p->write.memchunk = p->write.current->chunk;
487 pa_memblock_ref(p->write.memchunk.memblock);
488 p->write.data = NULL;
489 }
490
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
492 }
493
494 #ifdef HAVE_CREDS
495 if ((p->send_creds_now = p->write.current->with_creds))
496 p->write_creds = p->write.current->creds;
497 #endif
498 }
499
500 static int do_write(pa_pstream *p) {
501 void *d;
502 size_t l;
503 ssize_t r;
504 pa_memblock *release_memblock = NULL;
505
506 assert(p);
507 assert(PA_REFCNT_VALUE(p) > 0);
508
509 if (!p->write.current)
510 prepare_next_write_item(p);
511
512 if (!p->write.current)
513 return 0;
514
515 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
516 d = (uint8_t*) p->write.descriptor + p->write.index;
517 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
518 } else {
519 assert(p->write.data || p->write.memchunk.memblock);
520
521 if (p->write.data)
522 d = p->write.data;
523 else {
524 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
525 release_memblock = p->write.memchunk.memblock;
526 }
527
528 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
529 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
530 }
531
532 assert(l > 0);
533
534 #ifdef HAVE_CREDS
535 if (p->send_creds_now) {
536
537 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
538 goto fail;
539
540 p->send_creds_now = 0;
541 } else
542 #endif
543
544 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
545 goto fail;
546
547 if (release_memblock)
548 pa_memblock_release(release_memblock);
549
550 p->write.index += r;
551
552 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
553 assert(p->write.current);
554 item_free(p->write.current, (void *) 1);
555 p->write.current = NULL;
556
557 if (p->drain_callback && !pa_pstream_is_pending(p))
558 p->drain_callback(p, p->drain_callback_userdata);
559 }
560
561 return 0;
562
563 fail:
564
565 if (release_memblock)
566 pa_memblock_release(release_memblock);
567
568 return -1;
569 }
570
571 static int do_read(pa_pstream *p) {
572 void *d;
573 size_t l;
574 ssize_t r;
575 pa_memblock *release_memblock = NULL;
576 assert(p);
577 assert(PA_REFCNT_VALUE(p) > 0);
578
579 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
580 d = (uint8_t*) p->read.descriptor + p->read.index;
581 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
582 } else {
583 assert(p->read.data || p->read.memblock);
584
585 if (p->read.data)
586 d = p->read.data;
587 else {
588 d = pa_memblock_acquire(p->read.memblock);
589 release_memblock = p->read.memblock;
590 }
591
592 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
593 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
594 }
595
596 #ifdef HAVE_CREDS
597 {
598 int b = 0;
599
600 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
601 goto fail;
602
603 p->read_creds_valid = p->read_creds_valid || b;
604 }
605 #else
606 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
607 goto fail;
608 #endif
609
610 if (release_memblock)
611 pa_memblock_release(release_memblock);
612
613 p->read.index += r;
614
615 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
616 uint32_t flags, length, channel;
617 /* Reading of frame descriptor complete */
618
619 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
620
621 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
622 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
623 return -1;
624 }
625
626 if (flags == PA_FLAG_SHMRELEASE) {
627
628 /* This is a SHM memblock release frame with no payload */
629
630 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
631
632 assert(p->export);
633 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
634
635 goto frame_done;
636
637 } else if (flags == PA_FLAG_SHMREVOKE) {
638
639 /* This is a SHM memblock revoke frame with no payload */
640
641 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
642
643 assert(p->import);
644 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
645
646 goto frame_done;
647 }
648
649 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
650
651 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
652 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
653 return -1;
654 }
655
656 assert(!p->read.packet && !p->read.memblock);
657
658 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
659
660 if (channel == (uint32_t) -1) {
661
662 if (flags != 0) {
663 pa_log_warn("Received packet frame with invalid flags value.");
664 return -1;
665 }
666
667 /* Frame is a packet frame */
668 p->read.packet = pa_packet_new(length);
669 p->read.data = p->read.packet->data;
670
671 } else {
672
673 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
674 pa_log_warn("Received memblock frame with invalid seek mode.");
675 return -1;
676 }
677
678 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
679
680 if (length != sizeof(p->read.shm_info)) {
681 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
682 return -1;
683 }
684
685 /* Frame is a memblock frame referencing an SHM memblock */
686 p->read.data = p->read.shm_info;
687
688 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
689
690 /* Frame is a memblock frame */
691
692 p->read.memblock = pa_memblock_new(p->mempool, length);
693 p->read.data = NULL;
694 } else {
695
696 pa_log_warn("Recieved memblock frame with invalid flags value.");
697 return -1;
698 }
699 }
700
701 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
702 /* Frame payload available */
703
704 if (p->read.memblock && p->recieve_memblock_callback) {
705
706 /* Is this memblock data? Than pass it to the user */
707 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
708
709 if (l > 0) {
710 pa_memchunk chunk;
711
712 chunk.memblock = p->read.memblock;
713 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
714 chunk.length = l;
715
716 if (p->recieve_memblock_callback) {
717 int64_t offset;
718
719 offset = (int64_t) (
720 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
721 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
722
723 p->recieve_memblock_callback(
724 p,
725 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
726 offset,
727 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
728 &chunk,
729 p->recieve_memblock_callback_userdata);
730 }
731
732 /* Drop seek info for following callbacks */
733 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
734 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
735 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
736 }
737 }
738
739 /* Frame complete */
740 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
741
742 if (p->read.memblock) {
743
744 /* This was a memblock frame. We can unref the memblock now */
745 pa_memblock_unref(p->read.memblock);
746
747 } else if (p->read.packet) {
748
749 if (p->recieve_packet_callback)
750 #ifdef HAVE_CREDS
751 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
752 #else
753 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
754 #endif
755
756 pa_packet_unref(p->read.packet);
757 } else {
758 pa_memblock *b;
759
760 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
761
762 assert(p->import);
763
764 if (!(b = pa_memimport_get(p->import,
765 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
766 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
767 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
768 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
769
770 pa_log_warn("Failed to import memory block.");
771 return -1;
772 }
773
774 if (p->recieve_memblock_callback) {
775 int64_t offset;
776 pa_memchunk chunk;
777
778 chunk.memblock = b;
779 chunk.index = 0;
780 chunk.length = pa_memblock_get_length(b);
781
782 offset = (int64_t) (
783 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
784 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
785
786 p->recieve_memblock_callback(
787 p,
788 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
789 offset,
790 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
791 &chunk,
792 p->recieve_memblock_callback_userdata);
793 }
794
795 pa_memblock_unref(b);
796 }
797
798 goto frame_done;
799 }
800 }
801
802 return 0;
803
804 frame_done:
805 p->read.memblock = NULL;
806 p->read.packet = NULL;
807 p->read.index = 0;
808 p->read.data = NULL;
809
810 #ifdef HAVE_CREDS
811 p->read_creds_valid = 0;
812 #endif
813
814 return 0;
815
816 fail:
817 if (release_memblock)
818 pa_memblock_release(release_memblock);
819
820 return -1;
821 }
822
823 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
824 assert(p);
825 assert(PA_REFCNT_VALUE(p) > 0);
826
827 p->die_callback = cb;
828 p->die_callback_userdata = userdata;
829 }
830
831 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
832 assert(p);
833 assert(PA_REFCNT_VALUE(p) > 0);
834
835 p->drain_callback = cb;
836 p->drain_callback_userdata = userdata;
837 }
838
839 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
840 assert(p);
841 assert(PA_REFCNT_VALUE(p) > 0);
842
843 p->recieve_packet_callback = cb;
844 p->recieve_packet_callback_userdata = userdata;
845 }
846
847 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
848 assert(p);
849 assert(PA_REFCNT_VALUE(p) > 0);
850
851 p->recieve_memblock_callback = cb;
852 p->recieve_memblock_callback_userdata = userdata;
853 }
854
855 int pa_pstream_is_pending(pa_pstream *p) {
856 int b;
857
858 assert(p);
859 assert(PA_REFCNT_VALUE(p) > 0);
860
861 if (p->dead)
862 b = 0;
863 else
864 b = p->write.current || !pa_queue_is_empty(p->send_queue);
865
866 return b;
867 }
868
869 void pa_pstream_unref(pa_pstream*p) {
870 assert(p);
871 assert(PA_REFCNT_VALUE(p) > 0);
872
873 if (PA_REFCNT_DEC(p) <= 0)
874 pstream_free(p);
875 }
876
877 pa_pstream* pa_pstream_ref(pa_pstream*p) {
878 assert(p);
879 assert(PA_REFCNT_VALUE(p) > 0);
880
881 PA_REFCNT_INC(p);
882 return p;
883 }
884
885 void pa_pstream_close(pa_pstream *p) {
886 assert(p);
887
888 p->dead = 1;
889
890 if (p->import) {
891 pa_memimport_free(p->import);
892 p->import = NULL;
893 }
894
895 if (p->export) {
896 pa_memexport_free(p->export);
897 p->export = NULL;
898 }
899
900 if (p->io) {
901 pa_iochannel_free(p->io);
902 p->io = NULL;
903 }
904
905 p->die_callback = NULL;
906 p->drain_callback = NULL;
907 p->recieve_packet_callback = NULL;
908 p->recieve_memblock_callback = NULL;
909 }
910
911 void pa_pstream_use_shm(pa_pstream *p, int enable) {
912 assert(p);
913 assert(PA_REFCNT_VALUE(p) > 0);
914
915 p->use_shm = enable;
916
917 if (enable) {
918
919 if (!p->export)
920 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
921
922 } else {
923
924 if (p->export) {
925 pa_memexport_free(p->export);
926 p->export = NULL;
927 }
928 }
929 }