]> code.delx.au - pulseaudio/blob - polyp/pstream.c
improve sync clock change
[pulseaudio] / polyp / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30 #include <netinet/in.h>
31
32 #include "pstream.h"
33 #include "queue.h"
34 #include "xmalloc.h"
35 #include "log.h"
36
37 enum pa_pstream_descriptor_index {
38 PA_PSTREAM_DESCRIPTOR_LENGTH,
39 PA_PSTREAM_DESCRIPTOR_CHANNEL,
40 PA_PSTREAM_DESCRIPTOR_DELTA,
41 PA_PSTREAM_DESCRIPTOR_MAX
42 };
43
44 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
45
46 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
47 #define FRAME_SIZE_MAX (1024*500) /* half a megabyte */
48
49 struct item_info {
50 enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
51
52 /* memblock info */
53 struct pa_memchunk chunk;
54 uint32_t channel;
55 uint32_t delta;
56
57 /* packet info */
58 struct pa_packet *packet;
59 };
60
61 struct pa_pstream {
62 int ref;
63
64 struct pa_mainloop_api *mainloop;
65 struct pa_defer_event *defer_event;
66 struct pa_iochannel *io;
67 struct pa_queue *send_queue;
68
69 int dead;
70 void (*die_callback) (struct pa_pstream *p, void *userdata);
71 void *die_callback_userdata;
72
73 struct {
74 struct item_info* current;
75 pa_pstream_descriptor descriptor;
76 void *data;
77 size_t index;
78 } write;
79
80 struct {
81 struct pa_memblock *memblock;
82 struct pa_packet *packet;
83 pa_pstream_descriptor descriptor;
84 void *data;
85 size_t index;
86 } read;
87
88 void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
89 void *recieve_packet_callback_userdata;
90
91 void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata);
92 void *recieve_memblock_callback_userdata;
93
94 void (*drain_callback)(struct pa_pstream *p, void *userdata);
95 void *drain_userdata;
96
97 struct pa_memblock_stat *memblock_stat;
98 };
99
100 static void do_write(struct pa_pstream *p);
101 static void do_read(struct pa_pstream *p);
102
103 static void do_something(struct pa_pstream *p) {
104 assert(p);
105
106 p->mainloop->defer_enable(p->defer_event, 0);
107
108 pa_pstream_ref(p);
109
110 if (!p->dead && pa_iochannel_is_readable(p->io))
111 do_read(p);
112
113 if (!p->dead && pa_iochannel_is_writable(p->io))
114 do_write(p);
115
116 /* In case the line was hungup, make sure to rerun this function
117 as soon as possible, until all data has been read. */
118
119 if (!p->dead && pa_iochannel_is_hungup(p->io))
120 p->mainloop->defer_enable(p->defer_event, 1);
121
122 pa_pstream_unref(p);
123 }
124
125 static void io_callback(struct pa_iochannel*io, void *userdata) {
126 struct pa_pstream *p = userdata;
127 assert(p && p->io == io);
128 do_something(p);
129 }
130
131 static void defer_callback(struct pa_mainloop_api *m, struct pa_defer_event *e, void*userdata) {
132 struct pa_pstream *p = userdata;
133 assert(p && p->defer_event == e && p->mainloop == m);
134 do_something(p);
135 }
136
137 struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io, struct pa_memblock_stat *s) {
138 struct pa_pstream *p;
139 assert(io);
140
141 p = pa_xmalloc(sizeof(struct pa_pstream));
142 p->ref = 1;
143 p->io = io;
144 pa_iochannel_set_callback(io, io_callback, p);
145
146 p->dead = 0;
147 p->die_callback = NULL;
148 p->die_callback_userdata = NULL;
149
150 p->mainloop = m;
151 p->defer_event = m->defer_new(m, defer_callback, p);
152 m->defer_enable(p->defer_event, 0);
153
154 p->send_queue = pa_queue_new();
155 assert(p->send_queue);
156
157 p->write.current = NULL;
158 p->write.index = 0;
159
160 p->read.memblock = NULL;
161 p->read.packet = NULL;
162 p->read.index = 0;
163
164 p->recieve_packet_callback = NULL;
165 p->recieve_packet_callback_userdata = NULL;
166
167 p->recieve_memblock_callback = NULL;
168 p->recieve_memblock_callback_userdata = NULL;
169
170 p->drain_callback = NULL;
171 p->drain_userdata = NULL;
172
173 p->memblock_stat = s;
174
175 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
176 pa_iochannel_socket_set_sndbuf(io, 1024*8);
177
178 return p;
179 }
180
181 static void item_free(void *item, void *p) {
182 struct item_info *i = item;
183 assert(i);
184
185 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
186 assert(i->chunk.memblock);
187 pa_memblock_unref(i->chunk.memblock);
188 } else {
189 assert(i->type == PA_PSTREAM_ITEM_PACKET);
190 assert(i->packet);
191 pa_packet_unref(i->packet);
192 }
193
194 pa_xfree(i);
195 }
196
197 static void pstream_free(struct pa_pstream *p) {
198 assert(p);
199
200 pa_pstream_close(p);
201
202 pa_queue_free(p->send_queue, item_free, NULL);
203
204 if (p->write.current)
205 item_free(p->write.current, NULL);
206
207 if (p->read.memblock)
208 pa_memblock_unref(p->read.memblock);
209
210 if (p->read.packet)
211 pa_packet_unref(p->read.packet);
212
213 pa_xfree(p);
214 }
215
216 void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
217 struct item_info *i;
218 assert(p && packet && p->ref >= 1);
219
220 if (p->dead)
221 return;
222
223 /* pa_log(__FILE__": push-packet %p\n", packet); */
224
225 i = pa_xmalloc(sizeof(struct item_info));
226 i->type = PA_PSTREAM_ITEM_PACKET;
227 i->packet = pa_packet_ref(packet);
228
229 pa_queue_push(p->send_queue, i);
230 p->mainloop->defer_enable(p->defer_event, 1);
231 }
232
233 void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk) {
234 struct item_info *i;
235 assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
236
237 if (p->dead)
238 return;
239
240 /* pa_log(__FILE__": push-memblock %p\n", chunk); */
241
242 i = pa_xmalloc(sizeof(struct item_info));
243 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
244 i->chunk = *chunk;
245 i->channel = channel;
246 i->delta = delta;
247
248 pa_memblock_ref(i->chunk.memblock);
249
250 pa_queue_push(p->send_queue, i);
251 p->mainloop->defer_enable(p->defer_event, 1);
252 }
253
254 void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) {
255 assert(p && callback);
256
257 p->recieve_packet_callback = callback;
258 p->recieve_packet_callback_userdata = userdata;
259 }
260
261 void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
262 assert(p && callback);
263
264 p->recieve_memblock_callback = callback;
265 p->recieve_memblock_callback_userdata = userdata;
266 }
267
268 static void prepare_next_write_item(struct pa_pstream *p) {
269 assert(p);
270
271 if (!(p->write.current = pa_queue_pop(p->send_queue)))
272 return;
273
274 p->write.index = 0;
275
276 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
277 /*pa_log(__FILE__": pop-packet %p\n", p->write.current->packet);*/
278
279 assert(p->write.current->packet);
280 p->write.data = p->write.current->packet->data;
281 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
282 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
283 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA] = 0;
284 } else {
285 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
286 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
287 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
288 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
289 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA] = htonl(p->write.current->delta);
290 }
291 }
292
293 static void do_write(struct pa_pstream *p) {
294 void *d;
295 size_t l;
296 ssize_t r;
297 assert(p);
298
299 if (!p->write.current)
300 prepare_next_write_item(p);
301
302 if (!p->write.current)
303 return;
304
305 assert(p->write.data);
306
307 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
308 d = (uint8_t*) p->write.descriptor + p->write.index;
309 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
310 } else {
311 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
312 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
313 }
314
315 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
316 goto die;
317
318 p->write.index += r;
319
320 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
321 assert(p->write.current);
322 item_free(p->write.current, (void *) 1);
323 p->write.current = NULL;
324
325 if (p->drain_callback && !pa_pstream_is_pending(p))
326 p->drain_callback(p, p->drain_userdata);
327 }
328
329 return;
330
331 die:
332 p->dead = 1;
333 if (p->die_callback)
334 p->die_callback(p, p->die_callback_userdata);
335 }
336
337 static void do_read(struct pa_pstream *p) {
338 void *d;
339 size_t l;
340 ssize_t r;
341 assert(p);
342
343 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
344 d = (uint8_t*) p->read.descriptor + p->read.index;
345 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
346 } else {
347 assert(p->read.data);
348 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
349 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
350 }
351
352 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
353 goto die;
354
355 p->read.index += r;
356
357 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
358 /* Reading of frame descriptor complete */
359
360 /* Frame size too large */
361 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
362 pa_log(__FILE__": Frame size too large\n");
363 goto die;
364 }
365
366 assert(!p->read.packet && !p->read.memblock);
367
368 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
369 /* Frame is a packet frame */
370 p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
371 assert(p->read.packet);
372 p->read.data = p->read.packet->data;
373 } else {
374 /* Frame is a memblock frame */
375 p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
376 assert(p->read.memblock);
377 p->read.data = p->read.memblock->data;
378 }
379
380 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
381 /* Frame payload available */
382
383 if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
384 size_t l;
385
386 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
387
388 if (l > 0) {
389 struct pa_memchunk chunk;
390
391 chunk.memblock = p->read.memblock;
392 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
393 chunk.length = l;
394
395 if (p->recieve_memblock_callback)
396 p->recieve_memblock_callback(
397 p,
398 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
399 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
400 &chunk,
401 p->recieve_memblock_callback_userdata);
402 }
403 }
404
405 /* Frame complete */
406 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
407 if (p->read.memblock) {
408 assert(!p->read.packet);
409
410 pa_memblock_unref(p->read.memblock);
411 p->read.memblock = NULL;
412 } else {
413 assert(p->read.packet);
414
415 if (p->recieve_packet_callback)
416 p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
417
418 pa_packet_unref(p->read.packet);
419 p->read.packet = NULL;
420 }
421
422 p->read.index = 0;
423 }
424 }
425
426 return;
427
428 die:
429 p->dead = 1;
430 if (p->die_callback)
431 p->die_callback(p, p->die_callback_userdata);
432
433 }
434
435 void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata) {
436 assert(p && callback);
437 p->die_callback = callback;
438 p->die_callback_userdata = userdata;
439 }
440
441 int pa_pstream_is_pending(struct pa_pstream *p) {
442 assert(p);
443
444 if (p->dead)
445 return 0;
446
447 return p->write.current || !pa_queue_is_empty(p->send_queue);
448 }
449
450 void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata) {
451 assert(p);
452
453 p->drain_callback = cb;
454 p->drain_userdata = userdata;
455 }
456
457 void pa_pstream_unref(struct pa_pstream*p) {
458 assert(p && p->ref >= 1);
459
460 if (!(--(p->ref)))
461 pstream_free(p);
462 }
463
464 struct pa_pstream* pa_pstream_ref(struct pa_pstream*p) {
465 assert(p && p->ref >= 1);
466 p->ref++;
467 return p;
468 }
469
470 void pa_pstream_close(struct pa_pstream *p) {
471 assert(p);
472
473 p->dead = 1;
474
475 if (p->io) {
476 pa_iochannel_free(p->io);
477 p->io = NULL;
478 }
479
480 if (p->defer_event) {
481 p->mainloop->defer_free(p->defer_event);
482 p->defer_event = NULL;
483 }
484
485 p->die_callback = NULL;
486 p->drain_callback = NULL;
487 p->recieve_packet_callback = NULL;
488 p->recieve_memblock_callback = NULL;
489 }