9 #include "sourceoutput.h"
10 #include "protocol-simple.h"
14 struct protocol_simple
*protocol
;
16 struct sink_input
*sink_input
;
17 struct source_output
*source_output
;
18 struct client
*client
;
19 struct memblockq
*input_memblockq
, *output_memblockq
;
22 struct protocol_simple
{
24 struct socket_server
*server
;
25 struct idxset
*connections
;
26 enum protocol_simple_mode mode
;
29 #define BUFSIZE PIPE_BUF
31 static void free_connection(void *data
, void *userdata
) {
32 struct connection
*c
= data
;
36 sink_input_free(c
->sink_input
);
38 source_output_free(c
->source_output
);
40 client_free(c
->client
);
42 iochannel_free(c
->io
);
43 if (c
->input_memblockq
)
44 memblockq_free(c
->input_memblockq
);
45 if (c
->output_memblockq
)
46 memblockq_free(c
->output_memblockq
);
50 static void destroy_connection(struct connection
*c
) {
51 assert(c
&& c
->protocol
);
52 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
53 free_connection(c
, NULL
);
56 static int do_read(struct connection
*c
) {
57 struct memchunk chunk
;
60 if (!iochannel_is_readable(c
->io
))
63 if (!c
->sink_input
|| !memblockq_is_writable(c
->input_memblockq
, BUFSIZE
))
66 chunk
.memblock
= memblock_new(BUFSIZE
);
67 assert(chunk
.memblock
);
69 if ((r
= iochannel_read(c
->io
, chunk
.memblock
->data
, BUFSIZE
)) <= 0) {
70 fprintf(stderr
, "read(): %s\n", r
== 0 ? "EOF" : strerror(errno
));
71 memblock_unref(chunk
.memblock
);
75 chunk
.memblock
->length
= r
;
79 assert(c
->input_memblockq
);
80 memblockq_push(c
->input_memblockq
, &chunk
, 0);
81 memblock_unref(chunk
.memblock
);
82 assert(c
->sink_input
);
83 sink_notify(c
->sink_input
->sink
);
88 static int do_write(struct connection
*c
) {
89 struct memchunk chunk
;
92 if (!iochannel_is_writable(c
->io
))
95 if (!c
->source_output
)
98 assert(c
->output_memblockq
);
99 if (memblockq_peek(c
->output_memblockq
, &chunk
) < 0)
102 assert(chunk
.memblock
&& chunk
.length
);
104 if ((r
= iochannel_write(c
->io
, chunk
.memblock
->data
+chunk
.index
, chunk
.length
)) < 0) {
105 fprintf(stderr
, "write(): %s\n", strerror(errno
));
106 memblock_unref(chunk
.memblock
);
110 memblockq_drop(c
->output_memblockq
, r
);
111 memblock_unref(chunk
.memblock
);
115 /*** sink_input callbacks ***/
117 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
) {
118 struct connection
*c
= i
->userdata
;
119 assert(i
&& c
&& chunk
);
121 if (memblockq_peek(c
->input_memblockq
, chunk
) < 0)
127 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
128 struct connection
*c
= i
->userdata
;
129 assert(i
&& c
&& length
);
131 memblockq_drop(c
->input_memblockq
, length
);
134 destroy_connection(c
);
137 static void sink_input_kill_cb(struct sink_input
*i
) {
138 assert(i
&& i
->userdata
);
139 destroy_connection((struct connection
*) i
->userdata
);
143 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
) {
144 struct connection
*c
= i
->userdata
;
146 return samples_usec(memblockq_get_length(c
->input_memblockq
), &DEFAULT_SAMPLE_SPEC
);
149 /*** source_output callbacks ***/
151 static void source_output_push_cb(struct source_output
*o
, struct memchunk
*chunk
) {
152 struct connection
*c
= o
->userdata
;
153 assert(o
&& c
&& chunk
);
155 memblockq_push(c
->output_memblockq
, chunk
, 0);
158 destroy_connection(c
);
161 static void source_output_kill_cb(struct source_output
*o
) {
162 assert(o
&& o
->userdata
);
163 destroy_connection((struct connection
*) o
->userdata
);
166 /*** client callbacks ***/
168 static void client_kill_cb(struct client
*c
) {
169 assert(c
&& c
->userdata
);
170 destroy_connection((struct connection
*) c
->userdata
);
173 /*** iochannel callbacks ***/
175 static void io_callback(struct iochannel
*io
, void *userdata
) {
176 struct connection
*c
= userdata
;
177 assert(io
&& c
&& c
->io
== io
);
179 if (do_read(c
) < 0 || do_write(c
) < 0)
180 destroy_connection(c
);
183 /*** socket_server callbacks */
185 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
186 struct protocol_simple
*p
= userdata
;
187 struct connection
*c
= NULL
;
188 assert(s
&& io
&& p
);
190 c
= malloc(sizeof(struct connection
));
193 c
->sink_input
= NULL
;
194 c
->source_output
= NULL
;
195 c
->input_memblockq
= c
->output_memblockq
= NULL
;
198 c
->client
= client_new(p
->core
, "SIMPLE", "Client");
200 c
->client
->kill
= client_kill_cb
;
201 c
->client
->userdata
= c
;
203 if (p
->mode
& PROTOCOL_SIMPLE_RECORD
) {
204 struct source
*source
;
207 if (!(source
= source_get_default(p
->core
))) {
208 fprintf(stderr
, "Failed to get default source.\n");
212 c
->source_output
= source_output_new(source
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
213 assert(c
->source_output
);
214 c
->source_output
->push
= source_output_push_cb
;
215 c
->source_output
->kill
= source_output_kill_cb
;
216 c
->source_output
->userdata
= c
;
218 l
= 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC
); /* 5s */
219 c
->output_memblockq
= memblockq_new(l
, sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
222 if (p
->mode
& PROTOCOL_SIMPLE_PLAYBACK
) {
226 if (!(sink
= sink_get_default(p
->core
))) {
227 fprintf(stderr
, "Failed to get default sink.\n");
231 c
->sink_input
= sink_input_new(sink
, &DEFAULT_SAMPLE_SPEC
, c
->client
->name
);
232 assert(c
->sink_input
);
233 c
->sink_input
->peek
= sink_input_peek_cb
;
234 c
->sink_input
->drop
= sink_input_drop_cb
;
235 c
->sink_input
->kill
= sink_input_kill_cb
;
236 c
->sink_input
->get_latency
= sink_input_get_latency_cb
;
237 c
->sink_input
->userdata
= c
;
239 l
= bytes_per_second(&DEFAULT_SAMPLE_SPEC
)/2; /* half a second */
240 c
->input_memblockq
= memblockq_new(l
, sample_size(&DEFAULT_SAMPLE_SPEC
), l
/2);
244 iochannel_set_callback(c
->io
, io_callback
, c
);
245 idxset_put(p
->connections
, c
, NULL
);
250 free_connection(c
, NULL
);
251 iochannel_free(c
->io
);
256 struct protocol_simple
* protocol_simple_new(struct core
*core
, struct socket_server
*server
, enum protocol_simple_mode mode
) {
257 struct protocol_simple
* p
;
258 assert(core
&& server
&& mode
<= PROTOCOL_SIMPLE_DUPLEX
&& mode
> 0);
260 p
= malloc(sizeof(struct protocol_simple
));
264 p
->connections
= idxset_new(NULL
, NULL
);
267 socket_server_set_callback(p
->server
, on_connection
, p
);
273 void protocol_simple_free(struct protocol_simple
*p
) {
276 idxset_free(p
->connections
, free_connection
, NULL
);
277 socket_server_free(p
->server
);