Roll src/third_party/WebKit f36d5e0:68b67cd (svn 193299:193303)
[chromium-blink-merge.git] / remoting / host / linux / audio_pipe_reader.cc
blobcd293f57f00bf415bb6ccec3e94b787130cecb23
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/host/linux/audio_pipe_reader.h"
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
12 #include "base/logging.h"
13 #include "base/posix/eintr_wrapper.h"
14 #include "base/stl_util.h"
16 namespace remoting {
18 namespace {
20 const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate *
21 AudioPipeReader::kChannels *
22 AudioPipeReader::kBytesPerSample;
24 #if !defined(F_SETPIPE_SZ)
25 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
26 // to compile this code on machines with older kernel.
27 #define F_SETPIPE_SZ 1031
28 #endif // defined(F_SETPIPE_SZ)
30 } // namespace
32 // static
33 scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
34 scoped_refptr<base::SingleThreadTaskRunner> task_runner,
35 const base::FilePath& pipe_path) {
36 // Create a reference to the new AudioPipeReader before posting the
37 // StartOnAudioThread task, otherwise it may be deleted on the audio
38 // thread before we return.
39 scoped_refptr<AudioPipeReader> pipe_reader =
40 new AudioPipeReader(task_runner, pipe_path);
41 task_runner->PostTask(
42 FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader));
43 return pipe_reader;
46 AudioPipeReader::AudioPipeReader(
47 scoped_refptr<base::SingleThreadTaskRunner> task_runner,
48 const base::FilePath& pipe_path)
49 : task_runner_(task_runner),
50 pipe_path_(pipe_path),
51 observers_(new ObserverListThreadSafe<StreamObserver>()) {
54 AudioPipeReader::~AudioPipeReader() {}
56 void AudioPipeReader::AddObserver(StreamObserver* observer) {
57 observers_->AddObserver(observer);
59 void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
60 observers_->RemoveObserver(observer);
63 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
64 DCHECK_EQ(fd, pipe_.GetPlatformFile());
65 StartTimer();
68 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
69 NOTREACHED();
72 void AudioPipeReader::StartOnAudioThread() {
73 DCHECK(task_runner_->BelongsToCurrentThread());
75 if (!file_watcher_.Watch(pipe_path_.DirName(), true,
76 base::Bind(&AudioPipeReader::OnDirectoryChanged,
77 base::Unretained(this)))) {
78 LOG(ERROR) << "Failed to watch pulseaudio directory "
79 << pipe_path_.DirName().value();
82 TryOpenPipe();
85 void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path,
86 bool error) {
87 DCHECK(task_runner_->BelongsToCurrentThread());
89 if (error) {
90 LOG(ERROR) << "File watcher returned an error.";
91 return;
94 TryOpenPipe();
97 void AudioPipeReader::TryOpenPipe() {
98 DCHECK(task_runner_->BelongsToCurrentThread());
100 base::File new_pipe(
101 HANDLE_EINTR(open(pipe_path_.value().c_str(), O_RDONLY | O_NONBLOCK)));
103 // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
104 // file descriptors. Don't need to do anything if inode hasn't changed.
105 if (new_pipe.IsValid() && pipe_.IsValid()) {
106 struct stat old_stat;
107 struct stat new_stat;
108 if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 &&
109 fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 &&
110 old_stat.st_ino == new_stat.st_ino) {
111 return;
115 file_descriptor_watcher_.StopWatchingFileDescriptor();
116 timer_.Stop();
118 pipe_ = new_pipe.Pass();
120 if (pipe_.IsValid()) {
121 // Get buffer size for the pipe.
122 pipe_buffer_size_ = fpathconf(pipe_.GetPlatformFile(), _PC_PIPE_BUF);
123 if (pipe_buffer_size_ < 0) {
124 PLOG(ERROR) << "fpathconf(_PC_PIPE_BUF)";
125 pipe_buffer_size_ = 4096;
128 // Read from the pipe twice per buffer length, to avoid starving the stream.
129 capture_period_ = base::TimeDelta::FromSeconds(1) * pipe_buffer_size_ /
130 kSampleBytesPerSecond / 2;
132 WaitForPipeReadable();
136 void AudioPipeReader::StartTimer() {
137 DCHECK(task_runner_->BelongsToCurrentThread());
138 started_time_ = base::TimeTicks::Now();
139 last_capture_position_ = 0;
140 timer_.Start(FROM_HERE, capture_period_, this, &AudioPipeReader::DoCapture);
143 void AudioPipeReader::DoCapture() {
144 DCHECK(task_runner_->BelongsToCurrentThread());
145 DCHECK(pipe_.IsValid());
147 // Calculate how much we need read from the pipe. Pulseaudio doesn't control
148 // how much data it writes to the pipe, so we need to pace the stream.
149 base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
150 int64 stream_position_bytes = stream_position.InMilliseconds() *
151 kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
152 int64 bytes_to_read = stream_position_bytes - last_capture_position_;
154 std::string data = left_over_bytes_;
155 size_t pos = data.size();
156 left_over_bytes_.clear();
157 data.resize(pos + bytes_to_read);
159 while (pos < data.size()) {
160 int read_result =
161 pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos);
162 if (read_result > 0) {
163 pos += read_result;
164 } else {
165 if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
166 PLOG(ERROR) << "read";
167 break;
171 // Stop reading from the pipe if PulseAudio isn't writing anything.
172 if (pos == 0) {
173 WaitForPipeReadable();
174 return;
177 // Save any incomplete samples we've read for later. Each packet should
178 // contain integer number of samples.
179 int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
180 left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
181 incomplete_samples_bytes);
182 data.resize(pos - incomplete_samples_bytes);
184 last_capture_position_ += data.size();
185 // Normally PulseAudio will keep pipe buffer full, so we should always be able
186 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
187 // sure that |stream_position_bytes| doesn't go out of sync with the current
188 // stream position.
189 if (stream_position_bytes - last_capture_position_ > pipe_buffer_size_)
190 last_capture_position_ = stream_position_bytes - pipe_buffer_size_;
191 DCHECK_LE(last_capture_position_, stream_position_bytes);
193 // Dispatch asynchronous notification to the stream observers.
194 scoped_refptr<base::RefCountedString> data_ref =
195 base::RefCountedString::TakeString(&data);
196 observers_->Notify(FROM_HERE, &StreamObserver::OnDataRead, data_ref);
199 void AudioPipeReader::WaitForPipeReadable() {
200 timer_.Stop();
201 base::MessageLoopForIO::current()->WatchFileDescriptor(
202 pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ,
203 &file_descriptor_watcher_, this);
206 // static
207 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
208 audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
211 } // namespace remoting