int64_t frame_index;
snd_pcm_sframes_t hwbuf_unused_frames;
- snd_pcm_sframes_t avail_min_frames;
};
static void fix_tsched_watermark(struct userdata *u) {
max_use = u->hwbuf_size - u->hwbuf_unused_frames * u->frame_size;
- if (u->tsched_watermark >= max_use/2)
- u->tsched_watermark = max_use/2;
+ if (u->tsched_watermark >= max_use-u->frame_size)
+ u->tsched_watermark = max_use-u->frame_size;
+}
+
+static int try_recover(struct userdata *u, const char *call, int err) {
+ pa_assert(u);
+ pa_assert(call);
+ pa_assert(err < 0);
+
+ pa_log_debug("%s: %s", call, snd_strerror(err));
+
+ if (err == -EAGAIN) {
+ pa_log_debug("%s: EAGAIN", call);
+ return 1;
+ }
+
+ if (err == -EPIPE)
+ pa_log_debug("%s: Buffer underrun!", call);
+
+ if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
+ u->first = TRUE;
+ return 0;
+ }
+
+ pa_log("%s: %s", call, snd_strerror(err));
+ return -1;
+}
+
+static void check_left_to_play(struct userdata *u, snd_pcm_sframes_t n) {
+ size_t left_to_play;
+
+ if (u->first)
+ return;
+
+ if (n*u->frame_size < u->hwbuf_size)
+ left_to_play = u->hwbuf_size - (n*u->frame_size);
+ else
+ left_to_play = 0;
+
+ if (left_to_play > 0)
+ pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
+ else {
+ pa_log_info("Underrun!");
+
+ if (u->use_tsched) {
+ size_t old_watermark = u->tsched_watermark;
+
+ u->tsched_watermark *= 2;
+ fix_tsched_watermark(u);
+
+ if (old_watermark != u->tsched_watermark)
+ pa_log_notice("Increasing wakeup watermark to %0.2f ms",
+ (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
+ }
+ }
}
static int mmap_write(struct userdata *u) {
pa_memchunk chunk;
void *p;
snd_pcm_sframes_t n;
- int err;
+ int err, r;
const snd_pcm_channel_area_t *areas;
snd_pcm_uframes_t offset, frames;
- size_t left_to_play;
snd_pcm_hwsync(u->pcm_handle);
if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
- pa_log_debug("snd_pcm_avail_update: %s", snd_strerror(n));
-
- if (err == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (n == -EPIPE)
- pa_log_debug("snd_pcm_avail_update: Buffer underrun!");
-
- if ((err = snd_pcm_recover(u->pcm_handle, n, 1)) == 0) {
- u->first = TRUE;
+ if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("snd_pcm_recover: %s", snd_strerror(err));
- return -1;
+ return r;
}
+ check_left_to_play(u, n);
+
/* We only use part of the buffer that matches our
* dynamically requested latency */
if (PA_UNLIKELY(n <= u->hwbuf_unused_frames))
return work_done;
- if (n*u->frame_size < u->hwbuf_size)
- left_to_play = u->hwbuf_size - (n*u->frame_size);
- else
- left_to_play = 0;
-
- pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
-
- if (left_to_play <= 0 && !u->first) {
- u->tsched_watermark *= 2;
- fix_tsched_watermark(u);
- pa_log_notice("Underrun! Increasing wakeup watermark to %0.2f ms",
- (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
- }
-
frames = n = n - u->hwbuf_unused_frames;
- pa_log_debug("%llu frames to write", (unsigned long long) frames);
+ pa_log_debug("%lu frames to write", (unsigned long) frames);
if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) {
- pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err));
-
- if (err == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (err == -EPIPE)
- pa_log_debug("snd_pcm_mmap_begin: Buffer underrun!");
-
- if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
- u->first = TRUE;
+ if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("Failed to write data to DSP: %s", snd_strerror(err));
- return -1;
+ return r;
}
+ /* Make sure that if these memblocks need to be copied they will fit into one slot */
+ if (frames > pa_mempool_block_size_max(u->sink->core->mempool)/u->frame_size)
+ frames = pa_mempool_block_size_max(u->sink->core->mempool)/u->frame_size;
+
/* Check these are multiples of 8 bit */
pa_assert((areas[0].first & 7) == 0);
pa_assert((areas[0].step & 7)== 0);
p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
- chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, 1);
+ chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
chunk.length = pa_memblock_get_length(chunk.memblock);
chunk.index = 0;
if (PA_UNLIKELY((err = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {
- pa_log_debug("snd_pcm_mmap_commit: %s", snd_strerror(err));
-
- if (err == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (err == -EPIPE)
- pa_log_debug("snd_pcm_mmap_commit: Buffer underrun!");
-
- if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
- u->first = TRUE;
+ if ((r = try_recover(u, "snd_pcm_mmap_commit", err)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("Failed to write data to DSP: %s", snd_strerror(err));
- return -1;
+ return r;
}
work_done = 1;
u->frame_index += frames;
- pa_log_debug("wrote %llu frames", (unsigned long long) frames);
+ pa_log_debug("wrote %lu frames", (unsigned long) frames);
if (PA_LIKELY(frames >= (snd_pcm_uframes_t) n))
return work_done;
}
static int unix_write(struct userdata *u) {
- snd_pcm_status_t *status;
int work_done = 0;
- snd_pcm_status_alloca(&status);
-
pa_assert(u);
pa_sink_assert_ref(u->sink);
for (;;) {
void *p;
snd_pcm_sframes_t n, frames;
- int err;
+ int r;
snd_pcm_hwsync(u->pcm_handle);
- snd_pcm_avail_update(u->pcm_handle);
- if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) {
- pa_log("Failed to query DSP status data: %s", snd_strerror(err));
- return -1;
- }
+ if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
- if (PA_UNLIKELY(snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size))
- pa_log_debug("Buffer underrun!");
+ if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0)
+ continue;
+ else if (r > 0)
+ return work_done;
- n = snd_pcm_status_get_avail(status);
+ return r;
+ }
- /* We only use part of the buffer that matches our
- * dynamically requested latency */
+ check_left_to_play(u, n);
if (PA_UNLIKELY(n <= u->hwbuf_unused_frames))
return work_done;
n -= u->hwbuf_unused_frames;
+ pa_log_debug("%lu frames to write", (unsigned long) frames);
+
if (u->memchunk.length <= 0)
pa_sink_render(u->sink, n * u->frame_size, &u->memchunk);
if (PA_UNLIKELY(frames < 0)) {
- if (frames == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (frames == -EPIPE)
- pa_log_debug("snd_pcm_avail_update: Buffer underrun!");
-
- if ((frames = snd_pcm_recover(u->pcm_handle, frames, 1)) == 0) {
- u->first = TRUE;
+ if ((r = try_recover(u, "snd_pcm_writei", n)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("Failed to write data to DSP: %s", snd_strerror(frames));
- return -1;
+ return r;
}
u->memchunk.index += frames * u->frame_size;
u->frame_index += frames;
+ pa_log_debug("wrote %lu frames", (unsigned long) frames);
+
if (PA_LIKELY(frames >= n))
return work_done;
}
return;
}
-
frames = u->frame_index - delay;
+
/* pa_log_debug("frame_index = %llu, delay = %llu, p = %llu", (unsigned long long) u->frame_index, (unsigned long long) delay, (unsigned long long) frames); */
/* snd_pcm_status_get_tstamp(status, ×tamp); */
now1 = pa_rtclock_usec();
now2 = pa_smoother_get(u->smoother, now1);
- delay = (int64_t) pa_bytes_to_usec(u->frame_index * u->frame_size, &u->sink->sample_spec) - now2;
+ delay = (int64_t) pa_bytes_to_usec(u->frame_index * u->frame_size, &u->sink->sample_spec) - (int64_t) now2;
if (delay > 0)
r = (pa_usec_t) delay;
}
static int build_pollfd(struct userdata *u) {
- int err;
- struct pollfd *pollfd;
- int n;
-
pa_assert(u);
pa_assert(u->pcm_handle);
- if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) {
- pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n));
- return -1;
- }
-
if (u->alsa_rtpoll_item)
pa_rtpoll_item_free(u->alsa_rtpoll_item);
- u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, n);
- pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL);
-
- if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) {
- pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err));
+ if (!(u->alsa_rtpoll_item = pa_alsa_build_pollfd(u->pcm_handle, u->rtpoll)))
return -1;
- }
return 0;
}
if (usec == (pa_usec_t) -1)
usec = pa_bytes_to_usec(u->hwbuf_size, &u->sink->sample_spec);
-/* pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
+ pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
wm = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
usec /= 2;
}
-/* pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
+ pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
return usec;
}
static int update_sw_params(struct userdata *u) {
+ snd_pcm_uframes_t avail_min;
int err;
- pa_usec_t latency;
pa_assert(u);
/* Use the full buffer if noone asked us for anything specific */
u->hwbuf_unused_frames = 0;
- if (u->use_tsched)
+ if (u->use_tsched) {
+ pa_usec_t latency;
+
if ((latency = pa_sink_get_requested_latency_within_thread(u->sink)) != (pa_usec_t) -1) {
size_t b;
- pa_log_debug("latency set to %llu", (unsigned long long) latency);
+ pa_log_debug("latency set to %0.2f", (double) latency / PA_USEC_PER_MSEC);
b = pa_usec_to_bytes(latency, &u->sink->sample_spec);
fix_tsched_watermark(u);
}
+ }
pa_log_debug("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames);
/* We need at last one frame in the used part of the buffer */
- u->avail_min_frames = u->hwbuf_unused_frames + 1;
+ avail_min = u->hwbuf_unused_frames + 1;
if (u->use_tsched) {
pa_usec_t usec;
usec = hw_sleep_time(u);
-
- u->avail_min_frames += (pa_usec_to_bytes(usec, &u->sink->sample_spec) / u->frame_size);
+ avail_min += pa_usec_to_bytes(usec, &u->sink->sample_spec);
}
- pa_log_debug("setting avail_min=%lu", (unsigned long) u->avail_min_frames);
+ pa_log_debug("setting avail_min=%lu", (unsigned long) avail_min);
- if ((err = pa_alsa_set_sw_params(u->pcm_handle, u->avail_min_frames)) < 0) {
+ if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) {
pa_log("Failed to set software parameters: %s", snd_strerror(err));
return err;
}
}
break;
-
-/* case PA_SINK_MESSAGE_ADD_INPUT: */
-/* case PA_SINK_MESSAGE_REMOVE_INPUT: */
-/* case PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER: { */
-/* int r = pa_sink_process_msg(o, code, data, offset, chunk); */
-/* update_hwbuf_unused_frames(u); */
-/* return r; */
-/* } */
}
return pa_sink_process_msg(o, code, data, offset, chunk);
u->use_mmap = use_mmap;
u->use_tsched = use_tsched;
u->first = TRUE;
- pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = pa_rtpoll_new();
+ pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
u->alsa_rtpoll_item = NULL;
- pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
u->smoother = pa_smoother_new(DEFAULT_TSCHED_BUFFER_USEC*2, DEFAULT_TSCHED_BUFFER_USEC*2, TRUE);
usec = pa_rtclock_usec();
u->nfragments = nfrags;
u->hwbuf_size = u->fragment_size * nfrags;
u->hwbuf_unused_frames = 0;
- u->avail_min_frames = 0;
u->tsched_watermark = tsched_watermark;
u->frame_index = 0;
u->hw_dB_supported = FALSE;
u->hw_volume_min = u->hw_volume_max = 0;
if (use_tsched)
- if (u->tsched_watermark >= u->hwbuf_size/2)
- u->tsched_watermark = pa_frame_align(u->hwbuf_size/2, &ss);
+ fix_tsched_watermark(u);
u->sink->thread_info.max_rewind = use_tsched ? u->hwbuf_size : 0;
u->sink->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss);
-
if (!use_tsched)
u->sink->min_latency = u->sink->max_latency;
u->sink->get_volume = sink_get_volume_cb;
u->sink->set_volume = sink_set_volume_cb;
u->sink->flags |= PA_SINK_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SINK_DECIBEL_VOLUME : 0);
- pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using");
+ pa_log_info("Using hardware volume control. Hardware dB scale %s.", u->hw_dB_supported ? "supported" : "not supported");
} else if (mixer_reset) {
pa_log_info("Using software volume control. Trying to reset sound card to 0 dB.");
pa_smoother *smoother;
int64_t frame_index;
+
+ snd_pcm_sframes_t hwbuf_unused_frames;
};
+static void fix_tsched_watermark(struct userdata *u) {
+ size_t max_use;
+ pa_assert(u);
+
+ max_use = u->hwbuf_size - u->hwbuf_unused_frames * u->frame_size;
+
+ if (u->tsched_watermark >= max_use-u->frame_size)
+ u->tsched_watermark = max_use-u->frame_size;
+}
+
+static int try_recover(struct userdata *u, const char *call, int err) {
+ pa_assert(u);
+ pa_assert(call);
+ pa_assert(err < 0);
+
+ pa_log_debug("%s: %s", call, snd_strerror(err));
+
+ if (err == -EAGAIN) {
+ pa_log_debug("%s: EAGAIN", call);
+ return 1;
+ }
+
+ if (err == -EPIPE)
+ pa_log_debug("%s: Buffer overrun!", call);
+
+ if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
+ snd_pcm_start(u->pcm_handle);
+ return 0;
+ }
+
+ pa_log("%s: %s", call, snd_strerror(err));
+ return -1;
+}
+
+static void check_left_to_record(struct userdata *u, snd_pcm_sframes_t n) {
+ size_t left_to_record;
+
+ if (n*u->frame_size < u->hwbuf_size)
+ left_to_record = u->hwbuf_size - (n*u->frame_size);
+ else
+ left_to_record = 0;
+
+ if (left_to_record > 0)
+ pa_log_debug("%0.2f ms left to record", (double) pa_bytes_to_usec(left_to_record, &u->source->sample_spec) / PA_USEC_PER_MSEC);
+ else {
+ pa_log_info("Overrun!");
+
+ if (u->use_tsched) {
+ size_t old_watermark = u->tsched_watermark;
+
+ u->tsched_watermark *= 2;
+ fix_tsched_watermark(u);
+
+ if (old_watermark != u->tsched_watermark)
+ pa_log_notice("Increasing wakeup watermark to %0.2f ms",
+ (double) pa_bytes_to_usec(u->tsched_watermark, &u->source->sample_spec) / PA_USEC_PER_MSEC);
+ }
+ }
+}
+
static int mmap_read(struct userdata *u) {
int work_done = 0;
for (;;) {
snd_pcm_sframes_t n;
- int err;
+ int err, r;
const snd_pcm_channel_area_t *areas;
snd_pcm_uframes_t offset, frames;
pa_memchunk chunk;
if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
- pa_log_debug("snd_pcm_avail_update: %s", snd_strerror(n));
-
- if (err == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (n == -EPIPE)
- pa_log_debug("snd_pcm_avail_update: Buffer underrun!");
-
- if ((err = snd_pcm_recover(u->pcm_handle, n, 1)) == 0) {
- snd_pcm_start(u->pcm_handle);
+ if ((r = try_recover(u, "snd_pcm_avail_update", err)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("snd_pcm_recover: %s", snd_strerror(err));
- return -1;
+ return r;
}
+ check_left_to_record(u, n);
+
if (PA_UNLIKELY(n <= 0))
return work_done;
frames = n;
- if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) {
+ pa_log_debug("%lu frames to read", (unsigned long) frames);
- pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err));
-
- if (err == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (err == -EPIPE)
- pa_log_debug("snd_pcm_mmap_begin: Buffer underrun!");
+ if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) {
- if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
- snd_pcm_start(u->pcm_handle);
+ if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("Failed to write data to DSP: %s", snd_strerror(err));
- return -1;
+ return r;
}
+ /* Make sure that if these memblocks need to be copied they will fit into one slot */
+ if (frames > pa_mempool_block_size_max(u->source->core->mempool)/u->frame_size)
+ frames = pa_mempool_block_size_max(u->source->core->mempool)/u->frame_size;
+
/* Check these are multiples of 8 bit */
pa_assert((areas[0].first & 7) == 0);
pa_assert((areas[0].step & 7)== 0);
p = (uint8_t*) areas[0].addr + (offset * u->frame_size);
- chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, 1);
+ chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE);
chunk.length = pa_memblock_get_length(chunk.memblock);
chunk.index = 0;
pa_source_post(u->source, &chunk);
-
- /* FIXME: Maybe we can do something to keep this memory block
- * a little bit longer around? */
pa_memblock_unref_fixed(chunk.memblock);
if (PA_UNLIKELY((err = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {
- pa_log_debug("snd_pcm_mmap_commit: %s", snd_strerror(err));
-
- if (err == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (err == -EPIPE)
- pa_log_debug("snd_pcm_mmap_commit: Buffer underrun!");
-
- if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) {
- snd_pcm_start(u->pcm_handle);
+ if ((r = try_recover(u, "snd_pcm_mmap_commit", err)) == 0)
continue;
- }
+ else if (r > 0)
+ return work_done;
- pa_log("Failed to write data to DSP: %s", snd_strerror(err));
- return -1;
+ return r;
}
work_done = 1;
u->frame_index += frames;
- pa_log_debug("read %llu frames", (unsigned long long) frames);
+ pa_log_debug("read %lu frames", (unsigned long) frames);
if (PA_LIKELY(frames >= (snd_pcm_uframes_t) n))
return work_done;
}
static int unix_read(struct userdata *u) {
- snd_pcm_status_t *status;
int work_done = 0;
- snd_pcm_status_alloca(&status);
-
pa_assert(u);
pa_source_assert_ref(u->source);
for (;;) {
void *p;
snd_pcm_sframes_t n, frames;
- int err;
+ int r;
pa_memchunk chunk;
snd_pcm_hwsync(u->pcm_handle);
- if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) {
- pa_log("Failed to query DSP status data: %s", snd_strerror(err));
- return -1;
- }
+ if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) {
+
+ if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0)
+ continue;
+ else if (r > 0)
+ return work_done;
- if (PA_UNLIKELY(snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size))
- pa_log_debug("Buffer overrun!");
+ return r;
+ }
- n = snd_pcm_status_get_avail(status);
+ check_left_to_record(u, n);
if (PA_UNLIKELY(n <= 0))
return work_done;
if (frames > n)
frames = n;
+ pa_log_debug("%lu frames to read", (unsigned long) n);
+
p = pa_memblock_acquire(chunk.memblock);
frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, frames);
pa_memblock_release(chunk.memblock);
if (PA_UNLIKELY(frames < 0)) {
pa_memblock_unref(chunk.memblock);
- if (frames == -EAGAIN) {
- pa_log_debug("EAGAIN");
- return work_done;
- }
-
- if (frames == -EPIPE)
- pa_log_debug("snd_pcm_avail_update: Buffer overrun!");
-
- if ((frames = snd_pcm_recover(u->pcm_handle, frames, 1)) == 0)
- snd_pcm_start(u->pcm_handle);
+ if ((r = try_recover(u, "snd_pcm_readi", n)) == 0)
continue;
+ else if (r > 0)
+ return work_done;
- pa_log("Failed to read data from DSP: %s", snd_strerror(frames));
- return -1;
+ return r;
}
chunk.index = 0;
u->frame_index += frames;
+ pa_log_debug("read %lu frames", (unsigned long) frames);
+
if (PA_LIKELY(frames >= n))
return work_done;
}
now1 = pa_rtclock_usec();
now2 = pa_bytes_to_usec(frames * u->frame_size, &u->source->sample_spec);
+
pa_smoother_put(u->smoother, now1, now2);
}
static pa_usec_t source_get_latency(struct userdata *u) {
pa_usec_t r = 0;
int64_t delay;
+ pa_usec_t now1, now2;
pa_assert(u);
- delay = pa_smoother_get(u->smoother, pa_rtclock_usec()) - u->frame_index;
+ now1 = pa_rtclock_usec();
+ now2 = pa_smoother_get(u->smoother, now1);
+
+ delay = (int64_t) now2 - pa_bytes_to_usec(u->frame_index * u->frame_size, &u->source->sample_spec);
if (delay > 0)
- r = pa_bytes_to_usec(delay * u->frame_size, &u->source->sample_spec);
+ r = (pa_usec_t) delay;
return r;
}
static int build_pollfd(struct userdata *u) {
- int err;
- struct pollfd *pollfd;
- int n;
-
pa_assert(u);
pa_assert(u->pcm_handle);
- if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) {
- pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n));
- return -1;
- }
-
if (u->alsa_rtpoll_item)
pa_rtpoll_item_free(u->alsa_rtpoll_item);
- u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, n);
- pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL);
-
- if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) {
- pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err));
+ if (!(u->alsa_rtpoll_item = pa_alsa_build_pollfd(u->pcm_handle, u->rtpoll)))
return -1;
- }
return 0;
}
if (usec == (pa_usec_t) -1)
usec = pa_bytes_to_usec(u->hwbuf_size, &u->source->sample_spec);
- pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
+/* pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
wm = pa_bytes_to_usec(u->tsched_watermark, &u->source->sample_spec);
else
usec /= 2;
- pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC));
+/* pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
return usec;
}
static int update_sw_params(struct userdata *u) {
- size_t avail_min;
+ snd_pcm_uframes_t avail_min;
int err;
pa_assert(u);
+ /* Use the full buffer if noone asked us for anything specific */
+ u->hwbuf_unused_frames = 0;
+
if (u->use_tsched) {
- pa_usec_t usec;
+ pa_usec_t latency;
- usec = hw_sleep_time(u);
+ if ((latency = pa_source_get_requested_latency_within_thread(u->source)) != (pa_usec_t) -1) {
+ size_t b;
- avail_min = pa_usec_to_bytes(usec, &u->source->sample_spec);
+ pa_log_debug("latency set to %0.2f", (double) latency / PA_USEC_PER_MSEC);
- if (avail_min <= 0)
- avail_min = 1;
+ b = pa_usec_to_bytes(latency, &u->source->sample_spec);
- } else
- avail_min = 1;
+ /* We need at least one sample in our buffer */
+
+ if (PA_UNLIKELY(b < u->frame_size))
+ b = u->frame_size;
+
+ u->hwbuf_unused_frames =
+ PA_LIKELY(b < u->hwbuf_size) ?
+ ((u->hwbuf_size - b) / u->frame_size) : 0;
+
+ fix_tsched_watermark(u);
+ }
+ }
+
+ pa_log_debug("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames);
+
+ avail_min = 1;
+
+ if (u->use_tsched) {
+ pa_usec_t usec;
+
+ usec = hw_sleep_time(u);
+ avail_min += pa_usec_to_bytes(usec, &u->source->sample_spec);
+ }
+
+ pa_log_debug("setting avail_min=%lu", (unsigned long) avail_min);
if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) {
pa_log("Failed to set software parameters: %s", snd_strerror(err));
long alsa_vol;
pa_volume_t vol;
-
pa_assert(snd_mixer_selem_has_capture_channel(u->mixer_elem, u->mixer_map[i]));
vol = PA_MIN(s->volume.values[i], PA_VOLUME_NORM);
u->hw_dB_supported = FALSE;
}
-
alsa_vol = (long) roundf(((float) vol * (u->hw_volume_max - u->hw_volume_min)) / PA_VOLUME_NORM) + u->hw_volume_min;
alsa_vol = PA_CLAMP_UNLIKELY(alsa_vol, u->hw_volume_min, u->hw_volume_max);
for (;;) {
int ret;
+ pa_log_debug("loop");
+
/* Read some data and pass it to the sources */
if (PA_SOURCE_OPENED(u->source->thread_info.state)) {
int work_done = 0;
if (work_done < 0)
goto fail;
+ pa_log_debug("work_done = %i", work_done);
+
if (work_done)
update_smoother(u);
}
if (revents & (POLLERR|POLLNVAL|POLLHUP)) {
-
- if (pa_alsa_recover_from_poll(u->pcm_handle, revents))
+ if (pa_alsa_recover_from_poll(u->pcm_handle, revents) < 0)
goto fail;
snd_pcm_start(u->pcm_handle);
m->userdata = u;
u->use_mmap = use_mmap;
u->use_tsched = use_tsched;
- pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = pa_rtpoll_new();
+ pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
u->alsa_rtpoll_item = NULL;
- pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
u->smoother = pa_smoother_new(DEFAULT_TSCHED_WATERMARK_USEC, DEFAULT_TSCHED_WATERMARK_USEC, TRUE);
pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
if (use_mmap && !b) {
pa_log_info("Device doesn't support mmap(), falling back to UNIX read/write mode.");
- u->use_mmap = use_mmap = b;
+ u->use_mmap = use_mmap = FALSE;
}
if (use_tsched && (!b || !d)) {
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->device_name);
pa_proplist_setf(data.proplist, PA_PROP_DEVICE_BUFFERING_BUFFER_SIZE, "%lu", (unsigned long) (period_frames * frame_size * nfrags));
pa_proplist_setf(data.proplist, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE, "%lu", (unsigned long) (period_frames * frame_size));
- pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap_rewrite" : (u->use_mmap ? "mmap" : "serial"));
+ pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap+timer" : (u->use_mmap ? "mmap" : "serial"));
u->source = pa_source_new(m->core, &data, PA_SOURCE_HARDWARE|PA_SOURCE_LATENCY);
pa_source_new_data_done(&data);
u->fragment_size = frag_size = period_frames * frame_size;
u->nfragments = nfrags;
u->hwbuf_size = u->fragment_size * nfrags;
+ u->hwbuf_unused_frames = 0;
u->tsched_watermark = tsched_watermark;
u->frame_index = 0;
u->hw_dB_supported = FALSE;
u->hw_dB_min = u->hw_dB_max = 0;
u->hw_volume_min = u->hw_volume_max = 0;
- u->source->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss);
+ if (use_tsched)
+ fix_tsched_watermark(u);
+ u->source->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss);
if (!use_tsched)
u->source->min_latency = u->source->max_latency;
pa_log_info("Device has less than 4 volume levels. Falling back to software volume control.");
suitable = FALSE;
- } else if (snd_mixer_selem_get_playback_dB_range(u->mixer_elem, &u->hw_dB_min, &u->hw_dB_max) >= 0) {
+ } else if (snd_mixer_selem_get_capture_dB_range(u->mixer_elem, &u->hw_dB_min, &u->hw_dB_max) >= 0) {
pa_log_info("Volume ranges from %0.2f dB to %0.2f dB.", u->hw_dB_min/100.0, u->hw_dB_max/100.0);
u->source->get_volume = source_get_volume_cb;
u->source->set_volume = source_set_volume_cb;
u->source->flags |= PA_SOURCE_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SOURCE_DECIBEL_VOLUME : 0);
- pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using");
+ pa_log_info("Using hardware volume control. Hardware dB scale %s.", u->hw_dB_supported ? "supported" : "not supported");
} else if (mixer_reset) {
pa_log_info("Using software volume control. Trying to reset sound card to 0 dB.");
u->core = m->core;
u->module = m;
m->userdata = u;
- pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = pa_rtpoll_new();
- pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
+ pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
pa_sink_new_data_init(&data);
data.driver = __FILE__;
u->module = m;
m->userdata = u;
pa_memchunk_reset(&u->memchunk);
- pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = pa_rtpoll_new();
- pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
+ pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
u->module = m;
m->userdata = u;
pa_memchunk_reset(&u->memchunk);
- pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = pa_rtpoll_new();
- pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq);
+ pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
/* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
pa_mutex_lock(a->mutex);
- pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
+ pa_asyncq_post(a->asyncq, i);
pa_mutex_unlock(a->mutex);
}
/* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
pa_mutex_lock(a->mutex);
- pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
+ pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
pa_mutex_unlock(a->mutex);
pa_semaphore_wait(i.semaphore);
return i.ret;
}
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
pa_assert(!a->current);
return 1;
}
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- return pa_asyncq_get_fd(a->asyncq);
+ return pa_asyncq_read_fd(a->asyncq);
}
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- return pa_asyncq_before_poll(a->asyncq);
+ return pa_asyncq_read_before_poll(a->asyncq);
}
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- pa_asyncq_after_poll(a->asyncq);
+ pa_asyncq_read_after_poll(a->asyncq);
+}
+
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ return pa_asyncq_write_fd(a->asyncq);
+}
+
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ pa_asyncq_write_before_poll(a->asyncq);
+}
+
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ pa_asyncq_write_after_poll(a->asyncq);
}
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk);
-int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, int wait);
+int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, pa_bool_t wait);
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
int pa_asyncmsgq_process_one(pa_asyncmsgq *a);
-/* Just for the reading side */
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *q);
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a);
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a);
+/* For the reading side */
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *q);
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a);
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a);
+
+/* For the write side */
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *q);
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a);
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a);
#endif
#include <pulsecore/thread.h>
#include <pulsecore/macro.h>
#include <pulsecore/core-util.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/flist.h>
#include <pulse/xmalloc.h>
#include "asyncq.h"
#define _Y do { } while(0)
#endif
+struct localq {
+ void *data;
+ PA_LLIST_FIELDS(struct localq);
+};
+
struct pa_asyncq {
unsigned size;
unsigned read_idx;
unsigned write_idx;
pa_fdsem *read_fdsem, *write_fdsem;
+
+ PA_LLIST_HEAD(struct localq, localq);
+ struct localq *last_localq;
+ pa_bool_t waiting_for_post;
};
+PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
+
#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
static int is_power_of_two(unsigned size) {
l->size = size;
+ PA_LLIST_HEAD_INIT(struct localq, l->localq);
+ l->last_localq = NULL;
+ l->waiting_for_post = FALSE;
+
if (!(l->read_fdsem = pa_fdsem_new())) {
pa_xfree(l);
return NULL;
}
void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
+ struct localq *q;
pa_assert(l);
if (free_cb) {
free_cb(p);
}
+ while ((q = l->localq)) {
+ if (free_cb)
+ free_cb(q->data);
+
+ PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+ if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+ pa_xfree(q);
+ }
+
pa_fdsem_free(l->read_fdsem);
pa_fdsem_free(l->write_fdsem);
pa_xfree(l);
}
-int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
+static int push(pa_asyncq*l, void *p, pa_bool_t wait) {
int idx;
pa_atomic_ptr_t *cells;
return 0;
}
-void* pa_asyncq_pop(pa_asyncq*l, int wait) {
+static pa_bool_t flush_postq(pa_asyncq *l) {
+ struct localq *q;
+
+ pa_assert(l);
+
+ while ((q = l->last_localq)) {
+
+ if (push(l, q->data, FALSE) < 0)
+ return FALSE;
+
+ l->last_localq = q->prev;
+
+ PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+ if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+ pa_xfree(q);
+ }
+
+ return TRUE;
+}
+
+int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) {
+ pa_assert(l);
+
+ if (!flush_postq(l))
+ return -1;
+
+ return push(l, p, wait);
+}
+
+void pa_asyncq_post(pa_asyncq*l, void *p) {
+ struct localq *q;
+
+ pa_assert(l);
+ pa_assert(p);
+
+ if (pa_asyncq_push(l, p, FALSE) >= 0)
+ return;
+
+ /* OK, we couldn't push anything in the queue. So let's queue it
+ * locally and push it later */
+
+ pa_log("q overrun, queuing locally");
+
+ if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
+ q = pa_xnew(struct localq, 1);
+
+ q->data = p;
+ PA_LLIST_PREPEND(struct localq, l->localq, q);
+
+ if (!l->last_localq)
+ l->last_localq = q;
+
+ return;
+}
+
+void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) {
int idx;
void *ret;
pa_atomic_ptr_t *cells;
return ret;
}
-int pa_asyncq_get_fd(pa_asyncq *q) {
+int pa_asyncq_read_fd(pa_asyncq *q) {
pa_assert(q);
return pa_fdsem_get(q->write_fdsem);
}
-int pa_asyncq_before_poll(pa_asyncq *l) {
+int pa_asyncq_read_before_poll(pa_asyncq *l) {
int idx;
pa_atomic_ptr_t *cells;
return 0;
}
-void pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_read_after_poll(pa_asyncq *l) {
pa_assert(l);
pa_fdsem_after_poll(l->write_fdsem);
}
+
+int pa_asyncq_write_fd(pa_asyncq *q) {
+ pa_assert(q);
+
+ return pa_fdsem_get(q->read_fdsem);
+}
+
+void pa_asyncq_write_before_poll(pa_asyncq *l) {
+ pa_assert(l);
+
+ for (;;) {
+
+ if (flush_postq(l))
+ break;
+
+ if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
+ l->waiting_for_post = TRUE;
+ break;
+ }
+ }
+}
+
+void pa_asyncq_write_after_poll(pa_asyncq *l) {
+ pa_assert(l);
+
+ if (l->waiting_for_post) {
+ pa_fdsem_after_poll(l->read_fdsem);
+ l->waiting_for_post = FALSE;
+ }
+}
#include <sys/types.h>
#include <pulse/def.h>
+#include <pulsecore/macro.h>
/* A simple, asynchronous, lock-free (if requested also wait-free)
* queue. Not multiple-reader/multiple-writer safe. If that is
pa_asyncq* pa_asyncq_new(unsigned size);
void pa_asyncq_free(pa_asyncq* q, pa_free_cb_t free_cb);
-void* pa_asyncq_pop(pa_asyncq *q, int wait);
-int pa_asyncq_push(pa_asyncq *q, void *p, int wait);
+void* pa_asyncq_pop(pa_asyncq *q, pa_bool_t wait);
+int pa_asyncq_push(pa_asyncq *q, void *p, pa_bool_t wait);
-int pa_asyncq_get_fd(pa_asyncq *q);
-int pa_asyncq_before_poll(pa_asyncq *a);
-void pa_asyncq_after_poll(pa_asyncq *a);
+/* Similar to pa_asyncq_push(), but if the queue is full, postpone it
+ * locally and delay until pa_asyncq_before_poll_post() */
+void pa_asyncq_post(pa_asyncq*l, void *p);
+
+/* For the reading side */
+int pa_asyncq_read_fd(pa_asyncq *q);
+int pa_asyncq_read_before_poll(pa_asyncq *a);
+void pa_asyncq_read_after_poll(pa_asyncq *a);
+
+/* For the writing side */
+int pa_asyncq_write_fd(pa_asyncq *q);
+void pa_asyncq_write_before_poll(pa_asyncq *a);
+void pa_asyncq_write_after_poll(pa_asyncq *a);
#endif
return i;
}
-static int asyncmsgq_before(pa_rtpoll_item *i) {
+static int asyncmsgq_read_before(pa_rtpoll_item *i) {
pa_assert(i);
- if (pa_asyncmsgq_before_poll(i->userdata) < 0)
+ if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
return 1; /* 1 means immediate restart of the loop */
return 0;
}
-static void asyncmsgq_after(pa_rtpoll_item *i) {
+static void asyncmsgq_read_after(pa_rtpoll_item *i) {
pa_assert(i);
pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
- pa_asyncmsgq_after_poll(i->userdata);
+ pa_asyncmsgq_read_after_poll(i->userdata);
}
-static int asyncmsgq_work(pa_rtpoll_item *i) {
+static int asyncmsgq_read_work(pa_rtpoll_item *i) {
pa_msgobject *object;
int code;
void *data;
return 0;
}
-pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
pa_rtpoll_item *i;
struct pollfd *pollfd;
i = pa_rtpoll_item_new(p, prio, 1);
pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
- pollfd->fd = pa_asyncmsgq_get_fd(q);
+ pollfd->fd = pa_asyncmsgq_read_fd(q);
pollfd->events = POLLIN;
- i->before_cb = asyncmsgq_before;
- i->after_cb = asyncmsgq_after;
- i->work_cb = asyncmsgq_work;
+ i->before_cb = asyncmsgq_read_before;
+ i->after_cb = asyncmsgq_read_after;
+ i->work_cb = asyncmsgq_read_work;
+ i->userdata = q;
+
+ return i;
+}
+
+static int asyncmsgq_write_before(pa_rtpoll_item *i) {
+ pa_assert(i);
+
+ pa_asyncmsgq_write_before_poll(i->userdata);
+ return 0;
+}
+
+static void asyncmsgq_write_after(pa_rtpoll_item *i) {
+ pa_assert(i);
+
+ pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
+ pa_asyncmsgq_write_after_poll(i->userdata);
+}
+
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
+ pa_rtpoll_item *i;
+ struct pollfd *pollfd;
+
+ pa_assert(p);
+ pa_assert(q);
+
+ i = pa_rtpoll_item_new(p, prio, 1);
+
+ pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
+ pollfd->fd = pa_asyncmsgq_write_fd(q);
+ pollfd->events = POLLIN;
+
+ i->before_cb = asyncmsgq_write_before;
+ i->after_cb = asyncmsgq_write_after;
+ i->work_cb = NULL;
i->userdata = q;
return i;
void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i);
pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s);
-pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
/* Requests the loop to exit. Will cause the next iteration of
* pa_rtpoll_run() to return 0 */
PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq);
-static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
+static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
pa_thread_mq *q = userdata;
pa_asyncmsgq *aq;
- pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd);
+ pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd);
pa_assert(events == PA_IO_EVENT_INPUT);
pa_asyncmsgq_ref(aq = q->outq);
- pa_asyncmsgq_after_poll(aq);
+ pa_asyncmsgq_write_after_poll(aq);
for (;;) {
pa_msgobject *object;
pa_asyncmsgq_done(aq, ret);
}
- if (pa_asyncmsgq_before_poll(aq) == 0)
+ if (pa_asyncmsgq_read_before_poll(aq) == 0)
break;
}
pa_asyncmsgq_unref(aq);
}
-void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) {
+static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
+ pa_thread_mq *q = userdata;
+
+ pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd);
+ pa_assert(events == PA_IO_EVENT_INPUT);
+
+ pa_asyncmsgq_write_after_poll(q->inq);
+ pa_asyncmsgq_write_before_poll(q->inq);
+}
+
+void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) {
pa_assert(q);
pa_assert(mainloop);
pa_assert_se(q->inq = pa_asyncmsgq_new(0));
pa_assert_se(q->outq = pa_asyncmsgq_new(0));
- pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0);
- pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q));
+ pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0);
+ pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q));
+
+ pa_asyncmsgq_write_before_poll(q->inq);
+ pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q));
+
+ pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq);
+ pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq);
}
void pa_thread_mq_done(pa_thread_mq *q) {
pa_assert(q);
- q->mainloop->io_free(q->io_event);
- q->io_event = NULL;
+ q->mainloop->io_free(q->read_event);
+ q->mainloop->io_free(q->write_event);
+ q->read_event = q->write_event = NULL;
pa_asyncmsgq_unref(q->inq);
pa_asyncmsgq_unref(q->outq);
#include <pulse/mainloop-api.h>
#include <pulsecore/asyncmsgq.h>
+#include <pulsecore/rtpoll.h>
/* Two way communication between a thread and a mainloop. Before the
* thread is started a pa_pthread_mq should be initialized and than
typedef struct pa_thread_mq {
pa_mainloop_api *mainloop;
pa_asyncmsgq *inq, *outq;
- pa_io_event *io_event;
+ pa_io_event *read_event, *write_event;
} pa_thread_mq;
-void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop);
+void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll);
void pa_thread_mq_done(pa_thread_mq *q);
/* Install the specified pa_thread_mq object for the current thread */