]> code.delx.au - pulseaudio/blob - polyp/pstream.c
5fe2b4e75791e3f712390d6e5584501faf3e629f
[pulseaudio] / polyp / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30 #include <netinet/in.h>
31
32 #include "pstream.h"
33 #include "queue.h"
34 #include "xmalloc.h"
35 #include "log.h"
36
37 enum pa_pstream_descriptor_index {
38 PA_PSTREAM_DESCRIPTOR_LENGTH,
39 PA_PSTREAM_DESCRIPTOR_CHANNEL,
40 PA_PSTREAM_DESCRIPTOR_DELTA,
41 PA_PSTREAM_DESCRIPTOR_MAX
42 };
43
44 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
45
46 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
47 #define FRAME_SIZE_MAX (1024*500) /* half a megabyte */
48
49 struct item_info {
50 enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
51
52 /* memblock info */
53 struct pa_memchunk chunk;
54 uint32_t channel;
55 uint32_t delta;
56
57 /* packet info */
58 struct pa_packet *packet;
59 };
60
61 struct pa_pstream {
62 int ref;
63
64 struct pa_mainloop_api *mainloop;
65 struct pa_defer_event *defer_event;
66 struct pa_iochannel *io;
67 struct pa_queue *send_queue;
68
69 int dead;
70 void (*die_callback) (struct pa_pstream *p, void *userdata);
71 void *die_callback_userdata;
72
73 struct {
74 struct item_info* current;
75 pa_pstream_descriptor descriptor;
76 void *data;
77 size_t index;
78 } write;
79
80 struct {
81 struct pa_memblock *memblock;
82 struct pa_packet *packet;
83 pa_pstream_descriptor descriptor;
84 void *data;
85 size_t index;
86 } read;
87
88 void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
89 void *recieve_packet_callback_userdata;
90
91 void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata);
92 void *recieve_memblock_callback_userdata;
93
94 void (*drain_callback)(struct pa_pstream *p, void *userdata);
95 void *drain_userdata;
96
97 struct pa_memblock_stat *memblock_stat;
98 };
99
100 static void do_write(struct pa_pstream *p);
101 static void do_read(struct pa_pstream *p);
102
103 static void do_something(struct pa_pstream *p) {
104 assert(p);
105
106 if (p->dead)
107 return;
108
109 p->mainloop->defer_enable(p->defer_event, 0);
110
111 pa_pstream_ref(p);
112
113 if (!p->dead && pa_iochannel_is_hungup(p->io)) {
114 p->dead = 1;
115 if (p->die_callback)
116 p->die_callback(p, p->die_callback_userdata);
117 }
118
119 if (!p->dead && pa_iochannel_is_writable(p->io))
120 do_write(p);
121
122 if (!p->dead && pa_iochannel_is_readable(p->io))
123 do_read(p);
124
125 pa_pstream_unref(p);
126 }
127
128 static void io_callback(struct pa_iochannel*io, void *userdata) {
129 struct pa_pstream *p = userdata;
130 assert(p && p->io == io);
131 do_something(p);
132 }
133
134 static void defer_callback(struct pa_mainloop_api *m, struct pa_defer_event *e, void*userdata) {
135 struct pa_pstream *p = userdata;
136 assert(p && p->defer_event == e && p->mainloop == m);
137 do_something(p);
138 }
139
140 struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io, struct pa_memblock_stat *s) {
141 struct pa_pstream *p;
142 assert(io);
143
144 p = pa_xmalloc(sizeof(struct pa_pstream));
145 p->ref = 1;
146 p->io = io;
147 pa_iochannel_set_callback(io, io_callback, p);
148
149 p->dead = 0;
150 p->die_callback = NULL;
151 p->die_callback_userdata = NULL;
152
153 p->mainloop = m;
154 p->defer_event = m->defer_new(m, defer_callback, p);
155 m->defer_enable(p->defer_event, 0);
156
157 p->send_queue = pa_queue_new();
158 assert(p->send_queue);
159
160 p->write.current = NULL;
161 p->write.index = 0;
162
163 p->read.memblock = NULL;
164 p->read.packet = NULL;
165 p->read.index = 0;
166
167 p->recieve_packet_callback = NULL;
168 p->recieve_packet_callback_userdata = NULL;
169
170 p->recieve_memblock_callback = NULL;
171 p->recieve_memblock_callback_userdata = NULL;
172
173 p->drain_callback = NULL;
174 p->drain_userdata = NULL;
175
176 p->memblock_stat = s;
177
178 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
179 pa_iochannel_socket_set_sndbuf(io, 1024*8);
180
181 return p;
182 }
183
184 static void item_free(void *item, void *p) {
185 struct item_info *i = item;
186 assert(i);
187
188 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
189 assert(i->chunk.memblock);
190 pa_memblock_unref(i->chunk.memblock);
191 } else {
192 assert(i->type == PA_PSTREAM_ITEM_PACKET);
193 assert(i->packet);
194 pa_packet_unref(i->packet);
195 }
196
197 pa_xfree(i);
198 }
199
200 static void pstream_free(struct pa_pstream *p) {
201 assert(p);
202
203 pa_pstream_close(p);
204
205 pa_queue_free(p->send_queue, item_free, NULL);
206
207 if (p->write.current)
208 item_free(p->write.current, NULL);
209
210 if (p->read.memblock)
211 pa_memblock_unref(p->read.memblock);
212
213 if (p->read.packet)
214 pa_packet_unref(p->read.packet);
215
216 pa_xfree(p);
217 }
218
219 void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
220 struct item_info *i;
221 assert(p && packet && p->ref >= 1);
222
223 if (p->dead)
224 return;
225
226 /* pa_log(__FILE__": push-packet %p\n", packet); */
227
228 i = pa_xmalloc(sizeof(struct item_info));
229 i->type = PA_PSTREAM_ITEM_PACKET;
230 i->packet = pa_packet_ref(packet);
231
232 pa_queue_push(p->send_queue, i);
233 p->mainloop->defer_enable(p->defer_event, 1);
234 }
235
236 void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk) {
237 struct item_info *i;
238 assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
239
240 if (p->dead)
241 return;
242
243 /* pa_log(__FILE__": push-memblock %p\n", chunk); */
244
245 i = pa_xmalloc(sizeof(struct item_info));
246 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
247 i->chunk = *chunk;
248 i->channel = channel;
249 i->delta = delta;
250
251 pa_memblock_ref(i->chunk.memblock);
252
253 pa_queue_push(p->send_queue, i);
254 p->mainloop->defer_enable(p->defer_event, 1);
255 }
256
257 void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) {
258 assert(p && callback);
259
260 p->recieve_packet_callback = callback;
261 p->recieve_packet_callback_userdata = userdata;
262 }
263
264 void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
265 assert(p && callback);
266
267 p->recieve_memblock_callback = callback;
268 p->recieve_memblock_callback_userdata = userdata;
269 }
270
271 static void prepare_next_write_item(struct pa_pstream *p) {
272 assert(p);
273
274 if (!(p->write.current = pa_queue_pop(p->send_queue)))
275 return;
276
277 p->write.index = 0;
278
279 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
280 /*pa_log(__FILE__": pop-packet %p\n", p->write.current->packet);*/
281
282 assert(p->write.current->packet);
283 p->write.data = p->write.current->packet->data;
284 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
285 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
286 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA] = 0;
287 } else {
288 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
289 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
290 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
291 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
292 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA] = htonl(p->write.current->delta);
293 }
294 }
295
296 static void do_write(struct pa_pstream *p) {
297 void *d;
298 size_t l;
299 ssize_t r;
300 assert(p);
301
302 if (!p->write.current)
303 prepare_next_write_item(p);
304
305 if (!p->write.current)
306 return;
307
308 assert(p->write.data);
309
310 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
311 d = (uint8_t*) p->write.descriptor + p->write.index;
312 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
313 } else {
314 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
315 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
316 }
317
318 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
319 goto die;
320
321 p->write.index += r;
322
323 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
324 assert(p->write.current);
325 item_free(p->write.current, (void *) 1);
326 p->write.current = NULL;
327
328 if (p->drain_callback && !pa_pstream_is_pending(p))
329 p->drain_callback(p, p->drain_userdata);
330 }
331
332 return;
333
334 die:
335 p->dead = 1;
336 if (p->die_callback)
337 p->die_callback(p, p->die_callback_userdata);
338 }
339
340 static void do_read(struct pa_pstream *p) {
341 void *d;
342 size_t l;
343 ssize_t r;
344 assert(p);
345
346 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
347 d = (uint8_t*) p->read.descriptor + p->read.index;
348 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
349 } else {
350 assert(p->read.data);
351 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
352 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
353 }
354
355 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
356 goto die;
357
358 p->read.index += r;
359
360 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
361 /* Reading of frame descriptor complete */
362
363 /* Frame size too large */
364 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
365 pa_log(__FILE__": Frame size too large\n");
366 goto die;
367 }
368
369 assert(!p->read.packet && !p->read.memblock);
370
371 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
372 /* Frame is a packet frame */
373 p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
374 assert(p->read.packet);
375 p->read.data = p->read.packet->data;
376 } else {
377 /* Frame is a memblock frame */
378 p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
379 assert(p->read.memblock);
380 p->read.data = p->read.memblock->data;
381 }
382
383 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
384 /* Frame payload available */
385
386 if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
387 size_t l;
388
389 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
390
391 if (l > 0) {
392 struct pa_memchunk chunk;
393
394 chunk.memblock = p->read.memblock;
395 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
396 chunk.length = l;
397
398 if (p->recieve_memblock_callback)
399 p->recieve_memblock_callback(
400 p,
401 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
402 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
403 &chunk,
404 p->recieve_memblock_callback_userdata);
405 }
406 }
407
408 /* Frame complete */
409 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
410 if (p->read.memblock) {
411 assert(!p->read.packet);
412
413 pa_memblock_unref(p->read.memblock);
414 p->read.memblock = NULL;
415 } else {
416 assert(p->read.packet);
417
418 if (p->recieve_packet_callback)
419 p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
420
421 pa_packet_unref(p->read.packet);
422 p->read.packet = NULL;
423 }
424
425 p->read.index = 0;
426 }
427 }
428
429 return;
430
431 die:
432 p->dead = 1;
433 if (p->die_callback)
434 p->die_callback(p, p->die_callback_userdata);
435
436 }
437
438 void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata) {
439 assert(p && callback);
440 p->die_callback = callback;
441 p->die_callback_userdata = userdata;
442 }
443
444 int pa_pstream_is_pending(struct pa_pstream *p) {
445 assert(p);
446
447 if (p->dead)
448 return 0;
449
450 return p->write.current || !pa_queue_is_empty(p->send_queue);
451 }
452
453 void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata) {
454 assert(p);
455
456 p->drain_callback = cb;
457 p->drain_userdata = userdata;
458 }
459
460 void pa_pstream_unref(struct pa_pstream*p) {
461 assert(p && p->ref >= 1);
462
463 if (!(--(p->ref)))
464 pstream_free(p);
465 }
466
467 struct pa_pstream* pa_pstream_ref(struct pa_pstream*p) {
468 assert(p && p->ref >= 1);
469 p->ref++;
470 return p;
471 }
472
473 void pa_pstream_close(struct pa_pstream *p) {
474 assert(p);
475
476 p->dead = 1;
477
478 if (p->io) {
479 pa_iochannel_free(p->io);
480 p->io = NULL;
481 }
482
483 if (p->defer_event) {
484 p->mainloop->defer_free(p->defer_event);
485 p->defer_event = NULL;
486 }
487
488 p->die_callback = NULL;
489 p->drain_callback = NULL;
490 p->recieve_packet_callback = NULL;
491 p->recieve_memblock_callback = NULL;
492 }