]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
fix a memory leak
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
13
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22 USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43 #include "winsock.h"
44
45 #include <pulse/xmalloc.h>
46
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 int with_creds;
100 pa_creds creds;
101 #endif
102
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
108
109 /* release/revoke info */
110 uint32_t block_id;
111 };
112
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
115
116 pa_mainloop_api *mainloop;
117 pa_defer_event *defer_event;
118 pa_iochannel *io;
119
120 pa_queue *send_queue;
121
122 int dead;
123
124 struct {
125 pa_pstream_descriptor descriptor;
126 struct item_info* current;
127 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128 void *data;
129 size_t index;
130 pa_memchunk memchunk;
131 } write;
132
133 struct {
134 pa_pstream_descriptor descriptor;
135 pa_memblock *memblock;
136 pa_packet *packet;
137 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138 void *data;
139 size_t index;
140 } read;
141
142 int use_shm;
143 pa_memimport *import;
144 pa_memexport *export;
145
146 pa_pstream_packet_cb_t recieve_packet_callback;
147 void *recieve_packet_callback_userdata;
148
149 pa_pstream_memblock_cb_t recieve_memblock_callback;
150 void *recieve_memblock_callback_userdata;
151
152 pa_pstream_notify_cb_t drain_callback;
153 void *drain_callback_userdata;
154
155 pa_pstream_notify_cb_t die_callback;
156 void *die_callback_userdata;
157
158 pa_mempool *mempool;
159
160 #ifdef HAVE_CREDS
161 pa_creds read_creds, write_creds;
162 int read_creds_valid, send_creds_now;
163 #endif
164 };
165
166 static int do_write(pa_pstream *p);
167 static int do_read(pa_pstream *p);
168
169 static void do_something(pa_pstream *p) {
170 pa_assert(p);
171 pa_assert(PA_REFCNT_VALUE(p) > 0);
172
173 pa_pstream_ref(p);
174
175 p->mainloop->defer_enable(p->defer_event, 0);
176
177 if (!p->dead && pa_iochannel_is_readable(p->io)) {
178 if (do_read(p) < 0)
179 goto fail;
180 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
181 goto fail;
182
183 if (!p->dead && pa_iochannel_is_writable(p->io)) {
184 if (do_write(p) < 0)
185 goto fail;
186 }
187
188 pa_pstream_unref(p);
189 return;
190
191 fail:
192
193 if (p->die_callback)
194 p->die_callback(p, p->die_callback_userdata);
195
196 pa_pstream_unlink(p);
197 pa_pstream_unref(p);
198 }
199
200 static void io_callback(pa_iochannel*io, void *userdata) {
201 pa_pstream *p = userdata;
202
203 pa_assert(p);
204 pa_assert(PA_REFCNT_VALUE(p) > 0);
205 pa_assert(p->io == io);
206
207 do_something(p);
208 }
209
210 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
211 pa_pstream *p = userdata;
212
213 pa_assert(p);
214 pa_assert(PA_REFCNT_VALUE(p) > 0);
215 pa_assert(p->defer_event == e);
216 pa_assert(p->mainloop == m);
217
218 do_something(p);
219 }
220
221 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
222
223 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
224 pa_pstream *p;
225
226 pa_assert(m);
227 pa_assert(io);
228 pa_assert(pool);
229
230 p = pa_xnew(pa_pstream, 1);
231 PA_REFCNT_INIT(p);
232 p->io = io;
233 pa_iochannel_set_callback(io, io_callback, p);
234 p->dead = 0;
235
236 p->mainloop = m;
237 p->defer_event = m->defer_new(m, defer_callback, p);
238 m->defer_enable(p->defer_event, 0);
239
240 p->send_queue = pa_queue_new();
241 pa_assert(p->send_queue);
242
243 p->write.current = NULL;
244 p->write.index = 0;
245 pa_memchunk_reset(&p->write.memchunk);
246 p->read.memblock = NULL;
247 p->read.packet = NULL;
248 p->read.index = 0;
249
250 p->recieve_packet_callback = NULL;
251 p->recieve_packet_callback_userdata = NULL;
252 p->recieve_memblock_callback = NULL;
253 p->recieve_memblock_callback_userdata = NULL;
254 p->drain_callback = NULL;
255 p->drain_callback_userdata = NULL;
256 p->die_callback = NULL;
257 p->die_callback_userdata = NULL;
258
259 p->mempool = pool;
260
261 p->use_shm = 0;
262 p->export = NULL;
263
264 /* We do importing unconditionally */
265 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
266
267 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
268 pa_iochannel_socket_set_sndbuf(io, 1024*8);
269
270 #ifdef HAVE_CREDS
271 p->send_creds_now = 0;
272 p->read_creds_valid = 0;
273 #endif
274 return p;
275 }
276
277 static void item_free(void *item, PA_GCC_UNUSED void *q) {
278 struct item_info *i = item;
279 pa_assert(i);
280
281 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
282 pa_assert(i->chunk.memblock);
283 pa_memblock_unref(i->chunk.memblock);
284 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
285 pa_assert(i->packet);
286 pa_packet_unref(i->packet);
287 }
288
289 pa_xfree(i);
290 }
291
292 static void pstream_free(pa_pstream *p) {
293 pa_assert(p);
294
295 pa_pstream_unlink(p);
296
297 pa_queue_free(p->send_queue, item_free, NULL);
298
299 if (p->write.current)
300 item_free(p->write.current, NULL);
301
302 if (p->write.memchunk.memblock)
303 pa_memblock_unref(p->write.memchunk.memblock);
304
305 if (p->read.memblock)
306 pa_memblock_unref(p->read.memblock);
307
308 if (p->read.packet)
309 pa_packet_unref(p->read.packet);
310
311 pa_xfree(p);
312 }
313
314 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
315 struct item_info *i;
316
317 pa_assert(p);
318 pa_assert(PA_REFCNT_VALUE(p) > 0);
319 pa_assert(packet);
320
321 if (p->dead)
322 return;
323
324 i = pa_xnew(struct item_info, 1);
325 i->type = PA_PSTREAM_ITEM_PACKET;
326 i->packet = pa_packet_ref(packet);
327
328 #ifdef HAVE_CREDS
329 if ((i->with_creds = !!creds))
330 i->creds = *creds;
331 #endif
332
333 pa_queue_push(p->send_queue, i);
334
335 p->mainloop->defer_enable(p->defer_event, 1);
336 }
337
338 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
339 size_t length, idx;
340
341 pa_assert(p);
342 pa_assert(PA_REFCNT_VALUE(p) > 0);
343 pa_assert(channel != (uint32_t) -1);
344 pa_assert(chunk);
345
346 if (p->dead)
347 return;
348
349 idx = 0;
350 length = chunk->length;
351
352 while (length > 0) {
353 struct item_info *i;
354 size_t n;
355
356 i = pa_xnew(struct item_info, 1);
357 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
358
359 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
360 i->chunk.index = chunk->index + idx;
361 i->chunk.length = n;
362 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
363
364 i->channel = channel;
365 i->offset = offset;
366 i->seek_mode = seek_mode;
367 #ifdef HAVE_CREDS
368 i->with_creds = 0;
369 #endif
370
371 pa_queue_push(p->send_queue, i);
372
373 idx += n;
374 length -= n;
375 }
376
377 p->mainloop->defer_enable(p->defer_event, 1);
378 }
379
380 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
381 struct item_info *item;
382 pa_pstream *p = userdata;
383
384 pa_assert(p);
385 pa_assert(PA_REFCNT_VALUE(p) > 0);
386
387 if (p->dead)
388 return;
389
390 /* pa_log("Releasing block %u", block_id); */
391
392 item = pa_xnew(struct item_info, 1);
393 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
394 item->block_id = block_id;
395 #ifdef HAVE_CREDS
396 item->with_creds = 0;
397 #endif
398
399 pa_queue_push(p->send_queue, item);
400 p->mainloop->defer_enable(p->defer_event, 1);
401 }
402
403 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
404 struct item_info *item;
405 pa_pstream *p = userdata;
406
407 pa_assert(p);
408 pa_assert(PA_REFCNT_VALUE(p) > 0);
409
410 if (p->dead)
411 return;
412 /* pa_log("Revoking block %u", block_id); */
413
414 item = pa_xnew(struct item_info, 1);
415 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
416 item->block_id = block_id;
417 #ifdef HAVE_CREDS
418 item->with_creds = 0;
419 #endif
420
421 pa_queue_push(p->send_queue, item);
422 p->mainloop->defer_enable(p->defer_event, 1);
423 }
424
425 static void prepare_next_write_item(pa_pstream *p) {
426 pa_assert(p);
427 pa_assert(PA_REFCNT_VALUE(p) > 0);
428
429 p->write.current = pa_queue_pop(p->send_queue);
430
431 if (!p->write.current)
432 return;
433
434 p->write.index = 0;
435 p->write.data = NULL;
436 pa_memchunk_reset(&p->write.memchunk);
437
438 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
439 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
440 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
441 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
442 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
443
444 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
445
446 pa_assert(p->write.current->packet);
447 p->write.data = p->write.current->packet->data;
448 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
449
450 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
451
452 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
453 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
454
455 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
456
457 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
458 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
459
460 } else {
461 uint32_t flags;
462 int send_payload = 1;
463
464 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
465 pa_assert(p->write.current->chunk.memblock);
466
467 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
468 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
470
471 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
472
473 if (p->use_shm) {
474 uint32_t block_id, shm_id;
475 size_t offset, length;
476
477 pa_assert(p->export);
478
479 if (pa_memexport_put(p->export,
480 p->write.current->chunk.memblock,
481 &block_id,
482 &shm_id,
483 &offset,
484 &length) >= 0) {
485
486 flags |= PA_FLAG_SHMDATA;
487 send_payload = 0;
488
489 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
490 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
491 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
492 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
493
494 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
495 p->write.data = p->write.shm_info;
496 }
497 /* else */
498 /* pa_log_warn("Failed to export memory block."); */
499 }
500
501 if (send_payload) {
502 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
503 p->write.memchunk = p->write.current->chunk;
504 pa_memblock_ref(p->write.memchunk.memblock);
505 p->write.data = NULL;
506 }
507
508 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
509 }
510
511 #ifdef HAVE_CREDS
512 if ((p->send_creds_now = p->write.current->with_creds))
513 p->write_creds = p->write.current->creds;
514 #endif
515 }
516
517 static int do_write(pa_pstream *p) {
518 void *d;
519 size_t l;
520 ssize_t r;
521 pa_memblock *release_memblock = NULL;
522
523 pa_assert(p);
524 pa_assert(PA_REFCNT_VALUE(p) > 0);
525
526 if (!p->write.current)
527 prepare_next_write_item(p);
528
529 if (!p->write.current)
530 return 0;
531
532 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
533 d = (uint8_t*) p->write.descriptor + p->write.index;
534 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
535 } else {
536 pa_assert(p->write.data || p->write.memchunk.memblock);
537
538 if (p->write.data)
539 d = p->write.data;
540 else {
541 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
542 release_memblock = p->write.memchunk.memblock;
543 }
544
545 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
546 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
547 }
548
549 pa_assert(l > 0);
550
551 #ifdef HAVE_CREDS
552 if (p->send_creds_now) {
553
554 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
555 goto fail;
556
557 p->send_creds_now = 0;
558 } else
559 #endif
560
561 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
562 goto fail;
563
564 if (release_memblock)
565 pa_memblock_release(release_memblock);
566
567 p->write.index += r;
568
569 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
570 pa_assert(p->write.current);
571 item_free(p->write.current, NULL);
572 p->write.current = NULL;
573
574 if (p->write.memchunk.memblock)
575 pa_memblock_unref(p->write.memchunk.memblock);
576
577 pa_memchunk_reset(&p->write.memchunk);
578
579 if (p->drain_callback && !pa_pstream_is_pending(p))
580 p->drain_callback(p, p->drain_callback_userdata);
581 }
582
583 return 0;
584
585 fail:
586
587 if (release_memblock)
588 pa_memblock_release(release_memblock);
589
590 return -1;
591 }
592
593 static int do_read(pa_pstream *p) {
594 void *d;
595 size_t l;
596 ssize_t r;
597 pa_memblock *release_memblock = NULL;
598 pa_assert(p);
599 pa_assert(PA_REFCNT_VALUE(p) > 0);
600
601 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
602 d = (uint8_t*) p->read.descriptor + p->read.index;
603 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
604 } else {
605 pa_assert(p->read.data || p->read.memblock);
606
607 if (p->read.data)
608 d = p->read.data;
609 else {
610 d = pa_memblock_acquire(p->read.memblock);
611 release_memblock = p->read.memblock;
612 }
613
614 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
615 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
616 }
617
618 #ifdef HAVE_CREDS
619 {
620 int b = 0;
621
622 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
623 goto fail;
624
625 p->read_creds_valid = p->read_creds_valid || b;
626 }
627 #else
628 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
629 goto fail;
630 #endif
631
632 if (release_memblock)
633 pa_memblock_release(release_memblock);
634
635 p->read.index += r;
636
637 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
638 uint32_t flags, length, channel;
639 /* Reading of frame descriptor complete */
640
641 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
642
643 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
644 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
645 return -1;
646 }
647
648 if (flags == PA_FLAG_SHMRELEASE) {
649
650 /* This is a SHM memblock release frame with no payload */
651
652 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
653
654 pa_assert(p->export);
655 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
656
657 goto frame_done;
658
659 } else if (flags == PA_FLAG_SHMREVOKE) {
660
661 /* This is a SHM memblock revoke frame with no payload */
662
663 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
664
665 pa_assert(p->import);
666 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
667
668 goto frame_done;
669 }
670
671 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
672
673 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
674 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
675 return -1;
676 }
677
678 pa_assert(!p->read.packet && !p->read.memblock);
679
680 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
681
682 if (channel == (uint32_t) -1) {
683
684 if (flags != 0) {
685 pa_log_warn("Received packet frame with invalid flags value.");
686 return -1;
687 }
688
689 /* Frame is a packet frame */
690 p->read.packet = pa_packet_new(length);
691 p->read.data = p->read.packet->data;
692
693 } else {
694
695 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
696 pa_log_warn("Received memblock frame with invalid seek mode.");
697 return -1;
698 }
699
700 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
701
702 if (length != sizeof(p->read.shm_info)) {
703 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
704 return -1;
705 }
706
707 /* Frame is a memblock frame referencing an SHM memblock */
708 p->read.data = p->read.shm_info;
709
710 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
711
712 /* Frame is a memblock frame */
713
714 p->read.memblock = pa_memblock_new(p->mempool, length);
715 p->read.data = NULL;
716 } else {
717
718 pa_log_warn("Recieved memblock frame with invalid flags value.");
719 return -1;
720 }
721 }
722
723 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
724 /* Frame payload available */
725
726 if (p->read.memblock && p->recieve_memblock_callback) {
727
728 /* Is this memblock data? Than pass it to the user */
729 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
730
731 if (l > 0) {
732 pa_memchunk chunk;
733
734 chunk.memblock = p->read.memblock;
735 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
736 chunk.length = l;
737
738 if (p->recieve_memblock_callback) {
739 int64_t offset;
740
741 offset = (int64_t) (
742 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
743 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
744
745 p->recieve_memblock_callback(
746 p,
747 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
748 offset,
749 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
750 &chunk,
751 p->recieve_memblock_callback_userdata);
752 }
753
754 /* Drop seek info for following callbacks */
755 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
756 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
757 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
758 }
759 }
760
761 /* Frame complete */
762 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
763
764 if (p->read.memblock) {
765
766 /* This was a memblock frame. We can unref the memblock now */
767 pa_memblock_unref(p->read.memblock);
768
769 } else if (p->read.packet) {
770
771 if (p->recieve_packet_callback)
772 #ifdef HAVE_CREDS
773 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
774 #else
775 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
776 #endif
777
778 pa_packet_unref(p->read.packet);
779 } else {
780 pa_memblock *b;
781
782 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
783
784 pa_assert(p->import);
785
786 if (!(b = pa_memimport_get(p->import,
787 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
788 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
789 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
790 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
791
792 pa_log_warn("Failed to import memory block.");
793 return -1;
794 }
795
796 if (p->recieve_memblock_callback) {
797 int64_t offset;
798 pa_memchunk chunk;
799
800 chunk.memblock = b;
801 chunk.index = 0;
802 chunk.length = pa_memblock_get_length(b);
803
804 offset = (int64_t) (
805 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
806 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
807
808 p->recieve_memblock_callback(
809 p,
810 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
811 offset,
812 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
813 &chunk,
814 p->recieve_memblock_callback_userdata);
815 }
816
817 pa_memblock_unref(b);
818 }
819
820 goto frame_done;
821 }
822 }
823
824 return 0;
825
826 frame_done:
827 p->read.memblock = NULL;
828 p->read.packet = NULL;
829 p->read.index = 0;
830 p->read.data = NULL;
831
832 #ifdef HAVE_CREDS
833 p->read_creds_valid = 0;
834 #endif
835
836 return 0;
837
838 fail:
839 if (release_memblock)
840 pa_memblock_release(release_memblock);
841
842 return -1;
843 }
844
845 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
846 pa_assert(p);
847 pa_assert(PA_REFCNT_VALUE(p) > 0);
848
849 p->die_callback = cb;
850 p->die_callback_userdata = userdata;
851 }
852
853 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
854 pa_assert(p);
855 pa_assert(PA_REFCNT_VALUE(p) > 0);
856
857 p->drain_callback = cb;
858 p->drain_callback_userdata = userdata;
859 }
860
861 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
862 pa_assert(p);
863 pa_assert(PA_REFCNT_VALUE(p) > 0);
864
865 p->recieve_packet_callback = cb;
866 p->recieve_packet_callback_userdata = userdata;
867 }
868
869 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
870 pa_assert(p);
871 pa_assert(PA_REFCNT_VALUE(p) > 0);
872
873 p->recieve_memblock_callback = cb;
874 p->recieve_memblock_callback_userdata = userdata;
875 }
876
877 int pa_pstream_is_pending(pa_pstream *p) {
878 int b;
879
880 pa_assert(p);
881 pa_assert(PA_REFCNT_VALUE(p) > 0);
882
883 if (p->dead)
884 b = 0;
885 else
886 b = p->write.current || !pa_queue_is_empty(p->send_queue);
887
888 return b;
889 }
890
891 void pa_pstream_unref(pa_pstream*p) {
892 pa_assert(p);
893 pa_assert(PA_REFCNT_VALUE(p) > 0);
894
895 if (PA_REFCNT_DEC(p) <= 0)
896 pstream_free(p);
897 }
898
899 pa_pstream* pa_pstream_ref(pa_pstream*p) {
900 pa_assert(p);
901 pa_assert(PA_REFCNT_VALUE(p) > 0);
902
903 PA_REFCNT_INC(p);
904 return p;
905 }
906
907 void pa_pstream_unlink(pa_pstream *p) {
908 pa_assert(p);
909
910 if (p->dead)
911 return;
912
913 p->dead = 1;
914
915 if (p->import) {
916 pa_memimport_free(p->import);
917 p->import = NULL;
918 }
919
920 if (p->export) {
921 pa_memexport_free(p->export);
922 p->export = NULL;
923 }
924
925 if (p->io) {
926 pa_iochannel_free(p->io);
927 p->io = NULL;
928 }
929
930 if (p->defer_event) {
931 p->mainloop->defer_free(p->defer_event);
932 p->defer_event = NULL;
933 }
934
935 p->die_callback = NULL;
936 p->drain_callback = NULL;
937 p->recieve_packet_callback = NULL;
938 p->recieve_memblock_callback = NULL;
939 }
940
941 void pa_pstream_use_shm(pa_pstream *p, int enable) {
942 pa_assert(p);
943 pa_assert(PA_REFCNT_VALUE(p) > 0);
944
945 p->use_shm = enable;
946
947 if (enable) {
948
949 if (!p->export)
950 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
951
952 } else {
953
954 if (p->export) {
955 pa_memexport_free(p->export);
956 p->export = NULL;
957 }
958 }
959 }