]> code.delx.au - pulseaudio/blob - src/pstream.c
partial implementation of native protocol
[pulseaudio] / src / pstream.c
1 #include <stdlib.h>
2 #include <assert.h>
3 #include <netinet/in.h>
4
5 #include "pstream.h"
6 #include "queue.h"
7
8 enum pstream_descriptor_index {
9 PSTREAM_DESCRIPTOR_LENGTH,
10 PSTREAM_DESCRIPTOR_CHANNEL,
11 PSTREAM_DESCRIPTOR_DELTA,
12 PSTREAM_DESCRIPTOR_MAX
13 };
14
15 typedef uint32_t pstream_descriptor[PSTREAM_DESCRIPTOR_MAX];
16
17 #define PSTREAM_DESCRIPTOR_SIZE (PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
18 #define FRAME_SIZE_MAX (1024*64)
19
20 struct item_info {
21 enum { PSTREAM_ITEM_PACKET, PSTREAM_ITEM_MEMBLOCK } type;
22
23 /* memblock info */
24 struct memchunk chunk;
25 uint32_t channel;
26 int32_t delta;
27
28 /* packet info */
29 struct packet *packet;
30 };
31
32 struct pstream {
33 struct mainloop *mainloop;
34 struct mainloop_source *mainloop_source;
35 struct iochannel *io;
36 struct queue *send_queue;
37
38 int dead;
39 void (*die_callback) (struct pstream *p, void *userdad);
40 void *die_callback_userdata;
41
42 struct {
43 struct item_info* current;
44 pstream_descriptor descriptor;
45 void *data;
46 size_t index;
47 } write;
48
49 void (*send_callback) (struct pstream *p, void *userdata);
50 void *send_callback_userdata;
51
52 struct {
53 struct memblock *memblock;
54 struct packet *packet;
55 pstream_descriptor descriptor;
56 void *data;
57 size_t index;
58 } read;
59
60 int (*recieve_packet_callback) (struct pstream *p, struct packet *packet, void *userdata);
61 void *recieve_packet_callback_userdata;
62
63 int (*recieve_memblock_callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata);
64 void *recieve_memblock_callback_userdata;
65 };
66
67 static void do_write(struct pstream *p);
68 static void do_read(struct pstream *p);
69
70 static void io_callback(struct iochannel*io, void *userdata) {
71 struct pstream *p = userdata;
72 assert(p && p->io == io);
73 do_write(p);
74 do_read(p);
75 }
76
77 static void prepare_callback(struct mainloop_source *s, void*userdata) {
78 struct pstream *p = userdata;
79 assert(p && p->mainloop_source == s);
80 do_write(p);
81 do_read(p);
82 }
83
84 struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
85 struct pstream *p;
86 assert(io);
87
88 p = malloc(sizeof(struct pstream));
89 assert(p);
90
91 p->io = io;
92 iochannel_set_callback(io, io_callback, p);
93
94 p->dead = 0;
95 p->die_callback = NULL;
96 p->die_callback_userdata = NULL;
97
98 p->mainloop = m;
99 p->mainloop_source = mainloop_source_new_fixed(m, prepare_callback, p);
100 mainloop_source_enable(p->mainloop_source, 0);
101
102 p->send_queue = queue_new();
103 assert(p->send_queue);
104
105 p->write.current = NULL;
106 p->write.index = 0;
107
108 p->read.memblock = NULL;
109 p->read.packet = NULL;
110 p->read.index = 0;
111
112 p->send_callback = NULL;
113 p->send_callback_userdata = NULL;
114
115 p->recieve_packet_callback = NULL;
116 p->recieve_packet_callback_userdata = NULL;
117
118 p->recieve_memblock_callback = NULL;
119 p->recieve_memblock_callback_userdata = NULL;
120
121 return p;
122 }
123
124 static void item_free(void *item, void *p) {
125 struct item_info *i = item;
126 assert(i);
127
128 if (i->type == PSTREAM_ITEM_PACKET) {
129 assert(i->chunk.memblock);
130 memblock_unref(i->chunk.memblock);
131 } else {
132 assert(i->type == PSTREAM_ITEM_MEMBLOCK);
133 assert(i->packet);
134 packet_unref(i->packet);
135 }
136
137 free(i);
138 }
139
140 void pstream_free(struct pstream *p) {
141 assert(p);
142
143 iochannel_free(p->io);
144 queue_free(p->send_queue, item_free, NULL);
145
146 if (p->write.current)
147 item_free(p->write.current, NULL);
148
149 if (p->read.memblock)
150 memblock_unref(p->read.memblock);
151
152 if (p->read.packet)
153 packet_unref(p->read.packet);
154
155 mainloop_source_free(p->mainloop_source);
156 free(p);
157 }
158
159 void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata) {
160 assert(p && callback);
161
162 p->send_callback = callback;
163 p->send_callback_userdata = userdata;
164 }
165
166 void pstream_send_packet(struct pstream*p, struct packet *packet) {
167 struct item_info *i;
168 assert(p && packet);
169
170 i = malloc(sizeof(struct item_info));
171 assert(i);
172 i->type = PSTREAM_ITEM_PACKET;
173 i->packet = packet_ref(packet);
174
175 queue_push(p->send_queue, i);
176 mainloop_source_enable(p->mainloop_source, 1);
177 }
178
179 void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
180 struct item_info *i;
181 assert(p && channel && chunk);
182
183 i = malloc(sizeof(struct item_info));
184 assert(i);
185 i->type = PSTREAM_ITEM_MEMBLOCK;
186 i->chunk = *chunk;
187 i->channel = channel;
188 i->delta = delta;
189
190 memblock_ref(i->chunk.memblock);
191
192 queue_push(p->send_queue, i);
193 mainloop_source_enable(p->mainloop_source, 1);
194 }
195
196 void pstream_set_recieve_packet_callback(struct pstream *p, int (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) {
197 assert(p && callback);
198
199 p->recieve_packet_callback = callback;
200 p->recieve_packet_callback_userdata = userdata;
201 }
202
203 void pstream_set_recieve_memblock_callback(struct pstream *p, int (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata) {
204 assert(p && callback);
205
206 p->recieve_memblock_callback = callback;
207 p->recieve_memblock_callback_userdata = userdata;
208 }
209
210 static void prepare_next_write_item(struct pstream *p) {
211 assert(p);
212
213 if (!(p->write.current = queue_pop(p->send_queue)))
214 return;
215
216 p->write.index = 0;
217
218 if (p->write.current->type == PSTREAM_ITEM_PACKET) {
219 assert(p->write.current->packet);
220 p->write.data = p->write.current->packet->data;
221 p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
222 p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0;
223 p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;
224 } else {
225 assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
226 p->write.data = p->write.current->chunk.memblock->data + p->write.current->chunk.index;
227 p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
228 p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
229 p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = htonl(p->write.current->delta);
230 }
231 }
232
233 static void do_write(struct pstream *p) {
234 void *d;
235 size_t l;
236 ssize_t r;
237 assert(p);
238
239 mainloop_source_enable(p->mainloop_source, 0);
240
241 if (p->dead || !iochannel_is_writable(p->io))
242 return;
243
244 if (!p->write.current)
245 prepare_next_write_item(p);
246
247 if (!p->write.current)
248 return;
249
250 assert(p->write.data);
251
252 if (p->write.index < PSTREAM_DESCRIPTOR_SIZE) {
253 d = (void*) p->write.descriptor + p->write.index;
254 l = PSTREAM_DESCRIPTOR_SIZE - p->write.index;
255 } else {
256 d = (void*) p->write.data + p->write.index - PSTREAM_DESCRIPTOR_SIZE;
257 l = ntohl(p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - p->write.index - PSTREAM_DESCRIPTOR_SIZE;
258 }
259
260 if ((r = iochannel_write(p->io, d, l)) < 0)
261 goto die;
262
263 p->write.index += r;
264
265 if (p->write.index >= PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH])) {
266 assert(p->write.current);
267 item_free(p->write.current, (void *) 1);
268 p->write.current = NULL;
269
270 if (p->send_callback && queue_is_empty(p->send_queue))
271 p->send_callback(p, p->send_callback_userdata);
272 }
273
274 return;
275
276 die:
277 p->dead = 1;
278 if (p->die_callback)
279 p->die_callback(p, p->die_callback_userdata);
280 }
281
282 static void do_read(struct pstream *p) {
283 void *d;
284 size_t l;
285 ssize_t r;
286 assert(p);
287
288 mainloop_source_enable(p->mainloop_source, 0);
289
290 if (p->dead || !iochannel_is_readable(p->io))
291 return;
292
293 if (p->read.index < PSTREAM_DESCRIPTOR_SIZE) {
294 d = (void*) p->read.descriptor + p->read.index;
295 l = PSTREAM_DESCRIPTOR_SIZE - p->read.index;
296 } else {
297 assert(p->read.data);
298 d = (void*) p->read.data + p->read.index - PSTREAM_DESCRIPTOR_SIZE;
299 l = ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - p->read.index - PSTREAM_DESCRIPTOR_SIZE;
300 }
301
302 if ((r = iochannel_read(p->io, d, l)) <= 0)
303 goto die;
304
305 p->read.index += r;
306
307 if (p->read.index == PSTREAM_DESCRIPTOR_SIZE) {
308 /* Reading of frame descriptor complete */
309
310 /* Frame size too large */
311 if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX)
312 goto die;
313
314 assert(!p->read.packet && !p->read.memblock);
315
316 if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == 0) {
317 /* Frame is a packet frame */
318 p->read.packet = packet_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]));
319 assert(p->read.packet);
320 p->read.data = p->read.packet->data;
321 } else {
322 /* Frame is a memblock frame */
323 p->read.memblock = memblock_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]));
324 assert(p->read.memblock);
325 p->read.data = p->read.memblock->data;
326 }
327
328 } else if (p->read.index > PSTREAM_DESCRIPTOR_SIZE) {
329 /* Frame payload available */
330
331 if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
332 size_t l;
333
334 l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r;
335
336 if (l > 0) {
337 struct memchunk chunk;
338
339 chunk.memblock = p->read.memblock;
340 chunk.index = p->read.index - PSTREAM_DESCRIPTOR_SIZE - l;
341 chunk.length = l;
342
343 if (p->recieve_memblock_callback(p,
344 ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]),
345 (int32_t) ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_DELTA]),
346 &chunk,
347 p->recieve_memblock_callback_userdata) < 0)
348 goto die;
349 }
350 }
351
352 /* Frame complete */
353 if (p->read.index >= ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) + PSTREAM_DESCRIPTOR_SIZE) {
354 if (p->read.memblock) {
355 assert(!p->read.packet);
356
357 memblock_unref(p->read.memblock);
358 p->read.memblock = NULL;
359 } else {
360 int r = 0;
361 assert(p->read.packet);
362
363 if (p->recieve_packet_callback)
364 r = p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
365
366 packet_unref(p->read.packet);
367 p->read.packet = NULL;
368
369 if (r < 0)
370 goto die;
371 }
372
373 p->read.index = 0;
374 }
375 }
376
377 return;
378
379 die:
380 p->dead = 1;
381 if (p->die_callback)
382 p->die_callback(p, p->die_callback_userdata);
383
384 }
385
386 void pstream_set_die_callback(struct pstream *p, void (*callback)(struct pstream *p, void *userdata), void *userdata) {
387 assert(p && callback);
388 p->die_callback = callback;
389 p->die_callback_userdata = userdata;
390 }