give more priority to socketnotifiers(fd's) in mainloop
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 eFilePushThread::eFilePushThread(): m_messagepump(eApp, 0)
10 {
11         m_stop = 0;
12         m_sg = 0;
13         flush();
14         enablePVRCommit(0);
15         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
16 }
17
18 static void signal_handler(int x)
19 {
20 }
21
22 void eFilePushThread::thread()
23 {
24         off_t dest_pos = 0, source_pos = 0;
25         size_t bytes_read = 0;
26         
27         off_t current_span_offset = 0;
28         size_t current_span_remaining = 0;
29         
30         size_t written_since_last_sync = 0;
31         
32         int already_empty = 0;
33         eDebug("FILEPUSH THREAD START");
34         
35                 /* we set the signal to not restart syscalls, so we can detect our signal. */
36         struct sigaction act;
37         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
38         act.sa_flags = 0;
39         sigaction(SIGUSR1, &act, 0);
40         
41         hasStarted();
42         
43         dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
44         source_pos = m_raw_source.lseek(0, SEEK_CUR);
45         
46                 /* m_stop must be evaluated after each syscall. */
47         while (!m_stop)
48         {
49                         /* first try flushing the bufptr */
50                 if (m_buf_start != m_buf_end)
51                 {
52                                 // TODO: take care of boundaries.
53                         filterRecordData(m_buffer + m_buf_start, m_buf_end - m_buf_start);
54                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
55 //                      eDebug("wrote %d bytes", w);
56                         if (w <= 0)
57                         {
58                                 if (errno == EINTR)
59                                         continue;
60                                 break;
61                                 // ... we would stop the thread
62                         }
63
64 //                      posix_fadvise(m_fd_dest, dest_pos, w, POSIX_FADV_DONTNEED);
65
66                         dest_pos += w;
67                         written_since_last_sync += w;
68                         
69                         if (written_since_last_sync >= 2048*1024)
70                         {
71                                 fdatasync(m_fd_dest);
72                                 written_since_last_sync = 0;
73                         }
74
75 //                      printf("FILEPUSH: wrote %d bytes\n", w);
76                         m_buf_start += w;
77                         continue;
78                 }
79
80                         /* now fill our buffer. */
81                         
82                 if (m_sg && !current_span_remaining)
83                 {
84                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
85
86                         if (source_pos != current_span_offset)
87                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
88                         bytes_read = 0;
89                 }
90                 
91                 size_t maxread = sizeof(m_buffer);
92                 
93                         /* if we have a source span, don't read past the end */
94                 if (m_sg && maxread > current_span_remaining)
95                         maxread = current_span_remaining;
96
97                 m_buf_start = 0;
98                 m_buf_end = 0;
99                 
100                 if (maxread)
101                         m_buf_end = m_raw_source.read(m_buffer, maxread);
102
103                 if (m_buf_end < 0)
104                 {
105                         m_buf_end = 0;
106                         if (errno == EINTR)
107                                 continue;
108                         if (errno == EOVERFLOW)
109                         {
110                                 eWarning("OVERFLOW while recording");
111                                 continue;
112                         }
113                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
114                 }
115                 if (m_buf_end == 0)
116                 {
117                                 /* on EOF, try COMMITting once. */
118                         if (m_send_pvr_commit && !already_empty)
119                         {
120                                 eDebug("sending PVR commit");
121                                 already_empty = 1;
122                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
123                                         continue;
124                                 eDebug("commit done");
125                                                 /* well check again */
126                                 continue;
127                         }
128                         sendEvent(evtEOF);
129 #if 0
130                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
131                         if (!m_raw_source.lseek(0, SEEK_SET))
132                         {
133                                 eDebug("(looping)");
134                                 continue;
135                         }
136 #endif
137                         break;
138                 } else
139                 {
140                         source_pos += m_buf_end;
141                         bytes_read += m_buf_end;
142                         if (m_sg)
143                                 current_span_remaining -= m_buf_end;
144                         already_empty = 0;
145                 }
146 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
147         }
148         
149         eDebug("FILEPUSH THREAD STOP");
150 }
151
152 void eFilePushThread::start(int fd_source, int fd_dest)
153 {
154         m_raw_source.setfd(fd_source);
155         m_fd_dest = fd_dest;
156         resume();
157 }
158
159 int eFilePushThread::start(const char *filename, int fd_dest)
160 {
161         if (m_raw_source.open(filename) < 0)
162                 return -1;
163         m_fd_dest = fd_dest;
164         resume();
165         return 0;
166 }
167
168 void eFilePushThread::stop()
169 {
170                 /* if we aren't running, don't bother stopping. */
171         if (!sync())
172                 return;
173
174         m_stop = 1;
175         sendSignal(SIGUSR1);
176         kill();
177 }
178
179 void eFilePushThread::pause()
180 {
181         stop();
182 }
183
184 void eFilePushThread::seek(int whence, off_t where)
185 {
186         m_raw_source.lseek(where, whence);
187 }
188
189 void eFilePushThread::resume()
190 {
191         m_stop = 0;
192         run();
193 }
194
195 void eFilePushThread::flush()
196 {
197         m_buf_start = m_buf_end = 0;
198 }
199
200 void eFilePushThread::enablePVRCommit(int s)
201 {
202         m_send_pvr_commit = s;
203 }
204
205 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
206 {
207         m_sg = sg;
208 }
209
210 void eFilePushThread::sendEvent(int evt)
211 {
212         m_messagepump.send(evt);
213 }
214
215 void eFilePushThread::recvEvent(const int &evt)
216 {
217         m_event(evt);
218 }
219
220 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
221 {
222         /* do nothing */
223 }
224