-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpy_worker.erl
More file actions
352 lines (319 loc) · 13.5 KB
/
py_worker.erl
File metadata and controls
352 lines (319 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
%% Copyright 2026 Benoit Chesneau
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%% @doc Python worker process.
%%%
%%% Each worker maintains its own Python execution context. Workers
%%% receive requests from the pool and execute Python code, sending
%%% results back to callers.
%%%
%%% The NIF functions use ERL_NIF_DIRTY_JOB_IO_BOUND to run on dirty
%%% I/O schedulers, so the worker process itself runs on a normal scheduler.
%%%
%%% @private
-module(py_worker).
-export([
start_link/0,
init/1
]).
%% Timeout for checking shutdown (ms)
-define(RECV_TIMEOUT, 1000).
%%% ============================================================================
%%% API
%%% ============================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
Pid = spawn_link(?MODULE, init, [self()]),
receive
{Pid, ready} -> {ok, Pid};
{Pid, {error, Reason}} -> {error, Reason}
after 10000 ->
exit(Pid, kill),
{error, timeout}
end.
%%% ============================================================================
%%% Worker Process
%%% ============================================================================
init(Parent) ->
%% Create worker context
case py_nif:worker_new() of
{ok, WorkerRef} ->
%% Spawn a separate callback handler process
CallbackHandler = spawn_link(fun() -> callback_handler_loop() end),
%% Set up callback handler with the separate process
case py_nif:set_callback_handler(WorkerRef, CallbackHandler) of
{ok, CallbackFd} ->
CallbackHandler ! {set_fd, CallbackFd},
Parent ! {self(), ready},
loop(WorkerRef, Parent, CallbackFd);
{error, Reason} ->
exit(CallbackHandler, kill),
Parent ! {self(), {error, Reason}}
end;
{error, Reason} ->
Parent ! {self(), {error, Reason}}
end.
%% Separate process that handles callbacks from Python
callback_handler_loop() ->
receive
{set_fd, CallbackFd} ->
callback_handler_loop(CallbackFd)
end.
callback_handler_loop(CallbackFd) ->
receive
{erlang_callback, _CallbackId, FuncName, Args} ->
handle_callback(CallbackFd, FuncName, Args),
callback_handler_loop(CallbackFd);
shutdown ->
ok;
_Other ->
callback_handler_loop(CallbackFd)
end.
loop(WorkerRef, _Parent, _CallbackFd) ->
receive
{py_request, Request} ->
handle_request(WorkerRef, Request),
loop(WorkerRef, _Parent, _CallbackFd);
shutdown ->
py_nif:worker_destroy(WorkerRef),
ok;
_Other ->
loop(WorkerRef, _Parent, _CallbackFd)
end.
%%% ============================================================================
%%% Request Handling
%%% ============================================================================
%% Call with timeout
handle_request(WorkerRef, {call, Ref, Caller, Module, Func, Args, Kwargs, TimeoutMs}) ->
ModuleBin = to_binary(Module),
FuncBin = to_binary(Func),
Result = py_nif:worker_call(WorkerRef, ModuleBin, FuncBin, Args, Kwargs, TimeoutMs),
handle_call_result(Result, Ref, Caller);
%% Call without timeout (backward compatible)
handle_request(WorkerRef, {call, Ref, Caller, Module, Func, Args, Kwargs}) ->
ModuleBin = to_binary(Module),
FuncBin = to_binary(Func),
Result = py_nif:worker_call(WorkerRef, ModuleBin, FuncBin, Args, Kwargs),
handle_call_result(Result, Ref, Caller);
%% Eval with timeout
handle_request(WorkerRef, {eval, Ref, Caller, Code, Locals, TimeoutMs}) ->
CodeBin = to_binary(Code),
Result = py_nif:worker_eval(WorkerRef, CodeBin, Locals, TimeoutMs),
handle_call_result(Result, Ref, Caller);
%% Eval without timeout (backward compatible)
handle_request(WorkerRef, {eval, Ref, Caller, Code, Locals}) ->
CodeBin = to_binary(Code),
Result = py_nif:worker_eval(WorkerRef, CodeBin, Locals),
handle_call_result(Result, Ref, Caller);
handle_request(WorkerRef, {exec, Ref, Caller, Code}) ->
CodeBin = to_binary(Code),
Result = py_nif:worker_exec(WorkerRef, CodeBin),
send_response(Caller, Ref, Result);
handle_request(WorkerRef, {stream, Ref, Caller, Module, Func, Args, Kwargs}) ->
ModuleBin = to_binary(Module),
FuncBin = to_binary(Func),
%% For streaming, we call a special function that yields chunks
case py_nif:worker_call(WorkerRef, ModuleBin, FuncBin, Args, Kwargs) of
{ok, {generator, GenRef}} ->
stream_chunks(WorkerRef, GenRef, Ref, Caller);
{ok, Value} ->
%% Not a generator, send as single chunk
Caller ! {py_chunk, Ref, Value},
Caller ! {py_end, Ref};
{error, _} = Error ->
Caller ! {py_error, Ref, Error}
end;
handle_request(WorkerRef, {stream_eval, Ref, Caller, Code, Locals}) ->
%% Evaluate expression and stream if result is a generator
CodeBin = to_binary(Code),
case py_nif:worker_eval(WorkerRef, CodeBin, Locals) of
{ok, {generator, GenRef}} ->
stream_chunks(WorkerRef, GenRef, Ref, Caller);
{ok, Value} ->
%% Not a generator, send as single value
Caller ! {py_chunk, Ref, Value},
Caller ! {py_end, Ref};
{error, _} = Error ->
Caller ! {py_error, Ref, Error}
end.
stream_chunks(WorkerRef, GenRef, Ref, Caller) ->
case py_nif:worker_next(WorkerRef, GenRef) of
{ok, {generator, NestedGen}} ->
%% Nested generator - stream it inline
stream_chunks(WorkerRef, NestedGen, Ref, Caller);
{ok, Chunk} ->
Caller ! {py_chunk, Ref, Chunk},
stream_chunks(WorkerRef, GenRef, Ref, Caller);
{error, stop_iteration} ->
Caller ! {py_end, Ref};
{error, Error} ->
Caller ! {py_error, Ref, Error}
end.
%%% ============================================================================
%%% Suspended Callback Handling
%%%
%%% When Python code calls erlang.call(), it may return a suspension marker
%%% instead of blocking. This allows the dirty scheduler to be freed while
%%% the Erlang callback is executed.
%%% ============================================================================
%% Handle the result of a worker_call - either normal result or suspended callback
handle_call_result({suspended, _CallbackId, StateRef, {FuncName, CallArgs}}, Ref, Caller) ->
%% Python code called erlang.call() - spawn a process to handle the callback.
%% This prevents deadlock when the callback itself calls py:eval, which would
%% otherwise block this worker while waiting for another worker (or even this
%% same worker via round-robin).
spawn_link(fun() ->
handle_suspended_callback(StateRef, FuncName, CallArgs, Ref, Caller)
end),
ok; %% Don't block the worker - it can process other requests
handle_call_result(Result, Ref, Caller) ->
%% Normal result - send directly to caller
send_response(Caller, Ref, Result).
%% Execute a suspended callback and resume Python execution.
%% This runs in a separate process to avoid blocking the worker.
handle_suspended_callback(StateRef, FuncName, CallArgs, Ref, Caller) ->
%% Convert Args from tuple/list to list
ArgsList = case CallArgs of
T when is_tuple(T) -> tuple_to_list(T);
L when is_list(L) -> L;
_ -> [CallArgs]
end,
%% Execute the registered Erlang function
%% This can call py:eval, py:call, etc. without deadlocking
CallbackResult = py_callback:execute(FuncName, ArgsList),
%% Encode result as binary (status byte + python repr)
ResultBinary = case CallbackResult of
{ok, Value} ->
ValueRepr = term_to_python_repr(Value),
<<0, ValueRepr/binary>>;
{error, {not_found, Name}} ->
ErrMsg = iolist_to_binary(io_lib:format("Function '~s' not registered", [Name])),
<<1, ErrMsg/binary>>;
{error, {Class, Reason, _Stack}} ->
ErrMsg = iolist_to_binary(io_lib:format("~p: ~p", [Class, Reason])),
<<1, ErrMsg/binary>>
end,
%% Resume Python execution with the callback result
%% The NIF parses the result using Python's ast.literal_eval and returns
%% {ok, ParsedTerm} or {error, Reason}
FinalResult = py_nif:resume_callback(StateRef, ResultBinary),
%% Handle the final result (could be another suspension for nested callbacks)
%% Note: for nested callbacks, this will spawn another process recursively
forward_final_result(FinalResult, Ref, Caller).
%% Forward the final result to the caller, handling nested suspensions
forward_final_result({suspended, _CallbackId, StateRef, {FuncName, CallArgs}}, Ref, Caller) ->
%% Another suspension - handle it recursively
handle_suspended_callback(StateRef, FuncName, CallArgs, Ref, Caller);
forward_final_result(Result, Ref, Caller) ->
%% Final result - send to the original caller
send_response(Caller, Ref, Result).
%%% ============================================================================
%%% Callback Handling
%%% ============================================================================
handle_callback(CallbackFd, FuncName, Args) ->
%% Convert Args from tuple to list if needed
ArgsList = case Args of
T when is_tuple(T) -> tuple_to_list(T);
L when is_list(L) -> L;
_ -> [Args]
end,
%% Execute the registered function
case py_callback:execute(FuncName, ArgsList) of
{ok, Result} ->
%% Encode result as Python-parseable string
%% Format: status_byte (0=ok) + python_repr
ResultStr = term_to_python_repr(Result),
Response = <<0, ResultStr/binary>>,
py_nif:send_callback_response(CallbackFd, Response);
{error, {not_found, Name}} ->
ErrMsg = iolist_to_binary(io_lib:format("Function '~s' not registered", [Name])),
Response = <<1, ErrMsg/binary>>,
py_nif:send_callback_response(CallbackFd, Response);
{error, {Class, Reason, _Stack}} ->
ErrMsg = iolist_to_binary(io_lib:format("~p: ~p", [Class, Reason])),
Response = <<1, ErrMsg/binary>>,
py_nif:send_callback_response(CallbackFd, Response)
end.
%% Convert Erlang term to Python-parseable string representation
term_to_python_repr(Term) when is_integer(Term) ->
integer_to_binary(Term);
term_to_python_repr(Term) when is_float(Term) ->
float_to_binary(Term, [{decimals, 17}, compact]);
term_to_python_repr(true) ->
<<"True">>;
term_to_python_repr(false) ->
<<"False">>;
term_to_python_repr(none) ->
<<"None">>;
term_to_python_repr(nil) ->
<<"None">>;
term_to_python_repr(undefined) ->
<<"None">>;
term_to_python_repr(Term) when is_atom(Term) ->
%% Convert atom to Python string
AtomStr = atom_to_binary(Term, utf8),
<<"\"", AtomStr/binary, "\"">>;
term_to_python_repr(Term) when is_binary(Term) ->
%% Escape binary as Python string
Escaped = escape_string(Term),
<<"\"", Escaped/binary, "\"">>;
term_to_python_repr(Term) when is_list(Term) ->
%% Check if it's a string (list of integers)
case io_lib:printable_list(Term) of
true ->
Bin = list_to_binary(Term),
Escaped = escape_string(Bin),
<<"\"", Escaped/binary, "\"">>;
false ->
Items = [term_to_python_repr(E) || E <- Term],
Joined = join_binaries(Items, <<", ">>),
<<"[", Joined/binary, "]">>
end;
term_to_python_repr(Term) when is_tuple(Term) ->
Items = [term_to_python_repr(E) || E <- tuple_to_list(Term)],
Joined = join_binaries(Items, <<", ">>),
case length(Items) of
1 -> <<"(", Joined/binary, ",)">>;
_ -> <<"(", Joined/binary, ")">>
end;
term_to_python_repr(Term) when is_map(Term) ->
Items = maps:fold(fun(K, V, Acc) ->
KeyRepr = term_to_python_repr(K),
ValRepr = term_to_python_repr(V),
[<<KeyRepr/binary, ": ", ValRepr/binary>> | Acc]
end, [], Term),
Joined = join_binaries(Items, <<", ">>),
<<"{", Joined/binary, "}">>;
term_to_python_repr(_Term) ->
%% Fallback - return None for unsupported types
<<"None">>.
escape_string(Bin) ->
%% Escape special characters for Python string
binary:replace(
binary:replace(
binary:replace(
binary:replace(Bin, <<"\\">>, <<"\\\\">>, [global]),
<<"\"">>, <<"\\\"">>, [global]),
<<"\n">>, <<"\\n">>, [global]),
<<"\r">>, <<"\\r">>, [global]).
join_binaries([], _Sep) -> <<>>;
join_binaries([H], _Sep) -> H;
join_binaries([H|T], Sep) ->
lists:foldl(fun(E, Acc) -> <<Acc/binary, Sep/binary, E/binary>> end, H, T).
%%% ============================================================================
%%% Internal Functions
%%% ============================================================================
send_response(Caller, Ref, Result) ->
py_util:send_response(Caller, Ref, Result).
to_binary(Term) ->
py_util:to_binary(Term).