use __deref__() until we have something better
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level)
10         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
11 {
12         m_stop = 0;
13         m_sg = 0;
14         m_send_pvr_commit = 0;
15         m_stream_mode = 0;
16         flush();
17         enablePVRCommit(0);
18         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
19 }
20
21 static void signal_handler(int x)
22 {
23 }
24
25 void eFilePushThread::thread()
26 {
27         setIoPrio(prio_class, prio);
28
29         off_t dest_pos = 0, source_pos = 0;
30         size_t bytes_read = 0;
31         
32         off_t current_span_offset = 0;
33         size_t current_span_remaining = 0;
34         
35         size_t written_since_last_sync = 0;
36
37         int already_empty = 0;
38         eDebug("FILEPUSH THREAD START");
39         
40                 /* we set the signal to not restart syscalls, so we can detect our signal. */
41         struct sigaction act;
42         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
43         act.sa_flags = 0;
44         sigaction(SIGUSR1, &act, 0);
45         
46         hasStarted();
47         
48         source_pos = m_raw_source.lseek(0, SEEK_CUR);
49         
50                 /* m_stop must be evaluated after each syscall. */
51         while (!m_stop)
52         {
53                         /* first try flushing the bufptr */
54                 if (m_buf_start != m_buf_end)
55                 {
56                                 // TODO: take care of boundaries.
57                         filterRecordData(m_buffer + m_buf_start, m_buf_end - m_buf_start);
58                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
59 //                      eDebug("wrote %d bytes", w);
60                         if (w <= 0)
61                         {
62                                 if (errno == EINTR)
63                                         continue;
64                                 eDebug("eFilePushThread WRITE ERROR");
65                                 sendEvent(evtWriteError);
66                                 break;
67                                 // ... we would stop the thread
68                         }
69
70                         written_since_last_sync += w;
71
72                         if (written_since_last_sync >= 512*1024)
73                         {
74                                 int toflush = written_since_last_sync > 2*1024*1024 ?
75                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
76                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
77                                 dest_pos -= toflush;
78                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
79                                 written_since_last_sync -= toflush;
80                         }
81
82 //                      printf("FILEPUSH: wrote %d bytes\n", w);
83                         m_buf_start += w;
84                         continue;
85                 }
86
87                         /* now fill our buffer. */
88                         
89                 if (m_sg && !current_span_remaining)
90                 {
91                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
92
93                         if (source_pos != current_span_offset)
94                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
95                         bytes_read = 0;
96                 }
97                 
98                 size_t maxread = sizeof(m_buffer);
99                 
100                         /* if we have a source span, don't read past the end */
101                 if (m_sg && maxread > current_span_remaining)
102                         maxread = current_span_remaining;
103
104                 m_buf_start = 0;
105                 m_buf_end = 0;
106                 
107                 if (maxread)
108                         m_buf_end = m_raw_source.read(m_buffer, maxread);
109
110                 if (m_buf_end < 0)
111                 {
112                         m_buf_end = 0;
113                         if (errno == EINTR)
114                                 continue;
115                         if (errno == EOVERFLOW)
116                         {
117                                 eWarning("OVERFLOW while recording");
118                                 continue;
119                         }
120                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
121                 }
122                 if (m_buf_end == 0)
123                 {
124                                 /* on EOF, try COMMITting once. */
125                         if (m_send_pvr_commit && !already_empty)
126                         {
127                                 eDebug("sending PVR commit");
128                                 already_empty = 1;
129                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
130                                         continue;
131                                 eDebug("commit done");
132                                                 /* well check again */
133                                 continue;
134                         }
135                         
136                                 /* in stream_mode, we are sending EOF events 
137                                    over and over until somebody responds.
138                                    
139                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
140                         sendEvent(evtEOF);
141
142                         if (m_stream_mode)
143                         {
144                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
145                                 sleep(1);
146                                 continue;
147                         }
148 #if 0
149                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
150                         if (!m_raw_source.lseek(0, SEEK_SET))
151                         {
152                                 eDebug("(looping)");
153                                 continue;
154                         }
155 #endif
156                         break;
157                 } else
158                 {
159                         source_pos += m_buf_end;
160                         bytes_read += m_buf_end;
161                         if (m_sg)
162                                 current_span_remaining -= m_buf_end;
163                         already_empty = 0;
164                 }
165 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
166         }
167         fdatasync(m_fd_dest);
168
169         eDebug("FILEPUSH THREAD STOP");
170 }
171
172 void eFilePushThread::start(int fd_source, int fd_dest)
173 {
174         m_raw_source.setfd(fd_source);
175         m_fd_dest = fd_dest;
176         resume();
177 }
178
179 int eFilePushThread::start(const char *filename, int fd_dest)
180 {
181         if (m_raw_source.open(filename) < 0)
182                 return -1;
183         m_fd_dest = fd_dest;
184         resume();
185         return 0;
186 }
187
188 void eFilePushThread::stop()
189 {
190                 /* if we aren't running, don't bother stopping. */
191         if (!sync())
192                 return;
193
194         m_stop = 1;
195
196         // fixmee.. here we need a better solution to ensure
197         // that the thread context take notice of the signal
198         // even when no syscall is in progress
199         while(!sendSignal(SIGUSR1))
200         {
201                 eDebug("send SIGUSR1 to thread context");
202                 usleep(5000); // wait msek
203         }
204         kill();
205 }
206
207 void eFilePushThread::pause()
208 {
209         stop();
210 }
211
212 void eFilePushThread::seek(int whence, off_t where)
213 {
214         m_raw_source.lseek(where, whence);
215 }
216
217 void eFilePushThread::resume()
218 {
219         m_stop = 0;
220         run();
221 }
222
223 void eFilePushThread::flush()
224 {
225         m_buf_start = m_buf_end = 0;
226 }
227
228 void eFilePushThread::enablePVRCommit(int s)
229 {
230         m_send_pvr_commit = s;
231 }
232
233 void eFilePushThread::setStreamMode(int s)
234 {
235         m_stream_mode = s;
236 }
237
238 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
239 {
240         m_sg = sg;
241 }
242
243 void eFilePushThread::sendEvent(int evt)
244 {
245         m_messagepump.send(evt);
246 }
247
248 void eFilePushThread::recvEvent(const int &evt)
249 {
250         m_event(evt);
251 }
252
253 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
254 {
255         /* do nothing */
256 }
257