1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/host/linux/audio_pipe_reader.h"
12 #include "base/logging.h"
13 #include "base/posix/eintr_wrapper.h"
14 #include "base/stl_util.h"
20 const int kSampleBytesPerSecond
= AudioPipeReader::kSamplingRate
*
21 AudioPipeReader::kChannels
*
22 AudioPipeReader::kBytesPerSample
;
24 #if !defined(F_SETPIPE_SZ)
25 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
26 // to compile this code on machines with older kernel.
27 #define F_SETPIPE_SZ 1031
28 #endif // defined(F_SETPIPE_SZ)
33 scoped_refptr
<AudioPipeReader
> AudioPipeReader::Create(
34 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
,
35 const base::FilePath
& pipe_path
) {
36 // Create a reference to the new AudioPipeReader before posting the
37 // StartOnAudioThread task, otherwise it may be deleted on the audio
38 // thread before we return.
39 scoped_refptr
<AudioPipeReader
> pipe_reader
=
40 new AudioPipeReader(task_runner
, pipe_path
);
41 task_runner
->PostTask(
42 FROM_HERE
, base::Bind(&AudioPipeReader::StartOnAudioThread
, pipe_reader
));
46 AudioPipeReader::AudioPipeReader(
47 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
,
48 const base::FilePath
& pipe_path
)
49 : task_runner_(task_runner
),
50 pipe_path_(pipe_path
),
51 observers_(new base::ObserverListThreadSafe
<StreamObserver
>()) {
54 AudioPipeReader::~AudioPipeReader() {}
56 void AudioPipeReader::AddObserver(StreamObserver
* observer
) {
57 observers_
->AddObserver(observer
);
59 void AudioPipeReader::RemoveObserver(StreamObserver
* observer
) {
60 observers_
->RemoveObserver(observer
);
63 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd
) {
64 DCHECK_EQ(fd
, pipe_
.GetPlatformFile());
68 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd
) {
72 void AudioPipeReader::StartOnAudioThread() {
73 DCHECK(task_runner_
->BelongsToCurrentThread());
75 if (!file_watcher_
.Watch(pipe_path_
.DirName(), true,
76 base::Bind(&AudioPipeReader::OnDirectoryChanged
,
77 base::Unretained(this)))) {
78 LOG(ERROR
) << "Failed to watch pulseaudio directory "
79 << pipe_path_
.DirName().value();
85 void AudioPipeReader::OnDirectoryChanged(const base::FilePath
& path
,
87 DCHECK(task_runner_
->BelongsToCurrentThread());
90 LOG(ERROR
) << "File watcher returned an error.";
97 void AudioPipeReader::TryOpenPipe() {
98 DCHECK(task_runner_
->BelongsToCurrentThread());
101 HANDLE_EINTR(open(pipe_path_
.value().c_str(), O_RDONLY
| O_NONBLOCK
)));
103 // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
104 // file descriptors. Don't need to do anything if inode hasn't changed.
105 if (new_pipe
.IsValid() && pipe_
.IsValid()) {
106 struct stat old_stat
;
107 struct stat new_stat
;
108 if (fstat(pipe_
.GetPlatformFile(), &old_stat
) == 0 &&
109 fstat(new_pipe
.GetPlatformFile(), &new_stat
) == 0 &&
110 old_stat
.st_ino
== new_stat
.st_ino
) {
115 file_descriptor_watcher_
.StopWatchingFileDescriptor();
118 pipe_
= new_pipe
.Pass();
120 if (pipe_
.IsValid()) {
121 // Get buffer size for the pipe.
122 pipe_buffer_size_
= fpathconf(pipe_
.GetPlatformFile(), _PC_PIPE_BUF
);
123 if (pipe_buffer_size_
< 0) {
124 PLOG(ERROR
) << "fpathconf(_PC_PIPE_BUF)";
125 pipe_buffer_size_
= 4096;
128 // Read from the pipe twice per buffer length, to avoid starving the stream.
129 capture_period_
= base::TimeDelta::FromSeconds(1) * pipe_buffer_size_
/
130 kSampleBytesPerSecond
/ 2;
132 WaitForPipeReadable();
136 void AudioPipeReader::StartTimer() {
137 DCHECK(task_runner_
->BelongsToCurrentThread());
138 started_time_
= base::TimeTicks::Now();
139 last_capture_position_
= 0;
140 timer_
.Start(FROM_HERE
, capture_period_
, this, &AudioPipeReader::DoCapture
);
143 void AudioPipeReader::DoCapture() {
144 DCHECK(task_runner_
->BelongsToCurrentThread());
145 DCHECK(pipe_
.IsValid());
147 // Calculate how much we need read from the pipe. Pulseaudio doesn't control
148 // how much data it writes to the pipe, so we need to pace the stream.
149 base::TimeDelta stream_position
= base::TimeTicks::Now() - started_time_
;
150 int64 stream_position_bytes
= stream_position
.InMilliseconds() *
151 kSampleBytesPerSecond
/ base::Time::kMillisecondsPerSecond
;
152 int64 bytes_to_read
= stream_position_bytes
- last_capture_position_
;
154 std::string data
= left_over_bytes_
;
155 size_t pos
= data
.size();
156 left_over_bytes_
.clear();
157 data
.resize(pos
+ bytes_to_read
);
159 while (pos
< data
.size()) {
161 pipe_
.ReadAtCurrentPos(string_as_array(&data
) + pos
, data
.size() - pos
);
162 if (read_result
> 0) {
165 if (read_result
< 0 && errno
!= EWOULDBLOCK
&& errno
!= EAGAIN
)
166 PLOG(ERROR
) << "read";
171 // Stop reading from the pipe if PulseAudio isn't writing anything.
173 WaitForPipeReadable();
177 // Save any incomplete samples we've read for later. Each packet should
178 // contain integer number of samples.
179 int incomplete_samples_bytes
= pos
% (kChannels
* kBytesPerSample
);
180 left_over_bytes_
.assign(data
, pos
- incomplete_samples_bytes
,
181 incomplete_samples_bytes
);
182 data
.resize(pos
- incomplete_samples_bytes
);
184 last_capture_position_
+= data
.size();
185 // Normally PulseAudio will keep pipe buffer full, so we should always be able
186 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
187 // sure that |stream_position_bytes| doesn't go out of sync with the current
189 if (stream_position_bytes
- last_capture_position_
> pipe_buffer_size_
)
190 last_capture_position_
= stream_position_bytes
- pipe_buffer_size_
;
191 DCHECK_LE(last_capture_position_
, stream_position_bytes
);
193 // Dispatch asynchronous notification to the stream observers.
194 scoped_refptr
<base::RefCountedString
> data_ref
=
195 base::RefCountedString::TakeString(&data
);
196 observers_
->Notify(FROM_HERE
, &StreamObserver::OnDataRead
, data_ref
);
199 void AudioPipeReader::WaitForPipeReadable() {
201 base::MessageLoopForIO::current()->WatchFileDescriptor(
202 pipe_
.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ
,
203 &file_descriptor_watcher_
, this);
207 void AudioPipeReaderTraits::Destruct(const AudioPipeReader
* audio_pipe_reader
) {
208 audio_pipe_reader
->task_runner_
->DeleteSoon(FROM_HERE
, audio_pipe_reader
);
211 } // namespace remoting