improve filterRecordData
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         int already_empty = 0;
41         eDebug("FILEPUSH THREAD START");
42         
43                 /* we set the signal to not restart syscalls, so we can detect our signal. */
44         struct sigaction act;
45         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
46         act.sa_flags = 0;
47         sigaction(SIGUSR1, &act, 0);
48         
49         hasStarted();
50         
51         source_pos = m_raw_source.lseek(0, SEEK_CUR);
52         
53                 /* m_stop must be evaluated after each syscall. */
54         while (!m_stop)
55         {
56                         /* first try flushing the bufptr */
57                 if (m_buf_start != m_buf_end)
58                 {
59                                 /* filterRecordData wants to work on multiples of blocksize.
60                                    if it returns a negative result, it means that this many bytes should be skipped
61                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
62                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
63                                    Skipped bytes will also not be output.
64
65                                    if it returns a positive result, that means that only these many bytes should be used
66                                    in the buffer. 
67                                    
68                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
69                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
70                                    skip some data). This is probably a very special application for fast-forward, where the current
71                                    span is to be cancelled after a complete iframe has been output.
72
73                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
74                                    
75                                    we filter data only once, of course, but it might not get immediately written.
76                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
77                                 */
78                         
79                         int filter_res;
80                         
81                         do
82                         {
83                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
84
85                                 if (filter_res < 0)
86                                 {
87                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
88                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
89                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
90                                         continue; /* try again */
91                                 }
92                                 
93                                         /* adjust end of buffer to strip dropped tail bytes */
94                                 m_buf_end = m_filter_end + filter_res;
95                                         /* mark data as filtered. */
96                                 m_filter_end = m_buf_end;
97                         } while (0);
98                         
99                         ASSERT(m_filter_end == m_buf_end);
100                         
101                         if (m_buf_start == m_buf_end)
102                                 continue;
103
104                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
105                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
106                                 */
107                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
108                         fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
109 //                      eDebug("wrote %d bytes", w);
110                         if (w <= 0)
111                         {
112                                 if (errno == EINTR)
113                                         continue;
114                                 eDebug("eFilePushThread WRITE ERROR");
115                                 sendEvent(evtWriteError);
116                                 break;
117                                 // ... we would stop the thread
118                         }
119
120                         written_since_last_sync += w;
121
122                         if (written_since_last_sync >= 512*1024)
123                         {
124                                 int toflush = written_since_last_sync > 2*1024*1024 ?
125                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
126                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
127                                 dest_pos -= toflush;
128                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
129                                 written_since_last_sync -= toflush;
130                         }
131
132 //                      printf("FILEPUSH: wrote %d bytes\n", w);
133                         m_buf_start += w;
134                         continue;
135                 }
136
137                         /* now fill our buffer. */
138                         
139                 if (m_sg && !current_span_remaining)
140                 {
141                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
142                         ASSERT(!(current_span_remaining % m_blocksize));
143
144                         if (source_pos != current_span_offset)
145                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
146                         bytes_read = 0;
147                 }
148                 
149                 size_t maxread = sizeof(m_buffer);
150                 
151                         /* if we have a source span, don't read past the end */
152                 if (m_sg && maxread > current_span_remaining)
153                         maxread = current_span_remaining;
154
155                         /* align to blocksize */
156                 maxread -= maxread % m_blocksize;
157
158                 m_buf_start = 0;
159                 m_filter_end = 0;
160                 m_buf_end = 0;
161                 
162                 if (maxread)
163                         m_buf_end = m_raw_source.read(m_buffer, maxread);
164
165                 if (m_buf_end < 0)
166                 {
167                         m_buf_end = 0;
168                         if (errno == EINTR)
169                                 continue;
170                         if (errno == EOVERFLOW)
171                         {
172                                 eWarning("OVERFLOW while recording");
173                                 continue;
174                         }
175                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
176                 }
177                 if (m_buf_end == 0)
178                 {
179                                 /* on EOF, try COMMITting once. */
180                         if (m_send_pvr_commit && !already_empty)
181                         {
182                                 eDebug("sending PVR commit");
183                                 already_empty = 1;
184                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
185                                         continue;
186                                 eDebug("commit done");
187                                                 /* well check again */
188                                 continue;
189                         }
190                         
191                                 /* in stream_mode, we are sending EOF events 
192                                    over and over until somebody responds.
193                                    
194                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
195                         sendEvent(evtEOF);
196
197                         if (m_stream_mode)
198                         {
199                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
200                                 sleep(1);
201                                 continue;
202                         }
203 #if 0
204                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
205                         if (!m_raw_source.lseek(0, SEEK_SET))
206                         {
207                                 eDebug("(looping)");
208                                 continue;
209                         }
210 #endif
211                         break;
212                 } else
213                 {
214                         source_pos += m_buf_end;
215                         bytes_read += m_buf_end;
216                         if (m_sg)
217                                 current_span_remaining -= m_buf_end;
218                         already_empty = 0;
219                 }
220 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
221         }
222         fdatasync(m_fd_dest);
223
224         eDebug("FILEPUSH THREAD STOP");
225 }
226
227 void eFilePushThread::start(int fd_source, int fd_dest)
228 {
229         m_raw_source.setfd(fd_source);
230         m_fd_dest = fd_dest;
231         resume();
232 }
233
234 int eFilePushThread::start(const char *filename, int fd_dest)
235 {
236         if (m_raw_source.open(filename) < 0)
237                 return -1;
238         m_fd_dest = fd_dest;
239         resume();
240         return 0;
241 }
242
243 void eFilePushThread::stop()
244 {
245                 /* if we aren't running, don't bother stopping. */
246         if (!sync())
247                 return;
248
249         m_stop = 1;
250
251         // fixmee.. here we need a better solution to ensure
252         // that the thread context take notice of the signal
253         // even when no syscall is in progress
254         while(!sendSignal(SIGUSR1))
255         {
256                 eDebug("send SIGUSR1 to thread context");
257                 usleep(5000); // wait msek
258         }
259         kill();
260 }
261
262 void eFilePushThread::pause()
263 {
264         stop();
265 }
266
267 void eFilePushThread::seek(int whence, off_t where)
268 {
269         m_raw_source.lseek(where, whence);
270 }
271
272 void eFilePushThread::resume()
273 {
274         m_stop = 0;
275         run();
276 }
277
278 void eFilePushThread::flush()
279 {
280         m_buf_start = m_buf_end = m_filter_end = 0;
281 }
282
283 void eFilePushThread::enablePVRCommit(int s)
284 {
285         m_send_pvr_commit = s;
286 }
287
288 void eFilePushThread::setStreamMode(int s)
289 {
290         m_stream_mode = s;
291 }
292
293 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
294 {
295         m_sg = sg;
296 }
297
298 void eFilePushThread::sendEvent(int evt)
299 {
300         m_messagepump.send(evt);
301 }
302
303 void eFilePushThread::recvEvent(const int &evt)
304 {
305         m_event(evt);
306 }
307
308 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
309 {
310         return len;
311 }