645d36851cbaadefd0c7ce244c378d7663118ae0
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         int already_empty = 0;
41         eDebug("FILEPUSH THREAD START");
42         
43                 /* we set the signal to not restart syscalls, so we can detect our signal. */
44         struct sigaction act;
45         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
46         act.sa_flags = 0;
47         sigaction(SIGUSR1, &act, 0);
48         
49         hasStarted();
50         
51         source_pos = m_raw_source.lseek(0, SEEK_CUR);
52         
53                 /* m_stop must be evaluated after each syscall. */
54         while (!m_stop)
55         {
56                         /* first try flushing the bufptr */
57                 if (m_buf_start != m_buf_end)
58                 {
59                                 /* filterRecordData wants to work on multiples of blocksize.
60                                    if it returns a negative result, it means that this many bytes should be skipped
61                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
62                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
63                                    Skipped bytes will also not be output.
64
65                                    if it returns a positive result, that means that only these many bytes should be used
66                                    in the buffer. 
67                                    
68                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
69                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
70                                    skip some data). This is probably a very special application for fast-forward, where the current
71                                    span is to be cancelled after a complete iframe has been output.
72
73                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
74                                    
75                                    we filter data only once, of course, but it might not get immediately written.
76                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
77                                 */
78                         
79                         int filter_res;
80                         
81                         do
82                         {
83                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
84
85                                 if (filter_res < 0)
86                                 {
87                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
88                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
89                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
90                                         continue; /* try again */
91                                 }
92                                 
93                                         /* adjust end of buffer to strip dropped tail bytes */
94                                 m_buf_end = m_filter_end + filter_res;
95                                         /* mark data as filtered. */
96                                 m_filter_end = m_buf_end;
97                         } while (0);
98                         
99                         ASSERT(m_filter_end == m_buf_end);
100                         
101                         if (m_buf_start == m_buf_end)
102                                 continue;
103
104                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
105                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
106                                 */
107                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
108 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
109 //                      eDebug("wrote %d bytes", w);
110                         if (w <= 0)
111                         {
112                                 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
113                                         continue;
114                                 eDebug("eFilePushThread WRITE ERROR");
115                                 sendEvent(evtWriteError);
116                                 break;
117                                 // ... we would stop the thread
118                         }
119
120                         written_since_last_sync += w;
121
122                         if (written_since_last_sync >= 512*1024)
123                         {
124                                 int toflush = written_since_last_sync > 2*1024*1024 ?
125                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
126                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
127                                 dest_pos -= toflush;
128                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
129                                 written_since_last_sync -= toflush;
130                         }
131
132 //                      printf("FILEPUSH: wrote %d bytes\n", w);
133                         m_buf_start += w;
134                         continue;
135                 }
136
137                         /* now fill our buffer. */
138                         
139                 if (m_sg && !current_span_remaining)
140                 {
141                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
142                         ASSERT(!(current_span_remaining % m_blocksize));
143
144                         if (source_pos != current_span_offset)
145                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
146                         bytes_read = 0;
147                 }
148                 
149                 size_t maxread = sizeof(m_buffer);
150                 
151                         /* if we have a source span, don't read past the end */
152                 if (m_sg && maxread > current_span_remaining)
153                         maxread = current_span_remaining;
154
155                         /* align to blocksize */
156                 maxread -= maxread % m_blocksize;
157
158                 m_buf_start = 0;
159                 m_filter_end = 0;
160                 m_buf_end = 0;
161                 
162                 if (maxread)
163                         m_buf_end = m_raw_source.read(m_buffer, maxread);
164
165                 if (m_buf_end < 0)
166                 {
167                         m_buf_end = 0;
168                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
169                                 continue;
170                         if (errno == EOVERFLOW)
171                         {
172                                 eWarning("OVERFLOW while recording");
173                                 continue;
174                         }
175                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
176                 }
177
178                         /* a read might be mis-aligned in case of a short read. */
179                 int d = m_buf_end % m_blocksize;
180                 if (d)
181                 {
182                         m_raw_source.lseek(-d, SEEK_CUR);
183                         m_buf_end -= d;
184                 }
185
186                 if (m_buf_end == 0)
187                 {
188                                 /* on EOF, try COMMITting once. */
189                         if (m_send_pvr_commit && !already_empty)
190                         {
191                                 eDebug("sending PVR commit");
192                                 
193                                 struct pollfd pfd[1] = {m_fd_dest, POLLHUP};
194                                 poll(pfd, 1, 10000);
195                                 sleep(5); /* HACK to allow ES buffer to drain */
196                                 already_empty = 1;
197 //                              if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
198 //                                      continue;
199                                 eDebug("commit done");
200                                                 /* well check again */
201                                 continue;
202                         }
203                         
204                                 /* in stream_mode, we are sending EOF events 
205                                    over and over until somebody responds.
206                                    
207                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
208                         sendEvent(evtEOF);
209
210                         if (m_stream_mode)
211                         {
212                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
213                                 sleep(1);
214                                 continue;
215                         }
216 #if 0
217                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
218                         if (!m_raw_source.lseek(0, SEEK_SET))
219                         {
220                                 eDebug("(looping)");
221                                 continue;
222                         }
223 #endif
224                         break;
225                 } else
226                 {
227                         source_pos += m_buf_end;
228                         bytes_read += m_buf_end;
229                         if (m_sg)
230                                 current_span_remaining -= m_buf_end;
231                         already_empty = 0;
232                 }
233 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
234         }
235         fdatasync(m_fd_dest);
236
237         eDebug("FILEPUSH THREAD STOP");
238 }
239
240 void eFilePushThread::start(int fd_source, int fd_dest)
241 {
242         m_raw_source.setfd(fd_source);
243         m_fd_dest = fd_dest;
244         resume();
245 }
246
247 int eFilePushThread::start(const char *filename, int fd_dest)
248 {
249         if (m_raw_source.open(filename) < 0)
250                 return -1;
251         m_fd_dest = fd_dest;
252         resume();
253         return 0;
254 }
255
256 void eFilePushThread::stop()
257 {
258                 /* if we aren't running, don't bother stopping. */
259         if (!sync())
260                 return;
261
262         m_stop = 1;
263
264         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
265         sendSignal(SIGUSR1);
266         kill(0);
267 }
268
269 void eFilePushThread::pause()
270 {
271         stop();
272 }
273
274 void eFilePushThread::seek(int whence, off_t where)
275 {
276         m_raw_source.lseek(where, whence);
277 }
278
279 void eFilePushThread::resume()
280 {
281         m_stop = 0;
282         run();
283 }
284
285 void eFilePushThread::flush()
286 {
287         m_buf_start = m_buf_end = m_filter_end = 0;
288 }
289
290 void eFilePushThread::enablePVRCommit(int s)
291 {
292         m_send_pvr_commit = s;
293 }
294
295 void eFilePushThread::setStreamMode(int s)
296 {
297         m_stream_mode = s;
298 }
299
300 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
301 {
302         m_sg = sg;
303 }
304
305 void eFilePushThread::sendEvent(int evt)
306 {
307         m_messagepump.send(evt);
308 }
309
310 void eFilePushThread::recvEvent(const int &evt)
311 {
312         m_event(evt);
313 }
314
315 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
316 {
317         return len;
318 }