add possibility to do things after thread start.. but before setting the thread to...
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 eFilePushThread::eFilePushThread(): m_messagepump(eApp, 0)
10 {
11         m_stop = 0;
12         m_sg = 0;
13         flush();
14         enablePVRCommit(0);
15         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
16 }
17
18 static void signal_handler(int x)
19 {
20 }
21
22 void eFilePushThread::thread()
23 {
24         off_t dest_pos = 0, source_pos = 0;
25         size_t bytes_read = 0;
26         
27         off_t current_span_offset = 0;
28         size_t current_span_remaining = 0;
29         
30         size_t written_since_last_sync = 0;
31         
32         int already_empty = 0;
33         eDebug("FILEPUSH THREAD START");
34                 // this is a race. FIXME.
35         
36         dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
37         source_pos = m_raw_source.lseek(0, SEEK_CUR);
38         
39                 /* m_stop must be evaluated after each syscall. */
40         while (!m_stop)
41         {
42                         /* first try flushing the bufptr */
43                 if (m_buf_start != m_buf_end)
44                 {
45                                 // TODO: take care of boundaries.
46                         filterRecordData(m_buffer + m_buf_start, m_buf_end - m_buf_start);
47                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
48 //                      eDebug("wrote %d bytes", w);
49                         if (w <= 0)
50                         {
51                                 if (errno == EINTR)
52                                         continue;
53                                 break;
54                                 // ... we would stop the thread
55                         }
56
57 //                      posix_fadvise(m_fd_dest, dest_pos, w, POSIX_FADV_DONTNEED);
58
59                         dest_pos += w;
60                         written_since_last_sync += w;
61                         
62                         if (written_since_last_sync >= 2048*1024)
63                         {
64                                 fdatasync(m_fd_dest);
65                                 written_since_last_sync = 0;
66                         }
67
68 //                      printf("FILEPUSH: wrote %d bytes\n", w);
69                         m_buf_start += w;
70                         continue;
71                 }
72
73                         /* now fill our buffer. */
74                         
75                 if (m_sg && !current_span_remaining)
76                 {
77                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
78
79                         if (source_pos != current_span_offset)
80                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
81                         bytes_read = 0;
82                 }
83                 
84                 size_t maxread = sizeof(m_buffer);
85                 
86                         /* if we have a source span, don't read past the end */
87                 if (m_sg && maxread > current_span_remaining)
88                         maxread = current_span_remaining;
89
90                 m_buf_start = 0;
91                 m_buf_end = 0;
92                 
93                 if (maxread)
94                         m_buf_end = m_raw_source.read(m_buffer, maxread);
95
96                 if (m_buf_end < 0)
97                 {
98                         m_buf_end = 0;
99                         if (errno == EINTR)
100                                 continue;
101                         if (errno == EOVERFLOW)
102                         {
103                                 eWarning("OVERFLOW while recording");
104                                 continue;
105                         }
106                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
107                 }
108                 if (m_buf_end == 0)
109                 {
110                                 /* on EOF, try COMMITting once. */
111                         if (m_send_pvr_commit && !already_empty)
112                         {
113                                 eDebug("sending PVR commit");
114                                 already_empty = 1;
115                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
116                                         continue;
117                                 eDebug("commit done");
118                                                 /* well check again */
119                                 continue;
120                         }
121                         sendEvent(evtEOF);
122 #if 0
123                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
124                         if (!m_raw_source.lseek(0, SEEK_SET))
125                         {
126                                 eDebug("(looping)");
127                                 continue;
128                         }
129 #endif
130                         break;
131                 } else
132                 {
133                         source_pos += m_buf_end;
134                         bytes_read += m_buf_end;
135                         if (m_sg)
136                                 current_span_remaining -= m_buf_end;
137                         already_empty = 0;
138                 }
139 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
140         }
141         
142         eDebug("FILEPUSH THREAD STOP");
143 }
144
145 void eFilePushThread::start(int fd_source, int fd_dest)
146 {
147         m_raw_source.setfd(fd_source);
148         m_fd_dest = fd_dest;
149         resume();
150 }
151
152 int eFilePushThread::start(const char *filename, int fd_dest)
153 {
154         if (m_raw_source.open(filename) < 0)
155                 return -1;
156         m_fd_dest = fd_dest;
157         resume();
158         return 0;
159 }
160
161 void eFilePushThread::installSigUSR1Handler()
162 {
163                 /* we set the signal to not restart syscalls, so we can detect our signal. */
164         struct sigaction act;
165         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
166         act.sa_flags = 0;
167         sigaction(SIGUSR1, &act, 0);
168 }
169
170 // called from thread before alive is set to 1
171 void eFilePushThread::before_set_thread_alive()
172 {
173         installSigUSR1Handler();
174 }
175
176 void eFilePushThread::stop()
177 {
178         if (!thread_running()) /* FIXME: races */
179                 return;
180         m_stop = 1;
181         sendSignal(SIGUSR1);
182         kill();
183 }
184
185 void eFilePushThread::pause()
186 {
187         stop();
188 }
189
190 void eFilePushThread::seek(int whence, off_t where)
191 {
192         m_raw_source.lseek(where, whence);
193 }
194
195 void eFilePushThread::resume()
196 {
197         m_stop = 0;
198         run();
199 }
200
201 void eFilePushThread::flush()
202 {
203         m_buf_start = m_buf_end = 0;
204 }
205
206 void eFilePushThread::enablePVRCommit(int s)
207 {
208         m_send_pvr_commit = s;
209 }
210
211 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
212 {
213         m_sg = sg;
214 }
215
216 void eFilePushThread::sendEvent(int evt)
217 {
218         m_messagepump.send(evt);
219 }
220
221 void eFilePushThread::recvEvent(const int &evt)
222 {
223         m_event(evt);
224 }
225
226 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
227 {
228         /* do nothing */
229 }
230