1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/host/linux/audio_pipe_reader.h"
12 #include "base/files/file_path.h"
13 #include "base/logging.h"
14 #include "base/posix/eintr_wrapper.h"
15 #include "base/stl_util.h"
21 // PulseAudio's module-pipe-sink must be configured to use the following
22 // parameters for the sink we read from.
23 const int kSamplesPerSecond
= 48000;
24 const int kChannels
= 2;
25 const int kBytesPerSample
= 2;
26 const int kSampleBytesPerSecond
=
27 kSamplesPerSecond
* kChannels
* kBytesPerSample
;
29 // Read data from the pipe every 40ms.
30 const int kCapturingPeriodMs
= 40;
32 // Size of the pipe buffer in milliseconds.
33 const int kPipeBufferSizeMs
= kCapturingPeriodMs
* 2;
35 // Size of the pipe buffer in bytes.
36 const int kPipeBufferSizeBytes
= kPipeBufferSizeMs
* kSampleBytesPerSecond
/
37 base::Time::kMillisecondsPerSecond
;
39 #if !defined(F_SETPIPE_SZ)
40 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
41 // to compile this code on machines with older kernel.
42 #define F_SETPIPE_SZ 1031
43 #endif // defined(F_SETPIPE_SZ)
48 scoped_refptr
<AudioPipeReader
> AudioPipeReader::Create(
49 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
,
50 const base::FilePath
& pipe_name
) {
51 // Create a reference to the new AudioPipeReader before posting the
52 // StartOnAudioThread task, otherwise it may be deleted on the audio
53 // thread before we return.
54 scoped_refptr
<AudioPipeReader
> pipe_reader
=
55 new AudioPipeReader(task_runner
);
56 task_runner
->PostTask(FROM_HERE
, base::Bind(
57 &AudioPipeReader::StartOnAudioThread
, pipe_reader
, pipe_name
));
61 void AudioPipeReader::StartOnAudioThread(const base::FilePath
& pipe_name
) {
62 DCHECK(task_runner_
->BelongsToCurrentThread());
64 pipe_fd_
= HANDLE_EINTR(open(
65 pipe_name
.value().c_str(), O_RDONLY
| O_NONBLOCK
));
67 LOG(ERROR
) << "Failed to open " << pipe_name
.value();
71 // Set buffer size for the pipe.
72 int result
= HANDLE_EINTR(
73 fcntl(pipe_fd_
, F_SETPIPE_SZ
, kPipeBufferSizeBytes
));
75 PLOG(ERROR
) << "fcntl";
78 WaitForPipeReadable();
81 AudioPipeReader::AudioPipeReader(
82 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
)
83 : task_runner_(task_runner
),
84 observers_(new ObserverListThreadSafe
<StreamObserver
>()) {
87 AudioPipeReader::~AudioPipeReader() {
90 void AudioPipeReader::AddObserver(StreamObserver
* observer
) {
91 observers_
->AddObserver(observer
);
93 void AudioPipeReader::RemoveObserver(StreamObserver
* observer
) {
94 observers_
->RemoveObserver(observer
);
97 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd
) {
98 DCHECK_EQ(fd
, pipe_fd_
);
102 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd
) {
106 void AudioPipeReader::StartTimer() {
107 DCHECK(task_runner_
->BelongsToCurrentThread());
108 started_time_
= base::TimeTicks::Now();
109 last_capture_position_
= 0;
110 timer_
.Start(FROM_HERE
, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs
),
111 this, &AudioPipeReader::DoCapture
);
114 void AudioPipeReader::DoCapture() {
115 DCHECK(task_runner_
->BelongsToCurrentThread());
116 DCHECK_GT(pipe_fd_
, 0);
118 // Calculate how much we need read from the pipe. Pulseaudio doesn't control
119 // how much data it writes to the pipe, so we need to pace the stream, so
120 // that we read the exact number of the samples per second we need.
121 base::TimeDelta stream_position
= base::TimeTicks::Now() - started_time_
;
122 int64 stream_position_bytes
= stream_position
.InMilliseconds() *
123 kSampleBytesPerSecond
/ base::Time::kMillisecondsPerSecond
;
124 int64 bytes_to_read
= stream_position_bytes
- last_capture_position_
;
126 std::string data
= left_over_bytes_
;
127 size_t pos
= data
.size();
128 left_over_bytes_
.clear();
129 data
.resize(pos
+ bytes_to_read
);
131 while (pos
< data
.size()) {
132 int read_result
= HANDLE_EINTR(
133 read(pipe_fd_
, string_as_array(&data
) + pos
, data
.size() - pos
));
134 if (read_result
> 0) {
137 if (read_result
< 0 && errno
!= EWOULDBLOCK
&& errno
!= EAGAIN
)
138 PLOG(ERROR
) << "read";
143 // Stop reading from the pipe if PulseAudio isn't writing anything.
145 WaitForPipeReadable();
149 // Save any incomplete samples we've read for later. Each packet should
150 // contain integer number of samples.
151 int incomplete_samples_bytes
= pos
% (kChannels
* kBytesPerSample
);
152 left_over_bytes_
.assign(data
, pos
- incomplete_samples_bytes
,
153 incomplete_samples_bytes
);
154 data
.resize(pos
- incomplete_samples_bytes
);
156 last_capture_position_
+= data
.size();
157 // Normally PulseAudio will keep pipe buffer full, so we should always be able
158 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
159 // sure that |stream_position_bytes| doesn't go out of sync with the current
161 if (stream_position_bytes
- last_capture_position_
> kPipeBufferSizeBytes
)
162 last_capture_position_
= stream_position_bytes
- kPipeBufferSizeBytes
;
163 DCHECK_LE(last_capture_position_
, stream_position_bytes
);
165 // Dispatch asynchronous notification to the stream observers.
166 scoped_refptr
<base::RefCountedString
> data_ref
=
167 base::RefCountedString::TakeString(&data
);
168 observers_
->Notify(&StreamObserver::OnDataRead
, data_ref
);
171 void AudioPipeReader::WaitForPipeReadable() {
173 base::MessageLoopForIO::current()->WatchFileDescriptor(
176 base::MessageLoopForIO::WATCH_READ
,
177 &file_descriptor_watcher_
,
182 void AudioPipeReaderTraits::Destruct(const AudioPipeReader
* audio_pipe_reader
) {
183 audio_pipe_reader
->task_runner_
->DeleteSoon(FROM_HERE
, audio_pipe_reader
);
186 } // namespace remoting