a955e10b20e952f239d00690f6dcb3dded9208fc
[enigma2.git] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 eFilePushThread::eFilePushThread(): m_messagepump(eApp, 0)
10 {
11         m_stop = 0;
12         m_sg = 0;
13         flush();
14         enablePVRCommit(0);
15         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
16 }
17
18 static void signal_handler(int x)
19 {
20 }
21
22 void eFilePushThread::thread()
23 {
24         off_t dest_pos = 0, source_pos = 0;
25         size_t bytes_read = 0;
26         
27         off_t current_span_offset = 0;
28         size_t current_span_remaining = 0;
29         
30         int already_empty = 0;
31         eDebug("FILEPUSH THREAD START");
32                 // this is a race. FIXME.
33         
34                 /* we set the signal to not restart syscalls, so we can detect our signal. */
35         struct sigaction act;
36         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
37         act.sa_flags = 0;
38         sigaction(SIGUSR1, &act, 0);
39         
40         dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
41         source_pos = lseek(m_fd_source, 0, SEEK_CUR);
42         
43                 /* m_stop must be evaluated after each syscall. */
44         while (!m_stop)
45         {
46                         /* first try flushing the bufptr */
47                 if (m_buf_start != m_buf_end)
48                 {
49                                 // TODO: take care of boundaries.
50                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
51 //                      eDebug("wrote %d bytes", w);
52                         if (w <= 0)
53                         {
54                                 if (errno == -EINTR)
55                                         continue;
56                                 break;
57                                 // ... we would stop the thread
58                         }
59
60                                 /* this should flush all written pages to disk. */
61                         posix_fadvise(m_fd_dest, dest_pos, w, POSIX_FADV_DONTNEED);
62
63                         dest_pos += w;
64 //                      printf("FILEPUSH: wrote %d bytes\n", w);
65                         m_buf_start += w;
66                         continue;
67                 }
68
69                         /* now fill our buffer. */
70                         
71                 if (m_sg && !current_span_remaining)
72                 {
73                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
74
75                         if (source_pos != current_span_offset)
76                                 source_pos = lseek(m_fd_source, current_span_offset, SEEK_SET);
77                         bytes_read = 0;
78                 }
79                 
80                 size_t maxread = sizeof(m_buffer);
81                 
82                         /* if we have a source span, don't read past the end */
83                 if (m_sg && maxread > current_span_remaining)
84                         maxread = current_span_remaining;
85
86                 m_buf_start = 0;
87                 m_buf_end = 0;
88                 
89                 if (maxread)
90                         m_buf_end = read(m_fd_source, m_buffer, maxread);
91
92                 if (m_buf_end < 0)
93                 {
94                         m_buf_end = 0;
95                         if (errno == EINTR)
96                                 continue;
97                         eDebug("eFilePushThread *read error* - not yet handled");
98                 }
99                 if (m_buf_end == 0)
100                 {
101                                 /* on EOF, try COMMITting once. */
102                         if (m_send_pvr_commit && !already_empty)
103                         {
104                                 eDebug("sending PVR commit");
105                                 already_empty = 1;
106                                 if (::ioctl(m_fd_dest, PVR_COMMIT) == EINTR)
107                                         continue;
108                                 eDebug("commit done");
109                                                 /* well check again */
110                                 continue;
111                         }
112                         sendEvent(evtEOF);
113 #if 0
114                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
115                         if (!lseek(m_fd_source, 0, SEEK_SET))
116                         {
117                                 eDebug("(looping)");
118                                 continue;
119                         }
120 #endif
121                         break;
122                 } else
123                 {
124                         source_pos += m_buf_end;
125                         bytes_read += m_buf_end;
126                         if (m_sg)
127                                 current_span_remaining -= m_buf_end;
128                         already_empty = 0;
129                 }
130 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
131         }
132         
133         eDebug("FILEPUSH THREAD STOP");
134 }
135
136 void eFilePushThread::start(int fd_source, int fd_dest)
137 {
138         m_fd_source = fd_source;
139         m_fd_dest = fd_dest;
140         resume();
141 }
142
143 void eFilePushThread::stop()
144 {
145         if (!thread_running()) /* FIXME: races */
146                 return;
147         m_stop = 1;
148         sendSignal(SIGUSR1);
149         kill();
150 }
151
152 void eFilePushThread::pause()
153 {
154         stop();
155 }
156
157 void eFilePushThread::seek(int whence, off_t where)
158 {
159         ::lseek(m_fd_source, where, whence);
160 }
161
162 void eFilePushThread::resume()
163 {
164         m_stop = 0;
165         run();
166 }
167
168 void eFilePushThread::flush()
169 {
170         m_buf_start = m_buf_end = 0;
171 }
172
173 void eFilePushThread::enablePVRCommit(int s)
174 {
175         m_send_pvr_commit = s;
176 }
177
178 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
179 {
180         m_sg = sg;
181 }
182
183 void eFilePushThread::sendEvent(int evt)
184 {
185         m_messagepump.send(evt);
186 }
187
188 void eFilePushThread::recvEvent(const int &evt)
189 {
190         m_event(evt);
191 }