return 0;
}
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
struct userdata *u;
size_t l;
pa_assert(i);
u = i->userdata;
pa_assert(u);
- pa_assert(chunk);
pa_assert(length > 0);
u->peek_index += length;
static void sink_input_kill_cb(pa_sink_input *i) {
struct userdata *u;
- pa_assert(i && i->userdata);
+
+ pa_assert(i);
u = i->userdata;
+ pa_assert(u);
pa_sink_input_disconnect(u->sink_input);
pa_sink_input_unref(u->sink_input);
}
skip += k;
- pa_memblockq_drop(q, &chunk, k);
+ pa_memblockq_drop(q, k);
}
if (r < 0 || !chunk.memblock || n >= size || iov_idx >= MAX_IOVECS) {
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
- pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+ pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
/* Fix the simulated local read index */
if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
static void segment_detach(pa_memimport_segment *seg);
-PA_STATIC_FLIST_DECLARE(unused_memblocks, 0);
+PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);
/* No lock necessary */
static void stat_add(pa_memblock*b) {
pa_mutex_unlock(p->mutex);
if (pa_atomic_load(&p->stat.n_allocated) > 0) {
- raise(SIGTRAP);
- pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!");
+/* raise(SIGTRAP); */
+ pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));
}
pa_flist_free(p->free_slots, NULL);
#include <pulsecore/log.h>
#include <pulsecore/mcalign.h>
#include <pulsecore/macro.h>
+#include <pulsecore/flist.h>
#include "memblockq.h"
-struct memblock_list {
- struct memblock_list *next, *prev;
+struct list_item {
+ struct list_item *next, *prev;
int64_t index;
pa_memchunk chunk;
};
+PA_STATIC_FLIST_DECLARE(list_items, 0, pa_xfree);
+
struct pa_memblockq {
- struct memblock_list *blocks, *blocks_tail;
+ struct list_item *blocks, *blocks_tail;
unsigned n_blocks;
size_t maxlength, tlength, base, prebuf, minreq;
int64_t read_index, write_index;
- enum { PREBUF, RUNNING } state;
+ int in_prebuf;
pa_memblock *silence;
pa_mcalign *mcalign;
};
bq->read_index = bq->write_index = idx;
pa_log_debug("memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",
- (unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq);
+ (unsigned long) maxlength, (unsigned long) tlength, (unsigned long) base, (unsigned long) prebuf, (unsigned long) minreq);
bq->maxlength = ((maxlength+base-1)/base)*base;
pa_assert(bq->maxlength >= base);
bq->tlength = ((tlength+base-1)/base)*base;
- if (!bq->tlength || bq->tlength >= bq->maxlength)
+ if (bq->tlength <= 0 || bq->tlength > bq->maxlength)
bq->tlength = bq->maxlength;
bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength/2 : prebuf;
pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",
(unsigned long)bq->maxlength, (unsigned long)bq->tlength, (unsigned long)bq->base, (unsigned long)bq->prebuf, (unsigned long)bq->minreq);
- bq->state = bq->prebuf ? PREBUF : RUNNING;
+ bq->in_prebuf = bq->prebuf > 0;
bq->silence = silence ? pa_memblock_ref(silence) : NULL;
bq->mcalign = NULL;
pa_assert(bq);
pa_memblockq_flush(bq);
-
+
if (bq->silence)
pa_memblock_unref(bq->silence);
pa_xfree(bq);
}
-static void drop_block(pa_memblockq *bq, struct memblock_list *q) {
+static void drop_block(pa_memblockq *bq, struct list_item *q) {
pa_assert(bq);
pa_assert(q);
bq->blocks_tail = q->prev;
pa_memblock_unref(q->chunk.memblock);
- pa_xfree(q);
+
+ if (pa_flist_push(PA_STATIC_FLIST_GET(list_items), q) < 0)
+ pa_xfree(q);
bq->n_blocks--;
}
int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {
- struct memblock_list *q, *n;
+ struct list_item *q, *n;
pa_memchunk chunk;
pa_assert(bq);
if (chunk.length > d) {
chunk.index += d;
chunk.length -= d;
- bq->write_index = bq->read_index;
+ bq->write_index += d;
} else {
/* We drop the incoming data completely */
bq->write_index += chunk.length;
q = bq->blocks_tail;
while (q) {
- if (bq->write_index >= q->index + (int64_t)q->chunk.length)
+ if (bq->write_index >= q->index + (int64_t) q->chunk.length)
/* We found the entry where we need to place the new entry immediately after */
break;
- else if (bq->write_index + (int64_t)chunk.length <= q->index) {
+ else if (bq->write_index + (int64_t) chunk.length <= q->index) {
/* This entry isn't touched at all, let's skip it */
q = q->prev;
} else if (bq->write_index <= q->index &&
/* This entry is fully replaced by the new entry, so let's drop it */
- struct memblock_list *p;
+ struct list_item *p;
p = q;
q = q->prev;
drop_block(bq, p);
if (bq->write_index + chunk.length < q->index + q->chunk.length) {
/* We need to save the end of this memchunk */
- struct memblock_list *p;
+ struct list_item *p;
size_t d;
/* Create a new list entry for the end of thie memchunk */
- p = pa_xnew(struct memblock_list, 1);
+ if (!(p = pa_flist_pop(PA_STATIC_FLIST_GET(list_items))))
+ p = pa_xnew(struct list_item, 1);
+
p->chunk = q->chunk;
pa_memblock_ref(p->chunk.memblock);
/* Truncate the chunk */
if (!(q->chunk.length = bq->write_index - q->index)) {
- struct memblock_list *p;
+ struct list_item *p;
p = q;
q = q->prev;
drop_block(bq, p);
q = q->prev;
}
-
}
if (q) {
pa_assert(!bq->blocks || (bq->write_index + (int64_t)chunk.length <= bq->blocks->index));
- n = pa_xnew(struct memblock_list, 1);
+ if (!(n = pa_flist_pop(PA_STATIC_FLIST_GET(list_items))))
+ n = pa_xnew(struct list_item, 1);
+
n->chunk = chunk;
pa_memblock_ref(n->chunk.memblock);
n->index = bq->write_index;
return 0;
}
-int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
+static int memblockq_check_prebuf(pa_memblockq *bq) {
pa_assert(bq);
- pa_assert(chunk);
+
+ if (bq->in_prebuf) {
+
+ if (pa_memblockq_get_length(bq) < bq->prebuf)
+ return 1;
- if (bq->state == PREBUF) {
+ bq->in_prebuf = 0;
+ return 0;
+ } else {
- /* We need to pre-buffer */
- if (pa_memblockq_get_length(bq) < bq->prebuf)
- return -1;
+ if (bq->prebuf > 0 && bq->read_index >= bq->write_index) {
+ bq->in_prebuf = 1;
+ return 1;
+ }
- bq->state = RUNNING;
+ return 0;
+ }
+}
- } else if (bq->prebuf > 0 && bq->read_index >= bq->write_index) {
+int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
+ pa_assert(bq);
+ pa_assert(chunk);
- /* Buffer underflow protection */
- bq->state = PREBUF;
+ /* We need to pre-buffer */
+ if (memblockq_check_prebuf(bq))
return -1;
- }
/* Do we need to spit out silence? */
if (!bq->blocks || bq->blocks->index > bq->read_index) {
return 0;
}
-void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length) {
+void pa_memblockq_drop(pa_memblockq *bq, size_t length) {
pa_assert(bq);
pa_assert(length % bq->base == 0);
- pa_assert(!chunk || length <= chunk->length);
-
- if (chunk) {
-
- if (bq->blocks && bq->blocks->index == bq->read_index) {
- /* The first item in queue is valid */
-
- /* Does the chunk match with what the user supplied us? */
- if (memcmp(chunk, &bq->blocks->chunk, sizeof(pa_memchunk)) != 0)
- return;
-
- } else {
- size_t l;
-
- /* The first item in the queue is not yet relevant */
-
- pa_assert(!bq->blocks || bq->blocks->index > bq->read_index);
- l = bq->blocks ? bq->blocks->index - bq->read_index : 0;
-
- if (bq->silence) {
-
- if (!l || l > pa_memblock_get_length(bq->silence))
- l = pa_memblock_get_length(bq->silence);
-
- }
-
- /* Do the entries still match? */
- if (chunk->index != 0 || chunk->length != l || chunk->memblock != bq->silence)
- return;
- }
- }
-
+
while (length > 0) {
+ /* Do not drop any data when we are in prebuffering mode */
+ if (memblockq_check_prebuf(bq))
+ break;
+
if (bq->blocks) {
size_t d;
int pa_memblockq_is_readable(pa_memblockq *bq) {
pa_assert(bq);
- if (bq->prebuf > 0) {
- size_t l = pa_memblockq_get_length(bq);
-
- if (bq->state == PREBUF && l < bq->prebuf)
- return 0;
+ if (memblockq_check_prebuf(bq))
+ return 0;
- if (l <= 0)
- return 0;
- }
+ if (pa_memblockq_get_length(bq) <= 0)
+ return 0;
return 1;
}
return 0;
l = bq->tlength - l;
- return (l >= bq->minreq) ? l : 0;
+ return l >= bq->minreq ? l : 0;
}
size_t pa_memblockq_get_minreq(pa_memblockq *bq) {
bq->write_index = bq->read_index + offset;
return;
case PA_SEEK_RELATIVE_END:
- bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t)bq->blocks_tail->chunk.length : bq->read_index) + offset;
+ bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t) bq->blocks_tail->chunk.length : bq->read_index) + offset;
return;
}
pa_memchunk rchunk;
pa_assert(bq);
- pa_assert(chunk && bq->base);
+ pa_assert(chunk);
if (bq->base == 1)
return pa_memblockq_push(bq, chunk);
l = pa_memblockq_get_length(bq);
if (l > length)
- pa_memblockq_drop(bq, NULL, l - length);
+ pa_memblockq_drop(bq, l - length);
}
void pa_memblockq_prebuf_disable(pa_memblockq *bq) {
pa_assert(bq);
- if (bq->state == PREBUF)
- bq->state = RUNNING;
+ bq->in_prebuf = 0;
}
void pa_memblockq_prebuf_force(pa_memblockq *bq) {
pa_assert(bq);
- if (bq->state == RUNNING && bq->prebuf > 0)
- bq->state = PREBUF;
+ if (!bq->in_prebuf && bq->prebuf > 0)
+ bq->in_prebuf = 1;
}
size_t pa_memblockq_get_maxlength(pa_memblockq *bq) {
* you know what you do. */
int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk);
-/* Return a copy of the next memory chunk in the queue. It is not removed from the queue */
+/* Return a copy of the next memory chunk in the queue. It is not
+ * removed from the queue. There are two reasons this function might
+ * fail: 1. prebuffering is active, 2. queue is empty and no silence
+ * memblock was passed at initialization. If the queue is not empty,
+ * but we're currently at a hole in the queue and no silence memblock
+ * was passed we return the length of the hole in chunk->length. */
int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk);
-/* Drop the specified bytes from the queue, but only if the first
- * chunk in the queue matches the one passed here. If NULL is passed,
- * this check isn't done. */
-void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length);
+/* Drop the specified bytes from the queue. */
+void pa_memblockq_drop(pa_memblockq *bq, size_t length);
/* Test if the pa_memblockq is currently readable, that is, more data than base */
int pa_memblockq_is_readable(pa_memblockq *bq);
#include "play-memblockq.h"
-static void sink_input_kill(pa_sink_input *i) {
+static void sink_input_kill_cb(pa_sink_input *i) {
pa_memblockq *q;
assert(i);
assert(i->userdata);
pa_memblockq_free(q);
}
-static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
+static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
pa_memblockq *q;
assert(i);
assert(chunk);
return pa_memblockq_peek(q, chunk);
}
-static void si_kill(PA_GCC_UNUSED pa_mainloop_api *m, void *i) {
- sink_input_kill(i);
+static void si_kill_cb(PA_GCC_UNUSED pa_mainloop_api *m, void *i) {
+ sink_input_kill_cb(i);
}
-static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) {
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
pa_memblockq *q;
assert(i);
q = i->userdata;
- pa_memblockq_drop(q, chunk, length);
+ pa_memblockq_drop(q, length);
if (pa_memblockq_get_length(q) <= 0)
- pa_mainloop_api_once(i->sink->core->mainloop, si_kill, i);
+ pa_mainloop_api_once(i->sink->core->mainloop, si_kill_cb, i);
}
int pa_play_memblockq(
if (!(si = pa_sink_input_new(sink->core, &data, 0)))
return -1;
- si->peek = sink_input_peek;
- si->drop = sink_input_drop;
- si->kill = sink_input_kill;
+ si->peek = sink_input_peek_cb;
+ si->drop = sink_input_drop_cb;
+ si->kill = sink_input_kill_cb;
si->userdata = q;
#include "play-memchunk.h"
-static void sink_input_kill(pa_sink_input *i) {
+static void sink_input_kill_cb(pa_sink_input *i) {
pa_memchunk *c;
assert(i && i->userdata);
c = i->userdata;
pa_xfree(c);
}
-static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
+static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
pa_memchunk *c;
assert(i && chunk && i->userdata);
c = i->userdata;
return 0;
}
-static void si_kill(PA_GCC_UNUSED pa_mainloop_api *m, void *i) {
- sink_input_kill(i);
+static void si_kill_cb(PA_GCC_UNUSED pa_mainloop_api *m, void *i) {
+ sink_input_kill_cb(i);
}
-static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) {
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
pa_memchunk *c;
assert(i && length && i->userdata);
c = i->userdata;
- assert(!memcmp(chunk, c, sizeof(chunk)));
- assert(length <= c->length);
+ if (length >= c->length) {
+ c->length -= length;
+ c->index += length;
+ } else {
- c->length -= length;
- c->index += length;
+ c->length = 0;
- if (c->length <= 0)
- pa_mainloop_api_once(i->sink->core->mainloop, si_kill, i);
+ pa_mainloop_api_once(i->sink->core->mainloop, si_kill_cb, i);
+ }
}
int pa_play_memchunk(
if (!(si = pa_sink_input_new(sink->core, &data, 0)))
return -1;
- si->peek = sink_input_peek;
- si->drop = sink_input_drop;
- si->kill = sink_input_kill;
+ si->peek = sink_input_peek_cb;
+ si->drop = sink_input_drop_cb;
+ si->kill = sink_input_kill_cb;
si->userdata = nchunk = pa_xnew(pa_memchunk, 1);
*nchunk = *chunk;
PA_DECLARE_CLASS(connection);
#define CONNECTION(o) (connection_cast(o))
-
static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
struct pa_protocol_simple {
return -1;
}
- pa_memblockq_drop(c->output_memblockq, &chunk, r);
+ pa_memblockq_drop(c->output_memblockq, r);
return 0;
}
static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
connection *c = CONNECTION(o);
-
connection_assert_ref(c);
switch (code) {
/* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */
if (c->dead && r < 0)
- pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL);
+ pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL);
return r;
}
/* Called from thread context */
-static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
connection*c = i->userdata;
size_t old, new;
pa_assert(length);
old = pa_memblockq_missing(c->input_memblockq);
- pa_memblockq_drop(c->input_memblockq, chunk, length);
+ pa_memblockq_drop(c->input_memblockq, length);
new = pa_memblockq_missing(c->input_memblockq);
if (new > old) {
/* Called from main context */
static void sink_input_kill_cb(pa_sink_input *i) {
pa_assert(i);
- pa_assert(i->userdata);
- connection_drop((connection *) i->userdata);
+ connection_drop(CONNECTION(i->userdata));
}
/*** source_output callbacks ***/
/* } */
if (!i->thread_info.resampler) {
- do_volume_adj_here = 0;
+ do_volume_adj_here = 0; /* FIXME??? */
ret = i->peek(i, chunk);
goto finish;
}
if ((ret = i->peek(i, &tchunk)) < 0)
goto finish;
- pa_assert(tchunk.length);
+ pa_assert(tchunk.length > 0);
l = pa_resampler_request(i->thread_info.resampler, CONVERT_BUFFER_LENGTH);
- if (l > tchunk.length)
- l = tchunk.length;
+ if (tchunk.length > l)
+ tchunk.length = l;
- i->drop(i, &tchunk, l);
- tchunk.length = l;
+ i->drop(i, tchunk.length);
/* It might be necessary to adjust the volume here */
if (do_volume_adj_here && !volume_is_norm) {
}
pa_assert(i->thread_info.resampled_chunk.memblock);
- pa_assert(i->thread_info.resampled_chunk.length);
+ pa_assert(i->thread_info.resampled_chunk.length > 0);
*chunk = i->thread_info.resampled_chunk;
pa_memblock_ref(i->thread_info.resampled_chunk.memblock);
return ret;
}
-void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
+void pa_sink_input_drop(pa_sink_input *i, size_t length) {
pa_sink_input_assert_ref(i);
pa_assert(length > 0);
/* return; */
/* } */
- if (!i->thread_info.resampler) {
- if (i->drop)
- i->drop(i, chunk, length);
- return;
- }
-
- pa_assert(i->thread_info.resampled_chunk.memblock);
- pa_assert(i->thread_info.resampled_chunk.length >= length);
+ pa_log("dropping %u", length);
+
+ if (i->thread_info.resampled_chunk.memblock) {
+ size_t l = length;
+
+ if (l > i->thread_info.resampled_chunk.length)
+ l = i->thread_info.resampled_chunk.length;
+
+ pa_log("really dropping %u", l);
+
+ i->thread_info.resampled_chunk.index += l;
+ i->thread_info.resampled_chunk.length -= l;
+
+ if (i->thread_info.resampled_chunk.length <= 0) {
+ pa_memblock_unref(i->thread_info.resampled_chunk.memblock);
+ pa_memchunk_reset(&i->thread_info.resampled_chunk);
+ }
- i->thread_info.resampled_chunk.index += length;
- i->thread_info.resampled_chunk.length -= length;
+ length -= l;
+ }
- if (i->thread_info.resampled_chunk.length <= 0) {
- pa_memblock_unref(i->thread_info.resampled_chunk.memblock);
- i->thread_info.resampled_chunk.memblock = NULL;
- i->thread_info.resampled_chunk.index = i->thread_info.resampled_chunk.length = 0;
+ pa_log("really remaining %u", length);
+
+ if (length > 0) {
+
+ if (i->thread_info.resampler) {
+ /* So, we have a resampler. To avoid discontinuities we
+ * have to actually read all data that could be read and
+ * pass it through the resampler. */
+
+ while (length > 0) {
+ pa_memchunk chunk;
+ pa_cvolume volume;
+
+ if (pa_sink_input_peek(i, &chunk, &volume) >= 0) {
+ size_t l = chunk.length;
+
+ if (l > length)
+ l = length;
+
+ pa_sink_input_drop(i, l);
+ length -= l;
+
+ } else {
+ /* Hmmm, peeking failed, so let's at least drop
+ * the right amount of data */
+
+ if (i->drop)
+ i->drop(i, pa_resampler_request(i->thread_info.resampler, length));
+
+ break;
+ }
+ }
+
+ } else {
+
+ /* We have no resampler, hence let's just drop the data */
+
+ if (i->drop)
+ i->drop(i, length);
+ }
}
}
int muted;
int (*peek) (pa_sink_input *i, pa_memchunk *chunk);
- void (*drop) (pa_sink_input *i, const pa_memchunk *chunk, size_t length);
+ void (*drop) (pa_sink_input *i, size_t length);
void (*kill) (pa_sink_input *i); /* may be NULL */
pa_usec_t (*get_latency) (pa_sink_input *i); /* may be NULL */
void (*underrun) (pa_sink_input *i); /* may be NULL */
/* To be used exclusively by the sink driver thread */
int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume);
-void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length);
+void pa_sink_input_drop(pa_sink_input *i, size_t length);
int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);
#endif
}
/* Drop read data */
- pa_sink_input_drop(i, m ? &m->chunk : NULL, length);
+ pa_sink_input_drop(i, length);
if (m) {
pa_sink_input_unref(m->userdata);
#endif
#include <stdlib.h>
-#include <assert.h>
#include <stdio.h>
#include <string.h>
#define BUF_SIZE (1024*10)
-struct userdata {
+typedef struct file_stream {
+ pa_msgobject parent;
+ pa_core *core;
SNDFILE *sndfile;
pa_sink_input *sink_input;
pa_memchunk memchunk;
sf_count_t (*readf_function)(SNDFILE *sndfile, void *ptr, sf_count_t frames);
+ size_t drop;
+} file_stream;
+
+enum {
+ MESSAGE_DROP_FILE_STREAM
};
-static void free_userdata(struct userdata *u) {
- assert(u);
- if (u->sink_input) {
- pa_sink_input_disconnect(u->sink_input);
- pa_sink_input_unref(u->sink_input);
- }
+PA_DECLARE_CLASS(file_stream);
+#define FILE_STREAM(o) (file_stream_cast(o))
+static PA_DEFINE_CHECK_TYPE(file_stream, file_stream_check_type, pa_msgobject_check_type);
+
+static void file_stream_free(pa_object *o) {
+ file_stream *u = FILE_STREAM(o);
+ pa_assert(u);
+ pa_log("xxxx ffreee");
+
if (u->memchunk.memblock)
pa_memblock_unref(u->memchunk.memblock);
+
if (u->sndfile)
sf_close(u->sndfile);
pa_xfree(u);
}
-static void sink_input_kill(pa_sink_input *i) {
- assert(i && i->userdata);
- free_userdata(i->userdata);
-}
-
-static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
- struct userdata *u;
- assert(i && chunk && i->userdata);
- u = i->userdata;
+static void file_stream_drop(file_stream *u) {
+ file_stream_assert_ref(u);
- if (!u->memchunk.memblock) {
- uint32_t fs = pa_frame_size(&i->sample_spec);
- sf_count_t n;
- void *p;
+ pa_log("xxxx drop");
+
+
+ if (u->sink_input) {
+ pa_sink_input_disconnect(u->sink_input);
+ pa_sink_input_unref(u->sink_input);
+ u->sink_input = NULL;
- u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
- u->memchunk.index = 0;
+ /* Make sure we don't decrease the ref count twice. */
+ file_stream_unref(u);
+ }
+}
- p = pa_memblock_acquire(u->memchunk.memblock);
+static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
+ file_stream *u = FILE_STREAM(o);
+ file_stream_assert_ref(u);
+
+ switch (code) {
+ case MESSAGE_DROP_FILE_STREAM:
+ file_stream_drop(u);
+ break;
+ }
- if (u->readf_function) {
- if ((n = u->readf_function(u->sndfile, p, BUF_SIZE/fs)) <= 0)
- n = 0;
+ return 0;
+}
- u->memchunk.length = n * fs;
- } else {
- if ((n = sf_read_raw(u->sndfile, p, BUF_SIZE)) <= 0)
- n = 0;
+static void sink_input_kill_cb(pa_sink_input *i) {
+ pa_assert(i);
+
+ file_stream_drop(FILE_STREAM(i->userdata));
+}
- u->memchunk.length = n;
+static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
+ file_stream *u;
+
+ pa_assert(i);
+ pa_assert(chunk);
+ u = FILE_STREAM(i->userdata);
+ file_stream_assert_ref(u);
+
+ for (;;) {
+
+ if (!u->memchunk.memblock) {
+
+ u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
+ u->memchunk.index = 0;
+
+ if (u->readf_function) {
+ sf_count_t n;
+ void *p;
+ size_t fs = pa_frame_size(&i->sample_spec);
+
+ p = pa_memblock_acquire(u->memchunk.memblock);
+ n = u->readf_function(u->sndfile, p, BUF_SIZE/fs);
+ pa_memblock_release(u->memchunk.memblock);
+
+ pa_log("%u/%u = data: %02x %02x %02x %02x %02x %02x %02x %02x",
+ (unsigned int) n, BUF_SIZE/fs,
+ ((uint8_t*)p)[0], ((uint8_t*)p)[1], ((uint8_t*)p)[2], ((uint8_t*)p)[3],
+ ((uint8_t*)p)[4], ((uint8_t*)p)[5], ((uint8_t*)p)[6], ((uint8_t*)p)[7]);
+
+ if (n <= 0)
+ n = 0;
+
+ u->memchunk.length = n * fs;
+ } else {
+ sf_count_t n;
+ void *p;
+
+ p = pa_memblock_acquire(u->memchunk.memblock);
+ n = sf_read_raw(u->sndfile, p, BUF_SIZE);
+ pa_memblock_release(u->memchunk.memblock);
+
+ if (n <= 0)
+ n = 0;
+
+ u->memchunk.length = n;
+ }
+
+ if (u->memchunk.length <= 0) {
+
+ pa_memblock_unref(u->memchunk.memblock);
+ pa_memchunk_reset(&u->memchunk);
+
+ pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, NULL, NULL);
+ return -1;
+ }
}
- pa_memblock_release(u->memchunk.memblock);
- if (!u->memchunk.length) {
- free_userdata(u);
- return -1;
+ pa_assert(u->memchunk.memblock);
+ pa_assert(u->memchunk.length > 0);
+
+ if (u->drop < u->memchunk.length) {
+ u->memchunk.index += u->drop;
+ u->memchunk.length -= u->drop;
+ u->drop = 0;
+ break;
}
+
+ u->drop -= u->memchunk.length;
+ pa_memblock_unref(u->memchunk.memblock);
+ pa_memchunk_reset(&u->memchunk);
}
*chunk = u->memchunk;
pa_memblock_ref(chunk->memblock);
- assert(chunk->length);
+
+ pa_assert(chunk->length > 0);
+ pa_assert(u->drop <= 0);
+
return 0;
}
-static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) {
- struct userdata *u;
- assert(i && chunk && length && i->userdata);
- u = i->userdata;
+static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
+ file_stream *u;
- assert(!memcmp(chunk, &u->memchunk, sizeof(chunk)));
- assert(length <= u->memchunk.length);
+ pa_assert(i);
+ pa_assert(length > 0);
+ u = FILE_STREAM(i->userdata);
+ file_stream_assert_ref(u);
+
+ if (u->memchunk.memblock) {
- u->memchunk.index += length;
- u->memchunk.length -= length;
+ if (length < u->memchunk.length) {
+ u->memchunk.index += length;
+ u->memchunk.length -= length;
+ return;
+ }
- if (u->memchunk.length <= 0) {
+ length -= u->memchunk.length;
pa_memblock_unref(u->memchunk.memblock);
- u->memchunk.memblock = NULL;
- u->memchunk.index = u->memchunk.length = 0;
+ pa_memchunk_reset(&u->memchunk);
}
+
+ u->drop += length;
}
int pa_play_file(
const char *fname,
const pa_cvolume *volume) {
- struct userdata *u = NULL;
+ file_stream *u = NULL;
SF_INFO sfinfo;
pa_sample_spec ss;
pa_sink_input_new_data data;
- assert(sink);
- assert(fname);
+ pa_assert(sink);
+ pa_assert(fname);
- u = pa_xnew(struct userdata, 1);
+ u = pa_msgobject_new(file_stream, file_stream_check_type);
+ u->parent.parent.free = file_stream_free;
+ u->parent.process_msg = file_stream_process_msg;
+ u->core = sink->core;
u->sink_input = NULL;
- u->memchunk.memblock = NULL;
- u->memchunk.index = u->memchunk.length = 0;
+ pa_memchunk_reset(&u->memchunk);
u->sndfile = NULL;
+ u->readf_function = NULL;
+ u->drop = 0;
memset(&sfinfo, 0, sizeof(sfinfo));
goto fail;
}
- u->readf_function = NULL;
-
switch (sfinfo.format & 0xFF) {
case SF_FORMAT_PCM_16:
case SF_FORMAT_PCM_U8:
if (!(u->sink_input = pa_sink_input_new(sink->core, &data, 0)))
goto fail;
- u->sink_input->peek = sink_input_peek;
- u->sink_input->drop = sink_input_drop;
- u->sink_input->kill = sink_input_kill;
+ u->sink_input->peek = sink_input_peek_cb;
+ u->sink_input->drop = sink_input_drop_cb;
+ u->sink_input->kill = sink_input_kill_cb;
u->sink_input->userdata = u;
-/* pa_sink_notify(u->sink_input->sink); */
+ pa_sink_input_put(u->sink_input);
+
+ /* The reference to u is dangling here, because we want to keep
+ * this stream around until it is fully played. */
return 0;
fail:
if (u)
- free_userdata(u);
+ file_stream_unref(u);
return -1;
}
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
+#include <signal.h>
#include <pulsecore/memblockq.h>
#include <pulsecore/log.h>
bq = pa_memblockq_new(0, 40, 10, 2, 4, 4, silence);
assert(bq);
- chunk1.memblock = pa_memblock_new_fixed(p, (char*) "AA", 2, 1);
+ chunk1.memblock = pa_memblock_new_fixed(p, (char*) "11", 2, 1);
chunk1.index = 0;
chunk1.length = 2;
assert(chunk1.memblock);
- chunk2.memblock = pa_memblock_new_fixed(p, (char*) "TTBB", 4, 1);
+ chunk2.memblock = pa_memblock_new_fixed(p, (char*) "XX22", 4, 1);
chunk2.index = 2;
chunk2.length = 2;
assert(chunk2.memblock);
- chunk3.memblock = pa_memblock_new_fixed(p, (char*) "ZZZZ", 4, 1);
+ chunk3.memblock = pa_memblock_new_fixed(p, (char*) "3333", 4, 1);
chunk3.index = 0;
chunk3.length = 4;
assert(chunk3.memblock);
- chunk4.memblock = pa_memblock_new_fixed(p, (char*) "KKKKKKKK", 8, 1);
+ chunk4.memblock = pa_memblock_new_fixed(p, (char*) "44444444", 8, 1);
chunk4.index = 0;
chunk4.length = 8;
assert(chunk4.memblock);
chunk3.index += 2;
chunk3.length -= 2;
-
ret = pa_memblockq_push(bq, &chunk3);
assert(ret == 0);
- printf(">");
+ pa_memblockq_shorten(bq, pa_memblockq_get_length(bq)-2);
- pa_memblockq_shorten(bq, 6);
+ printf(">");
for (;;) {
pa_memchunk out;
pa_memblock_release(out.memblock);
pa_memblock_unref(out.memblock);
- pa_memblockq_drop(bq, &out, out.length);
+ pa_memblockq_drop(bq, out.length);
}
printf("<\n");