4 This file is part of polypaudio.
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
29 #include <netinet/in.h>
36 enum pa_pstream_descriptor_index
{
37 PA_PSTREAM_DESCRIPTOR_LENGTH
,
38 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
39 PA_PSTREAM_DESCRIPTOR_DELTA
,
40 PA_PSTREAM_DESCRIPTOR_MAX
43 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
45 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
46 #define FRAME_SIZE_MAX (1024*500) /* half a megabyte */
49 enum { PA_PSTREAM_ITEM_PACKET
, PA_PSTREAM_ITEM_MEMBLOCK
} type
;
52 struct pa_memchunk chunk
;
57 struct pa_packet
*packet
;
63 struct pa_mainloop_api
*mainloop
;
64 struct pa_defer_event
*defer_event
;
65 struct pa_iochannel
*io
;
66 struct pa_queue
*send_queue
;
69 void (*die_callback
) (struct pa_pstream
*p
, void *userdata
);
70 void *die_callback_userdata
;
73 struct item_info
* current
;
74 pa_pstream_descriptor descriptor
;
80 struct pa_memblock
*memblock
;
81 struct pa_packet
*packet
;
82 pa_pstream_descriptor descriptor
;
87 void (*recieve_packet_callback
) (struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
);
88 void *recieve_packet_callback_userdata
;
90 void (*recieve_memblock_callback
) (struct pa_pstream
*p
, uint32_t channel
, uint32_t delta
, const struct pa_memchunk
*chunk
, void *userdata
);
91 void *recieve_memblock_callback_userdata
;
93 void (*drain_callback
)(struct pa_pstream
*p
, void *userdata
);
96 struct pa_memblock_stat
*memblock_stat
;
99 static void do_write(struct pa_pstream
*p
);
100 static void do_read(struct pa_pstream
*p
);
102 static void do_something(struct pa_pstream
*p
) {
104 p
->mainloop
->defer_enable(p
->defer_event
, 0);
108 if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
)) {
111 p
->die_callback(p
, p
->die_callback_userdata
);
114 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
))
117 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
))
123 static void io_callback(struct pa_iochannel
*io
, void *userdata
) {
124 struct pa_pstream
*p
= userdata
;
125 assert(p
&& p
->io
== io
);
129 static void defer_callback(struct pa_mainloop_api
*m
, struct pa_defer_event
*e
, void*userdata
) {
130 struct pa_pstream
*p
= userdata
;
131 assert(p
&& p
->defer_event
== e
&& p
->mainloop
== m
);
135 struct pa_pstream
*pa_pstream_new(struct pa_mainloop_api
*m
, struct pa_iochannel
*io
, struct pa_memblock_stat
*s
) {
136 struct pa_pstream
*p
;
139 p
= pa_xmalloc(sizeof(struct pa_pstream
));
142 pa_iochannel_set_callback(io
, io_callback
, p
);
145 p
->die_callback
= NULL
;
146 p
->die_callback_userdata
= NULL
;
149 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
150 m
->defer_enable(p
->defer_event
, 0);
152 p
->send_queue
= pa_queue_new();
153 assert(p
->send_queue
);
155 p
->write
.current
= NULL
;
158 p
->read
.memblock
= NULL
;
159 p
->read
.packet
= NULL
;
162 p
->recieve_packet_callback
= NULL
;
163 p
->recieve_packet_callback_userdata
= NULL
;
165 p
->recieve_memblock_callback
= NULL
;
166 p
->recieve_memblock_callback_userdata
= NULL
;
168 p
->drain_callback
= NULL
;
169 p
->drain_userdata
= NULL
;
171 p
->memblock_stat
= s
;
173 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
174 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
179 static void item_free(void *item
, void *p
) {
180 struct item_info
*i
= item
;
183 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
184 assert(i
->chunk
.memblock
);
185 pa_memblock_unref(i
->chunk
.memblock
);
187 assert(i
->type
== PA_PSTREAM_ITEM_PACKET
);
189 pa_packet_unref(i
->packet
);
195 static void pstream_free(struct pa_pstream
*p
) {
200 pa_queue_free(p
->send_queue
, item_free
, NULL
);
202 if (p
->write
.current
)
203 item_free(p
->write
.current
, NULL
);
205 if (p
->read
.memblock
)
206 pa_memblock_unref(p
->read
.memblock
);
209 pa_packet_unref(p
->read
.packet
);
214 void pa_pstream_send_packet(struct pa_pstream
*p
, struct pa_packet
*packet
) {
218 /*pa_log(__FILE__": push-packet %p\n", packet);*/
220 i
= pa_xmalloc(sizeof(struct item_info
));
221 i
->type
= PA_PSTREAM_ITEM_PACKET
;
222 i
->packet
= pa_packet_ref(packet
);
224 pa_queue_push(p
->send_queue
, i
);
225 p
->mainloop
->defer_enable(p
->defer_event
, 1);
228 void pa_pstream_send_memblock(struct pa_pstream
*p
, uint32_t channel
, uint32_t delta
, const struct pa_memchunk
*chunk
) {
230 assert(p
&& channel
!= (uint32_t) -1 && chunk
);
232 i
= pa_xmalloc(sizeof(struct item_info
));
233 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
235 i
->channel
= channel
;
238 pa_memblock_ref(i
->chunk
.memblock
);
240 pa_queue_push(p
->send_queue
, i
);
241 p
->mainloop
->defer_enable(p
->defer_event
, 1);
244 void pa_pstream_set_recieve_packet_callback(struct pa_pstream
*p
, void (*callback
) (struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
), void *userdata
) {
245 assert(p
&& callback
);
247 p
->recieve_packet_callback
= callback
;
248 p
->recieve_packet_callback_userdata
= userdata
;
251 void pa_pstream_set_recieve_memblock_callback(struct pa_pstream
*p
, void (*callback
) (struct pa_pstream
*p
, uint32_t channel
, uint32_t delta
, const struct pa_memchunk
*chunk
, void *userdata
), void *userdata
) {
252 assert(p
&& callback
);
254 p
->recieve_memblock_callback
= callback
;
255 p
->recieve_memblock_callback_userdata
= userdata
;
258 static void prepare_next_write_item(struct pa_pstream
*p
) {
261 if (!(p
->write
.current
= pa_queue_pop(p
->send_queue
)))
266 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
267 /*pa_log(__FILE__": pop-packet %p\n", p->write.current->packet);*/
269 assert(p
->write
.current
->packet
);
270 p
->write
.data
= p
->write
.current
->packet
->data
;
271 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
272 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
273 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
] = 0;
275 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
&& p
->write
.current
->chunk
.memblock
);
276 p
->write
.data
= (uint8_t*) p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
277 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
278 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
279 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
] = htonl(p
->write
.current
->delta
);
283 static void do_write(struct pa_pstream
*p
) {
289 if (!p
->write
.current
)
290 prepare_next_write_item(p
);
292 if (!p
->write
.current
)
295 assert(p
->write
.data
);
297 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
298 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
299 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
301 d
= (uint8_t*) p
->write
.data
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
302 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
305 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
310 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
311 assert(p
->write
.current
);
312 item_free(p
->write
.current
, (void *) 1);
313 p
->write
.current
= NULL
;
315 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
316 p
->drain_callback(p
, p
->drain_userdata
);
324 p
->die_callback(p
, p
->die_callback_userdata
);
327 static void do_read(struct pa_pstream
*p
) {
333 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
334 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
335 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
337 assert(p
->read
.data
);
338 d
= (uint8_t*) p
->read
.data
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
339 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
342 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
347 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
348 /* Reading of frame descriptor complete */
350 /* Frame size too large */
351 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) > FRAME_SIZE_MAX
) {
352 pa_log(__FILE__
": Frame size too large\n");
356 assert(!p
->read
.packet
&& !p
->read
.memblock
);
358 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]) == (uint32_t) -1) {
359 /* Frame is a packet frame */
360 p
->read
.packet
= pa_packet_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
361 assert(p
->read
.packet
);
362 p
->read
.data
= p
->read
.packet
->data
;
364 /* Frame is a memblock frame */
365 p
->read
.memblock
= pa_memblock_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]), p
->memblock_stat
);
366 assert(p
->read
.memblock
);
367 p
->read
.data
= p
->read
.memblock
->data
;
370 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
371 /* Frame payload available */
373 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) { /* Is this memblock data? Than pass it to the user */
376 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
379 struct pa_memchunk chunk
;
381 chunk
.memblock
= p
->read
.memblock
;
382 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
385 if (p
->recieve_memblock_callback
)
386 p
->recieve_memblock_callback(
388 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
389 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
]),
391 p
->recieve_memblock_callback_userdata
);
396 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
397 if (p
->read
.memblock
) {
398 assert(!p
->read
.packet
);
400 pa_memblock_unref(p
->read
.memblock
);
401 p
->read
.memblock
= NULL
;
403 assert(p
->read
.packet
);
405 if (p
->recieve_packet_callback
)
406 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->recieve_packet_callback_userdata
);
408 pa_packet_unref(p
->read
.packet
);
409 p
->read
.packet
= NULL
;
421 p
->die_callback(p
, p
->die_callback_userdata
);
425 void pa_pstream_set_die_callback(struct pa_pstream
*p
, void (*callback
)(struct pa_pstream
*p
, void *userdata
), void *userdata
) {
426 assert(p
&& callback
);
427 p
->die_callback
= callback
;
428 p
->die_callback_userdata
= userdata
;
431 int pa_pstream_is_pending(struct pa_pstream
*p
) {
437 return p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
440 void pa_pstream_set_drain_callback(struct pa_pstream
*p
, void (*cb
)(struct pa_pstream
*p
, void *userdata
), void *userdata
) {
443 p
->drain_callback
= cb
;
444 p
->drain_userdata
= userdata
;
447 void pa_pstream_unref(struct pa_pstream
*p
) {
448 assert(p
&& p
->ref
>= 1);
454 struct pa_pstream
* pa_pstream_ref(struct pa_pstream
*p
) {
455 assert(p
&& p
->ref
>= 1);
460 void pa_pstream_close(struct pa_pstream
*p
) {
466 pa_iochannel_free(p
->io
);
470 if (p
->defer_event
) {
471 p
->mainloop
->defer_free(p
->defer_event
);
472 p
->defer_event
= NULL
;
475 p
->die_callback
= NULL
;
476 p
->drain_callback
= NULL
;
477 p
->recieve_packet_callback
= NULL
;
478 p
->recieve_memblock_callback
= NULL
;