Merge remote branch 'origin/bug_348_dvdburn_residenttask' into experimental
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         eDebug("FILEPUSH THREAD START");
41         
42                 /* we set the signal to not restart syscalls, so we can detect our signal. */
43         struct sigaction act;
44         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
45         act.sa_flags = 0;
46         sigaction(SIGUSR1, &act, 0);
47         
48         hasStarted();
49         
50         source_pos = m_raw_source.lseek(0, SEEK_CUR);
51         
52                 /* m_stop must be evaluated after each syscall. */
53         while (!m_stop)
54         {
55                         /* first try flushing the bufptr */
56                 if (m_buf_start != m_buf_end)
57                 {
58                                 /* filterRecordData wants to work on multiples of blocksize.
59                                    if it returns a negative result, it means that this many bytes should be skipped
60                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
61                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
62                                    Skipped bytes will also not be output.
63
64                                    if it returns a positive result, that means that only these many bytes should be used
65                                    in the buffer. 
66                                    
67                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
68                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
69                                    skip some data). This is probably a very special application for fast-forward, where the current
70                                    span is to be cancelled after a complete iframe has been output.
71
72                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
73                                    
74                                    we filter data only once, of course, but it might not get immediately written.
75                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
76                                 */
77                         
78                         int filter_res;
79                         
80                         do
81                         {
82                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
83
84                                 if (filter_res < 0)
85                                 {
86                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
87                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
88                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
89                                         continue; /* try again */
90                                 }
91                                 
92                                         /* adjust end of buffer to strip dropped tail bytes */
93                                 m_buf_end = m_filter_end + filter_res;
94                                         /* mark data as filtered. */
95                                 m_filter_end = m_buf_end;
96                         } while (0);
97                         
98                         ASSERT(m_filter_end == m_buf_end);
99                         
100                         if (m_buf_start == m_buf_end)
101                                 continue;
102
103                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
104                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
105                                 */
106                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
107 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
108 //                      eDebug("wrote %d bytes", w);
109                         if (w <= 0)
110                         {
111                                 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
112                                         continue;
113                                 eDebug("eFilePushThread WRITE ERROR");
114                                 sendEvent(evtWriteError);
115                                 break;
116                                 // ... we would stop the thread
117                         }
118
119                         written_since_last_sync += w;
120
121                         if (written_since_last_sync >= 512*1024)
122                         {
123                                 int toflush = written_since_last_sync > 2*1024*1024 ?
124                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
125                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
126                                 dest_pos -= toflush;
127                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
128                                 written_since_last_sync -= toflush;
129                         }
130
131 //                      printf("FILEPUSH: wrote %d bytes\n", w);
132                         m_buf_start += w;
133                         continue;
134                 }
135
136                         /* now fill our buffer. */
137                         
138                 if (m_sg && !current_span_remaining)
139                 {
140                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
141                         ASSERT(!(current_span_remaining % m_blocksize));
142
143                         if (source_pos != current_span_offset)
144                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
145                         bytes_read = 0;
146                 }
147                 
148                 size_t maxread = sizeof(m_buffer);
149                 
150                         /* if we have a source span, don't read past the end */
151                 if (m_sg && maxread > current_span_remaining)
152                         maxread = current_span_remaining;
153
154                         /* align to blocksize */
155                 maxread -= maxread % m_blocksize;
156
157                 m_buf_start = 0;
158                 m_filter_end = 0;
159                 m_buf_end = 0;
160                 
161                 if (maxread)
162                         m_buf_end = m_raw_source.read(m_buffer, maxread);
163
164                 if (m_buf_end < 0)
165                 {
166                         m_buf_end = 0;
167                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
168                                 continue;
169                         if (errno == EOVERFLOW)
170                         {
171                                 eWarning("OVERFLOW while recording");
172                                 continue;
173                         }
174                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
175                 }
176
177                         /* a read might be mis-aligned in case of a short read. */
178                 int d = m_buf_end % m_blocksize;
179                 if (d)
180                 {
181                         m_raw_source.lseek(-d, SEEK_CUR);
182                         m_buf_end -= d;
183                 }
184
185                 if (m_buf_end == 0)
186                 {
187                                 /* on EOF, try COMMITting once. */
188                         if (m_send_pvr_commit)
189                         {
190                                 struct pollfd pfd;
191                                 pfd.fd = m_fd_dest;
192                                 pfd.events = POLLIN;
193                                 switch (poll(&pfd, 1, 250)) // wait for 250ms
194                                 {
195                                         case 0:
196                                                 eDebug("wait for driver eof timeout");
197                                                 continue;
198                                         case 1:
199                                                 eDebug("wait for driver eof ok");
200                                                 break;
201                                         default:
202                                                 eDebug("wait for driver eof aborted by signal");
203                                                 continue;
204                                 }
205                         }
206                         
207                                 /* in stream_mode, we are sending EOF events 
208                                    over and over until somebody responds.
209                                    
210                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
211                         sendEvent(evtEOF);
212
213                         if (m_stream_mode)
214                         {
215                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
216                                 sleep(1);
217                                 continue;
218                         }
219 #if 0
220                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
221                         if (!m_raw_source.lseek(0, SEEK_SET))
222                         {
223                                 eDebug("(looping)");
224                                 continue;
225                         }
226 #endif
227                         break;
228                 } else
229                 {
230                         source_pos += m_buf_end;
231                         bytes_read += m_buf_end;
232                         if (m_sg)
233                                 current_span_remaining -= m_buf_end;
234                 }
235 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
236         }
237         fdatasync(m_fd_dest);
238
239         eDebug("FILEPUSH THREAD STOP");
240 }
241
242 void eFilePushThread::start(int fd_source, int fd_dest)
243 {
244         m_raw_source.setfd(fd_source);
245         m_fd_dest = fd_dest;
246         resume();
247 }
248
249 int eFilePushThread::start(const char *filename, int fd_dest)
250 {
251         if (m_raw_source.open(filename) < 0)
252                 return -1;
253         m_fd_dest = fd_dest;
254         resume();
255         return 0;
256 }
257
258 void eFilePushThread::stop()
259 {
260                 /* if we aren't running, don't bother stopping. */
261         if (!sync())
262                 return;
263
264         m_stop = 1;
265
266         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
267         sendSignal(SIGUSR1);
268         kill(0);
269 }
270
271 void eFilePushThread::pause()
272 {
273         stop();
274 }
275
276 void eFilePushThread::seek(int whence, off_t where)
277 {
278         m_raw_source.lseek(where, whence);
279 }
280
281 void eFilePushThread::resume()
282 {
283         m_stop = 0;
284         run();
285 }
286
287 void eFilePushThread::flush()
288 {
289         m_buf_start = m_buf_end = m_filter_end = 0;
290 }
291
292 void eFilePushThread::enablePVRCommit(int s)
293 {
294         m_send_pvr_commit = s;
295 }
296
297 void eFilePushThread::setStreamMode(int s)
298 {
299         m_stream_mode = s;
300 }
301
302 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
303 {
304         m_sg = sg;
305 }
306
307 void eFilePushThread::sendEvent(int evt)
308 {
309         m_messagepump.send(evt);
310 }
311
312 void eFilePushThread::recvEvent(const int &evt)
313 {
314         m_event(evt);
315 }
316
317 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
318 {
319         return len;
320 }