]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
fdb1a66a341e234d658d3e1b09811cb91a165fc8
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <unistd.h>
33
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
42 #endif
43
44 #include "winsock.h"
45
46 #include <pulse/xmalloc.h>
47
48 #include <pulsecore/queue.h>
49 #include <pulsecore/log.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/creds.h>
52 #include <pulsecore/mutex.h>
53 #include <pulsecore/refcnt.h>
54
55 #include "pstream.h"
56
57 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
58 #define PA_FLAG_SHMDATA 0x80000000LU
59 #define PA_FLAG_SHMRELEASE 0x40000000LU
60 #define PA_FLAG_SHMREVOKE 0xC0000000LU
61 #define PA_FLAG_SHMMASK 0xFF000000LU
62 #define PA_FLAG_SEEKMASK 0x000000FFLU
63
64 /* The sequence descriptor header consists of 5 32bit integers: */
65 enum {
66 PA_PSTREAM_DESCRIPTOR_LENGTH,
67 PA_PSTREAM_DESCRIPTOR_CHANNEL,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
69 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
70 PA_PSTREAM_DESCRIPTOR_FLAGS,
71 PA_PSTREAM_DESCRIPTOR_MAX
72 };
73
74 /* If we have an SHM block, this info follows the descriptor */
75 enum {
76 PA_PSTREAM_SHM_BLOCKID,
77 PA_PSTREAM_SHM_SHMID,
78 PA_PSTREAM_SHM_INDEX,
79 PA_PSTREAM_SHM_LENGTH,
80 PA_PSTREAM_SHM_MAX
81 };
82
83 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
84
85 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
86 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
87 #define FRAME_SIZE_MAX_USE (1024*64)
88
89 struct item_info {
90 enum {
91 PA_PSTREAM_ITEM_PACKET,
92 PA_PSTREAM_ITEM_MEMBLOCK,
93 PA_PSTREAM_ITEM_SHMRELEASE,
94 PA_PSTREAM_ITEM_SHMREVOKE
95 } type;
96
97
98 /* packet info */
99 pa_packet *packet;
100 #ifdef HAVE_CREDS
101 int with_creds;
102 pa_creds creds;
103 #endif
104
105 /* memblock info */
106 pa_memchunk chunk;
107 uint32_t channel;
108 int64_t offset;
109 pa_seek_mode_t seek_mode;
110
111 /* release/revoke info */
112 uint32_t block_id;
113 };
114
115 struct pa_pstream {
116 PA_REFCNT_DECLARE;
117
118 pa_mainloop_api *mainloop;
119 pa_defer_event *defer_event;
120 pa_iochannel *io;
121 pa_queue *send_queue;
122 pa_mutex *mutex;
123
124 int dead;
125
126 struct {
127 pa_pstream_descriptor descriptor;
128 struct item_info* current;
129 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
130 void *data;
131 size_t index;
132 } write;
133
134 struct {
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
137 pa_packet *packet;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139 void *data;
140 size_t index;
141 } read;
142
143 int use_shm;
144 pa_memimport *import;
145 pa_memexport *export;
146
147 pa_pstream_packet_cb_t recieve_packet_callback;
148 void *recieve_packet_callback_userdata;
149
150 pa_pstream_memblock_cb_t recieve_memblock_callback;
151 void *recieve_memblock_callback_userdata;
152
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
155
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
158
159 pa_mempool *mempool;
160
161 #ifdef HAVE_CREDS
162 pa_creds read_creds, write_creds;
163 int read_creds_valid, send_creds_now;
164 #endif
165 };
166
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
169
170 static void do_something(pa_pstream *p) {
171 assert(p);
172 assert(PA_REFCNT_VALUE(p) > 0);
173
174 pa_pstream_ref(p);
175
176 pa_mutex_lock(p->mutex);
177
178 p->mainloop->defer_enable(p->defer_event, 0);
179
180 if (!p->dead && pa_iochannel_is_readable(p->io)) {
181 if (do_read(p) < 0)
182 goto fail;
183 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
184 goto fail;
185
186 if (!p->dead && pa_iochannel_is_writable(p->io)) {
187 if (do_write(p) < 0)
188 goto fail;
189 }
190
191 pa_mutex_unlock(p->mutex);
192
193 pa_pstream_unref(p);
194 return;
195
196 fail:
197
198 p->dead = 1;
199
200 if (p->die_callback)
201 p->die_callback(p, p->die_callback_userdata);
202
203 pa_mutex_unlock(p->mutex);
204
205 pa_pstream_unref(p);
206 }
207
208 static void io_callback(pa_iochannel*io, void *userdata) {
209 pa_pstream *p = userdata;
210
211 assert(p);
212 assert(p->io == io);
213
214 do_something(p);
215 }
216
217 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
218 pa_pstream *p = userdata;
219
220 assert(p);
221 assert(p->defer_event == e);
222 assert(p->mainloop == m);
223
224 do_something(p);
225 }
226
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
228
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
230 pa_pstream *p;
231
232 assert(m);
233 assert(io);
234 assert(pool);
235
236 p = pa_xnew(pa_pstream, 1);
237 PA_REFCNT_INIT(p);
238 p->io = io;
239 pa_iochannel_set_callback(io, io_callback, p);
240 p->dead = 0;
241
242 p->mutex = pa_mutex_new(1);
243
244 p->mainloop = m;
245 p->defer_event = m->defer_new(m, defer_callback, p);
246 m->defer_enable(p->defer_event, 0);
247
248 p->send_queue = pa_queue_new();
249 assert(p->send_queue);
250
251 p->write.current = NULL;
252 p->write.index = 0;
253 p->read.memblock = NULL;
254 p->read.packet = NULL;
255 p->read.index = 0;
256
257 p->recieve_packet_callback = NULL;
258 p->recieve_packet_callback_userdata = NULL;
259 p->recieve_memblock_callback = NULL;
260 p->recieve_memblock_callback_userdata = NULL;
261 p->drain_callback = NULL;
262 p->drain_callback_userdata = NULL;
263 p->die_callback = NULL;
264 p->die_callback_userdata = NULL;
265
266 p->mempool = pool;
267
268 p->use_shm = 0;
269 p->export = NULL;
270
271 /* We do importing unconditionally */
272 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
273
274 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
275 pa_iochannel_socket_set_sndbuf(io, 1024*8);
276
277 #ifdef HAVE_CREDS
278 p->send_creds_now = 0;
279 p->read_creds_valid = 0;
280 #endif
281 return p;
282 }
283
284 static void item_free(void *item, PA_GCC_UNUSED void *p) {
285 struct item_info *i = item;
286 assert(i);
287
288 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
289 assert(i->chunk.memblock);
290 pa_memblock_unref(i->chunk.memblock);
291 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
292 assert(i->packet);
293 pa_packet_unref(i->packet);
294 }
295
296 pa_xfree(i);
297 }
298
299 static void pstream_free(pa_pstream *p) {
300 assert(p);
301
302 pa_pstream_close(p);
303
304 pa_queue_free(p->send_queue, item_free, NULL);
305
306 if (p->write.current)
307 item_free(p->write.current, NULL);
308
309 if (p->read.memblock)
310 pa_memblock_unref(p->read.memblock);
311
312 if (p->read.packet)
313 pa_packet_unref(p->read.packet);
314
315 if (p->mutex)
316 pa_mutex_free(p->mutex);
317
318 pa_xfree(p);
319 }
320
321 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
322 struct item_info *i;
323
324 assert(p);
325 assert(PA_REFCNT_VALUE(p) > 0);
326 assert(packet);
327
328 pa_mutex_lock(p->mutex);
329
330 if (p->dead)
331 goto finish;
332
333 i = pa_xnew(struct item_info, 1);
334 i->type = PA_PSTREAM_ITEM_PACKET;
335 i->packet = pa_packet_ref(packet);
336
337 #ifdef HAVE_CREDS
338 if ((i->with_creds = !!creds))
339 i->creds = *creds;
340 #endif
341
342 pa_queue_push(p->send_queue, i);
343 p->mainloop->defer_enable(p->defer_event, 1);
344
345 finish:
346
347 pa_mutex_unlock(p->mutex);
348 }
349
350 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
351 size_t length, idx;
352
353 assert(p);
354 assert(PA_REFCNT_VALUE(p) > 0);
355 assert(channel != (uint32_t) -1);
356 assert(chunk);
357
358 pa_mutex_lock(p->mutex);
359
360 if (p->dead)
361 goto finish;
362
363 length = chunk->length;
364 idx = 0;
365
366 while (length > 0) {
367 struct item_info *i;
368 size_t n;
369
370 i = pa_xnew(struct item_info, 1);
371 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
372
373 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
374 i->chunk.index = chunk->index + idx;
375 i->chunk.length = n;
376 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
377
378 i->channel = channel;
379 i->offset = offset;
380 i->seek_mode = seek_mode;
381 #ifdef HAVE_CREDS
382 i->with_creds = 0;
383 #endif
384
385 pa_queue_push(p->send_queue, i);
386
387 idx += n;
388 length -= n;
389 }
390
391 p->mainloop->defer_enable(p->defer_event, 1);
392
393 finish:
394
395 pa_mutex_unlock(p->mutex);
396 }
397
398 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
399 struct item_info *item;
400 pa_pstream *p = userdata;
401
402 assert(p);
403 assert(PA_REFCNT_VALUE(p) > 0);
404
405 pa_mutex_lock(p->mutex);
406
407 if (p->dead)
408 goto finish;
409
410 /* pa_log("Releasing block %u", block_id); */
411
412 item = pa_xnew(struct item_info, 1);
413 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
414 item->block_id = block_id;
415 #ifdef HAVE_CREDS
416 item->with_creds = 0;
417 #endif
418
419 pa_queue_push(p->send_queue, item);
420 p->mainloop->defer_enable(p->defer_event, 1);
421
422 finish:
423
424 pa_mutex_unlock(p->mutex);
425 }
426
427 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
428 struct item_info *item;
429 pa_pstream *p = userdata;
430
431 assert(p);
432 assert(PA_REFCNT_VALUE(p) > 0);
433
434 pa_mutex_lock(p->mutex);
435
436 if (p->dead)
437 goto finish;
438
439 /* pa_log("Revoking block %u", block_id); */
440
441 item = pa_xnew(struct item_info, 1);
442 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
443 item->block_id = block_id;
444 #ifdef HAVE_CREDS
445 item->with_creds = 0;
446 #endif
447
448 pa_queue_push(p->send_queue, item);
449 p->mainloop->defer_enable(p->defer_event, 1);
450
451 finish:
452
453 pa_mutex_unlock(p->mutex);
454 }
455
456 static void prepare_next_write_item(pa_pstream *p) {
457 assert(p);
458 assert(PA_REFCNT_VALUE(p) > 0);
459
460 if (!(p->write.current = pa_queue_pop(p->send_queue)))
461 return;
462
463 p->write.index = 0;
464 p->write.data = NULL;
465
466 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
467 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
468 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
470 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
471
472 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
473
474 assert(p->write.current->packet);
475 p->write.data = p->write.current->packet->data;
476 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
477
478 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
479
480 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
482
483 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
484
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
486 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
487
488 } else {
489 uint32_t flags;
490 int send_payload = 1;
491
492 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
493 assert(p->write.current->chunk.memblock);
494
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
497 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
498
499 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
500
501 if (p->use_shm) {
502 uint32_t block_id, shm_id;
503 size_t offset, length;
504
505 assert(p->export);
506
507 if (pa_memexport_put(p->export,
508 p->write.current->chunk.memblock,
509 &block_id,
510 &shm_id,
511 &offset,
512 &length) >= 0) {
513
514 flags |= PA_FLAG_SHMDATA;
515 send_payload = 0;
516
517 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
518 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
519 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
520 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
521
522 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
523 p->write.data = p->write.shm_info;
524 }
525 /* else */
526 /* pa_log_warn("Failed to export memory block."); */
527 }
528
529 if (send_payload) {
530 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
531 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
532 }
533
534 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
535 }
536
537 #ifdef HAVE_CREDS
538 if ((p->send_creds_now = p->write.current->with_creds))
539 p->write_creds = p->write.current->creds;
540 #endif
541 }
542
543 static int do_write(pa_pstream *p) {
544 void *d;
545 size_t l;
546 ssize_t r;
547
548 assert(p);
549 assert(PA_REFCNT_VALUE(p) > 0);
550
551 if (!p->write.current)
552 prepare_next_write_item(p);
553
554 if (!p->write.current)
555 return 0;
556
557 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
558 d = (uint8_t*) p->write.descriptor + p->write.index;
559 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
560 } else {
561 assert(p->write.data);
562
563 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
564 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
565 }
566
567 assert(l > 0);
568
569 #ifdef HAVE_CREDS
570 if (p->send_creds_now) {
571
572 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
573 return -1;
574
575 p->send_creds_now = 0;
576 } else
577 #endif
578
579 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
580 return -1;
581
582 p->write.index += r;
583
584 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
585 assert(p->write.current);
586 item_free(p->write.current, (void *) 1);
587 p->write.current = NULL;
588
589 if (p->drain_callback && !pa_pstream_is_pending(p))
590 p->drain_callback(p, p->drain_callback_userdata);
591 }
592
593 return 0;
594 }
595
596 static int do_read(pa_pstream *p) {
597 void *d;
598 size_t l;
599 ssize_t r;
600
601 assert(p);
602 assert(PA_REFCNT_VALUE(p) > 0);
603
604 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
605 d = (uint8_t*) p->read.descriptor + p->read.index;
606 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
607 } else {
608 assert(p->read.data);
609 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
610 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
611 }
612
613 #ifdef HAVE_CREDS
614 {
615 int b = 0;
616
617 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
618 return -1;
619
620 p->read_creds_valid = p->read_creds_valid || b;
621 }
622 #else
623 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
624 return -1;
625 #endif
626
627 p->read.index += r;
628
629 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
630 uint32_t flags, length, channel;
631 /* Reading of frame descriptor complete */
632
633 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
634
635 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
636 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
637 return -1;
638 }
639
640 if (flags == PA_FLAG_SHMRELEASE) {
641
642 /* This is a SHM memblock release frame with no payload */
643
644 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
645
646 assert(p->export);
647 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
648
649 goto frame_done;
650
651 } else if (flags == PA_FLAG_SHMREVOKE) {
652
653 /* This is a SHM memblock revoke frame with no payload */
654
655 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
656
657 assert(p->import);
658 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
659
660 goto frame_done;
661 }
662
663 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
664
665 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
666 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
667 return -1;
668 }
669
670 assert(!p->read.packet && !p->read.memblock);
671
672 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
673
674 if (channel == (uint32_t) -1) {
675
676 if (flags != 0) {
677 pa_log_warn("Received packet frame with invalid flags value.");
678 return -1;
679 }
680
681 /* Frame is a packet frame */
682 p->read.packet = pa_packet_new(length);
683 p->read.data = p->read.packet->data;
684
685 } else {
686
687 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
688 pa_log_warn("Received memblock frame with invalid seek mode.");
689 return -1;
690 }
691
692 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
693
694 if (length != sizeof(p->read.shm_info)) {
695 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
696 return -1;
697 }
698
699 /* Frame is a memblock frame referencing an SHM memblock */
700 p->read.data = p->read.shm_info;
701
702 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
703
704 /* Frame is a memblock frame */
705
706 p->read.memblock = pa_memblock_new(p->mempool, length);
707 p->read.data = p->read.memblock->data;
708 } else {
709
710 pa_log_warn("Recieved memblock frame with invalid flags value.");
711 return -1;
712 }
713 }
714
715 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
716 /* Frame payload available */
717
718 if (p->read.memblock && p->recieve_memblock_callback) {
719
720 /* Is this memblock data? Than pass it to the user */
721 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
722
723 if (l > 0) {
724 pa_memchunk chunk;
725
726 chunk.memblock = p->read.memblock;
727 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
728 chunk.length = l;
729
730 if (p->recieve_memblock_callback) {
731 int64_t offset;
732
733 offset = (int64_t) (
734 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
735 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
736
737 p->recieve_memblock_callback(
738 p,
739 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
740 offset,
741 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
742 &chunk,
743 p->recieve_memblock_callback_userdata);
744 }
745
746 /* Drop seek info for following callbacks */
747 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
748 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
749 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
750 }
751 }
752
753 /* Frame complete */
754 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
755
756 if (p->read.memblock) {
757
758 /* This was a memblock frame. We can unref the memblock now */
759 pa_memblock_unref(p->read.memblock);
760
761 } else if (p->read.packet) {
762
763 if (p->recieve_packet_callback)
764 #ifdef HAVE_CREDS
765 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
766 #else
767 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
768 #endif
769
770 pa_packet_unref(p->read.packet);
771 } else {
772 pa_memblock *b;
773
774 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
775
776 assert(p->import);
777
778 if (!(b = pa_memimport_get(p->import,
779 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
780 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
781 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
782 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
783
784 pa_log_warn("Failed to import memory block.");
785 return -1;
786 }
787
788 if (p->recieve_memblock_callback) {
789 int64_t offset;
790 pa_memchunk chunk;
791
792 chunk.memblock = b;
793 chunk.index = 0;
794 chunk.length = b->length;
795
796 offset = (int64_t) (
797 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
798 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
799
800 p->recieve_memblock_callback(
801 p,
802 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
803 offset,
804 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
805 &chunk,
806 p->recieve_memblock_callback_userdata);
807 }
808
809 pa_memblock_unref(b);
810 }
811
812 goto frame_done;
813 }
814 }
815
816 return 0;
817
818 frame_done:
819 p->read.memblock = NULL;
820 p->read.packet = NULL;
821 p->read.index = 0;
822
823 #ifdef HAVE_CREDS
824 p->read_creds_valid = 0;
825 #endif
826
827 return 0;
828 }
829
830 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
831 assert(p);
832 assert(PA_REFCNT_VALUE(p) > 0);
833
834 pa_mutex_lock(p->mutex);
835 p->die_callback = cb;
836 p->die_callback_userdata = userdata;
837 pa_mutex_unlock(p->mutex);
838 }
839
840 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
841 assert(p);
842 assert(PA_REFCNT_VALUE(p) > 0);
843
844 pa_mutex_lock(p->mutex);
845 p->drain_callback = cb;
846 p->drain_callback_userdata = userdata;
847 pa_mutex_unlock(p->mutex);
848 }
849
850 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
851 assert(p);
852 assert(PA_REFCNT_VALUE(p) > 0);
853
854 pa_mutex_lock(p->mutex);
855 p->recieve_packet_callback = cb;
856 p->recieve_packet_callback_userdata = userdata;
857 pa_mutex_unlock(p->mutex);
858 }
859
860 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
861 assert(p);
862 assert(PA_REFCNT_VALUE(p) > 0);
863
864 pa_mutex_lock(p->mutex);
865 p->recieve_memblock_callback = cb;
866 p->recieve_memblock_callback_userdata = userdata;
867 pa_mutex_unlock(p->mutex);
868 }
869
870 int pa_pstream_is_pending(pa_pstream *p) {
871 int b;
872
873 assert(p);
874 assert(PA_REFCNT_VALUE(p) > 0);
875
876 pa_mutex_lock(p->mutex);
877
878 if (p->dead)
879 b = 0;
880 else
881 b = p->write.current || !pa_queue_is_empty(p->send_queue);
882
883 pa_mutex_unlock(p->mutex);
884
885 return b;
886 }
887
888 void pa_pstream_unref(pa_pstream*p) {
889 assert(p);
890 assert(PA_REFCNT_VALUE(p) > 0);
891
892 if (PA_REFCNT_DEC(p) <= 0)
893 pstream_free(p);
894 }
895
896 pa_pstream* pa_pstream_ref(pa_pstream*p) {
897 assert(p);
898 assert(PA_REFCNT_VALUE(p) > 0);
899
900 PA_REFCNT_INC(p);
901 return p;
902 }
903
904 void pa_pstream_close(pa_pstream *p) {
905 assert(p);
906
907 pa_mutex_lock(p->mutex);
908
909 p->dead = 1;
910
911 if (p->import) {
912 pa_memimport_free(p->import);
913 p->import = NULL;
914 }
915
916 if (p->export) {
917 pa_memexport_free(p->export);
918 p->export = NULL;
919 }
920
921 if (p->io) {
922 pa_iochannel_free(p->io);
923 p->io = NULL;
924 }
925
926 if (p->defer_event) {
927 p->mainloop->defer_free(p->defer_event);
928 p->defer_event = NULL;
929 }
930
931 p->die_callback = NULL;
932 p->drain_callback = NULL;
933 p->recieve_packet_callback = NULL;
934 p->recieve_memblock_callback = NULL;
935
936 pa_mutex_unlock(p->mutex);
937 }
938
939 void pa_pstream_use_shm(pa_pstream *p, int enable) {
940 assert(p);
941 assert(PA_REFCNT_VALUE(p) > 0);
942
943 pa_mutex_lock(p->mutex);
944
945 p->use_shm = enable;
946
947 if (enable) {
948
949 if (!p->export)
950 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
951
952 } else {
953
954 if (p->export) {
955 pa_memexport_free(p->export);
956 p->export = NULL;
957 }
958 }
959
960 pa_mutex_unlock(p->mutex);
961 }