Commit 84544d52 authored by Philipp Huebner's avatar Philipp Huebner

Merge tag 'upstream/1.0.8'

Upstream version 1.0.8
parents ca1bc8e6 5ea2be39
# Version 1.0.7
# Version 1.0.8
* Add p1_queue
* Only perform destructive operations in p1_file_queue:in/2
* Add garbage collector for file queues
* Add ram_to_file/1 and file_to_ram/1
* Improve exception names
* Implement limited queues
* Add ownership protection
* Add get_limit/1 and set_limit/2
# Version 1.0.7
* Fix coverall invocation (Paweł Chmielowski)
* Fix p1_server timeout handling, R18 compatibility (Alexey Shchepin)
......
......@@ -6,7 +6,7 @@ src:
clean:
rebar clean
test:
rebar skip_deps=true eunit
test: all
rebar -v skip_deps=true eunit
.PHONY: clean src
.PHONY: clean src test
-record(file_q,
{tail = 0 :: non_neg_integer(),
head = 0 :: non_neg_integer(),
limit :: non_neg_integer() | unlimited,
fd :: file:fd(),
path :: binary(),
owner = self() :: pid(),
start = 0 :: non_neg_integer(),
stop = 0 :: non_neg_integer()}).
-define(qlen(Q), element(2, Q)).
This diff is collapsed.
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2017, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 8 Mar 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(p1_queue).
%% API
-export([new/0, new/1, new/2, is_queue/1, len/1, is_empty/1, in/2, out/1,
peek/1, drop/1, from_list/1, from_list/2, from_list/3,
to_list/1, clear/1, foreach/2, foldl/3, dropwhile/2, type/1,
format_error/1, ram_to_file/1, file_to_ram/1, get_limit/1,
set_limit/2]).
-export([start/1, stop/0]).
-type limit() :: non_neg_integer() | unlimited.
-type rqueue() :: {queue:queue(), non_neg_integer(), limit()}.
-type fqueue() :: p1_file_queue:queue().
-type queue() :: rqueue() | fqueue().
-type queue_type() :: ram | file.
-type error_reason() :: p1_file_queue:error_reason().
-export_type([queue/0, queue_type/0, error_reason/0]).
%%%===================================================================
%%% API
%%%===================================================================
-spec start(file:filename()) -> ok | {error, any()}.
start(Dir) ->
application:ensure_all_started(p1_utils),
case p1_file_queue:start(Dir) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
Err -> Err
end.
-spec stop() -> ok | {error, any()}.
stop() ->
p1_file_queue:stop().
-spec new() -> rqueue().
new() ->
new(ram).
-spec new(ram) -> rqueue();
(file) -> fqueue().
new(Type) ->
new(Type, unlimited).
-spec new(ram, limit()) -> rqueue();
(file, limit()) -> fqueue().
new(ram, Limit) ->
{queue:new(), 0, Limit};
new(file, Limit) ->
p1_file_queue:new(Limit).
-spec type(queue()) -> ram | {file, file:filename()}.
type({_, _, _}) ->
ram;
type(Q) ->
{file, p1_file_queue:path(Q)}.
-spec is_queue(any()) -> boolean().
is_queue({Q, Len, _}) when is_integer(Len), Len >= 0 ->
queue:is_queue(Q);
is_queue(Q) ->
p1_file_queue:is_queue(Q).
-spec len(queue()) -> non_neg_integer().
len({_, Len, _}) ->
Len;
len(Q) ->
p1_file_queue:len(Q).
-spec is_empty(queue()) -> boolean().
is_empty({_, Len, _}) ->
Len == 0;
is_empty(Q) ->
p1_file_queue:is_empty(Q).
-spec get_limit(queue()) -> limit().
get_limit({_, _, Limit}) ->
Limit;
get_limit(Q) ->
p1_file_queue:get_limit(Q).
-spec set_limit(rqueue(), limit()) -> rqueue();
(fqueue(), limit()) -> fqueue().
set_limit({Q, Len, _}, Limit) ->
{Q, Len, Limit};
set_limit(Q, Limit) ->
p1_file_queue:set_limit(Q, Limit).
-spec in(term(), rqueue()) -> rqueue();
(term(), fqueue()) -> fqueue().
in(Item, {Q, Len, Limit}) ->
if Len < Limit ->
{queue:in(Item, Q), Len+1, Limit};
true ->
erlang:error(full)
end;
in(Item, Q) ->
p1_file_queue:in(Item, Q).
-spec out(rqueue()) -> {{value, term()}, rqueue()} | {empty, rqueue()};
(fqueue()) -> {{value, term()}, fqueue()} | {empty, fqueue()}.
out({Q, 0, Limit}) ->
{empty, {Q, 0, Limit}};
out({Q, Len, Limit}) ->
{{value, Item}, Q1} = queue:out(Q),
{{value, Item}, {Q1, Len-1, Limit}};
out(Q) ->
p1_file_queue:out(Q).
-spec peek(queue()) -> empty | {value, term()}.
peek({Q, _, _}) ->
queue:peek(Q);
peek(Q) ->
p1_file_queue:peek(Q).
-spec drop(rqueue()) -> rqueue();
(fqueue()) -> fqueue().
drop({Q, Len, Limit}) ->
{queue:drop(Q), Len-1, Limit};
drop(Q) ->
p1_file_queue:drop(Q).
-spec from_list(list()) -> rqueue().
from_list(L) ->
from_list(L, ram, unlimited).
-spec from_list(list(), ram) -> rqueue();
(list(), file) -> fqueue().
from_list(L, Type) ->
from_list(L, Type, unlimited).
-spec from_list(list(), ram, limit()) -> rqueue();
(list(), file, limit()) -> fqueue().
from_list(L, ram, Limit) ->
Len = length(L),
if Len =< Limit ->
{queue:from_list(L), Len, Limit};
true ->
erlang:error(full)
end;
from_list(L, file, Limit) ->
p1_file_queue:from_list(L, Limit).
-spec to_list(queue()) -> list().
to_list({Q, _, _}) ->
queue:to_list(Q);
to_list(Q) ->
p1_file_queue:to_list(Q).
-spec foreach(fun((term()) -> term()), fqueue()) -> ok.
foreach(F, {Q, Len, Limit}) ->
case queue:out(Q) of
{{value, Item}, Q1} ->
F(Item),
foreach(F, {Q1, Len-1, Limit});
{empty, _} ->
ok
end;
foreach(F, Q) ->
p1_file_queue:foreach(F, Q).
-spec foldl(fun((term(), T) -> T), T, queue()) -> T.
foldl(F, Acc, {Q, Len, Limit}) ->
case queue:out(Q) of
{{value, Item}, Q1} ->
Acc1 = F(Item, Acc),
foldl(F, Acc1, {Q1, Len-1, Limit});
{empty, _} ->
Acc
end;
foldl(F, Acc, Q) ->
p1_file_queue:foldl(F, Acc, Q).
-spec dropwhile(fun((term()) -> boolean()), rqueue()) -> rqueue();
(fun((term()) -> boolean()), fqueue()) -> fqueue().
dropwhile(_, {_, 0, _} = Q) ->
Q;
dropwhile(F, {Q, Len, Limit}) ->
{value, Item} = queue:peek(Q),
case F(Item) of
true ->
dropwhile(F, {queue:drop(Q), Len-1, Limit});
_ ->
{Q, Len, Limit}
end;
dropwhile(F, Q) ->
p1_file_queue:dropwhile(F, Q).
-spec clear(rqueue()) -> rqueue();
(fqueue()) -> fqueue().
clear({_, _, Limit}) ->
{queue:new(), 0, Limit};
clear(Q) ->
p1_file_queue:clear(Q).
-spec ram_to_file(queue()) -> fqueue().
ram_to_file({_, _, Limit} = Q) ->
foldl(fun p1_file_queue:in/2, new(file, Limit), Q);
ram_to_file(Q) ->
Q.
-spec file_to_ram(queue()) -> rqueue().
file_to_ram({_, _, _} = Q) ->
Q;
file_to_ram(Q) ->
Limit = p1_file_queue:get_limit(Q),
p1_file_queue:foldl(fun in/2, new(ram, Limit), Q).
-spec format_error(error_reason()) -> string().
format_error(Reason) ->
p1_file_queue:format_error(Reason).
%%%===================================================================
%%% Internal functions
%%%===================================================================
{application, p1_utils,
[
{description, "Erlang utility modules from ProcessOne"},
{vsn, "1.0.7"},
{vsn, "1.0.8"},
{modules, []},
{registered, []},
{applications, [
......@@ -9,6 +9,7 @@
stdlib
]},
{env, []},
{mod, {p1_utils, []}},
%% hex.pm packaging:
{licenses, ["Apache 2.0"]},
......
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2017, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 8 Mar 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(p1_utils).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
-export([start/0, stop/0]).
%%%===================================================================
%%% Application callbacks
%%%===================================================================
start(_StartType, _StartArgs) ->
case p1_utils_sup:start_link() of
{ok, Pid} ->
{ok, Pid};
Error ->
Error
end.
stop(_State) ->
ok.
%%%===================================================================
%%% API
%%%===================================================================
start() ->
application:start(p1_utils).
stop() ->
application:stop(p1_utils).
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2017, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 8 Mar 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(p1_utils_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
%%%===================================================================
%%% API functions
%%%===================================================================
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
init([]) ->
{ok, {{one_for_one, 10, 1}, []}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment