1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
9 //FILE *f = fopen("/log.ts", "wb");
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12 :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
16 m_send_pvr_commit = 0;
18 m_blocksize = blocksize;
21 CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
24 static void signal_handler(int x)
28 void eFilePushThread::thread()
30 setIoPrio(prio_class, prio);
32 off_t dest_pos = 0, source_pos = 0;
33 size_t bytes_read = 0;
35 off_t current_span_offset = 0;
36 size_t current_span_remaining = 0;
38 size_t written_since_last_sync = 0;
40 int already_empty = 0;
41 eDebug("FILEPUSH THREAD START");
43 /* we set the signal to not restart syscalls, so we can detect our signal. */
45 act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
47 sigaction(SIGUSR1, &act, 0);
51 source_pos = m_raw_source.lseek(0, SEEK_CUR);
53 /* m_stop must be evaluated after each syscall. */
56 /* first try flushing the bufptr */
57 if (m_buf_start != m_buf_end)
59 /* filterRecordData wants to work on multiples of blocksize.
60 if it returns a negative result, it means that this many bytes should be skipped
61 *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
62 if filterRecordData wants to skip more data then currently available, it must do that internally.
63 Skipped bytes will also not be output.
65 if it returns a positive result, that means that only these many bytes should be used
68 In either case, current_span_remaining is given as a reference and can be modified. (Of course it
69 doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
70 skip some data). This is probably a very special application for fast-forward, where the current
71 span is to be cancelled after a complete iframe has been output.
73 we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
75 we filter data only once, of course, but it might not get immediately written.
76 that's what m_filter_end is for - it points to the start of the unfiltered data.
83 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
87 eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
88 m_buf_start = m_filter_end + -filter_res; /* this will also drop unwritten data */
89 ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
90 continue; /* try again */
93 /* adjust end of buffer to strip dropped tail bytes */
94 m_buf_end = m_filter_end + filter_res;
95 /* mark data as filtered. */
96 m_filter_end = m_buf_end;
99 ASSERT(m_filter_end == m_buf_end);
101 if (m_buf_start == m_buf_end)
104 /* now write out data. it will be 'aligned' (according to filterRecordData).
105 absolutely forbidden is to return EINTR and consume a non-aligned number of bytes.
107 int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
108 // fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
109 // eDebug("wrote %d bytes", w);
112 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
114 eDebug("eFilePushThread WRITE ERROR");
115 sendEvent(evtWriteError);
117 // ... we would stop the thread
120 written_since_last_sync += w;
122 if (written_since_last_sync >= 512*1024)
124 int toflush = written_since_last_sync > 2*1024*1024 ?
125 2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
126 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
128 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
129 written_since_last_sync -= toflush;
132 // printf("FILEPUSH: wrote %d bytes\n", w);
137 /* now fill our buffer. */
139 if (m_sg && !current_span_remaining)
141 m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
142 ASSERT(!(current_span_remaining % m_blocksize));
144 if (source_pos != current_span_offset)
145 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
149 size_t maxread = sizeof(m_buffer);
151 /* if we have a source span, don't read past the end */
152 if (m_sg && maxread > current_span_remaining)
153 maxread = current_span_remaining;
155 /* align to blocksize */
156 maxread -= maxread % m_blocksize;
163 m_buf_end = m_raw_source.read(m_buffer, maxread);
168 if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
170 if (errno == EOVERFLOW)
172 eWarning("OVERFLOW while recording");
175 eDebug("eFilePushThread *read error* (%m) - not yet handled");
178 /* a read might be mis-aligned in case of a short read. */
179 int d = m_buf_end % m_blocksize;
182 m_raw_source.lseek(-d, SEEK_CUR);
188 /* on EOF, try COMMITting once. */
189 if (m_send_pvr_commit && !already_empty)
191 eDebug("sending PVR commit");
193 struct pollfd pfd[1] = {m_fd_dest, POLLHUP};
195 sleep(5); /* HACK to allow ES buffer to drain */
197 // if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
199 eDebug("commit done");
200 /* well check again */
204 /* in stream_mode, we are sending EOF events
205 over and over until somebody responds.
207 in stream_mode, think of evtEOF as "buffer underrun occured". */
212 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
217 eDebug("FILEPUSH: end-of-file! (currently unhandled)");
218 if (!m_raw_source.lseek(0, SEEK_SET))
227 source_pos += m_buf_end;
228 bytes_read += m_buf_end;
230 current_span_remaining -= m_buf_end;
233 // printf("FILEPUSH: read %d bytes\n", m_buf_end);
235 fdatasync(m_fd_dest);
237 eDebug("FILEPUSH THREAD STOP");
240 void eFilePushThread::start(int fd_source, int fd_dest)
242 m_raw_source.setfd(fd_source);
247 int eFilePushThread::start(const char *filename, int fd_dest)
249 if (m_raw_source.open(filename) < 0)
256 void eFilePushThread::stop()
258 /* if we aren't running, don't bother stopping. */
264 // fixmee.. here we need a better solution to ensure
265 // that the thread context take notice of the signal
266 // even when no syscall is in progress
267 while(!sendSignal(SIGUSR1))
269 eDebug("send SIGUSR1 to thread context");
270 usleep(5000); // wait msek
275 void eFilePushThread::pause()
280 void eFilePushThread::seek(int whence, off_t where)
282 m_raw_source.lseek(where, whence);
285 void eFilePushThread::resume()
291 void eFilePushThread::flush()
293 m_buf_start = m_buf_end = m_filter_end = 0;
296 void eFilePushThread::enablePVRCommit(int s)
298 m_send_pvr_commit = s;
301 void eFilePushThread::setStreamMode(int s)
306 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
311 void eFilePushThread::sendEvent(int evt)
313 m_messagepump.send(evt);
316 void eFilePushThread::recvEvent(const int &evt)
321 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t ¤t_span_remaining)