Skip to content

Commit 77b7ffb

Browse files
committed
move some useful code to universal_fiber.hpp
1 parent c25349f commit 77b7ffb

File tree

3 files changed

+85
-151
lines changed

3 files changed

+85
-151
lines changed

example/unbufcpy/unbufcp4/unbufcp4.cpp

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -88,81 +88,6 @@ struct IOBuffer
8888
}
8989
};
9090

91-
template<typename T>
92-
class FiberChannel
93-
{
94-
void wake_up_one_pusher()
95-
{
96-
if (m_push_awaiting.empty())
97-
return;
98-
auto top_waiter = m_push_awaiting.front();
99-
// wake up .
100-
m_push_awaiting.pop_front();
101-
PostQueuedCompletionStatus(m_iocp, 0,
102-
(ULONG_PTR)(iocp::overlapped_proc_func) & process_stack_full_overlapped_event
103-
, top_waiter);
104-
}
105-
void wake_up_one_poper()
106-
{
107-
if (m_pop_awaiting.empty())
108-
return;
109-
auto top_waiter = m_pop_awaiting.front();
110-
// wake up .
111-
m_pop_awaiting.pop_front();
112-
PostQueuedCompletionStatus(m_iocp, 0,
113-
(ULONG_PTR)(iocp::overlapped_proc_func)&process_stack_full_overlapped_event
114-
, top_waiter);
115-
}
116-
117-
public:
118-
T pop()
119-
{
120-
if (m_queue.empty())
121-
{
122-
FiberOVERLAPPED ov;
123-
// yield
124-
m_pop_awaiting.push_back(&ov);
125-
get_overlapped_result(ov);
126-
}
127-
T r = m_queue.front();
128-
m_queue.pop_front();
129-
130-
if (m_queue.size() < m_max_pending)
131-
{
132-
wake_up_one_pusher();
133-
}
134-
135-
return r;
136-
}
137-
138-
void push(T t)
139-
{
140-
m_queue.push_back(t);
141-
wake_up_one_poper();
142-
if (m_queue.size() > m_max_pending)
143-
{
144-
// sleep until wakeup.
145-
FiberOVERLAPPED ov;
146-
// yield
147-
m_push_awaiting.push_back(&ov);
148-
get_overlapped_result(ov);
149-
}
150-
}
151-
152-
FiberChannel(HANDLE iocp, long max_pending)
153-
: m_max_pending(max_pending)
154-
, m_iocp(iocp)
155-
{
156-
}
157-
158-
long m_max_pending = 1;
159-
std::deque<T> m_queue;
160-
161-
HANDLE m_iocp;
162-
std::deque<FiberOVERLAPPED*> m_pop_awaiting;
163-
std::deque<FiberOVERLAPPED*> m_push_awaiting;
164-
};
165-
16691
static void write_loop(FiberChannel<WSABUF>& channel, FiberChannel<uint64_t>& channel2, HANDLE destFile)
16792
{
16893
DWORD written;

example/unbufcpy/unbufcp5/unbufcp5.cpp

Lines changed: 1 addition & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*++
1+
/*++
22
THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF
33
ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED
44
TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
@@ -100,81 +100,6 @@ struct IOBuffer
100100
}
101101
};
102102

103-
template<typename T>
104-
class FiberChannel
105-
{
106-
void wake_up_one_pusher()
107-
{
108-
if (m_push_awaiting.empty())
109-
return;
110-
auto top_waiter = m_push_awaiting.front();
111-
// wake up .
112-
m_push_awaiting.pop_front();
113-
PostQueuedCompletionStatus(m_iocp, 0,
114-
(ULONG_PTR)(iocp::overlapped_proc_func) & process_stack_full_overlapped_event
115-
, top_waiter);
116-
}
117-
void wake_up_one_poper()
118-
{
119-
if (m_pop_awaiting.empty())
120-
return;
121-
auto top_waiter = m_pop_awaiting.front();
122-
// wake up .
123-
m_pop_awaiting.pop_front();
124-
PostQueuedCompletionStatus(m_iocp, 0,
125-
(ULONG_PTR)(iocp::overlapped_proc_func)&process_stack_full_overlapped_event
126-
, top_waiter);
127-
}
128-
129-
public:
130-
T pop()
131-
{
132-
if (m_queue.empty())
133-
{
134-
FiberOVERLAPPED ov;
135-
// yield
136-
m_pop_awaiting.push_back(&ov);
137-
get_overlapped_result(ov);
138-
}
139-
T r = m_queue.front();
140-
m_queue.pop_front();
141-
142-
if (m_queue.size() < m_max_pending)
143-
{
144-
wake_up_one_pusher();
145-
}
146-
147-
return r;
148-
}
149-
150-
void push(T t)
151-
{
152-
m_queue.push_back(t);
153-
wake_up_one_poper();
154-
if (m_queue.size() > m_max_pending)
155-
{
156-
// sleep until wakeup.
157-
FiberOVERLAPPED ov;
158-
// yield
159-
m_push_awaiting.push_back(&ov);
160-
get_overlapped_result(ov);
161-
}
162-
}
163-
164-
FiberChannel(HANDLE iocp, long max_pending)
165-
: m_max_pending(max_pending)
166-
, m_iocp(iocp)
167-
{
168-
}
169-
170-
long m_max_pending = 1;
171-
std::deque<T> m_queue;
172-
173-
HANDLE m_iocp;
174-
std::deque<FiberOVERLAPPED*> m_pop_awaiting;
175-
std::deque<FiberOVERLAPPED*> m_push_awaiting;
176-
};
177-
178103
struct FileCopyer
179104
{
180105
uint64_t cur_pos = 0;

uasync/include/universal_fiber.hpp

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ using iocp::init_winsock_api_pointer;
1919
#include <cstddef>
2020
#include <thread>
2121
#include <utility>
22+
#include <deque>
2223

2324
#if defined (USE_BOOST_CONTEXT)
2425
#define USE_FCONTEXT
@@ -702,4 +703,87 @@ inline void create_detached_coroutine(void (Class::*mem_func_ptr)(Args...), Clas
702703
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
703704

704705

706+
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
707+
// some useful utilities
708+
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
709+
710+
// FiberChannel is a message channel for use with fibers.
711+
template<typename T>
712+
class FiberChannel
713+
{
714+
void wake_up_one_pusher()
715+
{
716+
if (m_push_awaiting.empty())
717+
return;
718+
auto top_waiter = m_push_awaiting.front();
719+
// wake up .
720+
m_push_awaiting.pop_front();
721+
PostQueuedCompletionStatus(m_iocp, 0,
722+
(ULONG_PTR)(iocp::overlapped_proc_func)&process_stack_full_overlapped_event
723+
, top_waiter);
724+
}
725+
void wake_up_one_poper()
726+
{
727+
if (m_pop_awaiting.empty())
728+
return;
729+
auto top_waiter = m_pop_awaiting.front();
730+
// wake up .
731+
m_pop_awaiting.pop_front();
732+
PostQueuedCompletionStatus(m_iocp, 0,
733+
(ULONG_PTR)(iocp::overlapped_proc_func)&process_stack_full_overlapped_event
734+
, top_waiter);
735+
}
736+
737+
public:
738+
FiberChannel(const FiberChannel&) = delete;
739+
FiberChannel(FiberChannel&&) = delete;
740+
741+
FiberChannel(HANDLE iocp, long max_pending)
742+
: m_max_pending(max_pending)
743+
, m_iocp(iocp)
744+
{
745+
}
746+
747+
T pop()
748+
{
749+
if (m_queue.empty())
750+
{
751+
FiberOVERLAPPED ov;
752+
// yield
753+
m_pop_awaiting.push_back(&ov);
754+
get_overlapped_result(ov);
755+
}
756+
T r = m_queue.front();
757+
m_queue.pop_front();
758+
759+
if (m_queue.size() < m_max_pending)
760+
{
761+
wake_up_one_pusher();
762+
}
763+
764+
return r;
765+
}
766+
767+
void push(T t)
768+
{
769+
if (m_queue.size() > m_max_pending)
770+
{
771+
// sleep until wakeup.
772+
FiberOVERLAPPED ov;
773+
// yield
774+
m_push_awaiting.push_back(&ov);
775+
get_overlapped_result(ov);
776+
}
777+
m_queue.push_back(t);
778+
wake_up_one_poper();
779+
}
780+
781+
long m_max_pending = 1;
782+
std::deque<T> m_queue;
783+
784+
HANDLE m_iocp;
785+
std::deque<FiberOVERLAPPED*> m_pop_awaiting;
786+
std::deque<FiberOVERLAPPED*> m_push_awaiting;
787+
};
788+
705789
#endif // ___UNIVERSAL_FIBER__H___

0 commit comments

Comments
 (0)