api.pl 3.17 KB
Newer Older
priyank's avatar
priyank committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
use strict;
use warnings;
use Data::Dumper;
use Graph::Directed;
use JSON;
use List::Util qw(reduce);
use Mojolicious::Lite;
use Mojo::Redis2;
use lib qw(lib API/lib);
use ILMT::HIN::PAN::DefaultFeatures;

my $modulename = "ilmt.hin.pan.defaultfeatures";
my %database = ();
helper redis => sub {
    state $r = Mojo::Redis2->new(url => "redis://redis:6379");
};

sub process {
    my $hash = $_[0];
    my %newhash;
    if (keys %{$hash} == 1) {
        %newhash = (data => (%{$hash})[1]);
    } else {
        @newhash{ map { s/_[^_]*$//r } keys %{$hash} } = values %{$hash};
    }
    return ILMT::HIN::PAN::DefaultFeatures::process(%newhash);
}

sub genError {
    my $c = shift;
    my $error = shift;
    $c->render(json => to_json({Error => $error}), status => 400);
}

sub genDAGGraph {
    my %edges = %{$_[0]};
    my $g = Graph::Directed->new();
    foreach my $from (keys %edges) {
        foreach my $to (@{$edges{$from}}) {
            $g->add_edge($from, $to);
        }
    }
    return $g;
}

post '/pipeline' => sub {
    my $c = shift;
    my $ilmt_json = decode_json($c->req->body);
    my $ilmt_modid = $ilmt_json->{modid} || genError($c, "No ModuleID Specified!") && return;
    my $ilmt_jobid = $ilmt_json->{jobid} || genError($c, "No JobID Specified!") && return;
    my $ilmt_data = $ilmt_json->{data} || genError($c, "No Data Specified!") && return;
    my $ilmt_dag = genDAGGraph($ilmt_json->{edges});
    genError($c, "Edges not specified!") && return if (!$ilmt_dag);
    my $ilmt_module = $modulename . '_' . $ilmt_modid;
    my @ilmt_inputs = map {@$_[0]} $ilmt_dag->edges_to($ilmt_module);
    if (!$database{$ilmt_jobid}) {
	$database{$ilmt_jobid} = {};
	$database{"data_$ilmt_jobid"} = {};
    }
    foreach (@ilmt_inputs) {
        my $input_module = $_ =~ s/_[^_]*$//r;
        $database{$ilmt_jobid}{$input_module} = $ilmt_data->{$_} if $ilmt_data->{$_};
    }
    %{$database{"data_$ilmt_jobid"}} = (%{$database{"data_$ilmt_jobid"}}, %{$ilmt_data});
    if (@ilmt_inputs == keys %{$database{$ilmt_jobid}}) {
        $c->render(json => "{Response: 'Processing...'}", status => 202);
        my $ilmt_output = process($database{$ilmt_jobid});
        $ilmt_data->{$ilmt_module} = $ilmt_output;
	%{$ilmt_data} = (%{$ilmt_data}, %{$database{"data_$ilmt_jobid"}});
        my @ilmt_next = map {@$_[1]} $ilmt_dag->edges_from($ilmt_module);
        if (@ilmt_next) {
            foreach (@ilmt_next) {
                my @module_info = split(/_([^_]+)$/, $_);
                my $next_module = $module_info[0];
                $ilmt_json->{modid} = $module_info[1];
                $c->ua->post("http://$next_module/pipeline" => json
                    => from_json(encode_json($ilmt_json), {utf8 => 1}) => sub {
                        my ($ua, $tx) = @_;
                        my $msg = $tx->error ? $tx->error->{message} : $tx->res->body;
                        $c->app->log->debug("[$ilmt_jobid]: $msg\n");
                    });
            }
        } else {
            $c->redis->publish($ilmt_jobid => encode_json($ilmt_json));
        }
        delete $database{$ilmt_jobid};
    } else {
        $c->render(json => "{Response: 'Waiting for more inputs...'}", status => 202);
    }
};

app->start;