1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
#include <lib/base/thread.h>
#include <stdio.h>
#include <unistd.h>
#include <lib/base/eerror.h>
void eThread::thread_completed(void *ptr)
{
eThread *p = (eThread*) ptr;
p->m_alive = 0;
/* recover state */
if (!p->m_state.value())
{
p->m_state.up();
assert(p->m_state.value() == 1);
}
}
void *eThread::wrapper(void *ptr)
{
eThread *p = (eThread*)ptr;
pthread_cleanup_push(thread_completed, (void*)p);
p->thread();
pthread_exit(0);
pthread_cleanup_pop(1);
return 0;
}
eThread::eThread()
: the_thread(0), m_alive(0)
{
}
int eThread::runAsync(int prio, int policy)
{
eDebug("before: %d", m_state.value());
/* the thread might already run. */
if (sync())
return -1;
eDebug("after: %d", m_state.value());
assert(m_state.value() == 1); /* sync postconditions */
assert(!m_alive);
m_state.down();
m_alive = 1;
/* start thread. */
pthread_attr_t attr;
pthread_attr_init(&attr);
if (prio || policy)
{
struct sched_param p;
p.__sched_priority=prio;
pthread_attr_setschedpolicy(&attr, policy);
pthread_attr_setschedparam(&attr, &p);
}
if (pthread_create(&the_thread, &attr, wrapper, this))
{
pthread_attr_destroy(&attr);
m_alive = 0;
eDebug("couldn't create new thread");
return -1;
}
pthread_attr_destroy(&attr);
return 0;
}
int eThread::run(int prio, int policy)
{
if (runAsync(prio, policy))
return -1;
sync();
return 0;
}
eThread::~eThread()
{
kill();
}
int eThread::sync(void)
{
int res;
m_state.down(); /* this might block */
res = m_alive;
assert(m_state.value() == 0);
m_state.up();
return res; /* 0: thread is guaranteed not to run. 1: state unknown. */
}
void eThread::sendSignal(int sig)
{
if (m_alive)
pthread_kill(the_thread, sig);
else
eDebug("send signal to non running thread");
}
void eThread::kill(bool sendcancel)
{
if (!the_thread) /* already joined */
return;
if (sync() && sendcancel)
{
eDebug("send cancel to thread");
pthread_cancel(the_thread);
}
eDebug("thread joined %d", pthread_join(the_thread, 0));
the_thread = 0;
}
void eThread::hasStarted()
{
assert(!m_state.value());
m_state.up();
}
|