1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/host/linux/audio_pipe_reader.h"
12 #include "base/logging.h"
13 #include "base/posix/eintr_wrapper.h"
14 #include "base/stl_util.h"
20 const int kSampleBytesPerSecond
= AudioPipeReader::kSamplingRate
*
21 AudioPipeReader::kChannels
*
22 AudioPipeReader::kBytesPerSample
;
24 // Read data from the pipe every 40ms.
25 const int kCapturingPeriodMs
= 40;
27 // Size of the pipe buffer in milliseconds.
28 const int kPipeBufferSizeMs
= kCapturingPeriodMs
* 2;
30 // Size of the pipe buffer in bytes.
31 const int kPipeBufferSizeBytes
= kPipeBufferSizeMs
* kSampleBytesPerSecond
/
32 base::Time::kMillisecondsPerSecond
;
34 #if !defined(F_SETPIPE_SZ)
35 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
36 // to compile this code on machines with older kernel.
37 #define F_SETPIPE_SZ 1031
38 #endif // defined(F_SETPIPE_SZ)
43 scoped_refptr
<AudioPipeReader
> AudioPipeReader::Create(
44 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
,
45 const base::FilePath
& pipe_path
) {
46 // Create a reference to the new AudioPipeReader before posting the
47 // StartOnAudioThread task, otherwise it may be deleted on the audio
48 // thread before we return.
49 scoped_refptr
<AudioPipeReader
> pipe_reader
=
50 new AudioPipeReader(task_runner
, pipe_path
);
51 task_runner
->PostTask(
52 FROM_HERE
, base::Bind(&AudioPipeReader::StartOnAudioThread
, pipe_reader
));
56 AudioPipeReader::AudioPipeReader(
57 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
,
58 const base::FilePath
& pipe_path
)
59 : task_runner_(task_runner
),
60 pipe_path_(pipe_path
),
61 observers_(new ObserverListThreadSafe
<StreamObserver
>()) {
64 AudioPipeReader::~AudioPipeReader() {}
66 void AudioPipeReader::AddObserver(StreamObserver
* observer
) {
67 observers_
->AddObserver(observer
);
69 void AudioPipeReader::RemoveObserver(StreamObserver
* observer
) {
70 observers_
->RemoveObserver(observer
);
73 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd
) {
74 DCHECK_EQ(fd
, pipe_
.GetPlatformFile());
78 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd
) {
82 void AudioPipeReader::StartOnAudioThread() {
83 DCHECK(task_runner_
->BelongsToCurrentThread());
85 if (!file_watcher_
.Watch(pipe_path_
.DirName(), true,
86 base::Bind(&AudioPipeReader::OnDirectoryChanged
,
87 base::Unretained(this)))) {
88 LOG(ERROR
) << "Failed to watch pulseaudio directory "
89 << pipe_path_
.DirName().value();
95 void AudioPipeReader::OnDirectoryChanged(const base::FilePath
& path
,
97 DCHECK(task_runner_
->BelongsToCurrentThread());
100 LOG(ERROR
) << "File watcher returned an error.";
107 void AudioPipeReader::TryOpenPipe() {
108 DCHECK(task_runner_
->BelongsToCurrentThread());
113 base::File::FLAG_OPEN
| base::File::FLAG_READ
| base::File::FLAG_ASYNC
);
115 // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
116 // file descriptors. Don't need to do anything if inode hasn't changed.
117 if (new_pipe
.IsValid() && pipe_
.IsValid()) {
118 struct stat old_stat
;
119 struct stat new_stat
;
120 if (fstat(pipe_
.GetPlatformFile(), &old_stat
) == 0 &&
121 fstat(new_pipe
.GetPlatformFile(), &new_stat
) == 0 &&
122 old_stat
.st_ino
== new_stat
.st_ino
) {
127 file_descriptor_watcher_
.StopWatchingFileDescriptor();
130 pipe_
= new_pipe
.Pass();
132 if (pipe_
.IsValid()) {
133 // Set O_NONBLOCK flag.
134 if (HANDLE_EINTR(fcntl(pipe_
.GetPlatformFile(), F_SETFL
, O_NONBLOCK
)) < 0) {
135 PLOG(ERROR
) << "fcntl";
140 // Set buffer size for the pipe.
141 if (HANDLE_EINTR(fcntl(
142 pipe_
.GetPlatformFile(), F_SETPIPE_SZ
, kPipeBufferSizeBytes
)) < 0) {
143 PLOG(ERROR
) << "fcntl";
146 WaitForPipeReadable();
150 void AudioPipeReader::StartTimer() {
151 DCHECK(task_runner_
->BelongsToCurrentThread());
152 started_time_
= base::TimeTicks::Now();
153 last_capture_position_
= 0;
154 timer_
.Start(FROM_HERE
, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs
),
155 this, &AudioPipeReader::DoCapture
);
158 void AudioPipeReader::DoCapture() {
159 DCHECK(task_runner_
->BelongsToCurrentThread());
160 DCHECK(pipe_
.IsValid());
162 // Calculate how much we need read from the pipe. Pulseaudio doesn't control
163 // how much data it writes to the pipe, so we need to pace the stream.
164 base::TimeDelta stream_position
= base::TimeTicks::Now() - started_time_
;
165 int64 stream_position_bytes
= stream_position
.InMilliseconds() *
166 kSampleBytesPerSecond
/ base::Time::kMillisecondsPerSecond
;
167 int64 bytes_to_read
= stream_position_bytes
- last_capture_position_
;
169 std::string data
= left_over_bytes_
;
170 size_t pos
= data
.size();
171 left_over_bytes_
.clear();
172 data
.resize(pos
+ bytes_to_read
);
174 while (pos
< data
.size()) {
176 pipe_
.ReadAtCurrentPos(string_as_array(&data
) + pos
, data
.size() - pos
);
177 if (read_result
> 0) {
180 if (read_result
< 0 && errno
!= EWOULDBLOCK
&& errno
!= EAGAIN
)
181 PLOG(ERROR
) << "read";
186 // Stop reading from the pipe if PulseAudio isn't writing anything.
188 WaitForPipeReadable();
192 // Save any incomplete samples we've read for later. Each packet should
193 // contain integer number of samples.
194 int incomplete_samples_bytes
= pos
% (kChannels
* kBytesPerSample
);
195 left_over_bytes_
.assign(data
, pos
- incomplete_samples_bytes
,
196 incomplete_samples_bytes
);
197 data
.resize(pos
- incomplete_samples_bytes
);
199 last_capture_position_
+= data
.size();
200 // Normally PulseAudio will keep pipe buffer full, so we should always be able
201 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
202 // sure that |stream_position_bytes| doesn't go out of sync with the current
204 if (stream_position_bytes
- last_capture_position_
> kPipeBufferSizeBytes
)
205 last_capture_position_
= stream_position_bytes
- kPipeBufferSizeBytes
;
206 DCHECK_LE(last_capture_position_
, stream_position_bytes
);
208 // Dispatch asynchronous notification to the stream observers.
209 scoped_refptr
<base::RefCountedString
> data_ref
=
210 base::RefCountedString::TakeString(&data
);
211 observers_
->Notify(&StreamObserver::OnDataRead
, data_ref
);
214 void AudioPipeReader::WaitForPipeReadable() {
216 base::MessageLoopForIO::current()->WatchFileDescriptor(
217 pipe_
.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ
,
218 &file_descriptor_watcher_
, this);
222 void AudioPipeReaderTraits::Destruct(const AudioPipeReader
* audio_pipe_reader
) {
223 audio_pipe_reader
->task_runner_
->DeleteSoon(FROM_HERE
, audio_pipe_reader
);
226 } // namespace remoting