4 This file is part of PulseAudio.
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/socket.h>
33 #ifdef HAVE_NETINET_IN_H
34 #include <netinet/in.h>
39 #include <pulse/xmalloc.h>
41 #include <pulsecore/queue.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-scache.h>
48 PA_PSTREAM_DESCRIPTOR_LENGTH
,
49 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
50 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
51 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
52 PA_PSTREAM_DESCRIPTOR_SEEK
,
53 PA_PSTREAM_DESCRIPTOR_MAX
56 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
58 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
59 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
62 enum { PA_PSTREAM_ITEM_PACKET
, PA_PSTREAM_ITEM_MEMBLOCK
} type
;
68 pa_seek_mode_t seek_mode
;
72 #ifdef SCM_CREDENTIALS
81 pa_mainloop_api
*mainloop
;
82 pa_defer_event
*defer_event
;
89 struct item_info
* current
;
90 pa_pstream_descriptor descriptor
;
96 pa_memblock
*memblock
;
98 pa_pstream_descriptor descriptor
;
103 pa_pstream_packet_cb_t recieve_packet_callback
;
104 void *recieve_packet_callback_userdata
;
106 pa_pstream_memblock_cb_t recieve_memblock_callback
;
107 void *recieve_memblock_callback_userdata
;
109 pa_pstream_notify_cb_t drain_callback
;
110 void *drain_callback_userdata
;
112 pa_pstream_notify_cb_t die_callback
;
113 void *die_callback_userdata
;
115 pa_memblock_stat
*memblock_stat
;
117 #ifdef SCM_CREDENTIALS
118 struct ucred read_creds
, write_creds
;
119 int read_creds_valid
, send_creds_now
;
123 static int do_write(pa_pstream
*p
);
124 static int do_read(pa_pstream
*p
);
126 static void do_something(pa_pstream
*p
) {
129 p
->mainloop
->defer_enable(p
->defer_event
, 0);
133 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
136 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
139 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
152 p
->die_callback(p
, p
->die_callback_userdata
);
157 static void io_callback(pa_iochannel
*io
, void *userdata
) {
158 pa_pstream
*p
= userdata
;
166 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
167 pa_pstream
*p
= userdata
;
170 assert(p
->defer_event
== e
);
171 assert(p
->mainloop
== m
);
176 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_memblock_stat
*s
) {
180 p
= pa_xnew(pa_pstream
, 1);
184 pa_iochannel_set_callback(io
, io_callback
, p
);
189 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
190 m
->defer_enable(p
->defer_event
, 0);
192 p
->send_queue
= pa_queue_new();
193 assert(p
->send_queue
);
195 p
->write
.current
= NULL
;
198 p
->read
.memblock
= NULL
;
199 p
->read
.packet
= NULL
;
202 p
->recieve_packet_callback
= NULL
;
203 p
->recieve_packet_callback_userdata
= NULL
;
205 p
->recieve_memblock_callback
= NULL
;
206 p
->recieve_memblock_callback_userdata
= NULL
;
208 p
->drain_callback
= NULL
;
209 p
->drain_callback_userdata
= NULL
;
211 p
->die_callback
= NULL
;
212 p
->die_callback_userdata
= NULL
;
214 p
->memblock_stat
= s
;
216 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
217 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
219 #ifdef SCM_CREDENTIALS
220 p
->send_creds_now
= 0;
221 p
->read_creds_valid
= 0;
226 static void item_free(void *item
, PA_GCC_UNUSED
void *p
) {
227 struct item_info
*i
= item
;
230 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
231 assert(i
->chunk
.memblock
);
232 pa_memblock_unref(i
->chunk
.memblock
);
234 assert(i
->type
== PA_PSTREAM_ITEM_PACKET
);
236 pa_packet_unref(i
->packet
);
242 static void pstream_free(pa_pstream
*p
) {
247 pa_queue_free(p
->send_queue
, item_free
, NULL
);
249 if (p
->write
.current
)
250 item_free(p
->write
.current
, NULL
);
252 if (p
->read
.memblock
)
253 pa_memblock_unref(p
->read
.memblock
);
256 pa_packet_unref(p
->read
.packet
);
261 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const struct ucred
*creds
) {
263 assert(p
&& packet
&& p
->ref
>= 1);
268 /* pa_log(__FILE__": push-packet %p", packet); */
270 i
= pa_xnew(struct item_info
, 1);
271 i
->type
= PA_PSTREAM_ITEM_PACKET
;
272 i
->packet
= pa_packet_ref(packet
);
273 #ifdef SCM_CREDENTIALS
274 if ((i
->with_creds
= !!creds
))
278 pa_queue_push(p
->send_queue
, i
);
279 p
->mainloop
->defer_enable(p
->defer_event
, 1);
282 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
284 assert(p
&& channel
!= (uint32_t) -1 && chunk
&& p
->ref
>= 1);
289 /* pa_log(__FILE__": push-memblock %p", chunk); */
291 i
= pa_xnew(struct item_info
, 1);
292 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
294 i
->channel
= channel
;
296 i
->seek_mode
= seek_mode
;
297 #ifdef SCM_CREDENTIALS
301 pa_memblock_ref(i
->chunk
.memblock
);
303 pa_queue_push(p
->send_queue
, i
);
304 p
->mainloop
->defer_enable(p
->defer_event
, 1);
307 static void prepare_next_write_item(pa_pstream
*p
) {
310 if (!(p
->write
.current
= pa_queue_pop(p
->send_queue
)))
315 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
316 /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
318 assert(p
->write
.current
->packet
);
319 p
->write
.data
= p
->write
.current
->packet
->data
;
320 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
321 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
322 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
323 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
324 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
] = 0;
328 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
&& p
->write
.current
->chunk
.memblock
);
329 p
->write
.data
= (uint8_t*) p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
330 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
331 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
332 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
333 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
334 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
] = htonl(p
->write
.current
->seek_mode
);
337 #ifdef SCM_CREDENTIALS
338 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
339 p
->write_creds
= p
->write
.current
->creds
;
345 static int do_write(pa_pstream
*p
) {
351 if (!p
->write
.current
)
352 prepare_next_write_item(p
);
354 if (!p
->write
.current
)
357 assert(p
->write
.data
);
359 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
360 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
361 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
363 d
= (uint8_t*) p
->write
.data
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
364 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
367 #ifdef SCM_CREDENTIALS
368 if (p
->send_creds_now
) {
370 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
373 p
->send_creds_now
= 0;
377 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
382 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
383 assert(p
->write
.current
);
384 item_free(p
->write
.current
, (void *) 1);
385 p
->write
.current
= NULL
;
387 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
388 p
->drain_callback(p
, p
->drain_callback_userdata
);
394 static int do_read(pa_pstream
*p
) {
400 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
401 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
402 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
404 assert(p
->read
.data
);
405 d
= (uint8_t*) p
->read
.data
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
406 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
409 #ifdef SCM_CREDENTIALS
413 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
416 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
419 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
425 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
426 /* Reading of frame descriptor complete */
428 /* Frame size too large */
429 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) > FRAME_SIZE_MAX
) {
430 pa_log_warn(__FILE__
": Frame size too large: %lu > %lu", (unsigned long) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]), (unsigned long) FRAME_SIZE_MAX
);
434 assert(!p
->read
.packet
&& !p
->read
.memblock
);
436 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]) == (uint32_t) -1) {
437 /* Frame is a packet frame */
438 p
->read
.packet
= pa_packet_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
439 p
->read
.data
= p
->read
.packet
->data
;
441 /* Frame is a memblock frame */
442 p
->read
.memblock
= pa_memblock_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]), p
->memblock_stat
);
443 p
->read
.data
= p
->read
.memblock
->data
;
445 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
]) > PA_SEEK_RELATIVE_END
) {
446 pa_log_warn(__FILE__
": Invalid seek mode");
451 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
452 /* Frame payload available */
454 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) { /* Is this memblock data? Than pass it to the user */
455 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
460 chunk
.memblock
= p
->read
.memblock
;
461 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
464 if (p
->recieve_memblock_callback
) {
468 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
469 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
471 p
->recieve_memblock_callback(
473 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
475 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
]),
477 p
->recieve_memblock_callback_userdata
);
480 /* Drop seek info for following callbacks */
481 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
] =
482 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
483 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
488 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
489 if (p
->read
.memblock
) {
490 assert(!p
->read
.packet
);
492 pa_memblock_unref(p
->read
.memblock
);
493 p
->read
.memblock
= NULL
;
495 assert(p
->read
.packet
);
497 if (p
->recieve_packet_callback
)
498 #ifdef SCM_CREDENTIALS
499 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
501 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
504 pa_packet_unref(p
->read
.packet
);
505 p
->read
.packet
= NULL
;
509 #ifdef SCM_CREDENTIALS
510 p
->read_creds_valid
= 0;
518 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
522 p
->die_callback
= cb
;
523 p
->die_callback_userdata
= userdata
;
527 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
531 p
->drain_callback
= cb
;
532 p
->drain_callback_userdata
= userdata
;
535 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
539 p
->recieve_packet_callback
= cb
;
540 p
->recieve_packet_callback_userdata
= userdata
;
543 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
547 p
->recieve_memblock_callback
= cb
;
548 p
->recieve_memblock_callback_userdata
= userdata
;
551 int pa_pstream_is_pending(pa_pstream
*p
) {
557 return p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
560 void pa_pstream_unref(pa_pstream
*p
) {
568 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
576 void pa_pstream_close(pa_pstream
*p
) {
582 pa_iochannel_free(p
->io
);
586 if (p
->defer_event
) {
587 p
->mainloop
->defer_free(p
->defer_event
);
588 p
->defer_event
= NULL
;
591 p
->die_callback
= NULL
;
592 p
->drain_callback
= NULL
;
593 p
->recieve_packet_callback
= NULL
;
594 p
->recieve_memblock_callback
= NULL
;