1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#include <lib/base/thread.h>
#include <stdio.h>
#include <unistd.h>
#include <lib/base/eerror.h>
void eThread::thread_completed(void *ptr)
{
eThread *p = (eThread*) ptr;
p->m_alive = 0;
/* recover state in case thread was cancelled before calling hasStarted */
if (!p->m_started)
p->hasStarted();
p->thread_finished();
}
void *eThread::wrapper(void *ptr)
{
eThread *p = (eThread*)ptr;
pthread_cleanup_push(thread_completed, (void*)p);
p->thread();
pthread_exit(0);
pthread_cleanup_pop(1);
return 0;
}
eThread::eThread()
: the_thread(0), m_alive(0)
{
}
int eThread::runAsync(int prio, int policy)
{
eDebug("before: %d", m_state.value());
/* the thread might already run. */
if (sync())
return -1;
eDebug("after: %d", m_state.value());
ASSERT(m_state.value() == 1); /* sync postconditions */
ASSERT(!m_alive);
m_state.down();
ASSERT(m_state.value() == 0);
m_alive = 1;
m_started = 0;
/* start thread. */
pthread_attr_t attr;
pthread_attr_init(&attr);
if (prio || policy)
{
struct sched_param p;
p.__sched_priority=prio;
pthread_attr_setschedpolicy(&attr, policy);
pthread_attr_setschedparam(&attr, &p);
}
if (the_thread) {
eDebug("old thread joined %d", pthread_join(the_thread, 0));
the_thread = 0;
}
if (pthread_create(&the_thread, &attr, wrapper, this))
{
pthread_attr_destroy(&attr);
m_alive = 0;
eDebug("couldn't create new thread");
return -1;
}
pthread_attr_destroy(&attr);
return 0;
}
int eThread::run(int prio, int policy)
{
if (runAsync(prio, policy))
return -1;
sync();
return 0;
}
eThread::~eThread()
{
kill();
}
int eThread::sync(void)
{
int res;
int debug_val_before = m_state.value();
m_state.down(); /* this might block */
res = m_alive;
if (m_state.value() != 0)
eFatal("eThread::sync: m_state.value() == %d - was %d before", m_state.value(), debug_val_before);
ASSERT(m_state.value() == 0);
m_state.up();
return res; /* 0: thread is guaranteed not to run. 1: state unknown. */
}
int eThread::sendSignal(int sig)
{
if (m_alive)
return pthread_kill(the_thread, sig);
else
eDebug("send signal to non running thread");
return -1;
}
void eThread::kill(bool sendcancel)
{
if (!the_thread) /* already joined */
return;
if (sync() && sendcancel)
{
eDebug("send cancel to thread");
pthread_cancel(the_thread);
}
eDebug("thread joined %d", pthread_join(the_thread, 0));
the_thread = 0;
}
void eThread::hasStarted()
{
ASSERT(!m_state.value());
m_started = 1;
m_state.up();
}
|