8aed99fa3b611eeb14d1d43f3372c85b9d430dc6
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 eFilePushThread::eFilePushThread(): m_messagepump(eApp, 0)
10 {
11         m_stop = 0;
12         m_sg = 0;
13         flush();
14         enablePVRCommit(0);
15         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
16 }
17
18 static void signal_handler(int x)
19 {
20 }
21
22 void eFilePushThread::thread()
23 {
24         off_t dest_pos = 0, source_pos = 0;
25         size_t bytes_read = 0;
26         
27         off_t current_span_offset = 0;
28         size_t current_span_remaining = 0;
29         
30         size_t written_since_last_sync = 0;
31         
32         int already_empty = 0;
33         eDebug("FILEPUSH THREAD START");
34                 // this is a race. FIXME.
35         
36                 /* we set the signal to not restart syscalls, so we can detect our signal. */
37         struct sigaction act;
38         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
39         act.sa_flags = 0;
40         sigaction(SIGUSR1, &act, 0);
41         
42         dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
43         source_pos = m_raw_source.lseek(0, SEEK_CUR);
44         
45                 /* m_stop must be evaluated after each syscall. */
46         while (!m_stop)
47         {
48                         /* first try flushing the bufptr */
49                 if (m_buf_start != m_buf_end)
50                 {
51                                 // TODO: take care of boundaries.
52                         filterRecordData(m_buffer + m_buf_start, m_buf_end - m_buf_start);
53                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
54 //                      eDebug("wrote %d bytes", w);
55                         if (w <= 0)
56                         {
57                                 if (errno == EINTR)
58                                         continue;
59                                 break;
60                                 // ... we would stop the thread
61                         }
62
63 //                      posix_fadvise(m_fd_dest, dest_pos, w, POSIX_FADV_DONTNEED);
64
65                         dest_pos += w;
66                         written_since_last_sync += w;
67                         
68                         if (written_since_last_sync >= 2048*1024)
69                         {
70                                 fdatasync(m_fd_dest);
71                                 written_since_last_sync = 0;
72                         }
73
74 //                      printf("FILEPUSH: wrote %d bytes\n", w);
75                         m_buf_start += w;
76                         continue;
77                 }
78
79                         /* now fill our buffer. */
80                         
81                 if (m_sg && !current_span_remaining)
82                 {
83                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
84
85                         if (source_pos != current_span_offset)
86                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
87                         bytes_read = 0;
88                 }
89                 
90                 size_t maxread = sizeof(m_buffer);
91                 
92                         /* if we have a source span, don't read past the end */
93                 if (m_sg && maxread > current_span_remaining)
94                         maxread = current_span_remaining;
95
96                 m_buf_start = 0;
97                 m_buf_end = 0;
98                 
99                 if (maxread)
100                         m_buf_end = m_raw_source.read(m_buffer, maxread);
101
102                 if (m_buf_end < 0)
103                 {
104                         m_buf_end = 0;
105                         if (errno == EINTR)
106                                 continue;
107                         if (errno == EOVERFLOW)
108                         {
109                                 eWarning("OVERFLOW while recording");
110                                 continue;
111                         }
112                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
113                 }
114                 if (m_buf_end == 0)
115                 {
116                                 /* on EOF, try COMMITting once. */
117                         if (m_send_pvr_commit && !already_empty)
118                         {
119                                 eDebug("sending PVR commit");
120                                 already_empty = 1;
121                                 if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
122                                         continue;
123                                 eDebug("commit done");
124                                                 /* well check again */
125                                 continue;
126                         }
127                         sendEvent(evtEOF);
128 #if 0
129                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
130                         if (!m_raw_source.lseek(0, SEEK_SET))
131                         {
132                                 eDebug("(looping)");
133                                 continue;
134                         }
135 #endif
136                         break;
137                 } else
138                 {
139                         source_pos += m_buf_end;
140                         bytes_read += m_buf_end;
141                         if (m_sg)
142                                 current_span_remaining -= m_buf_end;
143                         already_empty = 0;
144                 }
145 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
146         }
147         
148         eDebug("FILEPUSH THREAD STOP");
149 }
150
151 void eFilePushThread::start(int fd_source, int fd_dest)
152 {
153         m_raw_source.setfd(fd_source);
154         m_fd_dest = fd_dest;
155         resume();
156 }
157
158 int eFilePushThread::start(const char *filename, int fd_dest)
159 {
160         if (m_raw_source.open(filename) < 0)
161                 return -1;
162         m_fd_dest = fd_dest;
163         resume();
164         return 0;
165 }
166
167 void eFilePushThread::stop()
168 {
169         if (!thread_running()) /* FIXME: races */
170                 return;
171         m_stop = 1;
172         sendSignal(SIGUSR1);
173         kill();
174 }
175
176 void eFilePushThread::pause()
177 {
178         stop();
179 }
180
181 void eFilePushThread::seek(int whence, off_t where)
182 {
183         m_raw_source.lseek(where, whence);
184 }
185
186 void eFilePushThread::resume()
187 {
188         m_stop = 0;
189         run();
190 }
191
192 void eFilePushThread::flush()
193 {
194         m_buf_start = m_buf_end = 0;
195 }
196
197 void eFilePushThread::enablePVRCommit(int s)
198 {
199         m_send_pvr_commit = s;
200 }
201
202 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
203 {
204         m_sg = sg;
205 }
206
207 void eFilePushThread::sendEvent(int evt)
208 {
209         m_messagepump.send(evt);
210 }
211
212 void eFilePushThread::recvEvent(const int &evt)
213 {
214         m_event(evt);
215 }
216
217 void eFilePushThread::filterRecordData(const unsigned char *data, int len)
218 {
219         /* do nothing */
220 }
221