4 This file is part of polypaudio.
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <netinet/in.h>
33 enum pa_pstream_descriptor_index
{
34 PA_PSTREAM_DESCRIPTOR_LENGTH
,
35 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
36 PA_PSTREAM_DESCRIPTOR_DELTA
,
37 PA_PSTREAM_DESCRIPTOR_MAX
40 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
42 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
43 #define FRAME_SIZE_MAX (1024*64)
46 enum { PA_PSTREAM_ITEM_PACKET
, PA_PSTREAM_ITEM_MEMBLOCK
} type
;
49 struct pa_memchunk chunk
;
54 struct pa_packet
*packet
;
58 struct pa_mainloop_api
*mainloop
;
59 struct mainloop_source
*mainloop_source
;
60 struct pa_iochannel
*io
;
61 struct pa_queue
*send_queue
;
63 int in_use
, shall_free
;
66 void (*die_callback
) (struct pa_pstream
*p
, void *userdata
);
67 void *die_callback_userdata
;
70 struct item_info
* current
;
71 pa_pstream_descriptor descriptor
;
77 struct pa_memblock
*memblock
;
78 struct pa_packet
*packet
;
79 pa_pstream_descriptor descriptor
;
84 void (*recieve_packet_callback
) (struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
);
85 void *recieve_packet_callback_userdata
;
87 void (*recieve_memblock_callback
) (struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, const struct pa_memchunk
*chunk
, void *userdata
);
88 void *recieve_memblock_callback_userdata
;
90 void (*drain_callback
)(struct pa_pstream
*p
, void *userdata
);
94 static void do_write(struct pa_pstream
*p
);
95 static void do_read(struct pa_pstream
*p
);
97 static void do_something(struct pa_pstream
*p
) {
98 assert(p
&& !p
->shall_free
);
99 p
->mainloop
->enable_fixed(p
->mainloop
, p
->mainloop_source
, 0);
104 if (pa_iochannel_is_hungup(p
->io
)) {
107 p
->die_callback(p
, p
->die_callback_userdata
);
112 if (pa_iochannel_is_writable(p
->io
)) {
123 if (pa_iochannel_is_readable(p
->io
)) {
134 static void io_callback(struct pa_iochannel
*io
, void *userdata
) {
135 struct pa_pstream
*p
= userdata
;
136 assert(p
&& p
->io
== io
);
140 static void fixed_callback(struct pa_mainloop_api
*m
, void *id
, void*userdata
) {
141 struct pa_pstream
*p
= userdata
;
142 assert(p
&& p
->mainloop_source
== id
&& p
->mainloop
== m
);
146 struct pa_pstream
*pa_pstream_new(struct pa_mainloop_api
*m
, struct pa_iochannel
*io
) {
147 struct pa_pstream
*p
;
150 p
= malloc(sizeof(struct pa_pstream
));
154 pa_iochannel_set_callback(io
, io_callback
, p
);
157 p
->die_callback
= NULL
;
158 p
->die_callback_userdata
= NULL
;
161 p
->mainloop_source
= m
->source_fixed(m
, fixed_callback
, p
);
162 m
->enable_fixed(m
, p
->mainloop_source
, 0);
164 p
->send_queue
= pa_queue_new();
165 assert(p
->send_queue
);
167 p
->write
.current
= NULL
;
170 p
->read
.memblock
= NULL
;
171 p
->read
.packet
= NULL
;
174 p
->recieve_packet_callback
= NULL
;
175 p
->recieve_packet_callback_userdata
= NULL
;
177 p
->recieve_memblock_callback
= NULL
;
178 p
->recieve_memblock_callback_userdata
= NULL
;
180 p
->drain_callback
= NULL
;
181 p
->drain_userdata
= NULL
;
183 p
->in_use
= p
->shall_free
= 0;
188 static void item_free(void *item
, void *p
) {
189 struct item_info
*i
= item
;
192 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
193 assert(i
->chunk
.memblock
);
194 pa_memblock_unref(i
->chunk
.memblock
);
196 assert(i
->type
== PA_PSTREAM_ITEM_PACKET
);
198 pa_packet_unref(i
->packet
);
204 void pa_pstream_free(struct pa_pstream
*p
) {
208 /* If this pstream object is used by someone else on the call stack, we have to postpone the freeing */
209 p
->dead
= p
->shall_free
= 1;
213 pa_iochannel_free(p
->io
);
214 pa_queue_free(p
->send_queue
, item_free
, NULL
);
216 if (p
->write
.current
)
217 item_free(p
->write
.current
, NULL
);
219 if (p
->read
.memblock
)
220 pa_memblock_unref(p
->read
.memblock
);
223 pa_packet_unref(p
->read
.packet
);
225 p
->mainloop
->cancel_fixed(p
->mainloop
, p
->mainloop_source
);
229 void pa_pstream_send_packet(struct pa_pstream
*p
, struct pa_packet
*packet
) {
233 i
= malloc(sizeof(struct item_info
));
235 i
->type
= PA_PSTREAM_ITEM_PACKET
;
236 i
->packet
= pa_packet_ref(packet
);
238 pa_queue_push(p
->send_queue
, i
);
239 p
->mainloop
->enable_fixed(p
->mainloop
, p
->mainloop_source
, 1);
242 void pa_pstream_send_memblock(struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, const struct pa_memchunk
*chunk
) {
244 assert(p
&& channel
!= (uint32_t) -1 && chunk
);
246 i
= malloc(sizeof(struct item_info
));
248 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
250 i
->channel
= channel
;
253 pa_memblock_ref(i
->chunk
.memblock
);
255 pa_queue_push(p
->send_queue
, i
);
256 p
->mainloop
->enable_fixed(p
->mainloop
, p
->mainloop_source
, 1);
259 void pa_pstream_set_recieve_packet_callback(struct pa_pstream
*p
, void (*callback
) (struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
), void *userdata
) {
260 assert(p
&& callback
);
262 p
->recieve_packet_callback
= callback
;
263 p
->recieve_packet_callback_userdata
= userdata
;
266 void pa_pstream_set_recieve_memblock_callback(struct pa_pstream
*p
, void (*callback
) (struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, const struct pa_memchunk
*chunk
, void *userdata
), void *userdata
) {
267 assert(p
&& callback
);
269 p
->recieve_memblock_callback
= callback
;
270 p
->recieve_memblock_callback_userdata
= userdata
;
273 static void prepare_next_write_item(struct pa_pstream
*p
) {
276 if (!(p
->write
.current
= pa_queue_pop(p
->send_queue
)))
281 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
282 assert(p
->write
.current
->packet
);
283 p
->write
.data
= p
->write
.current
->packet
->data
;
284 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
285 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
286 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
] = 0;
288 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
&& p
->write
.current
->chunk
.memblock
);
289 p
->write
.data
= p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
290 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
291 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
292 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
] = htonl(p
->write
.current
->delta
);
296 static void do_write(struct pa_pstream
*p
) {
302 if (!p
->write
.current
)
303 prepare_next_write_item(p
);
305 if (!p
->write
.current
)
308 assert(p
->write
.data
);
310 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
311 d
= (void*) p
->write
.descriptor
+ p
->write
.index
;
312 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
314 d
= (void*) p
->write
.data
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
315 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
318 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
323 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
324 assert(p
->write
.current
);
325 item_free(p
->write
.current
, (void *) 1);
326 p
->write
.current
= NULL
;
328 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
329 p
->drain_callback(p
, p
->drain_userdata
);
337 p
->die_callback(p
, p
->die_callback_userdata
);
340 static void do_read(struct pa_pstream
*p
) {
346 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
347 d
= (void*) p
->read
.descriptor
+ p
->read
.index
;
348 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
350 assert(p
->read
.data
);
351 d
= (void*) p
->read
.data
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
352 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
355 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
360 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
361 /* Reading of frame descriptor complete */
363 /* Frame size too large */
364 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) > FRAME_SIZE_MAX
)
367 assert(!p
->read
.packet
&& !p
->read
.memblock
);
369 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]) == (uint32_t) -1) {
370 /* Frame is a packet frame */
371 p
->read
.packet
= pa_packet_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
372 assert(p
->read
.packet
);
373 p
->read
.data
= p
->read
.packet
->data
;
375 /* Frame is a memblock frame */
376 p
->read
.memblock
= pa_memblock_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
377 assert(p
->read
.memblock
);
378 p
->read
.data
= p
->read
.memblock
->data
;
381 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
382 /* Frame payload available */
384 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) { /* Is this memblock data? Than pass it to the user */
387 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
390 struct pa_memchunk chunk
;
392 chunk
.memblock
= p
->read
.memblock
;
393 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
396 if (p
->recieve_memblock_callback
)
397 p
->recieve_memblock_callback(
399 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
400 (int32_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
]),
402 p
->recieve_memblock_callback_userdata
);
407 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
408 if (p
->read
.memblock
) {
409 assert(!p
->read
.packet
);
411 pa_memblock_unref(p
->read
.memblock
);
412 p
->read
.memblock
= NULL
;
414 assert(p
->read
.packet
);
416 if (p
->recieve_packet_callback
)
417 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->recieve_packet_callback_userdata
);
419 pa_packet_unref(p
->read
.packet
);
420 p
->read
.packet
= NULL
;
432 p
->die_callback(p
, p
->die_callback_userdata
);
436 void pa_pstream_set_die_callback(struct pa_pstream
*p
, void (*callback
)(struct pa_pstream
*p
, void *userdata
), void *userdata
) {
437 assert(p
&& callback
);
438 p
->die_callback
= callback
;
439 p
->die_callback_userdata
= userdata
;
442 int pa_pstream_is_pending(struct pa_pstream
*p
) {
448 return p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
451 void pa_pstream_set_drain_callback(struct pa_pstream
*p
, void (*cb
)(struct pa_pstream
*p
, void *userdata
), void *userdata
) {
454 p
->drain_callback
= cb
;
455 p
->drain_userdata
= userdata
;