1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
9 //FILE *f = fopen("/log.ts", "wb");
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12 :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
16 m_send_pvr_commit = 0;
18 m_blocksize = blocksize;
21 CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
24 static void signal_handler(int x)
28 void eFilePushThread::thread()
30 setIoPrio(prio_class, prio);
32 off_t dest_pos = 0, source_pos = 0;
33 size_t bytes_read = 0;
35 off_t current_span_offset = 0;
36 size_t current_span_remaining = 0;
38 size_t written_since_last_sync = 0;
40 eDebug("FILEPUSH THREAD START");
42 /* we set the signal to not restart syscalls, so we can detect our signal. */
44 act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
46 sigaction(SIGUSR1, &act, 0);
50 source_pos = m_raw_source.lseek(0, SEEK_CUR);
52 /* m_stop must be evaluated after each syscall. */
55 /* first try flushing the bufptr */
56 if (m_buf_start != m_buf_end)
58 /* filterRecordData wants to work on multiples of blocksize.
59 if it returns a negative result, it means that this many bytes should be skipped
60 *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
61 if filterRecordData wants to skip more data then currently available, it must do that internally.
62 Skipped bytes will also not be output.
64 if it returns a positive result, that means that only these many bytes should be used
67 In either case, current_span_remaining is given as a reference and can be modified. (Of course it
68 doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
69 skip some data). This is probably a very special application for fast-forward, where the current
70 span is to be cancelled after a complete iframe has been output.
72 we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
74 we filter data only once, of course, but it might not get immediately written.
75 that's what m_filter_end is for - it points to the start of the unfiltered data.
82 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
86 eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
87 m_buf_start = m_filter_end + -filter_res; /* this will also drop unwritten data */
88 ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
89 continue; /* try again */
92 /* adjust end of buffer to strip dropped tail bytes */
93 m_buf_end = m_filter_end + filter_res;
94 /* mark data as filtered. */
95 m_filter_end = m_buf_end;
98 ASSERT(m_filter_end == m_buf_end);
100 if (m_buf_start == m_buf_end)
103 /* now write out data. it will be 'aligned' (according to filterRecordData).
104 absolutely forbidden is to return EINTR and consume a non-aligned number of bytes.
106 int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
107 // fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
108 // eDebug("wrote %d bytes", w);
111 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
113 eDebug("eFilePushThread WRITE ERROR");
114 sendEvent(evtWriteError);
116 // ... we would stop the thread
119 written_since_last_sync += w;
121 if (written_since_last_sync >= 512*1024)
123 int toflush = written_since_last_sync > 2*1024*1024 ?
124 2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
125 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
127 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
128 written_since_last_sync -= toflush;
131 // printf("FILEPUSH: wrote %d bytes\n", w);
136 /* now fill our buffer. */
138 if (m_sg && !current_span_remaining)
140 m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
141 ASSERT(!(current_span_remaining % m_blocksize));
143 if (source_pos != current_span_offset)
144 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
148 size_t maxread = sizeof(m_buffer);
150 /* if we have a source span, don't read past the end */
151 if (m_sg && maxread > current_span_remaining)
152 maxread = current_span_remaining;
154 /* align to blocksize */
155 maxread -= maxread % m_blocksize;
162 m_buf_end = m_raw_source.read(m_buffer, maxread);
167 if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
169 if (errno == EOVERFLOW)
171 eWarning("OVERFLOW while recording");
174 eDebug("eFilePushThread *read error* (%m) - not yet handled");
177 /* a read might be mis-aligned in case of a short read. */
178 int d = m_buf_end % m_blocksize;
181 m_raw_source.lseek(-d, SEEK_CUR);
187 /* on EOF, try COMMITting once. */
188 if (m_send_pvr_commit)
193 switch (poll(&pfd, 1, 250)) // wait for 250ms
196 eDebug("wait for driver eof timeout");
199 eDebug("wait for driver eof ok");
202 eDebug("wait for driver eof aborted by signal");
207 /* in stream_mode, we are sending EOF events
208 over and over until somebody responds.
210 in stream_mode, think of evtEOF as "buffer underrun occured". */
215 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
220 eDebug("FILEPUSH: end-of-file! (currently unhandled)");
221 if (!m_raw_source.lseek(0, SEEK_SET))
230 source_pos += m_buf_end;
231 bytes_read += m_buf_end;
233 current_span_remaining -= m_buf_end;
235 // printf("FILEPUSH: read %d bytes\n", m_buf_end);
237 fdatasync(m_fd_dest);
239 eDebug("FILEPUSH THREAD STOP");
242 void eFilePushThread::start(int fd_source, int fd_dest)
244 m_raw_source.setfd(fd_source);
249 int eFilePushThread::start(const char *filename, int fd_dest)
251 if (m_raw_source.open(filename) < 0)
258 void eFilePushThread::stop()
260 /* if we aren't running, don't bother stopping. */
266 eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
271 void eFilePushThread::pause()
276 void eFilePushThread::seek(int whence, off_t where)
278 m_raw_source.lseek(where, whence);
281 void eFilePushThread::resume()
287 void eFilePushThread::flush()
289 m_buf_start = m_buf_end = m_filter_end = 0;
292 void eFilePushThread::enablePVRCommit(int s)
294 m_send_pvr_commit = s;
297 void eFilePushThread::setStreamMode(int s)
302 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
307 void eFilePushThread::sendEvent(int evt)
309 m_messagepump.send(evt);
312 void eFilePushThread::recvEvent(const int &evt)
317 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t ¤t_span_remaining)