I/O priority support with cfq scheduler (needs new kernel patch)
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level)
10         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
11 {
12         m_stop = 0;
13         m_sg = 0;
14         flush();
15         enablePVRCommit(0);
16         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
17 }
18
19 static void signal_handler(int x)
20 {
21 }
22
23 void eFilePushThread::thread()
24 {
25         setIoPrio(prio_class, prio);
26
27         off_t dest_pos = 0, source_pos = 0;
28         size_t bytes_read = 0;
29         
30         off_t current_span_offset = 0;
31         size_t current_span_remaining = 0;
32         
33         size_t written_since_last_sync = 0;
34
35         int already_empty = 0;
36         eDebug("FILEPUSH THREAD START");
37         
38                 /* we set the signal to not restart syscalls, so we can detect our signal. */
39         struct sigaction act;
40         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
41         act.sa_flags = 0;
42         sigaction(SIGUSR1, &act, 0);
43         
44         hasStarted();
45         
46         source_pos = m_raw_source.lseek(0, SEEK_CUR);
47         
48                 /* m_stop must be evaluated after each syscall. */
49         while (!m_stop)
50         {
51                         /* first try flushing the bufptr */
52                 if (m_buf_start != m_buf_end)
53                 {
54                                 // TODO: take care of boundaries.
55                         filterRecordData(m_buffer + m_buf_start, m_buf_end - m_buf_start);
56                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
57 //                      eDebug("wrote %d bytes", w);
58                         if (w <= 0)
59                         {
60                                 if (errno == EINTR)
61                                         continue;
62                                 break;
63                                 // ... we would stop the thread
64                         }
65
66                         written_since_last_sync += w;
67
68                         if (written_since_last_sync >= 512*1024)
69                         {
70                                 int toflush = written_since_last_sync > 2*1024*1024 ?
71                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
72                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
73                                 dest_pos -= toflush;
74                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
75                                 written_since_last_sync -= toflush;
76                         }
77
78 //                      printf("FILEPUSH: wrote %d bytes\n", w);
79                         m_buf_start += w;
80                         continue;
81                 }
82
83                         /* now fill our buffer. */
84                         
85                 if (m_sg && !current_span_remaining)
86                 {
87                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
88
89                         if (source_pos != current_span_offset)
90                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
91                         bytes_read = 0;
92                 }
93                 
94                 size_t maxread = sizeof(m_buffer);
95                 
96                         /* if we have a source span, don't read past the end */
97                 if (m_sg && maxread > current_span_remaining)
98                         maxread = current_span_remaining;
99
100                 m_buf_start = 0;
101                 m_buf_end = 0;
102                 
103                 if (maxread)
104                         m_buf_end = m_raw_source.read(m_buffer, maxread);
105
106                 if (m_buf_end < 0)
107                 {
108                         m_buf_end = 0;
109                         if (errno == EINTR)
110                                 continue;
111                         if (errno == EOVERFLOW)
112                         {
113                                 eWarning("OVERFLOW while recording");
114                                 continue;
115                         }
116                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
117                 }
118                 if (m_buf_end == 0)
119                 {
120                                 /* on EOF, try COMMITting once. */
121                         if (m_send_pvr_commit && !already_empty)
122                         {
123                                 eDebug("sending PVR commit");
124                                 already_empty = 1;
125                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
126                                         continue;
127                                 eDebug("commit done");
128                                                 /* well check again */
129                                 continue;
130                         }
131                         sendEvent(evtEOF);
132 #if 0
133                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
134                         if (!m_raw_source.lseek(0, SEEK_SET))
135                         {
136                                 eDebug("(looping)");
137                                 continue;
138                         }
139 #endif
140                         break;
141                 } else
142                 {
143                         source_pos += m_buf_end;
144                         bytes_read += m_buf_end;
145                         if (m_sg)
146                                 current_span_remaining -= m_buf_end;
147                         already_empty = 0;
148                 }
149 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
150         }
151         fdatasync(m_fd_dest);
152
153         eDebug("FILEPUSH THREAD STOP");
154 }
155
156 void eFilePushThread::start(int fd_source, int fd_dest)
157 {
158         m_raw_source.setfd(fd_source);
159         m_fd_dest = fd_dest;
160         resume();
161 }
162
163 int eFilePushThread::start(const char *filename, int fd_dest)
164 {
165         if (m_raw_source.open(filename) < 0)
166                 return -1;
167         m_fd_dest = fd_dest;
168         resume();
169         return 0;
170 }
171
172 void eFilePushThread::stop()
173 {
174                 /* if we aren't running, don't bother stopping. */
175         if (!sync())
176                 return;
177
178         m_stop = 1;
179         sendSignal(SIGUSR1);
180         kill();
181 }
182
183 void eFilePushThread::pause()
184 {
185         stop();
186 }
187
188 void eFilePushThread::seek(int whence, off_t where)
189 {
190         m_raw_source.lseek(where, whence);
191 }
192
193 void eFilePushThread::resume()
194 {
195         m_stop = 0;
196         run();
197 }
198
199 void eFilePushThread::flush()
200 {
201         m_buf_start = m_buf_end = 0;
202 }
203
204 void eFilePushThread::enablePVRCommit(int s)
205 {
206         m_send_pvr_commit = s;
207 }
208
209 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
210 {
211         m_sg = sg;
212 }
213
214 void eFilePushThread::sendEvent(int evt)
215 {
216         m_messagepump.send(evt);
217 }
218
219 void eFilePushThread::recvEvent(const int &evt)
220 {
221         m_event(evt);
222 }
223
224 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
225 {
226         /* do nothing */
227 }
228