Changeset ccf1d99
- Timestamp:
- Feb 2, 2023, 11:08:48 AM (22 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 2d028039, 98a2b1dc
- Parents:
- a64137f
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
ra64137f rccf1d99 24 24 // Define if executor is created in a separate cluster 25 25 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 26 27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp 28 #define __ALLOC 0 26 29 27 30 // forward decls … … 58 61 struct copy_queue { 59 62 dlist( request ) list; 63 #if ! __ALLOC 60 64 request * buffer; 61 65 size_t count, buffer_size, index; 66 #endif 62 67 }; 63 68 static inline void ?{}( copy_queue & this ) {} 64 69 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 65 70 list{}; 71 #if ! __ALLOC 66 72 buffer_size = buf_size; 67 73 buffer = aalloc( buffer_size ); 68 74 count = 0; 69 75 index = 0; 70 } 71 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 76 #endif 77 } 78 static inline void ^?{}( copy_queue & this ) with(this) { 79 #if ! __ALLOC 80 adelete(buffer); 81 #endif 82 } 72 83 73 84 static inline void insert( copy_queue & this, request & elem ) with(this) { 85 #if ! __ALLOC 74 86 if ( count < buffer_size ) { // fast path ( no alloc ) 75 87 buffer[count]{ elem }; … … 80 92 (*new_elem){ elem }; 81 93 insert_last( list, *new_elem ); 94 #else 95 insert_last( list, elem ); 96 #endif 82 97 } 83 98 … … 86 101 // should_delete is an output param 87 102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 103 #if ! __ALLOC 88 104 if ( count > 0 ) { 89 105 count--; … … 93 109 return buffer[old_idx]; 94 110 } 111 #endif 95 112 should_delete = true; 96 113 return try_pop_front( list ); 97 114 } 98 115 99 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; } 116 static inline bool isEmpty( copy_queue & this ) with(this) { 117 #if ! __ALLOC 118 return count == 0 && list`isEmpty; 119 #else 120 return list`isEmpty; 121 #endif 122 } 100 123 101 124 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) 102 125 struct work_queue { 103 futex_mutex mutex_lock; 126 __spinlock_t mutex_lock; 127 copy_queue owned_queue; 104 128 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling 129 105 130 }; // work_queue 106 131 static inline void ?{}( work_queue & this ) with(this) { 107 c_queue = alloc(); 108 (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor 109 } 110 static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 132 // c_queue = alloc(); 133 // (*c_queue){ __buffer_size }; 134 owned_queue{ __buffer_size }; 135 c_queue = &owned_queue; 136 } 137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 111 138 112 139 static inline void insert( work_queue & this, request & elem ) with(this) { 113 lock( mutex_lock );140 lock( mutex_lock __cfaabi_dbg_ctx2 ); 114 141 insert( *c_queue, elem ); 115 142 unlock( mutex_lock ); … … 117 144 118 145 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 119 lock( mutex_lock );146 lock( mutex_lock __cfaabi_dbg_ctx2 ); 120 147 // swap copy queue ptrs 121 148 copy_queue * temp = *transfer_to; … … 126 153 127 154 thread worker { 155 copy_queue owned_queue; 128 156 work_queue * request_queues; 129 157 copy_queue * current_queue; … … 135 163 ((thread &)this){ clu }; 136 164 this.request_queues = request_queues; 137 this.current_queue = alloc(); 138 (*this.current_queue){ __buffer_size }; 165 // this.current_queue = alloc(); 166 // (*this.current_queue){ __buffer_size }; 167 this.owned_queue{ __buffer_size }; 168 this.current_queue = &this.owned_queue; 139 169 this.start = start; 140 170 this.range = range; 141 171 } 142 static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 143 173 144 174 struct executor { -
src/Concurrency/Actors.cpp
ra64137f rccf1d99 180 180 }; 181 181 182 #define __ALLOC 0 182 #define __ALLOC 0 // C_TODO: complete swap to no-alloc version 183 183 184 184 struct GenReceiveDecls : public ast::WithDeclsToAdd<> { … … 225 225 return receiver; 226 226 } 227 */ 227 */ // C_TODO: update this with new no alloc version 228 228 CompoundStmt * sendBody = new CompoundStmt( decl->location ); 229 229 -
tests/concurrent/actors/executor.cfa
ra64137f rccf1d99 86 86 87 87 88 executor e{ Processors, Processors, Processors == 1 ? 1 : Processors * 512, true , BufSize};88 executor e{ Processors, Processors, Processors == 1 ? 1 : Processors * 512, true }; 89 89 90 90 printf("starting\n");
Note: See TracChangeset
for help on using the changeset viewer.