5 #include "protocol-native.h"
6 #include "protocol-native-spec.h"
9 #include "sourceoutput.h"
10 #include "sinkinput.h"
12 #include "tagstruct.h"
13 #include "pdispatch.h"
14 #include "pstream-util.h"
17 struct protocol_native
;
19 struct record_stream
{
20 struct connection
*connection
;
22 struct source_output
*source_output
;
23 struct memblockq
*memblockq
;
26 struct playback_stream
{
27 struct connection
*connection
;
30 struct sink_input
*sink_input
;
31 struct memblockq
*memblockq
;
36 struct protocol_native
*protocol
;
37 struct client
*client
;
38 struct pstream
*pstream
;
39 struct pdispatch
*pdispatch
;
40 struct idxset
*record_streams
, *playback_streams
;
43 struct protocol_native
{
46 struct socket_server
*server
;
47 struct idxset
*connections
;
50 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
);
51 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
);
52 static void sink_input_kill_cb(struct sink_input
*i
);
53 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
);
55 static void request_bytes(struct playback_stream
*s
);
57 static int command_exit(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
58 static int command_create_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
59 static int command_delete_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
);
61 static const struct pdispatch_command command_table
[PA_COMMAND_MAX
] = {
62 [PA_COMMAND_ERROR
] = { NULL
},
63 [PA_COMMAND_REPLY
] = { NULL
},
64 [PA_COMMAND_CREATE_PLAYBACK_STREAM
] = { command_create_playback_stream
},
65 [PA_COMMAND_DELETE_PLAYBACK_STREAM
] = { command_delete_playback_stream
},
66 [PA_COMMAND_CREATE_RECORD_STREAM
] = { NULL
},
67 [PA_COMMAND_DELETE_RECORD_STREAM
] = { NULL
},
68 [PA_COMMAND_EXIT
] = { command_exit
},
71 /* structure management */
73 static void record_stream_free(struct record_stream
* r
) {
74 assert(r
&& r
->connection
);
76 idxset_remove_by_data(r
->connection
->record_streams
, r
, NULL
);
77 source_output_free(r
->source_output
);
78 memblockq_free(r
->memblockq
);
82 static struct playback_stream
* playback_stream_new(struct connection
*c
, struct sink
*sink
, struct pa_sample_spec
*ss
, const char *name
, size_t qlen
, size_t maxlength
, size_t prebuf
) {
83 struct playback_stream
*s
;
84 assert(c
&& sink
&& s
&& name
&& qlen
&& maxlength
&& prebuf
);
86 s
= malloc(sizeof(struct playback_stream
));
91 s
->sink_input
= sink_input_new(sink
, ss
, name
);
92 assert(s
->sink_input
);
93 s
->sink_input
->peek
= sink_input_peek_cb
;
94 s
->sink_input
->drop
= sink_input_drop_cb
;
95 s
->sink_input
->kill
= sink_input_kill_cb
;
96 s
->sink_input
->get_latency
= sink_input_get_latency_cb
;
97 s
->sink_input
->userdata
= s
;
99 s
->memblockq
= memblockq_new(maxlength
, pa_sample_size(ss
), prebuf
);
100 assert(s
->memblockq
);
102 idxset_put(c
->playback_streams
, s
, &s
->index
);
107 static void playback_stream_free(struct playback_stream
* p
) {
108 assert(p
&& p
->connection
);
110 idxset_remove_by_data(p
->connection
->playback_streams
, p
, NULL
);
111 sink_input_free(p
->sink_input
);
112 memblockq_free(p
->memblockq
);
116 static void connection_free(struct connection
*c
) {
117 struct record_stream
*r
;
118 struct playback_stream
*p
;
119 assert(c
&& c
->protocol
);
121 idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
122 while ((r
= idxset_first(c
->record_streams
, NULL
)))
123 record_stream_free(r
);
124 idxset_free(c
->record_streams
, NULL
, NULL
);
126 while ((p
= idxset_first(c
->playback_streams
, NULL
)))
127 playback_stream_free(p
);
128 idxset_free(c
->playback_streams
, NULL
, NULL
);
130 pdispatch_free(c
->pdispatch
);
131 pstream_free(c
->pstream
);
132 client_free(c
->client
);
136 static void request_bytes(struct playback_stream
*s
) {
141 if (!(l
= memblockq_missing_to(s
->memblockq
, s
->qlength
)))
144 t
= tagstruct_new(NULL
, 0);
146 tagstruct_putu32(t
, PA_COMMAND_REQUEST
);
147 tagstruct_putu32(t
, s
->index
);
148 tagstruct_putu32(t
, l
);
149 pstream_send_tagstruct(s
->connection
->pstream
, t
);
152 /*** sinkinput callbacks ***/
154 static int sink_input_peek_cb(struct sink_input
*i
, struct memchunk
*chunk
) {
155 struct playback_stream
*s
;
156 assert(i
&& i
->userdata
&& chunk
);
159 if (memblockq_peek(s
->memblockq
, chunk
) < 0)
165 static void sink_input_drop_cb(struct sink_input
*i
, size_t length
) {
166 struct playback_stream
*s
;
167 assert(i
&& i
->userdata
&& length
);
170 memblockq_drop(s
->memblockq
, length
);
174 static void sink_input_kill_cb(struct sink_input
*i
) {
175 struct playback_stream
*s
;
176 assert(i
&& i
->userdata
);
179 playback_stream_free(s
);
182 static uint32_t sink_input_get_latency_cb(struct sink_input
*i
) {
183 struct playback_stream
*s
;
184 assert(i
&& i
->userdata
);
187 return pa_samples_usec(memblockq_get_length(s
->memblockq
), &s
->sink_input
->sample_spec
);
190 /*** pdispatch callbacks ***/
192 static int command_create_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
193 struct connection
*c
= userdata
;
194 struct playback_stream
*s
;
195 size_t maxlength
, prebuf
, qlength
;
198 struct pa_sample_spec ss
;
199 struct tagstruct
*reply
;
201 assert(c
&& t
&& c
->protocol
&& c
->protocol
->core
);
203 if (tagstruct_gets(t
, &name
) < 0 ||
204 tagstruct_get_sample_spec(t
, &ss
) < 0 ||
205 tagstruct_getu32(t
, &sink_index
) < 0 ||
206 tagstruct_getu32(t
, &qlength
) < 0 ||
207 tagstruct_getu32(t
, &maxlength
) < 0 ||
208 tagstruct_getu32(t
, &prebuf
) < 0 ||
212 if (!c
->authorized
) {
213 pstream_send_error(c
->pstream
, tag
, PA_ERROR_ACCESS
);
217 if (sink_index
== (uint32_t) -1)
218 sink
= sink_get_default(c
->protocol
->core
);
220 sink
= idxset_get_by_index(c
->protocol
->core
->sinks
, sink_index
);
223 pstream_send_error(c
->pstream
, tag
, PA_ERROR_EXIST
);
227 if (!(s
= playback_stream_new(c
, sink
, &ss
, name
, qlength
, maxlength
, prebuf
))) {
228 pstream_send_error(c
->pstream
, tag
, PA_ERROR_INVALID
);
232 reply
= tagstruct_new(NULL
, 0);
234 tagstruct_putu32(reply
, PA_COMMAND_REPLY
);
235 tagstruct_putu32(reply
, tag
);
236 tagstruct_putu32(reply
, s
->index
);
237 assert(s
->sink_input
);
238 tagstruct_putu32(reply
, s
->sink_input
->index
);
239 pstream_send_tagstruct(c
->pstream
, reply
);
243 static int command_delete_playback_stream(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
244 struct connection
*c
= userdata
;
246 struct playback_stream
*s
;
249 if (tagstruct_getu32(t
, &channel
) < 0 ||
253 if (!c
->authorized
) {
254 pstream_send_error(c
->pstream
, tag
, PA_ERROR_ACCESS
);
258 if (!(s
= idxset_get_by_index(c
->playback_streams
, channel
))) {
259 pstream_send_error(c
->pstream
, tag
, PA_ERROR_EXIST
);
263 pstream_send_simple_ack(c
->pstream
, tag
);
267 static int command_exit(struct pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct tagstruct
*t
, void *userdata
) {
268 struct connection
*c
= userdata
;
271 if (!tagstruct_eof(t
))
274 if (!c
->authorized
) {
275 pstream_send_error(c
->pstream
, tag
, PA_ERROR_ACCESS
);
279 assert(c
->protocol
&& c
->protocol
->core
&& c
->protocol
->core
->mainloop
);
280 c
->protocol
->core
->mainloop
->quit(c
->protocol
->core
->mainloop
, 0);
281 pstream_send_simple_ack(c
->pstream
, tag
); /* nonsense */
285 /*** pstream callbacks ***/
288 static int packet_callback(struct pstream
*p
, struct packet
*packet
, void *userdata
) {
289 struct connection
*c
= userdata
;
290 assert(p
&& packet
&& packet
->data
&& c
);
292 if (pdispatch_run(c
->pdispatch
, packet
, c
) < 0) {
293 fprintf(stderr
, "protocol-native: invalid packet.\n");
300 static int memblock_callback(struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
, void *userdata
) {
301 struct connection
*c
= userdata
;
302 struct playback_stream
*stream
;
303 assert(p
&& chunk
&& userdata
);
305 if (!(stream
= idxset_get_by_index(c
->playback_streams
, channel
))) {
306 fprintf(stderr
, "protocol-native: client sent block for invalid stream.\n");
310 memblockq_push(stream
->memblockq
, chunk
, delta
);
311 assert(stream
->sink_input
);
312 sink_notify(stream
->sink_input
->sink
);
317 static void die_callback(struct pstream
*p
, void *userdata
) {
318 struct connection
*c
= userdata
;
322 fprintf(stderr
, "protocol-native: connection died.\n");
325 /*** socket server callbacks ***/
327 static void on_connection(struct socket_server
*s
, struct iochannel
*io
, void *userdata
) {
328 struct protocol_native
*p
= userdata
;
329 struct connection
*c
;
330 assert(s
&& io
&& p
);
332 c
= malloc(sizeof(struct connection
));
334 c
->authorized
= p
->public;
337 c
->client
= client_new(p
->core
, "NATIVE", "Client");
339 c
->pstream
= pstream_new(p
->core
->mainloop
, io
);
342 pstream_set_recieve_packet_callback(c
->pstream
, packet_callback
, c
);
343 pstream_set_recieve_memblock_callback(c
->pstream
, memblock_callback
, c
);
344 pstream_set_die_callback(c
->pstream
, die_callback
, c
);
346 c
->pdispatch
= pdispatch_new(p
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
347 assert(c
->pdispatch
);
349 c
->record_streams
= idxset_new(NULL
, NULL
);
350 c
->playback_streams
= idxset_new(NULL
, NULL
);
351 assert(c
->record_streams
&& c
->playback_streams
);
353 idxset_put(p
->connections
, c
, NULL
);
356 /*** module entry points ***/
358 struct protocol_native
* protocol_native_new(struct core
*core
, struct socket_server
*server
) {
359 struct protocol_native
*p
;
360 assert(core
&& server
);
362 p
= malloc(sizeof(struct protocol_native
));
368 p
->connections
= idxset_new(NULL
, NULL
);
370 socket_server_set_callback(p
->server
, on_connection
, p
);
375 void protocol_native_free(struct protocol_native
*p
) {
376 struct connection
*c
;
379 while ((c
= idxset_first(p
->connections
, NULL
)))
381 idxset_free(p
->connections
, NULL
, NULL
);
382 socket_server_free(p
->server
);