4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
45 #include <pulse/xmalloc.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
64 PA_PSTREAM_DESCRIPTOR_LENGTH
,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
68 PA_PSTREAM_DESCRIPTOR_FLAGS
,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
74 PA_PSTREAM_SHM_BLOCKID
,
77 PA_PSTREAM_SHM_LENGTH
,
81 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85 #define FRAME_SIZE_MAX_USE (1024*64)
89 PA_PSTREAM_ITEM_PACKET
,
90 PA_PSTREAM_ITEM_MEMBLOCK
,
91 PA_PSTREAM_ITEM_SHMRELEASE
,
92 PA_PSTREAM_ITEM_SHMREVOKE
107 pa_seek_mode_t seek_mode
;
109 /* release/revoke info */
116 pa_mainloop_api
*mainloop
;
117 pa_defer_event
*defer_event
;
120 pa_queue
*send_queue
;
125 pa_pstream_descriptor descriptor
;
126 struct item_info
* current
;
127 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
130 pa_memchunk memchunk
;
134 pa_pstream_descriptor descriptor
;
135 pa_memblock
*memblock
;
137 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
143 pa_memimport
*import
;
144 pa_memexport
*export
;
146 pa_pstream_packet_cb_t recieve_packet_callback
;
147 void *recieve_packet_callback_userdata
;
149 pa_pstream_memblock_cb_t recieve_memblock_callback
;
150 void *recieve_memblock_callback_userdata
;
152 pa_pstream_notify_cb_t drain_callback
;
153 void *drain_callback_userdata
;
155 pa_pstream_notify_cb_t die_callback
;
156 void *die_callback_userdata
;
161 pa_creds read_creds
, write_creds
;
162 int read_creds_valid
, send_creds_now
;
166 static int do_write(pa_pstream
*p
);
167 static int do_read(pa_pstream
*p
);
169 static void do_something(pa_pstream
*p
) {
171 pa_assert(PA_REFCNT_VALUE(p
) > 0);
175 p
->mainloop
->defer_enable(p
->defer_event
, 0);
177 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
180 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
183 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
194 p
->die_callback(p
, p
->die_callback_userdata
);
196 pa_pstream_unlink(p
);
200 static void io_callback(pa_iochannel
*io
, void *userdata
) {
201 pa_pstream
*p
= userdata
;
204 pa_assert(PA_REFCNT_VALUE(p
) > 0);
205 pa_assert(p
->io
== io
);
210 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
211 pa_pstream
*p
= userdata
;
214 pa_assert(PA_REFCNT_VALUE(p
) > 0);
215 pa_assert(p
->defer_event
== e
);
216 pa_assert(p
->mainloop
== m
);
221 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
223 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
230 p
= pa_xnew(pa_pstream
, 1);
233 pa_iochannel_set_callback(io
, io_callback
, p
);
237 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
238 m
->defer_enable(p
->defer_event
, 0);
240 p
->send_queue
= pa_queue_new();
241 pa_assert(p
->send_queue
);
243 p
->write
.current
= NULL
;
245 pa_memchunk_reset(&p
->write
.memchunk
);
246 p
->read
.memblock
= NULL
;
247 p
->read
.packet
= NULL
;
250 p
->recieve_packet_callback
= NULL
;
251 p
->recieve_packet_callback_userdata
= NULL
;
252 p
->recieve_memblock_callback
= NULL
;
253 p
->recieve_memblock_callback_userdata
= NULL
;
254 p
->drain_callback
= NULL
;
255 p
->drain_callback_userdata
= NULL
;
256 p
->die_callback
= NULL
;
257 p
->die_callback_userdata
= NULL
;
264 /* We do importing unconditionally */
265 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
267 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
268 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
271 p
->send_creds_now
= 0;
272 p
->read_creds_valid
= 0;
277 static void item_free(void *item
, PA_GCC_UNUSED
void *q
) {
278 struct item_info
*i
= item
;
281 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
282 pa_assert(i
->chunk
.memblock
);
283 pa_memblock_unref(i
->chunk
.memblock
);
284 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
285 pa_assert(i
->packet
);
286 pa_packet_unref(i
->packet
);
292 static void pstream_free(pa_pstream
*p
) {
295 pa_pstream_unlink(p
);
297 pa_queue_free(p
->send_queue
, item_free
, NULL
);
299 if (p
->write
.current
)
300 item_free(p
->write
.current
, NULL
);
302 if (p
->write
.memchunk
.memblock
)
303 pa_memblock_unref(p
->write
.memchunk
.memblock
);
305 if (p
->read
.memblock
)
306 pa_memblock_unref(p
->read
.memblock
);
309 pa_packet_unref(p
->read
.packet
);
314 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
318 pa_assert(PA_REFCNT_VALUE(p
) > 0);
324 i
= pa_xnew(struct item_info
, 1);
325 i
->type
= PA_PSTREAM_ITEM_PACKET
;
326 i
->packet
= pa_packet_ref(packet
);
329 if ((i
->with_creds
= !!creds
))
333 pa_queue_push(p
->send_queue
, i
);
335 p
->mainloop
->defer_enable(p
->defer_event
, 1);
338 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
342 pa_assert(PA_REFCNT_VALUE(p
) > 0);
343 pa_assert(channel
!= (uint32_t) -1);
350 length
= chunk
->length
;
356 i
= pa_xnew(struct item_info
, 1);
357 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
359 n
= length
< FRAME_SIZE_MAX_USE
? length
: FRAME_SIZE_MAX_USE
;
360 i
->chunk
.index
= chunk
->index
+ idx
;
362 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
364 i
->channel
= channel
;
366 i
->seek_mode
= seek_mode
;
371 pa_queue_push(p
->send_queue
, i
);
377 p
->mainloop
->defer_enable(p
->defer_event
, 1);
380 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
381 struct item_info
*item
;
382 pa_pstream
*p
= userdata
;
385 pa_assert(PA_REFCNT_VALUE(p
) > 0);
390 /* pa_log("Releasing block %u", block_id); */
392 item
= pa_xnew(struct item_info
, 1);
393 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
394 item
->block_id
= block_id
;
396 item
->with_creds
= 0;
399 pa_queue_push(p
->send_queue
, item
);
400 p
->mainloop
->defer_enable(p
->defer_event
, 1);
403 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
404 struct item_info
*item
;
405 pa_pstream
*p
= userdata
;
408 pa_assert(PA_REFCNT_VALUE(p
) > 0);
412 /* pa_log("Revoking block %u", block_id); */
414 item
= pa_xnew(struct item_info
, 1);
415 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
416 item
->block_id
= block_id
;
418 item
->with_creds
= 0;
421 pa_queue_push(p
->send_queue
, item
);
422 p
->mainloop
->defer_enable(p
->defer_event
, 1);
425 static void prepare_next_write_item(pa_pstream
*p
) {
427 pa_assert(PA_REFCNT_VALUE(p
) > 0);
429 p
->write
.current
= pa_queue_pop(p
->send_queue
);
431 if (!p
->write
.current
)
435 p
->write
.data
= NULL
;
436 pa_memchunk_reset(&p
->write
.memchunk
);
438 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
439 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
440 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
441 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
442 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
444 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
446 pa_assert(p
->write
.current
->packet
);
447 p
->write
.data
= p
->write
.current
->packet
->data
;
448 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
450 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
452 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
453 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
455 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
457 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
458 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
462 int send_payload
= 1;
464 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
465 pa_assert(p
->write
.current
->chunk
.memblock
);
467 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
468 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
469 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
471 flags
= p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
;
474 uint32_t block_id
, shm_id
;
475 size_t offset
, length
;
477 pa_assert(p
->export
);
479 if (pa_memexport_put(p
->export
,
480 p
->write
.current
->chunk
.memblock
,
486 flags
|= PA_FLAG_SHMDATA
;
489 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
490 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
491 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
492 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
494 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
495 p
->write
.data
= p
->write
.shm_info
;
498 /* pa_log_warn("Failed to export memory block."); */
502 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
503 p
->write
.memchunk
= p
->write
.current
->chunk
;
504 pa_memblock_ref(p
->write
.memchunk
.memblock
);
505 p
->write
.data
= NULL
;
508 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
512 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
513 p
->write_creds
= p
->write
.current
->creds
;
517 static int do_write(pa_pstream
*p
) {
521 pa_memblock
*release_memblock
= NULL
;
524 pa_assert(PA_REFCNT_VALUE(p
) > 0);
526 if (!p
->write
.current
)
527 prepare_next_write_item(p
);
529 if (!p
->write
.current
)
532 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
533 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
534 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
536 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
541 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
542 release_memblock
= p
->write
.memchunk
.memblock
;
545 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
546 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
552 if (p
->send_creds_now
) {
554 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
557 p
->send_creds_now
= 0;
561 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
564 if (release_memblock
)
565 pa_memblock_release(release_memblock
);
569 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
570 pa_assert(p
->write
.current
);
571 item_free(p
->write
.current
, NULL
);
572 p
->write
.current
= NULL
;
574 if (p
->write
.memchunk
.memblock
)
575 pa_memblock_unref(p
->write
.memchunk
.memblock
);
577 pa_memchunk_reset(&p
->write
.memchunk
);
579 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
580 p
->drain_callback(p
, p
->drain_callback_userdata
);
587 if (release_memblock
)
588 pa_memblock_release(release_memblock
);
593 static int do_read(pa_pstream
*p
) {
597 pa_memblock
*release_memblock
= NULL
;
599 pa_assert(PA_REFCNT_VALUE(p
) > 0);
601 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
602 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
603 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
605 pa_assert(p
->read
.data
|| p
->read
.memblock
);
610 d
= pa_memblock_acquire(p
->read
.memblock
);
611 release_memblock
= p
->read
.memblock
;
614 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
615 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
622 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
625 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
628 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
632 if (release_memblock
)
633 pa_memblock_release(release_memblock
);
637 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
638 uint32_t flags
, length
, channel
;
639 /* Reading of frame descriptor complete */
641 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
643 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
644 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
648 if (flags
== PA_FLAG_SHMRELEASE
) {
650 /* This is a SHM memblock release frame with no payload */
652 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
654 pa_assert(p
->export
);
655 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
659 } else if (flags
== PA_FLAG_SHMREVOKE
) {
661 /* This is a SHM memblock revoke frame with no payload */
663 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
665 pa_assert(p
->import
);
666 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
671 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
673 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
674 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length
);
678 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
680 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
682 if (channel
== (uint32_t) -1) {
685 pa_log_warn("Received packet frame with invalid flags value.");
689 /* Frame is a packet frame */
690 p
->read
.packet
= pa_packet_new(length
);
691 p
->read
.data
= p
->read
.packet
->data
;
695 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
696 pa_log_warn("Received memblock frame with invalid seek mode.");
700 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
702 if (length
!= sizeof(p
->read
.shm_info
)) {
703 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
707 /* Frame is a memblock frame referencing an SHM memblock */
708 p
->read
.data
= p
->read
.shm_info
;
710 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
712 /* Frame is a memblock frame */
714 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
718 pa_log_warn("Recieved memblock frame with invalid flags value.");
723 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
724 /* Frame payload available */
726 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
728 /* Is this memblock data? Than pass it to the user */
729 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
734 chunk
.memblock
= p
->read
.memblock
;
735 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
738 if (p
->recieve_memblock_callback
) {
742 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
743 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
745 p
->recieve_memblock_callback(
747 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
749 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
751 p
->recieve_memblock_callback_userdata
);
754 /* Drop seek info for following callbacks */
755 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
756 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
757 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
762 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
764 if (p
->read
.memblock
) {
766 /* This was a memblock frame. We can unref the memblock now */
767 pa_memblock_unref(p
->read
.memblock
);
769 } else if (p
->read
.packet
) {
771 if (p
->recieve_packet_callback
)
773 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
775 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
778 pa_packet_unref(p
->read
.packet
);
782 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
784 pa_assert(p
->import
);
786 if (!(b
= pa_memimport_get(p
->import
,
787 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
788 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
789 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
790 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
792 pa_log_warn("Failed to import memory block.");
796 if (p
->recieve_memblock_callback
) {
802 chunk
.length
= pa_memblock_get_length(b
);
805 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
806 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
808 p
->recieve_memblock_callback(
810 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
812 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
814 p
->recieve_memblock_callback_userdata
);
817 pa_memblock_unref(b
);
827 p
->read
.memblock
= NULL
;
828 p
->read
.packet
= NULL
;
833 p
->read_creds_valid
= 0;
839 if (release_memblock
)
840 pa_memblock_release(release_memblock
);
845 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
847 pa_assert(PA_REFCNT_VALUE(p
) > 0);
849 p
->die_callback
= cb
;
850 p
->die_callback_userdata
= userdata
;
853 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
855 pa_assert(PA_REFCNT_VALUE(p
) > 0);
857 p
->drain_callback
= cb
;
858 p
->drain_callback_userdata
= userdata
;
861 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
863 pa_assert(PA_REFCNT_VALUE(p
) > 0);
865 p
->recieve_packet_callback
= cb
;
866 p
->recieve_packet_callback_userdata
= userdata
;
869 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
871 pa_assert(PA_REFCNT_VALUE(p
) > 0);
873 p
->recieve_memblock_callback
= cb
;
874 p
->recieve_memblock_callback_userdata
= userdata
;
877 int pa_pstream_is_pending(pa_pstream
*p
) {
881 pa_assert(PA_REFCNT_VALUE(p
) > 0);
886 b
= p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
891 void pa_pstream_unref(pa_pstream
*p
) {
893 pa_assert(PA_REFCNT_VALUE(p
) > 0);
895 if (PA_REFCNT_DEC(p
) <= 0)
899 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
901 pa_assert(PA_REFCNT_VALUE(p
) > 0);
907 void pa_pstream_unlink(pa_pstream
*p
) {
916 pa_memimport_free(p
->import
);
921 pa_memexport_free(p
->export
);
926 pa_iochannel_free(p
->io
);
930 if (p
->defer_event
) {
931 p
->mainloop
->defer_free(p
->defer_event
);
932 p
->defer_event
= NULL
;
935 p
->die_callback
= NULL
;
936 p
->drain_callback
= NULL
;
937 p
->recieve_packet_callback
= NULL
;
938 p
->recieve_memblock_callback
= NULL
;
941 void pa_pstream_use_shm(pa_pstream
*p
, int enable
) {
943 pa_assert(PA_REFCNT_VALUE(p
) > 0);
950 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
955 pa_memexport_free(p
->export
);