1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#include <config.h>
#include <lib/base/filepush.h>
#include <lib/base/eerror.h>
#include <errno.h>
#include <fcntl.h>
eFilePushThread::eFilePushThread()
{
m_stop = 0;
flush();
}
static void signal_handler(int x)
{
}
void eFilePushThread::thread()
{
off_t dest_pos = 0;
eDebug("FILEPUSH THREAD START");
// this is a race. FIXME.
/* we set the signal to not restart syscalls, so we can detect our signal. */
struct sigaction act;
act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
act.sa_flags = 0;
sigaction(SIGUSR1, &act, 0);
dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
/* m_stop must be evaluated after each syscall. */
while (!m_stop)
{
/* first try flushing the bufptr */
if (m_buf_start != m_buf_end)
{
// TODO: take care of boundaries.
int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
// eDebug("wrote %d bytes", w);
if (w <= 0)
{
if (errno == -EINTR)
continue;
eDebug("eFilePushThread *write error* (%m) - not yet handled");
// ... we would stop the thread
}
/* this should flush all written pages to disk. */
posix_fadvise(m_fd_dest, dest_pos, w, POSIX_FADV_DONTNEED);
dest_pos += w;
// printf("FILEPUSH: wrote %d bytes\n", w);
m_buf_start += w;
continue;
}
/* now fill our buffer. */
m_buf_start = 0;
m_buf_end = read(m_fd_source, m_buffer, sizeof(m_buffer));
if (m_buf_end < 0)
{
m_buf_end = 0;
if (errno == EINTR)
continue;
eDebug("eFilePushThread *read error* - not yet handled");
}
if (m_buf_end == 0)
{
eDebug("FILEPUSH: end-of-file! (currently unhandled)");
if (!lseek(m_fd_source, 0, SEEK_SET))
{
eDebug("(looping)");
continue;
}
break;
}
// printf("FILEPUSH: read %d bytes\n", m_buf_end);
}
eDebug("FILEPUSH THREAD STOP");
}
void eFilePushThread::start(int fd_source, int fd_dest)
{
m_fd_source = fd_source;
m_fd_dest = fd_dest;
resume();
}
void eFilePushThread::stop()
{
m_stop = 1;
sendSignal(SIGUSR1);
kill();
}
void eFilePushThread::pause()
{
stop();
}
void eFilePushThread::seek(int whence, off_t where)
{
::lseek(m_fd_source, where, whence);
}
void eFilePushThread::resume()
{
m_stop = 0;
run();
}
void eFilePushThread::flush()
{
m_buf_start = m_buf_end = 0;
}
|