some small fixes
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         int already_empty = 0;
41         eDebug("FILEPUSH THREAD START");
42         
43                 /* we set the signal to not restart syscalls, so we can detect our signal. */
44         struct sigaction act;
45         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
46         act.sa_flags = 0;
47         sigaction(SIGUSR1, &act, 0);
48         
49         hasStarted();
50         
51         source_pos = m_raw_source.lseek(0, SEEK_CUR);
52         
53                 /* m_stop must be evaluated after each syscall. */
54         while (!m_stop)
55         {
56                         /* first try flushing the bufptr */
57                 if (m_buf_start != m_buf_end)
58                 {
59                                 /* filterRecordData wants to work on multiples of blocksize.
60                                    if it returns a negative result, it means that this many bytes should be skipped
61                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
62                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
63                                    Skipped bytes will also not be output.
64
65                                    if it returns a positive result, that means that only these many bytes should be used
66                                    in the buffer. 
67                                    
68                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
69                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
70                                    skip some data). This is probably a very special application for fast-forward, where the current
71                                    span is to be cancelled after a complete iframe has been output.
72
73                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
74                                    
75                                    we filter data only once, of course, but it might not get immediately written.
76                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
77                                 */
78                         
79                         int filter_res;
80                         
81                         do
82                         {
83                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
84
85                                 if (filter_res < 0)
86                                 {
87                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
88                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
89                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
90                                         continue; /* try again */
91                                 }
92                                 
93                                         /* adjust end of buffer to strip dropped tail bytes */
94                                 m_buf_end = m_filter_end + filter_res;
95                                         /* mark data as filtered. */
96                                 m_filter_end = m_buf_end;
97                         } while (0);
98                         
99                         ASSERT(m_filter_end == m_buf_end);
100                         
101                         if (m_buf_start == m_buf_end)
102                                 continue;
103
104                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
105                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
106                                 */
107                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
108 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
109 //                      eDebug("wrote %d bytes", w);
110                         if (w <= 0)
111                         {
112                                 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
113                                         continue;
114                                 eDebug("eFilePushThread WRITE ERROR");
115                                 sendEvent(evtWriteError);
116                                 break;
117                                 // ... we would stop the thread
118                         }
119
120                         written_since_last_sync += w;
121
122                         if (written_since_last_sync >= 512*1024)
123                         {
124                                 int toflush = written_since_last_sync > 2*1024*1024 ?
125                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
126                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
127                                 dest_pos -= toflush;
128                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
129                                 written_since_last_sync -= toflush;
130                         }
131
132 //                      printf("FILEPUSH: wrote %d bytes\n", w);
133                         m_buf_start += w;
134                         continue;
135                 }
136
137                         /* now fill our buffer. */
138                         
139                 if (m_sg && !current_span_remaining)
140                 {
141                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
142                         ASSERT(!(current_span_remaining % m_blocksize));
143
144                         if (source_pos != current_span_offset)
145                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
146                         bytes_read = 0;
147                 }
148                 
149                 size_t maxread = sizeof(m_buffer);
150                 
151                         /* if we have a source span, don't read past the end */
152                 if (m_sg && maxread > current_span_remaining)
153                         maxread = current_span_remaining;
154
155                         /* align to blocksize */
156                 maxread -= maxread % m_blocksize;
157
158                 m_buf_start = 0;
159                 m_filter_end = 0;
160                 m_buf_end = 0;
161                 
162                 if (maxread)
163                         m_buf_end = m_raw_source.read(m_buffer, maxread);
164
165                 if (m_buf_end < 0)
166                 {
167                         m_buf_end = 0;
168                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
169                                 continue;
170                         if (errno == EOVERFLOW)
171                         {
172                                 eWarning("OVERFLOW while recording");
173                                 continue;
174                         }
175                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
176                 }
177
178                         /* a read might be mis-aligned in case of a short read. */
179                 int d = m_buf_end % m_blocksize;
180                 if (d)
181                 {
182                         m_raw_source.lseek(-d, SEEK_CUR);
183                         m_buf_end -= d;
184                 }
185
186                 if (m_buf_end == 0)
187                 {
188                                 /* on EOF, try COMMITting once. */
189                         if (m_send_pvr_commit && !already_empty)
190                         {
191                                 eDebug("sending PVR commit");
192                                 already_empty = 1;
193                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
194                                         continue;
195                                 eDebug("commit done");
196                                                 /* well check again */
197                                 continue;
198                         }
199                         
200                                 /* in stream_mode, we are sending EOF events 
201                                    over and over until somebody responds.
202                                    
203                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
204                         sendEvent(evtEOF);
205
206                         if (m_stream_mode)
207                         {
208                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
209                                 sleep(1);
210                                 continue;
211                         }
212 #if 0
213                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
214                         if (!m_raw_source.lseek(0, SEEK_SET))
215                         {
216                                 eDebug("(looping)");
217                                 continue;
218                         }
219 #endif
220                         break;
221                 } else
222                 {
223                         source_pos += m_buf_end;
224                         bytes_read += m_buf_end;
225                         if (m_sg)
226                                 current_span_remaining -= m_buf_end;
227                         already_empty = 0;
228                 }
229 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
230         }
231         fdatasync(m_fd_dest);
232
233         eDebug("FILEPUSH THREAD STOP");
234 }
235
236 void eFilePushThread::start(int fd_source, int fd_dest)
237 {
238         m_raw_source.setfd(fd_source);
239         m_fd_dest = fd_dest;
240         resume();
241 }
242
243 int eFilePushThread::start(const char *filename, int fd_dest)
244 {
245         if (m_raw_source.open(filename) < 0)
246                 return -1;
247         m_fd_dest = fd_dest;
248         resume();
249         return 0;
250 }
251
252 void eFilePushThread::stop()
253 {
254                 /* if we aren't running, don't bother stopping. */
255         if (!sync())
256                 return;
257
258         m_stop = 1;
259
260         // fixmee.. here we need a better solution to ensure
261         // that the thread context take notice of the signal
262         // even when no syscall is in progress
263         while(!sendSignal(SIGUSR1))
264         {
265                 eDebug("send SIGUSR1 to thread context");
266                 usleep(5000); // wait msek
267         }
268         kill();
269 }
270
271 void eFilePushThread::pause()
272 {
273         stop();
274 }
275
276 void eFilePushThread::seek(int whence, off_t where)
277 {
278         m_raw_source.lseek(where, whence);
279 }
280
281 void eFilePushThread::resume()
282 {
283         m_stop = 0;
284         run();
285 }
286
287 void eFilePushThread::flush()
288 {
289         m_buf_start = m_buf_end = m_filter_end = 0;
290 }
291
292 void eFilePushThread::enablePVRCommit(int s)
293 {
294         m_send_pvr_commit = s;
295 }
296
297 void eFilePushThread::setStreamMode(int s)
298 {
299         m_stream_mode = s;
300 }
301
302 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
303 {
304         m_sg = sg;
305 }
306
307 void eFilePushThread::sendEvent(int evt)
308 {
309         m_messagepump.send(evt);
310 }
311
312 void eFilePushThread::recvEvent(const int &evt)
313 {
314         m_event(evt);
315 }
316
317 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
318 {
319         return len;
320 }