| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289 |
- package main
- import (
- "github.com/shuffle/shuffle-shared"
- singul "github.com/shuffle/singul/pkg"
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net"
- "net/http"
- "net/http/pprof"
- "net/url"
- "os"
- "strconv"
- "strings"
- "time"
- "github.com/docker/docker/api/types"
- "github.com/docker/docker/api/types/container"
- "github.com/docker/docker/api/types/filters"
- dockerimage "github.com/docker/docker/api/types/image"
- "github.com/docker/docker/api/types/mount"
- "github.com/docker/docker/api/types/network"
- dockerclient "github.com/docker/docker/client"
- // This is for automatic removal of certain code :)
- /*** STARTREMOVE ***/
- "math/rand"
- "github.com/docker/docker/api/types/swarm"
- uuid "github.com/satori/go.uuid"
- /*** ENDREMOVE ***/
- "github.com/gorilla/mux"
- //k8s deps
- appsv1 "k8s.io/api/apps/v1"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/client-go/kubernetes"
- )
- // This is getting out of hand :)
- var timezone = os.Getenv("TZ")
- var baseUrl = os.Getenv("BASE_URL")
- var appCallbackUrl = os.Getenv("BASE_URL")
- var isKubernetes = os.Getenv("IS_KUBERNETES")
- var environment = os.Getenv("ENVIRONMENT_NAME")
- var logsDisabled = os.Getenv("SHUFFLE_LOGS_DISABLED")
- var cleanupEnv = strings.ToLower(os.Getenv("CLEANUP"))
- var swarmNetworkName = os.Getenv("SHUFFLE_SWARM_NETWORK_NAME")
- var dockerApiVersion = strings.ToLower(os.Getenv("DOCKER_API_VERSION"))
- var shutdownDisabled = strings.ToLower(os.Getenv("SHUFFLE_WORKER_SHUTDOWN_DISABLED"))
- // Kubernetes settings
- var appServiceAccountName = os.Getenv("SHUFFLE_APP_SERVICE_ACCOUNT_NAME")
- var appPodSecurityContext = os.Getenv("SHUFFLE_APP_POD_SECURITY_CONTEXT")
- var appContainerSecurityContext = os.Getenv("SHUFFLE_APP_CONTAINER_SECURITY_CONTEXT")
- var kubernetesNamespace = os.Getenv("KUBERNETES_NAMESPACE")
- var executionCount int64
- var baseimagename = os.Getenv("SHUFFLE_BASE_IMAGE_NAME")
- // var baseimagename = "registry.hub.docker.com/frikky/shuffle"
- var registryName = "registry.hub.docker.com"
- var sleepTime = 2
- var topClient *http.Client
- var data string
- var requestsSent = 0
- var appsInitialized = false
- var hostname string
- var maxReplicas = uint64(12)
- var debug bool
- /*
- var environments []string
- var parents map[string][]string
- var children map[string][]string
- var visited []string
- var executed []string
- var nextActions []string
- var extra int
- var startAction string
- */
- //var results []shuffle.ActionResult
- //var allLogs map[string]string
- //var containerIds []string
- var downloadedImages []string
- type ImageDownloadBody struct {
- Image string `json:"image"`
- }
- type ImageRequest struct {
- Image string `json:"image"`
- }
- var finishedExecutions []string
- var imagesDistributed []string
- var imagedownloadTimeout = time.Second * 300
- var window = shuffle.NewTimeWindow(10 * time.Second)
- // Images to be autodeployed in the latest version of Shuffle.
- var autoDeploy = map[string]string{
- "http:1.4.0": "frikky/shuffle:http_1.4.0",
- "shuffle-tools:1.2.0": "frikky/shuffle:shuffle-tools_1.2.0",
- "shuffle-subflow:1.1.0": "frikky/shuffle:shuffle-subflow_1.1.0",
- "shuffle-ai:1.1.0": "frikky/shuffle:shuffle-ai_1.1.0",
- // "shuffle-tools-fork:1.0.0": "frikky/shuffle:shuffle-tools-fork_1.0.0",
- }
- // New Worker mappings
- // visited, appendActions, nextActions, notFound, queueNodes, toRemove, executed, env
- var portMappings map[string]int
- var baseport = 33333
- type UserInputSubflow struct {
- Argument string `json:"execution_argument"`
- ContinueUrl string `json:"continue_url"`
- CancelUrl string `json:"cancel_url"`
- }
- // Not using shuffle.SetWorkflowExecution as we only want to use cache in reality
- func setWorkflowExecution(ctx context.Context, workflowExecution shuffle.WorkflowExecution, dbSave bool) error {
- if len(workflowExecution.ExecutionId) == 0 {
- log.Printf("[DEBUG] Workflowexecution executionId can't be empty.")
- return errors.New("ExecutionId can't be empty.")
- }
- //log.Printf("[DEBUG][%s] Setting with %d results (pre)", workflowExecution.ExecutionId, len(workflowExecution.Results))
- workflowExecution, _ = shuffle.Fixexecution(ctx, workflowExecution)
- cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
- execData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling execution during set: %s", err)
- return err
- }
- err = shuffle.SetCache(ctx, cacheKey, execData, 30)
- if err != nil {
- log.Printf("[ERROR][%s] Failed adding to cache during setexecution", workflowExecution.ExecutionId)
- return err
- }
- handleExecutionResult(workflowExecution)
- validated := shuffle.ValidateFinished(ctx, -1, workflowExecution)
- if validated {
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling shutdowndata during set: %s", err)
- }
- log.Printf("[DEBUG][%s] Sending result (set). Status: %s, Actions: %d, Results: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), len(workflowExecution.Results))
- sendResult(workflowExecution, shutdownData)
- return nil
- }
- // FIXME: Should this shutdown OR send the result?
- // The worker may not be running the backend hmm
- if dbSave {
- if workflowExecution.ExecutionSource == "default" {
- log.Printf("[DEBUG][%s] Shutting down (25)", workflowExecution.ExecutionId)
- shutdown(workflowExecution, "", "", true)
- //return
- } else {
- log.Printf("[DEBUG][%s] NOT shutting down with dbSave (%s). Instead sending result to backend and start polling until subflow is updated", workflowExecution.ExecutionId, workflowExecution.ExecutionSource)
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling shutdowndata during dbSave handler: %s", err)
- }
- sendResult(workflowExecution, shutdownData)
- // Poll for 1 minute max if there is a "wait for results" subflow
- subflowId := ""
- for _, result := range workflowExecution.Results {
- if result.Status == "WAITING" {
- //log.Printf("[DEBUG][%s] Found waiting result", workflowExecution.ExecutionId)
- subflowId = result.Action.ID
- }
- }
- if len(subflowId) == 0 {
- log.Printf("[DEBUG][%s] No waiting result found. Not polling", workflowExecution.ExecutionId)
- for _, action := range workflowExecution.Workflow.Actions {
- if action.AppName == "User Input" || action.AppName == "Shuffle Workflow" || action.AppName == "shuffle-subflow" {
- workflowExecution.Workflow.Triggers = append(workflowExecution.Workflow.Triggers, shuffle.Trigger{
- AppName: action.AppName,
- Parameters: action.Parameters,
- ID: action.ID,
- })
- }
- }
- for _, trigger := range workflowExecution.Workflow.Triggers {
- //log.Printf("[DEBUG] Found trigger %s", trigger.AppName)
- if trigger.AppName != "User Input" && trigger.AppName != "Shuffle Workflow" && trigger.AppName != "shuffle-subflow" {
- continue
- }
- // check if it has wait for results in params
- wait := false
- for _, param := range trigger.Parameters {
- //log.Printf("[DEBUG] Found param %s with value %s", param.Name, param.Value)
- if param.Name == "check_result" && strings.ToLower(param.Value) == "true" {
- //log.Printf("[DEBUG][%s] Found check result param!", workflowExecution.ExecutionId)
- wait = true
- break
- }
- }
- if wait {
- // Check if it has a result or not
- found := false
- for _, result := range workflowExecution.Results {
- //log.Printf("[DEBUG][%s] Found result %s", workflowExecution.ExecutionId, result.Action.ID)
- if result.Action.ID == trigger.ID && result.Status != "SUCCESS" && result.Status != "FAILURE" {
- //log.Printf("[DEBUG][%s] Found subflow result that is not handled. Waiting for results", workflowExecution.ExecutionId)
- subflowId = result.Action.ID
- found = true
- break
- }
- }
- if !found {
- log.Printf("[DEBUG][%s] No result found for subflow. Setting subflowId to %s", workflowExecution.ExecutionId, trigger.ID)
- subflowId = trigger.ID
- }
- }
- if len(subflowId) > 0 {
- break
- }
- }
- }
- if len(subflowId) > 0 {
- // Under rerun period timeout
- timeComparison := 120
- log.Printf("[DEBUG][%s] Starting polling for %d seconds to see if new subflow updates are found on the backend that are not handled. Subflow ID: %s", workflowExecution.ExecutionId, timeComparison, subflowId)
- timestart := time.Now()
- streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
- for {
- err = handleSubflowPoller(ctx, workflowExecution, streamResultUrl, subflowId)
- if err == nil {
- log.Printf("[DEBUG] Subflow is finished and we are breaking the thingy")
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" && workflowExecution.ExecutionSource != "default" {
- log.Printf("[DEBUG] Force shutdown of worker due to optimized run with webserver. Expecting reruns to take care of this")
- os.Exit(0)
- }
- break
- }
- timepassed := time.Since(timestart)
- if timepassed.Seconds() > float64(timeComparison) {
- log.Printf("[DEBUG][%s] Max poll time reached to look for updates. Stopping poll. This poll is here to send personal results back to itself to be handled, then to stop this thread.", workflowExecution.ExecutionId)
- break
- }
- // Sleep for 1 second
- time.Sleep(1 * time.Second)
- }
- } else {
- log.Printf("[DEBUG][%s] No need to poll for results. Not polling", workflowExecution.ExecutionId)
- }
- }
- }
- return nil
- }
- // removes every container except itself (worker)
- func shutdown(workflowExecution shuffle.WorkflowExecution, nodeId string, reason string, handleResultSend bool) {
- log.Printf("[DEBUG][%s] Shutdown (%s) started with reason %#v. Result amount: %d. ResultsSent: %d, Send result: %#v, Parent: %#v", workflowExecution.ExecutionId, workflowExecution.Status, reason, len(workflowExecution.Results), requestsSent, handleResultSend, workflowExecution.ExecutionParent)
- // This is an escape hatch for development only
- // Typically meant to be used when you aren't sure how to make a workflow run in bad scenarios, and want to rapidly debug it.
- if shutdownDisabled == "true" {
- log.Printf("[ERROR] Shutdown disabled: NOT shutting down. This should ONLY be used for development & debugging.")
- os.Exit(3)
- }
- sleepDuration := 1
- if handleResultSend && requestsSent < 2 {
- shutdownData, err := json.Marshal(workflowExecution)
- if err == nil {
- sendResult(workflowExecution, shutdownData)
- //log.Printf("[WARNING][%s] Sent shutdown update with %d results and result value %s", workflowExecution.ExecutionId, len(workflowExecution.Results), reason)
- } else {
- log.Printf("[WARNING][%s] Failed to send update: %s", workflowExecution.ExecutionId, err)
- }
- time.Sleep(time.Duration(sleepDuration) * time.Second)
- }
- // Might not be necessary because of cleanupEnv hostconfig autoremoval
- if strings.ToLower(cleanupEnv) == "true" && (os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm") {
- /*
- ctx := context.Background()
- dockercli, err := dockerclient.NewEnvClient()
- if err == nil {
- log.Printf("[INFO] Cleaning up %d containers", len(containerIds))
- removeOptions := types.ContainerRemoveOptions{
- RemoveVolumes: true,
- Force: true,
- }
- for _, containername := range containerIds {
- log.Printf("[INFO] Should stop and and remove container %s (deprecated)", containername)
- //dockercli.ContainerStop(ctx, containername, nil)
- //dockercli.ContainerRemove(ctx, containername, removeOptions)
- //removeContainers = append(removeContainers, containername)
- }
- }
- */
- } else {
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- log.Printf("[DEBUG][%s] NOT cleaning up containers. IDS: %d, CLEANUP env: %s", workflowExecution.ExecutionId, 0, cleanupEnv)
- }
- /*** ENDREMOVE ***/
- }
- if len(reason) > 0 && len(nodeId) > 0 {
- //log.Printf("[INFO] Running abort of workflow because it should be finished")
- abortUrl := fmt.Sprintf("%s/api/v1/workflows/%s/executions/%s/abort", baseUrl, workflowExecution.Workflow.ID, workflowExecution.ExecutionId)
- path := fmt.Sprintf("?reason=%s", url.QueryEscape(reason))
- if len(nodeId) > 0 {
- path += fmt.Sprintf("&node=%s", url.QueryEscape(nodeId))
- }
- if len(environment) > 0 {
- path += fmt.Sprintf("&env=%s", url.QueryEscape(environment))
- }
- abortUrl += path
- log.Printf("[DEBUG][%s] Abort URL: %s", workflowExecution.ExecutionId, abortUrl)
- req, err := http.NewRequest(
- "GET",
- abortUrl,
- nil,
- )
- if err != nil {
- log.Printf("[WARNING][%s] Failed building request: %s", workflowExecution.ExecutionId, err)
- }
- // FIXME: Add an API call to the backend
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- authorization := os.Getenv("AUTHORIZATION")
- if len(authorization) > 0 {
- req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization))
- } else {
- log.Printf("[ERROR][%s] No authorization specified for abort", workflowExecution.ExecutionId)
- }
- } else {
- req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", workflowExecution.Authorization))
- }
- req.Header.Add("Content-Type", "application/json")
- //log.Printf("[DEBUG][%s] All App Logs: %#v", workflowExecution.ExecutionId, allLogs)
- client := shuffle.GetExternalClient(abortUrl)
- newresp, err := client.Do(req)
- if err != nil {
- log.Printf("[WARNING][%s] Failed abort request: %s", workflowExecution.ExecutionId, err)
- } else {
- defer newresp.Body.Close()
- }
- } else {
- //log.Printf("[INFO][%s] NOT running abort during shutdown.", workflowExecution.ExecutionId)
- }
- log.Printf("[DEBUG][%s] Finished shutdown (after %d seconds). ", workflowExecution.ExecutionId, sleepDuration)
- //Finished shutdown (after %d seconds). ", sleepDuration)
- // Allows everything to finish in subprocesses (apps)
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" && isKubernetes != "true" {
- time.Sleep(time.Duration(sleepDuration) * time.Second)
- os.Exit(3)
- } else {
- log.Printf("[DEBUG][%s] Sending result and resetting values (K8s & Swarm).", workflowExecution.ExecutionId)
- }
- }
- // ** STARTREMOVE ***/
- func deployk8sApp(image string, identifier string, env []string) error {
- if len(os.Getenv("KUBERNETES_NAMESPACE")) > 0 {
- kubernetesNamespace = os.Getenv("KUBERNETES_NAMESPACE")
- } else {
- kubernetesNamespace = "default"
- }
- ctx := context.Background()
- log.Printf("[DEBUG] Deploying k8s app with identifier %s to namespace %s", identifier, kubernetesNamespace)
- deployport, err := strconv.Atoi(os.Getenv("SHUFFLE_APP_EXPOSED_PORT"))
- if err != nil {
- deployport = 80
- }
- envMap := make(map[string]string)
- for _, envStr := range env {
- parts := strings.SplitN(envStr, "=", 2)
- if len(parts) == 2 {
- envMap[parts[0]] = parts[1]
- }
- }
- // add to env
- envMap["SHUFFLE_APP_EXPOSED_PORT"] = strconv.Itoa(deployport)
- envMap["SHUFFLE_SWARM_CONFIG"] = os.Getenv("SHUFFLE_SWARM_CONFIG")
- envMap["BASE_URL"] = "http://shuffle-workers:33333"
- if len(os.Getenv("SHUFFLE_LOGS_DISABLED")) > 0 {
- envMap["SHUFFLE_LOGS_DISABLED"] = os.Getenv("SHUFFLE_LOGS_DISABLED")
- }
- clientset, _, err := shuffle.GetKubernetesClient()
- if err != nil {
- log.Printf("[ERROR] Failed getting kubernetes: %s", err)
- return err
- }
- // str := strings.ToLower(identifier)
- // strSplit := strings.Split(str, "_")
- // value := strSplit[0]
- // value = strings.ReplaceAll(value, "_", "-")
- value := identifier
- baseDeployMode := false
- // check if autoDeploy contains a value
- // that is equal to the image being deployed.
- for _, value := range autoDeploy {
- if value == image {
- baseDeployMode = true
- }
- }
- autoDeployOverride := os.Getenv("SHUFFLE_USE_GHCR_OVERRIDE_FOR_AUTODEPLOY") == "true"
- localRegistry := ""
- // Checking if app is generated or not
- if !(baseDeployMode && autoDeployOverride) {
- localRegistry = os.Getenv("REGISTRY_URL")
- } else {
- log.Printf("[DEBUG] Detected baseDeploy image (%s) and ghcr override. Resorting to using ghcr instead of registry", image)
- }
- /*
- appDetails := strings.Split(image, ":")[1]
- appDetailsSplit := strings.Split(appDetails, "_")
- appName := strings.Join(appDetailsSplit[:len(appDetailsSplit)-1], "_")
- appVersion := appDetailsSplit[len(appDetailsSplit)-1]
- for _, app := range workflowExecution.Workflow.Actions {
- // log.Printf("[DEBUG] App: %s, Version: %s", appName, appVersion)
- // log.Printf("[DEBUG] Checking app %s with version %s", app.AppName, app.AppVersion)
- if app.AppName == appName && app.AppVersion == appVersion {
- if app.Generated == true {
- log.Printf("[DEBUG] Generated app, setting local registry")
- image = fmt.Sprintf("%s/%s", localRegistry, image)
- break
- } else {
- log.Printf("[DEBUG] Not generated app, setting shuffle registry")
- }
- }
- }
- */
- if (len(localRegistry) == 0 && len(os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")) > 0) && !(baseDeployMode && autoDeployOverride) {
- localRegistry = os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")
- }
- if (len(localRegistry) > 0 && strings.Count(image, "/") <= 2) && !(baseDeployMode && autoDeployOverride) {
- log.Printf("[DEBUG] Using REGISTRY_URL %s", localRegistry)
- image = fmt.Sprintf("%s/%s", localRegistry, image)
- } else {
- if strings.Count(image, "/") <= 2 && !strings.HasPrefix(image, "frikky/shuffle:") {
- image = fmt.Sprintf("frikky/shuffle:%s", image)
- }
- }
- log.Printf("[DEBUG] Got kubernetes with namespace %#v to run image '%s'", kubernetesNamespace, image)
- //fix naming convention
- // podUuid := uuid.NewV4().String()
- // name := fmt.Sprintf("%s-%s", value, podUuid)
- // replace identifier "_" with "-"
- name := strings.ReplaceAll(identifier, "_", "-")
- labels := map[string]string{
- // Well-known Kubernetes labels
- "app.kubernetes.io/name": "shuffle-app",
- "app.kubernetes.io/instance": name,
- "app.kubernetes.io/part-of": "shuffle",
- "app.kubernetes.io/managed-by": "shuffle-worker",
- // Keep legacy labels for backward compatibility
- "app": name,
- // TODO: Add Shuffle specific labels
- // "app.shuffler.io/name": "APP_NAME",
- // "app.shuffler.io/version": "APP_VERSION",
- }
- matchLabels := map[string]string{
- "app.kubernetes.io/name": "shuffle-app",
- "app.kubernetes.io/instance": name,
- }
- // Parse security contexts from env
- var podSecurityContext *corev1.PodSecurityContext
- var containerSecurityContext *corev1.SecurityContext
- if len(appPodSecurityContext) > 0 {
- podSecurityContext = &corev1.PodSecurityContext{}
- err = json.Unmarshal([]byte(appPodSecurityContext), podSecurityContext)
- if err != nil {
- log.Printf("[ERROR] Failed to unmarshal app pod security context: %v", err)
- return fmt.Errorf("failed to unmarshal app pod security context: %v", err)
- }
- }
- if len(appContainerSecurityContext) > 0 {
- containerSecurityContext = &corev1.SecurityContext{}
- err = json.Unmarshal([]byte(appContainerSecurityContext), containerSecurityContext)
- if err != nil {
- log.Printf("[ERROR] Failed to unmarshal app container security context: %v", err)
- return fmt.Errorf("failed to unmarshal app container security context: %v", err)
- }
- }
- // pod := &corev1.Pod{
- // ObjectMeta: metav1.ObjectMeta{
- // Name: podName,
- // Labels: map[string]string{
- // "app": podName,
- // // "executionId": workflowExecution.ExecutionId,
- // },
- // },
- // Spec: corev1.PodSpec{
- // RestartPolicy: "Never", // As a crash is not useful in this context
- // // DNSPolicy: "Default",
- // DNSPolicy: corev1.DNSClusterFirst,
- // // NodeName: "worker1"
- // Containers: []corev1.Container{
- // {
- // Name: value,
- // Image: image,
- // Env: buildEnvVars(envMap),
- // // Pull if not available
- // ImagePullPolicy: corev1.PullIfNotPresent,
- // },
- // },
- // },
- // }
- // createdPod, err := clientset.CoreV1().Pods(kubernetesNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
- // if err != nil {
- // log.Printf("[ERROR] Failed creating pod: %v", err)
- // // os.Exit(1)
- // } else {
- // log.Printf("[DEBUG] Created pod %#v in namespace %#v", createdPod.Name, kubernetesNamespace)
- // }
- // service := &corev1.Service{
- // ObjectMeta: metav1.ObjectMeta{
- // Name: identifier,
- // },
- // Spec: corev1.ServiceSpec{
- // Selector: map[string]string{
- // "app": podName,
- // },
- // Ports: []corev1.ServicePort{
- // {
- // Protocol: "TCP",
- // Port: 80,
- // TargetPort: intstr.FromInt(80),
- // },
- // },
- // Type: corev1.ServiceTypeNodePort,
- // },
- // }
- // _, err = clientset.CoreV1().Services(kubernetesNamespace).Create(context.TODO(), service, metav1.CreateOptions{})
- // if err != nil {
- // log.Printf("[ERROR] Failed creating service: %v", err)
- // return err
- // }
- // use deployment instead of pod
- // then expose a service similarly.
- // number of replicas can be set to os.Getenv("SHUFFLE_APP_REPLICAS")
- replicaNumber := 1
- replicaNumberStr := os.Getenv("SHUFFLE_APP_REPLICAS")
- if len(replicaNumberStr) > 0 {
- tmpInt, err := strconv.Atoi(replicaNumberStr)
- if err != nil {
- log.Printf("[ERROR] %s is not a valid number for replication", replicaNumberStr)
- } else {
- replicaNumber = tmpInt
- }
- }
- existing, err := clientset.AppsV1().Deployments(kubernetesNamespace).List(
- ctx,
- metav1.ListOptions{
- LabelSelector: fmt.Sprintf("app: %s", name),
- },
- )
- if err != nil {
- log.Printf("[ERROR] Failed listing existing deployments: %v", err)
- }
- if len(existing.Items) > 0 {
- log.Printf("[INFO] Found existing deployments, skipping creation")
- return nil
- }
- replicaNumberInt32 := int32(replicaNumber)
- // apps do not need access the k8s api.
- automountServiceAccountToken := false
- deployment := &appsv1.Deployment{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Labels: labels,
- },
- Spec: appsv1.DeploymentSpec{
- Replicas: &replicaNumberInt32,
- Selector: &metav1.LabelSelector{
- MatchLabels: matchLabels,
- },
- Template: corev1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: labels,
- },
- Spec: corev1.PodSpec{
- Containers: []corev1.Container{
- {
- Name: value,
- Image: image,
- Env: buildEnvVars(envMap),
- Ports: []corev1.ContainerPort{
- {
- Protocol: "TCP",
- ContainerPort: int32(deployport),
- },
- },
- SecurityContext: containerSecurityContext,
- Resources: buildResourcesFromEnv(),
- },
- },
- DNSPolicy: corev1.DNSClusterFirst,
- ServiceAccountName: appServiceAccountName,
- AutomountServiceAccountToken: &automountServiceAccountToken,
- SecurityContext: podSecurityContext,
- },
- },
- },
- }
- if os.Getenv("SHUFFLE_APP_MOUNT_TMP_VOLUME") == "true" {
- deployment.Spec.Template.Spec.Volumes = append(
- deployment.Spec.Template.Spec.Volumes,
- corev1.Volume{
- Name: "tmp",
- VolumeSource: corev1.VolumeSource{
- EmptyDir: &corev1.EmptyDirVolumeSource{},
- },
- },
- )
- deployment.Spec.Template.Spec.Containers[0].VolumeMounts = append(
- deployment.Spec.Template.Spec.Containers[0].VolumeMounts,
- corev1.VolumeMount{
- Name: "tmp",
- ReadOnly: false,
- MountPath: "/tmp",
- },
- )
- }
- if len(os.Getenv("REGISTRY_URL")) > 0 && len(os.Getenv("SHUFFLE_BASE_IMAGE_NAME")) > 0 {
- log.Printf("[INFO] Setting image pull policy to Always as private registry is used.")
- //containerAttachment.ImagePullPolicy = corev1.PullAlways
- deployment.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullAlways
- } else {
- deployment.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
- }
- _, err = clientset.AppsV1().Deployments(kubernetesNamespace).Create(context.Background(), deployment, metav1.CreateOptions{})
- if err != nil {
- log.Printf("[ERROR] Failed creating deployment: %v", err)
- return err
- }
- svcAppProtocol := "http"
- service := &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Labels: labels,
- },
- Spec: corev1.ServiceSpec{
- Selector: matchLabels,
- Ports: []corev1.ServicePort{
- {
- Protocol: "TCP",
- AppProtocol: &svcAppProtocol,
- Port: 80,
- TargetPort: intstr.FromInt(deployport),
- },
- },
- Type: corev1.ServiceTypeClusterIP,
- },
- }
- _, err = clientset.CoreV1().Services(kubernetesNamespace).Create(context.TODO(), service, metav1.CreateOptions{})
- if err != nil {
- log.Printf("[ERROR] Failed creating service: %v", err)
- return err
- }
- // Giving the service time to start before we contineu anything
- log.Printf("[DEBUG] Waiting 20 seconds before moving on to let app '%s' start properly. Service: %s (k8s)", name, image)
- time.Sleep(20 * time.Second)
- return nil
- }
- //** ENDREMOVE ***/
- // Deploys the internal worker whenever something happens
- func deployApp(cli *dockerclient.Client, image string, identifier string, env []string, workflowExecution shuffle.WorkflowExecution, action shuffle.Action) error {
- // if isKubernetes == "true" {
- // if len(os.Getenv("KUBERNETES_NAMESPACE")) > 0 {
- // kubernetesNamespace = os.Getenv("KUBERNETES_NAMESPACE")
- // } else {
- // kubernetesNamespace = "default"
- // }
- // envMap := make(map[string]string)
- // for _, envStr := range env {
- // parts := strings.SplitN(envStr, "=", 2)
- // if len(parts) == 2 {
- // envMap[parts[0]] = parts[1]
- // }
- // }
- // clientset, _, err := shuffle.GetKubernetesClient()
- // if err != nil {
- // log.Printf("[ERROR] Failed getting kubernetes: %s", err)
- // return err
- // }
- // str := strings.ToLower(identifier)
- // strSplit := strings.Split(str, "_")
- // value := strSplit[0]
- // value = strings.ReplaceAll(value, "_", "-")
- // // Checking if app is generated or not
- // localRegistry := os.Getenv("REGISTRY_URL")
- // /*
- // appDetails := strings.Split(image, ":")[1]
- // appDetailsSplit := strings.Split(appDetails, "_")
- // appName := strings.Join(appDetailsSplit[:len(appDetailsSplit)-1], "_")
- // appVersion := appDetailsSplit[len(appDetailsSplit)-1]
- // for _, app := range workflowExecution.Workflow.Actions {
- // // log.Printf("[DEBUG] App: %s, Version: %s", appName, appVersion)
- // // log.Printf("[DEBUG] Checking app %s with version %s", app.AppName, app.AppVersion)
- // if app.AppName == appName && app.AppVersion == appVersion {
- // if app.Generated == true {
- // log.Printf("[DEBUG] Generated app, setting local registry")
- // image = fmt.Sprintf("%s/%s", localRegistry, image)
- // break
- // } else {
- // log.Printf("[DEBUG] Not generated app, setting shuffle registry")
- // }
- // }
- // }
- // */
- // if len(localRegistry) == 0 && len(os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")) > 0 {
- // localRegistry = os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")
- // }
- // if len(localRegistry) > 0 && strings.Count(image, "/") <= 2 {
- // log.Printf("[DEBUG] Using REGISTRY_URL %s", localRegistry)
- // image = fmt.Sprintf("%s/%s", localRegistry, image)
- // } else {
- // if strings.Count(image, "/") <= 2 {
- // image = fmt.Sprintf("frikky/shuffle:%s", image)
- // }
- // }
- // log.Printf("[DEBUG] Got kubernetes with namespace %#v to run image '%s'", kubernetesNamespace, image)
- // //fix naming convention
- // podUuid := uuid.NewV4().String()
- // podName := fmt.Sprintf("%s-%s", value, podUuid)
- // pod := &corev1.Pod{
- // ObjectMeta: metav1.ObjectMeta{
- // Name: podName,
- // Labels: map[string]string{
- // "app": "shuffle-app",
- // "executionId": workflowExecution.ExecutionId,
- // },
- // },
- // Spec: corev1.PodSpec{
- // RestartPolicy: "Never", // As a crash is not useful in this context
- // // DNSPolicy: "Default",
- // DNSPolicy: corev1.DNSClusterFirst,
- // // NodeName: "worker1"
- // Containers: []corev1.Container{
- // {
- // Name: value,
- // Image: image,
- // Env: buildEnvVars(envMap),
- // // Pull if not available
- // ImagePullPolicy: corev1.PullIfNotPresent,
- // },
- // },
- // },
- // }
- // createdPod, err := clientset.CoreV1().Pods(kubernetesNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
- // if err != nil {
- // log.Printf("[ERROR] Failed creating pod: %v", err)
- // // os.Exit(1)
- // } else {
- // log.Printf("[DEBUG] Created pod %#v in namespace %#v", createdPod.Name, kubernetesNamespace)
- // }
- // return nil
- // }
- // form basic hostConfig
- ctx := context.Background()
- // Check action if subflow
- // Check if url is default (shuffle-backend)
- // If it doesn't exist, add it
- // FIXME: This does NOT replace it in all cases as the data
- // is not saved in the database as the correct param.
- if action.AppName == "shuffle-subflow" {
- // Automatic replacement of URL
- for paramIndex, param := range action.Parameters {
- if param.Name != "backend_url" {
- continue
- }
- if !strings.Contains(param.Value, "shuffle-backend") {
- continue
- }
- // Automatic replacement as this is default
- if len(os.Getenv("BASE_URL")) > 0 {
- action.Parameters[paramIndex].Value = os.Getenv("BASE_URL")
- log.Printf("[DEBUG][%s] Replaced backend_url with base_url %s", workflowExecution.ExecutionId, os.Getenv("BASE_URL"))
- }
- if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 {
- action.Parameters[paramIndex].Value = os.Getenv("SHUFFLE_CLOUDRUN_URL")
- log.Printf("[DEBUG][%s] Replaced backend_url with cloudrun %s", workflowExecution.ExecutionId, os.Getenv("SHUFFLE_CLOUDRUN_URL"))
- }
- }
- }
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
- appName := strings.Replace(identifier, fmt.Sprintf("_%s", action.ID), "", -1)
- appName = strings.Replace(appName, fmt.Sprintf("_%s", workflowExecution.ExecutionId), "", -1)
- appName = strings.ToLower(appName)
- //log.Printf("[INFO][%s] New appname: %s, image: %s", workflowExecution.ExecutionId, appName, image)
- if !shuffle.ArrayContains(downloadedImages, image) && isKubernetes != "true" {
- log.Printf("[DEBUG] Downloading image %s from backend as it's first iteration for this image on the worker. Timeout: 60", image)
- // FIXME: Not caring if it's ok or not. Just continuing
- // This is working as intended, just designed to download an updated
- // image on every Orborus/new worker restart.
- // Running as coroutine for eventual completeness
- // FIXME: With goroutines it got too much trouble of deploying with an older version
- // Allowing slow startups, as long as it's eventually fast, and uses the same registry as on host.
- err := shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, image)
- if err == nil {
- downloadedImages = append(downloadedImages, image)
- }
- }
- var exposedPort int
- var err error
- if isKubernetes != "true" {
- exposedPort, err = findAppInfo(image, appName, false)
- if err != nil {
- log.Printf("[ERROR] Failed finding and creating port for %s: %s", appName, err)
- return err
- }
- } else {
- // ** STARTREMOVE ***/
- exposedPort = 80
- //deployport, err := strconv.Atoi(os.Getenv("SHUFFLE_APP_EXPOSED_PORT"))
- //if err == nil {
- // exposedPort = deployport
- //}
- err = findAppInfoKubernetes(image, appName, env)
- if err != nil {
- log.Printf("[ERROR] Failed finding and creating port for %s: %s", appName, err)
- return err
- }
- // ** ENDREMOVE ***/
- }
- /*
- // Makes it not run at all.
- cacheData := []byte("1")
- newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
- err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
- if err != nil {
- log.Printf("[WARNING] (1) Failed setting cache for action %s: %s", newExecId, err)
- } else {
- log.Printf("[DEBUG][%s] (1) Adding %s to cache (%#v)", workflowExecution.ExecutionId, newExecId, action.Name)
- }
- */
- log.Printf("[DEBUG][%s] Should run towards port %d for app %s. DELAY: %d", workflowExecution.ExecutionId, exposedPort, appName, action.ExecutionDelay)
- ctx := context.Background()
- if action.ExecutionDelay > 0 {
- //log.Printf("[DEBUG] Running app %s with delay of %d", action.Name, action.ExecutionDelay)
- waitTime := time.Duration(action.ExecutionDelay) * time.Second
- time.AfterFunc(waitTime, func() {
- err = sendAppRequest(ctx, baseUrl, appName, exposedPort, &action, &workflowExecution, image, 0)
- if err != nil {
- log.Printf("[ERROR] Failed sending SCHEDULED request to app %s on port %d: %s", appName, exposedPort, err)
- }
- })
- } else {
- rand.Seed(time.Now().UnixNano())
- waitTime := time.Duration(rand.Intn(500)) * time.Millisecond
- // Added a random delay + context timeout to ensure that the function returns, and only once
- time.AfterFunc(waitTime, func() {
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel() // Cancel the context to release resources even if not used
- go sendAppRequest(ctx, baseUrl, appName, exposedPort, &action, &workflowExecution, image, 0)
- })
- }
- return nil
- }
- /*** ENDREMOVE ***/
- // Max 10% CPU every second
- //CPUShares: 128,
- //CPUQuota: 10000,
- //CPUPeriod: 100000,
- hostConfig := &container.HostConfig{
- LogConfig: container.LogConfig{
- Type: "json-file",
- Config: map[string]string{
- "max-size": "10m",
- },
- },
- Resources: container.Resources{},
- }
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId))
- //log.Printf("Environments: %#v", env)
- }
- // Removing because log extraction should happen first
- if strings.ToLower(cleanupEnv) == "true" {
- hostConfig.AutoRemove = true
- }
- // Get environment for certificates
- volumeBinds := []string{}
- volumeBindString := os.Getenv("SHUFFLE_VOLUME_BINDS")
- if len(volumeBindString) > 0 {
- volumeBindSplit := strings.Split(volumeBindString, ",")
- for _, volumeBind := range volumeBindSplit {
- if volumeBind == "srcfolder=dstfolder" || volumeBind == "srcfolder:dstfolder" || volumeBind == "/srcfolder:/dstfolder" {
- log.Printf("[DEBUG] Volume bind '%s' is invalid.", volumeBind)
- continue
- }
- if !strings.HasPrefix(volumeBind, "/") {
- log.Printf("[ERROR] Volume bind '%s' is invalid. Use absolute paths.", volumeBind)
- continue
- }
- if !strings.Contains(volumeBind, ":") {
- log.Printf("[ERROR] Volume bind '%s' is invalid. Use absolute paths with colon inbetween them (/srcpath:dstpath/", volumeBind)
- continue
- }
- volumeBinds = append(volumeBinds, volumeBind)
- }
- }
- // Add more volume binds if possible
- if len(volumeBinds) > 0 {
- // Only use mounts, not direct binds
- hostConfig.Binds = []string{}
- hostConfig.Mounts = []mount.Mount{}
- for _, bind := range volumeBinds {
- if !strings.Contains(bind, ":") || strings.Contains(bind, "..") || strings.HasPrefix(bind, "~") {
- log.Printf("[ERROR] Volume bind '%s' is invalid. Use absolute paths.", bind)
- continue
- }
- log.Printf("[DEBUG] Appending bind %s to App container", bind)
- bindSplit := strings.Split(bind, ":")
- sourceFolder := bindSplit[0]
- destinationFolder := bindSplit[1]
- readOnly := false
- if len(bindSplit) > 2 {
- mode := bindSplit[2]
- if mode == "ro" {
- readOnly = true
- }
- }
- builtMount := mount.Mount{
- Type: mount.TypeBind,
- Source: sourceFolder,
- Target: destinationFolder,
- ReadOnly: readOnly,
- }
- hostConfig.Mounts = append(hostConfig.Mounts, builtMount)
- }
- }
- config := &container.Config{
- Image: image,
- Env: env,
- }
- //log.Printf("[DEBUG] Deploying image with env: %#v", env)
- // Checking as late as possible, just in case.
- newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
- _, err := shuffle.GetCache(ctx, newExecId)
- if err == nil {
- log.Printf("[DEBUG][%s] Result for action %s already found - returning", newExecId, action.ID)
- return nil
- }
- cacheData := []byte("1")
- err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
- if err != nil {
- //log.Printf("[WARNING][%s] Failed setting cache for action: %s", newExecId, err)
- } else {
- //log.Printf("[DEBUG][%s] Adding to cache. Name: %s", workflowExecution.ExecutionId, action.Name)
- }
- if action.ExecutionDelay > 0 {
- log.Printf("[DEBUG][%s] Running app '%s' with label '%s' in docker with delay of %d", workflowExecution.ExecutionId, action.AppName, action.Label, action.ExecutionDelay)
- waitTime := time.Duration(action.ExecutionDelay) * time.Second
- time.AfterFunc(waitTime, func() {
- DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId)
- })
- } else {
- log.Printf("[DEBUG][%s] Running app %s in docker NORMALLY as there is no delay set with identifier %s", workflowExecution.ExecutionId, action.Name, identifier)
- returnvalue := DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId)
- //log.Printf("[DEBUG][%s] Normal deploy ret: %s", workflowExecution.ExecutionId, returnvalue)
- return returnvalue
- }
- return nil
- }
- func cleanupKubernetesExecution(clientset *kubernetes.Clientset, workflowExecution shuffle.WorkflowExecution, namespace string) error {
- // workerName := fmt.Sprintf("worker-%s", workflowExecution.ExecutionId)
- // FIXME: The executionId label is currently not set
- labelSelector := fmt.Sprintf("app.kubernetes.io/name=shuffle-app,executionId=%s", workflowExecution.ExecutionId)
- podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
- LabelSelector: labelSelector,
- })
- if err != nil {
- return fmt.Errorf("[ERROR] Failed to list apps with label selector %s: %#vv", labelSelector, err)
- }
- for _, pod := range podList.Items {
- err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
- if err != nil {
- return fmt.Errorf("failed to delete app %s: %v", pod.Name, err)
- }
- log.Printf("App %s in namespace %s deleted.", pod.Name, namespace)
- }
- // podErr := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), workerName, metav1.DeleteOptions{})
- // if podErr != nil {
- // return fmt.Errorf("[ERROR] failed to delete the worker %s in namespace %s: %v", workerName, namespace, podErr)
- // }
- // log.Printf("[DEBUG] %s in namespace %s deleted.", workerName, namespace)
- return nil
- }
- func DeployContainer(ctx context.Context, cli *dockerclient.Client, config *container.Config, hostConfig *container.HostConfig, identifier string, workflowExecution shuffle.WorkflowExecution, actionExecId string) error {
- cont, err := cli.ContainerCreate(
- ctx,
- config,
- hostConfig,
- nil,
- nil,
- identifier,
- )
- //log.Printf("[DEBUG] config set: %#v", config)
- if err != nil {
- //log.Printf("[ERROR] Failed creating container: %s", err)
- if !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[ERROR] Container CREATE error (1): %s", err)
- cacheErr := shuffle.DeleteCache(ctx, actionExecId)
- if cacheErr != nil {
- log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
- }
- return err
- } else {
- parsedUuid := uuid.NewV4()
- identifier = fmt.Sprintf("%s-%s", identifier, parsedUuid)
- //hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId))
- log.Printf("[DEBUG] 2 - Identifier: %s", identifier)
- cont, err = cli.ContainerCreate(
- context.Background(),
- config,
- hostConfig,
- nil,
- nil,
- identifier,
- )
- if err != nil {
- log.Printf("[ERROR] Container create error (2): %s", err)
- cacheErr := shuffle.DeleteCache(ctx, actionExecId)
- if cacheErr != nil {
- log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
- }
- return err
- }
- //log.Printf("[DEBUG] Made new container ID
- }
- }
- err = cli.ContainerStart(ctx, cont.ID, container.StartOptions{})
- if err != nil {
- if strings.Contains(fmt.Sprintf("%s", err), "cannot join network") || strings.Contains(fmt.Sprintf("%s", err), "No such container") {
- // Remove the "CREATED" one from the previous if possible
- go cli.ContainerRemove(ctx, cont.ID, container.RemoveOptions{})
- log.Printf("[WARNING] Failed deploying App on first attempt: %s. Removing some HostConfig configs.", err)
- parsedUuid := uuid.NewV4()
- identifier = fmt.Sprintf("%s-%s-nonetwork", identifier, parsedUuid)
- hostConfig.NetworkMode = container.NetworkMode("")
- hostConfig.LogConfig = container.LogConfig{
- Type: "json-file",
- Config: map[string]string{
- "max-size": "10m",
- },
- }
- hostConfig.Resources = container.Resources{}
- cont, err = cli.ContainerCreate(
- context.Background(),
- config,
- hostConfig,
- nil,
- nil,
- identifier,
- )
- if err != nil {
- log.Printf("[ERROR] Container create error (3): %s", err)
- cacheErr := shuffle.DeleteCache(ctx, actionExecId)
- if cacheErr != nil {
- log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
- }
- return err
- }
- //log.Printf("[DEBUG] Running secondary check without network with worker")
- err = cli.ContainerStart(ctx, cont.ID, container.StartOptions{})
- }
- if err != nil {
- log.Printf("[ERROR] Failed to start container (2) in runtime location %s: %s", environment, err)
- cacheErr := shuffle.DeleteCache(ctx, actionExecId)
- if cacheErr != nil {
- log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
- }
- //shutdown(workflowExecution, workflowExecution.Workflow.ID, true)
- return err
- }
- }
- log.Printf("[DEBUG][%s] Container %s was created for %s", workflowExecution.ExecutionId, cont.ID, identifier)
- // Waiting to see if it exits.. Stupid, but stable(r)
- if workflowExecution.ExecutionSource != "default" {
- log.Printf("[INFO][%s] Handling NON-default execution source %s - NOT waiting or validating!", workflowExecution.ExecutionId, workflowExecution.ExecutionSource)
- } else if workflowExecution.ExecutionSource == "default" {
- log.Printf("[INFO][%s] Handling DEFAULT execution source %s - SKIPPING wait anyway due to exited issues!", workflowExecution.ExecutionId, workflowExecution.ExecutionSource)
- }
- //log.Printf("[DEBUG] Deployed container ID %s", cont.ID)
- //containerIds = append(containerIds, cont.ID)
- return nil
- }
- func removeContainer(containername string) error {
- ctx := context.Background()
- // cli, err := dockerclient.NewEnvClient()
- cli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[DEBUG] Unable to create docker client: %s", err)
- return err
- }
- defer cli.Close()
- // FIXME - ucnomment
- // containers, err := cli.ContainerList(ctx, types.ContainerListOptions{
- // All: true,
- // })
- _ = ctx
- _ = cli
- //if err := cli.ContainerStop(ctx, containername, nil); err != nil {
- // log.Printf("Unable to stop container %s - running removal anyway, just in case: %s", containername, err)
- //}
- removeOptions := container.RemoveOptions{
- RemoveVolumes: true,
- Force: true,
- }
- // FIXME - remove comments etc
- _ = removeOptions
- //if err := cli.ContainerRemove(ctx, containername, removeOptions); err != nil {
- // log.Printf("Unable to remove container: %s", err)
- //}
- return nil
- }
- func runFilter(workflowExecution shuffle.WorkflowExecution, action shuffle.Action) {
- // 1. Get the parameter $.#.id
- if action.Label == "filter_cases" && len(action.Parameters) > 0 {
- if action.Parameters[0].Variant == "ACTION_RESULT" {
- param := action.Parameters[0]
- value := param.Value
- _ = value
- // Loop cases.. Hmm, that's tricky
- }
- } else {
- log.Printf("No handler for filter %s with %d params", action.Label, len(action.Parameters))
- }
- }
- func removeIndex(s []string, i int) []string {
- s[len(s)-1], s[i] = s[i], s[len(s)-1]
- return s[:len(s)-1]
- }
- func getWorkerURLs() ([]string, error) {
- workerUrls := []string{}
- if isKubernetes == "true" {
- workerUrls = append(workerUrls, "http://shuffle-workers:33333")
- // workerUrls = append(workerUrls, "http://192.168.29.16:33333")
- // get service "shuffle-workers" "Endpoints"
- // serviceName := "shuffle-workers"
- // clientset, _, err := shuffle.GetKubernetesClient()
- // if err != nil {
- // log.Println("[ERROR] Failed to get Kubernetes client:", err)
- // return workerUrls, err
- // }
- // services, err := clientset.CoreV1().Services("default").List(context.Background(), metav1.ListOptions{})
- // if err != nil {
- // log.Println("[ERROR] Failed to list services:", err)
- // return workerUrls, err
- // }
- // for _, service := range services.Items {
- // if service.Name == serviceName {
- // endpoints, err := clientset.CoreV1().Endpoints("default").Get(context.Background(), serviceName, metav1.GetOptions{})
- // if err != nil {
- // log.Println("[ERROR] Failed to get endpoints for service:", err)
- // return workerUrls, err
- // }
- // for _, subset := range endpoints.Subsets {
- // for _, address := range subset.Addresses {
- // for _, port := range subset.Ports {
- // url := fmt.Sprintf("http://%s:%d", address.IP, port.Port)
- // workerUrls = append(workerUrls, url)
- // }
- // }
- // }
- // }
- // }
- //log.Printf("[DEBUG] Worker URLs for k8s: %#v", workerUrls)
- return workerUrls, nil
- }
- // Create a new Docker client
- // cli, err := dockerclient.NewEnvClient()
- cli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Println("[ERROR] Failed to create Docker client:", err)
- return workerUrls, err
- }
- defer cli.Close()
- // Specify the name of the service for which you want to list tasks
- serviceName := "shuffle-workers"
- // Get the list of tasks for the service
- tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
- Filters: filters.NewArgs(filters.Arg("service", serviceName)),
- })
- if err != nil {
- log.Println("[ERROR] Failed to list tasks for service:", err)
- return workerUrls, err
- }
- // Print task information
- for _, task := range tasks {
- url := fmt.Sprintf("http://%s.%d.%s:33333", serviceName, task.Slot, task.ID)
- workerUrls = append(workerUrls, url)
- }
- return workerUrls, nil
- }
- func askOtherWorkersToDownloadImage(image string) {
- // Why wouldn't it happen on swarm? Hmm
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- return
- }
- // Check environment SHUFFLE_AUTO_IMAGE_DOWNLOAD
- if os.Getenv("SHUFFLE_AUTO_IMAGE_DOWNLOAD") == "false" {
- log.Printf("[DEBUG] SHUFFLE_AUTO_IMAGE_DOWNLOAD is false. NOT distributing images %s", image)
- return
- }
- if shuffle.ArrayContains(imagesDistributed, image) {
- return
- }
- urls, err := getWorkerURLs()
- if err != nil {
- log.Printf("[ERROR] Error in listing worker urls: %s", err)
- return
- }
- if len(urls) < 2 {
- return
- }
- httpClient := &http.Client{}
- distributed := false
- for _, url := range urls {
- //log.Printf("[DEBUG] Trying to speak to: %s", url)
- imagesRequest := ImageRequest{
- Image: image,
- }
- url = fmt.Sprintf("%s/api/v1/download", url)
- //log.Printf("[INFO] Making a request to %s to download images", url)
- imageJSON, err := json.Marshal(imagesRequest)
- req, err := http.NewRequest(
- "POST",
- url,
- bytes.NewBuffer(imageJSON),
- )
- if err != nil {
- log.Printf("[ERROR] Error in making request to %s : %s", url, err)
- continue
- }
- resp, err := httpClient.Do(req)
- if err != nil {
- log.Printf("[ERROR] Error in making request to %s : %s", url, err)
- continue
- }
- defer resp.Body.Close()
- respBody, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- log.Printf("[ERROR] Error in reading response body : %s", err)
- continue
- }
- log.Printf("[INFO] Response body when tried sending images for nodes to download: %s", respBody)
- distributed = true
- }
- if distributed {
- imagesDistributed = append(imagesDistributed, image)
- }
- }
- func handleExecutionResult(workflowExecution shuffle.WorkflowExecution) {
- ctx := context.Background()
- workflowExecution, relevantActions := shuffle.DecideExecution(ctx, workflowExecution, environment)
- if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "FAILURE" || workflowExecution.Status == "ABORTED" {
- log.Printf("[DEBUG][%s] Shutting down because status is %s", workflowExecution.ExecutionId, workflowExecution.Status)
- shutdown(workflowExecution, "", "Workflow run is already finished", true)
- return
- }
- startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
- var dockercli *dockerclient.Client
- var err error
- if isKubernetes != "true" {
- // dockercli, err = dockerclient.NewEnvClient()
- dockercli, _, err = shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (3): %s", err)
- return
- }
- defer dockercli.Close()
- }
- for _, action := range relevantActions {
- appname := action.AppName
- appversion := action.AppVersion
- appname = strings.Replace(appname, ".", "-", -1)
- appversion = strings.Replace(appversion, ".", "-", -1)
- action, _ = singul.HandleSingulStartnode(workflowExecution, action, []string{})
- parsedAppname := strings.Replace(strings.ToLower(action.AppName), " ", "-", -1)
- // if strings.ToLower(parsedAppname) == "singul" {
- // parsedAppname = "shuffle-ai"
- // appversion = "1.1.0"
- // appname = "shuffle-ai"
- // }
- if parsedAppname == "ai-agent" {
- parsedAppname = "shuffle-ai"
- appversion = "1.1.0"
- appname = "shuffle-ai"
- action.AppVersion = "1.1.0"
- action.AppName = "shuffle-ai"
- action.Name = "run_agent"
- inputParamValue := ""
- allowedActions := ""
- for _, param := range action.Parameters {
- if strings.ToLower(param.Name) == "input" {
- inputParamValue = param.Value
- } else if strings.ToLower(param.Name) == "action" {
- param.Value = strings.ReplaceAll("Nothing,", " ", "")
- param.Value = strings.ReplaceAll("Nothing", " ", "")
- allowedActions = param.Value
- }
- }
- // Rewriting them
- action.Parameters = []shuffle.WorkflowAppActionParameter{
- shuffle.WorkflowAppActionParameter{
- Name: "input_data",
- Value: inputParamValue,
- },
- shuffle.WorkflowAppActionParameter{
- Name: "actions",
- Value: allowedActions,
- },
- }
- }
- imageName := fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion)
- if strings.Contains(imageName, " ") {
- imageName = strings.ReplaceAll(imageName, " ", "-")
- }
- // Kubernetes specific.
- // Should it be though?
- if isKubernetes == "true" {
- // Map it to:
- // <registry>/baseimagename/<appname>:<appversion>
- localRegistry := os.Getenv("REGISTRY_URL")
- if len(localRegistry) > 0 && len(baseimagename) > 0 {
- newImageName := fmt.Sprintf("%s/%s/%s:%s", localRegistry, baseimagename, parsedAppname, action.AppVersion)
- log.Printf("[INFO] Remapping image name %s to %s due to registry+image name existing on k8s", imageName, newImageName)
- imageName = newImageName
- }
- }
- askOtherWorkersToDownloadImage(imageName)
- // Added UUID to identifier just in case
- //identifier := fmt.Sprintf("%s_%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId, uuid.NewV4())
- identifier := fmt.Sprintf("%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId)
- if strings.Contains(identifier, " ") {
- identifier = strings.ReplaceAll(identifier, " ", "-")
- }
- //if arrayContains(executed, action.ID) || arrayContains(visited, action.ID) {
- // log.Printf("[WARNING] Action %s is already executed")
- // continue
- //}
- //visited = append(visited, action.ID)
- //executed = append(executed, action.ID)
- // FIXME - check whether it's running locally yet too
- // take care of auto clean up later on for k8s
- if isKubernetes != "true" {
- stats, err := dockercli.ContainerInspect(context.Background(), identifier)
- if err != nil || stats.ContainerJSONBase.State.Status != "running" {
- // REMOVE
- if err == nil {
- log.Printf("[DEBUG][%s] Docker Container Status: %s, should kill: %s", workflowExecution.ExecutionId, stats.ContainerJSONBase.State.Status, identifier)
- err = removeContainer(identifier)
- if err != nil {
- log.Printf("[ERROR] Error killing container: %s", err)
- }
- } else {
- //log.Printf("WHAT TO DO HERE?: %s", err)
- }
- } else if stats.ContainerJSONBase.State.Status == "running" {
- //log.Printf("
- continue
- }
- }
- if len(action.Parameters) == 0 {
- action.Parameters = []shuffle.WorkflowAppActionParameter{}
- }
- if len(action.Errors) == 0 {
- action.Errors = []string{}
- }
- // marshal action and put it in there rofl
- //log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters))
- log.Printf("[DEBUG][%s] Action: Send, Label: '%s', Action: '%s', Run status: %s, Extra=", workflowExecution.ExecutionId, action.Label, action.AppName, workflowExecution.Status)
- actionData, err := json.Marshal(action)
- if err != nil {
- log.Printf("[WARNING] Failed unmarshalling action: %s", err)
- continue
- }
- if action.AppID == "0ca8887e-b4af-4e3e-887c-87e9d3bc3d3e" {
- log.Printf("[DEBUG] Should run filter: %#v", action)
- runFilter(workflowExecution, action)
- continue
- }
- executionData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling executiondata: %s", err)
- executionData = []byte("")
- }
- // Sending full execution so that it won't have to load in every app
- // This might be an issue if they can read environments, but that's alright
- // if everything is generated during execution
- //log.Printf("[DEBUG][%s] Deployed with CALLBACK_URL %s and BASE_URL %s", workflowExecution.ExecutionId, appCallbackUrl, baseUrl)
- env := []string{
- fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId),
- fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization),
- fmt.Sprintf("CALLBACK_URL=%s", baseUrl),
- fmt.Sprintf("BASE_URL=%s", appCallbackUrl),
- fmt.Sprintf("TZ=%s", timezone),
- fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", logsDisabled),
- }
- if len(actionData) >= 100000 {
- log.Printf("[WARNING] Omitting some data from action execution. Length: %d. Fix in SDK!", len(actionData))
- newParams := []shuffle.WorkflowAppActionParameter{}
- for _, param := range action.Parameters {
- paramData, err := json.Marshal(param)
- if err != nil {
- log.Printf("[WARNING] Failed to marshal param %s: %s", param.Name, err)
- newParams = append(newParams, param)
- continue
- }
- if len(paramData) >= 50000 {
- log.Printf("[WARNING] Removing a lot of data from param %s with length %d", param.Name, len(paramData))
- param.Value = "SHUFFLE_AUTO_REMOVED"
- }
- newParams = append(newParams, param)
- }
- action.Parameters = newParams
- actionData, err = json.Marshal(action)
- if err == nil {
- log.Printf("[DEBUG] Ran data replace on action %s. new length: %d", action.Name, len(actionData))
- } else {
- log.Printf("[WARNING] Failed to marshal new actionData: %s", err)
- }
- } else {
- //log.Printf("[DEBUG] Actiondata is NOT 100000 in length. Adding as normal.")
- }
- actionEnv := fmt.Sprintf("ACTION=%s", string(actionData))
- env = append(env, actionEnv)
- if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
- //log.Printf("APPENDING PROXY TO THE APP!")
- env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
- env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
- env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
- env = append(env, fmt.Sprintf("no_proxy=%s", os.Getenv("no_proxy")))
- }
- overrideHttpProxy := os.Getenv("SHUFFLE_INTERNAL_HTTP_PROXY")
- overrideHttpsProxy := os.Getenv("SHUFFLE_INTERNAL_HTTPS_PROXY")
- if overrideHttpProxy != "" {
- env = append(env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTP_PROXY=%s", overrideHttpProxy))
- }
- if overrideHttpsProxy != "" {
- env = append(env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTPS_PROXY=%s", overrideHttpsProxy))
- }
- if len(os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) > 0 {
- env = append(env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
- }
- // FIXME: Ensure to NEVER do this anymore
- // This potentially breaks too much stuff. Better to have the app poll the data.
- _ = executionData
- /*
- maxSize := 32700 - len(string(actionData)) - 2000
- if len(executionData) < maxSize {
- log.Printf("[INFO] ADDING FULL_EXECUTION because size is smaller than %d", maxSize)
- env = append(env, fmt.Sprintf("FULL_EXECUTION=%s", string(executionData)))
- } else {
- log.Printf("[WARNING] Skipping FULL_EXECUTION because size is larger than %d", maxSize)
- }
- */
- // Uses a few ways of getting / checking if an app is available
- // 1. Try original with lowercase
- // 2. Go to original (no spaces)
- // 3. Add remote repo location
- images := []string{
- imageName,
- fmt.Sprintf("%s/%s:%s_%s", registryName, baseimagename, parsedAppname, action.AppVersion),
- fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion),
- }
- // This is the weirdest shit ever looking back at
- // Needs optimization lol
- pullOptions := dockerimage.PullOptions{}
- if strings.ToLower(cleanupEnv) == "true" {
- err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (2)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- err := shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, imageName)
- executed := false
- if err == nil {
- log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", imageName)
- downloadedImages = append(downloadedImages, imageName)
- //err = deployApp(dockercli, image, identifier, env, workflow, action)
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (41)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- } else {
- executed = true
- }
- }
- if !executed {
- imageName = images[2]
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (3)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- //log.Printf("[WARNING] Failed CLEANUP execution. Downloading image %s remotely.", image)
- log.Printf("[WARNING] Failed to download image %s (CLEANUP): %s", imageName, err)
- reader, err := dockercli.ImagePull(context.Background(), imageName, pullOptions)
- if err != nil {
- log.Printf("[ERROR] Failed getting %s. Couldn't be find locally, AND is missing.", imageName)
- log.Printf("[DEBUG] Shutting down (4)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- } else {
- defer reader.Close()
- baseTag := strings.Split(imageName, ":")
- if len(baseTag) > 1 {
- tag := baseTag[1]
- log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag)
- // Remapping
- ctx := context.Background()
- dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag))
- dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
- }
- }
- buildBuf := new(strings.Builder)
- _, err = io.Copy(buildBuf, reader)
- if err != nil && !strings.Contains(fmt.Sprintf("%s", err.Error()), "Conflict. The container name") {
- log.Printf("[ERROR] Error in IO copy: %s", err)
- log.Printf("[DEBUG] Shutting down (5)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- } else {
- if strings.Contains(buildBuf.String(), "errorDetail") {
- log.Printf("[ERROR] Docker build:%sERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), imageName)
- log.Printf("[DEBUG] Shutting down (6)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- log.Printf("[INFO] Successfully downloaded %s", imageName)
- }
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist")
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (7)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- if strings.Contains(err.Error(), "No such image") {
- //log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err)
- log.Printf("[ERROR] Image doesn't exist. Shutting down")
- log.Printf("[DEBUG] Shutting down (8)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- }
- }
- }
- }
- } else {
- err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[DEBUG] Failed deploying app? %s", err)
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (9)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- // Trying to replace with lowercase to deploy again. This seems to work with Dockerhub well.
- // FIXME: Should try to remotely download directly if this persists.
- imageName = images[1]
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (10)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- log.Printf("[DEBUG][%s] Failed deploy. Downloading image %s: %s", workflowExecution.ExecutionId, imageName, err)
- err := shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, imageName)
- executed := false
- if err == nil {
- log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", imageName)
- downloadedImages = append(downloadedImages, imageName)
- //err = deployApp(dockercli, image, identifier, env, workflow, action)
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[ERROR] Err: %s", err)
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (40)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- } else {
- executed = true
- }
- }
- if !executed {
- imageName = images[2]
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[ERROR] Err: %s", err)
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (11)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- log.Printf("[WARNING] Failed deploying image THREE TIMES. Attempting to download %s as last resort from backend and dockerhub: %s", imageName, err)
- if isKubernetes == "true" {
- log.Printf("[ERROR] Image %s doesn't exist. Returning error for now")
- return
- }
- reader, err := dockercli.ImagePull(context.Background(), imageName, pullOptions)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[ERROR] Failed getting %s. The couldn't be find locally, AND is missing.", imageName)
- log.Printf("[DEBUG] Shutting down (12)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- } else {
- defer reader.Close()
- baseTag := strings.Split(imageName, ":")
- if len(baseTag) > 1 {
- tag := baseTag[1]
- log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag)
- // Remapping
- ctx := context.Background()
- dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag))
- dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
- }
- }
- buildBuf := new(strings.Builder)
- _, err = io.Copy(buildBuf, reader)
- if err != nil {
- log.Printf("[ERROR] Error in IO copy: %s", err)
- log.Printf("[DEBUG] Shutting down (13)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- } else {
- if strings.Contains(buildBuf.String(), "errorDetail") {
- log.Printf("[ERROR] Docker build:%sERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), imageName)
- log.Printf("[DEBUG] Shutting down (14)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("Error deploying container: %s", buildBuf.String()), true)
- return
- }
- log.Printf("[INFO] Successfully downloaded %s", imageName)
- }
- }
- err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
- if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
- log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist")
- if strings.Contains(err.Error(), "exited prematurely") {
- log.Printf("[DEBUG] Shutting down (15)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- if strings.Contains(err.Error(), "No such image") {
- //log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err)
- log.Printf("[ERROR] Image doesn't exist. Shutting down")
- log.Printf("[DEBUG] Shutting down (16)")
- shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
- return
- }
- }
- }
- }
- }
- }
- //log.Printf("[INFO][%s] Adding visited (3): %s (%s). Actions: %d, Results: %d", workflowExecution.ExecutionId, action.Label, action.ID, len(workflowExecution.Workflow.Actions), len(workflowExecution.Results))
- visited = append(visited, action.ID)
- executed = append(executed, action.ID)
- // If children of action.ID are NOT in executed:
- // Remove them from visited.
- //log.Printf("EXECUTED: %#v", executed)
- }
- //log.Printf(nextAction)
- //log.Printf(startAction, children[startAction])
- // FIXME - new request here
- // FIXME - clean up stopped (remove) containers with this execution id
- err = shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra)
- if err != nil {
- log.Printf("[ERROR] Failed to update exec variables for execution %s: %s (2)", workflowExecution.ExecutionId, err)
- }
- if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra {
- shutdownCheck := true
- for _, result := range workflowExecution.Results {
- if result.Status == "EXECUTING" || result.Status == "WAITING" {
- // Cleaning up executing stuff
- shutdownCheck = false
- // USED TO BE CONTAINER REMOVAL
- // FIXME - send POST request to kill the container
- //log.Printf("Should remove (POST request) stopped containers")
- //ret = requests.post("%s%s" % (self.url, stream_path), headers=headers, json=action_result)
- }
- }
- if shutdownCheck {
- log.Printf("[INFO][%s] BREAKING BECAUSE RESULTS IS SAME LENGTH AS ACTIONS. SHOULD CHECK ALL RESULTS FOR WHETHER THEY'RE DONE", workflowExecution.ExecutionId)
- validated := shuffle.ValidateFinished(ctx, -1, workflowExecution)
- if validated {
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling shutdowndata during set: %s", err)
- }
- sendResult(workflowExecution, shutdownData)
- }
- log.Printf("[DEBUG][%s] Shutting down (17)", workflowExecution.ExecutionId)
- if isKubernetes == "true" {
- // log.Printf("workflow execution: %#v", workflowExecution)
- clientset, _, err := shuffle.GetKubernetesClient()
- if err != nil {
- log.Println("[ERROR] Error getting kubernetes client (1):", err)
- os.Exit(1)
- }
- cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
- } else {
- shutdown(workflowExecution, "", "", true)
- }
- return
- }
- }
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return
- }
- func executionInit(workflowExecution shuffle.WorkflowExecution) error {
- ctx := context.Background()
- parents := map[string][]string{}
- children := map[string][]string{}
- nextActions := []string{}
- extra := 0
- startAction := workflowExecution.Start
- //log.Printf("[INFO][%s] STARTACTION: %s", workflowExecution.ExecutionId, startAction)
- if len(startAction) == 0 {
- log.Printf("[INFO][%s] Didn't find execution start action. Setting it to workflow start action.", workflowExecution.ExecutionId)
- startAction = workflowExecution.Workflow.Start
- }
- // Setting up extra counter
- for _, trigger := range workflowExecution.Workflow.Triggers {
- //log.Printf("[DEBUG] Appname trigger (0): %s", trigger.AppName)
- if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
- extra += 1
- }
- }
- // Validates RERUN of single actions
- // Identified by:
- // 1. Predefined result from previous exec
- // 2. Only ONE action
- // 3. Every predefined result having result.Action.Category == "rerun"
- /*
- if len(workflowExecution.Workflow.Actions) == 1 && len(workflowExecution.Results) > 0 {
- finished := shuffle.ValidateFinished(ctx, extra, workflowExecution)
- if finished {
- return nil
- }
- }
- */
- nextActions = append(nextActions, startAction)
- for _, branch := range workflowExecution.Workflow.Branches {
- // Check what the parent is first. If it's trigger - skip
- sourceFound := false
- destinationFound := false
- for _, action := range workflowExecution.Workflow.Actions {
- if action.ID == branch.SourceID {
- sourceFound = true
- }
- if action.ID == branch.DestinationID {
- destinationFound = true
- }
- }
- for _, trigger := range workflowExecution.Workflow.Triggers {
- //log.Printf("Appname trigger (0): %s (%s)", trigger.AppName, trigger.ID)
- if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
- if trigger.ID == branch.SourceID {
- sourceFound = true
- } else if trigger.ID == branch.DestinationID {
- destinationFound = true
- }
- }
- }
- if sourceFound {
- parents[branch.DestinationID] = append(parents[branch.DestinationID], branch.SourceID)
- } else {
- log.Printf("[DEBUG] Parent ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID)
- }
- if destinationFound {
- children[branch.SourceID] = append(children[branch.SourceID], branch.DestinationID)
- } else {
- log.Printf("[DEBUG] Child ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID)
- }
- }
- log.Printf("[INFO][%s] shuffle.Actions: %d + Special shuffle.Triggers: %d", workflowExecution.ExecutionId, len(workflowExecution.Workflow.Actions), extra)
- onpremApps := []string{}
- toExecuteOnprem := []string{}
- for _, action := range workflowExecution.Workflow.Actions {
- if strings.ToLower(action.Environment) != strings.ToLower(environment) {
- continue
- }
- toExecuteOnprem = append(toExecuteOnprem, action.ID)
- actionName := fmt.Sprintf("%s:%s_%s", baseimagename, action.AppName, action.AppVersion)
- found := false
- for _, app := range onpremApps {
- if actionName == app {
- found = true
- }
- }
- if !found {
- onpremApps = append(onpremApps, actionName)
- }
- }
- if len(onpremApps) == 0 {
- //return errors.New(fmt.Sprintf("No apps to handle onprem (%s)", environment))
- log.Printf("[INFO][%s] No apps to handle onprem (%s). Returning 200 OK anyway", workflowExecution.ExecutionId, environment)
- return nil
- }
- pullOptions := dockerimage.PullOptions{}
- _ = pullOptions
- for _, image := range onpremApps {
- //log.Printf("[INFO] Image: %s", image)
- // Kind of gambling that the image exists.
- if strings.Contains(image, " ") {
- image = strings.ReplaceAll(image, " ", "-")
- }
- // FIXME: Reimplement for speed later
- // Skip to make it faster
- //reader, err := dockercli.ImagePull(context.Background(), image, pullOptions)
- //if err != nil {
- // log.Printf("Failed getting %s. The app is missing or some other issue", image)
- // shutdown(workflowExecution)
- //}
- ////io.Copy(os.Stdout, reader)
- //_ = reader
- //log.Printf("Successfully downloaded and built %s", image)
- }
- visited := []string{}
- executed := []string{}
- environments := []string{}
- for _, action := range workflowExecution.Workflow.Actions {
- found := false
- for _, environment := range environments {
- if action.Environment == environment {
- found = true
- break
- }
- }
- if !found {
- environments = append(environments, action.Environment)
- }
- }
- err := shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra)
- if err != nil {
- log.Printf("[ERROR] Failed to update exec variables for execution %s: %s", workflowExecution.ExecutionId, err)
- }
- return nil
- }
- func handleSubflowPoller(ctx context.Context, workflowExecution shuffle.WorkflowExecution, streamResultUrl, subflowId string) error {
- // FIXME: If MEMCACHE is enabled, check in this order:
- extra := 0
- for _, trigger := range workflowExecution.Workflow.Triggers {
- if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
- extra += 1
- }
- }
- if len(data) == 0 {
- log.Printf("[WARNING] Stream result missing execution ID and authorization; injecting them from workflow execution")
- data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, workflowExecution.ExecutionId, workflowExecution.Authorization)
- }
- req, err := http.NewRequest(
- "POST",
- streamResultUrl,
- bytes.NewBuffer([]byte(data)),
- )
- client := shuffle.GetExternalClient(streamResultUrl)
- newresp, err := client.Do(req)
- if err != nil {
- log.Printf("[ERROR] Failed making request (1): %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return err
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading body (1): %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return err
- }
- if newresp.StatusCode != 200 {
- log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body))
- if strings.Contains(string(body), "Workflowexecution is already finished") {
- log.Printf("[DEBUG] Shutting down (19)")
- shutdown(workflowExecution, "", "", true)
- }
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return errors.New(fmt.Sprintf("Bad statuscode: %d", newresp.StatusCode))
- }
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return err
- }
- if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
- log.Printf("[INFO][%s] Workflow execution is finished. Exiting worker.", workflowExecution.ExecutionId)
- log.Printf("[DEBUG] Shutting down (20)")
- if isKubernetes == "true" {
- // log.Printf("workflow execution: %#v", workflowExecution)
- clientset, _, err := shuffle.GetKubernetesClient()
- if err != nil {
- log.Println("[ERROR] Error getting kubernetes client (2):", err)
- os.Exit(1)
- }
- cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
- } else {
- shutdown(workflowExecution, "", "", true)
- }
- }
- hasUserinput := false
- for _, result := range workflowExecution.Results {
- if result.Action.ID != subflowId {
- continue
- }
- if result.Action.AppName == "User Input" {
- hasUserinput = true
- }
- log.Printf("[DEBUG][%s] Found subflow to handle: %s (%s)", workflowExecution.ExecutionId, result.Action.AppName, result.Status)
- if result.Status == "SUCCESS" || result.Status == "FINISHED" || result.Status == "FAILURE" || result.Status == "ABORTED" {
- // Check for results
- setWorkflowExecution(ctx, workflowExecution, false)
- return nil
- }
- }
- if workflowExecution.Status == "WAITING" && workflowExecution.ExecutionSource != "default" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- log.Printf("[INFO][%s] Workflow execution is waiting. Exiting worker, as backend will restart it.", workflowExecution.ExecutionId)
- shutdown(workflowExecution, "", "", true)
- }
- log.Printf("[INFO][%s] (2) Status: %s, Results: %d, actions: %d. Userinput: %#v", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra, hasUserinput)
- return errors.New("Subflow status not found yet")
- }
- func handleDefaultExecutionWrapper(ctx context.Context, workflowExecution shuffle.WorkflowExecution, streamResultUrl string, extra int) error {
- if extra == -1 {
- extra = 0
- for _, trigger := range workflowExecution.Workflow.Triggers {
- if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
- extra += 1
- }
- }
- }
- req, err := http.NewRequest(
- "POST",
- streamResultUrl,
- bytes.NewBuffer([]byte(data)),
- )
- newresp, err := topClient.Do(req)
- if err != nil {
- log.Printf("[ERROR] Failed making request (1): %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return err
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading body (1): %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return err
- }
- if newresp.StatusCode != 200 {
- log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body))
- if strings.Contains(string(body), "Workflowexecution is already finished") {
- log.Printf("[DEBUG] Shutting down (19)")
- shutdown(workflowExecution, "", "", true)
- }
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return errors.New(fmt.Sprintf("Bad statuscode: %d", newresp.StatusCode))
- }
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return err
- }
- if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
- log.Printf("[INFO][%s] Workflow execution is finished. Exiting worker.", workflowExecution.ExecutionId)
- log.Printf("[DEBUG] Shutting down (20)")
- if isKubernetes == "true" {
- // log.Printf("workflow execution: %#v", workflowExecution)
- clientset, _, err := shuffle.GetKubernetesClient()
- if err != nil {
- log.Println("[ERROR] Error getting kubernetes client (2):", err)
- os.Exit(1)
- }
- cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
- } else {
- shutdown(workflowExecution, "", "", true)
- }
- }
- log.Printf("[INFO][%s] (3) Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra)
- if workflowExecution.Status != "EXECUTING" {
- log.Printf("[WARNING][%s] Exiting as worker execution has status %s!", workflowExecution.ExecutionId, workflowExecution.Status)
- log.Printf("[DEBUG] Shutting down (21)")
- if isKubernetes == "true" {
- // log.Printf("workflow execution: %#v", workflowExecution)
- clientset, _, err := shuffle.GetKubernetesClient()
- if err != nil {
- log.Println("[ERROR] Error getting kubernetes client (3):", err)
- os.Exit(1)
- }
- cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
- } else {
- shutdown(workflowExecution, "", "", true)
- }
- }
- setWorkflowExecution(ctx, workflowExecution, false)
- return nil
- }
- func handleDefaultExecution(client *http.Client, req *http.Request, workflowExecution shuffle.WorkflowExecution) error {
- // if no onprem runs (shouldn't happen, but extra check), exit
- // if there are some, load the images ASAP for the app
- ctx := context.Background()
- //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
- startAction, extra, _, _, _, _, _, _ := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
- err := executionInit(workflowExecution)
- if err != nil {
- log.Printf("[INFO] Workflow setup failed for %s: %s", workflowExecution.ExecutionId, err)
- log.Printf("[DEBUG] Shutting down (18)")
- shutdown(workflowExecution, "", "", true)
- }
- log.Printf("[DEBUG] DEFAULT EXECUTION Startaction: %s", startAction)
- setWorkflowExecution(ctx, workflowExecution, false)
- streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
- for {
- err = handleDefaultExecutionWrapper(ctx, workflowExecution, streamResultUrl, extra)
- if err != nil {
- log.Printf("[ERROR] Failed handling default execution: %s", err)
- }
- }
- return nil
- }
- func arrayContains(visited []string, id string) bool {
- found := false
- for _, item := range visited {
- if item == id {
- found = true
- break
- }
- }
- return found
- }
- func getResult(workflowExecution shuffle.WorkflowExecution, id string) shuffle.ActionResult {
- for _, actionResult := range workflowExecution.Results {
- if actionResult.Action.ID == id {
- return actionResult
- }
- }
- return shuffle.ActionResult{}
- }
- func getAction(workflowExecution shuffle.WorkflowExecution, id, environment string) shuffle.Action {
- for _, action := range workflowExecution.Workflow.Actions {
- if action.ID == id {
- return action
- }
- }
- for _, trigger := range workflowExecution.Workflow.Triggers {
- if trigger.ID == id {
- return shuffle.Action{
- ID: trigger.ID,
- AppName: trigger.AppName,
- Name: trigger.AppName,
- Environment: environment,
- Label: trigger.Label,
- }
- log.Printf("FOUND TRIGGER: %#v!", trigger)
- }
- }
- return shuffle.Action{}
- }
- func runSkipAction(client *http.Client, action shuffle.Action, workflowId, workflowExecutionId, authorization string, configuration string) error {
- timeNow := time.Now().Unix()
- result := shuffle.ActionResult{
- Action: action,
- ExecutionId: workflowExecutionId,
- Authorization: authorization,
- Result: configuration,
- StartedAt: timeNow,
- CompletedAt: 0,
- Status: "SUCCESS",
- }
- resultData, err := json.Marshal(result)
- if err != nil {
- return err
- }
- streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
- req, err := http.NewRequest(
- "POST",
- streamUrl,
- bytes.NewBuffer([]byte(resultData)),
- )
- if err != nil {
- log.Printf("[WARNING] Error building skip request (0): %s", err)
- return err
- }
- newresp, err := topClient.Do(req)
- if err != nil {
- log.Printf("[WARNING] Error running skip request (0): %s", err)
- return err
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[WARNING] Failed reading body when skipping (0): %s", err)
- return err
- }
- log.Printf("[INFO] Skip Action Body: %s", string(body))
- return nil
- }
- func runTestExecution(client *http.Client, workflowId, apikey string) (string, string) {
- executeUrl := fmt.Sprintf("%s/api/v1/workflows/%s/execute", baseUrl, workflowId)
- req, err := http.NewRequest(
- "GET",
- executeUrl,
- nil,
- )
- if err != nil {
- log.Printf("Error building test request: %s", err)
- return "", ""
- }
- req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", apikey))
- newresp, err := topClient.Do(req)
- if err != nil {
- log.Printf("[WARNING] Error running test request (3): %s", err)
- return "", ""
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[WARNING] Failed reading body: %s", err)
- return "", ""
- }
- log.Printf("[INFO] Test Body: %s", string(body))
- var workflowExecution shuffle.WorkflowExecution
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- log.Printf("Failed workflowExecution unmarshal: %s", err)
- return "", ""
- }
- return workflowExecution.Authorization, workflowExecution.ExecutionId
- }
- func isRunningInCluster() bool {
- _, existsHost := os.LookupEnv("KUBERNETES_SERVICE_HOST")
- _, existsPort := os.LookupEnv("KUBERNETES_SERVICE_PORT")
- return existsHost && existsPort
- }
- func buildEnvVars(envMap map[string]string) []corev1.EnvVar {
- var envVars []corev1.EnvVar
- for key, value := range envMap {
- envVars = append(envVars, corev1.EnvVar{Name: key, Value: value})
- }
- return envVars
- }
- func buildResourcesFromEnv() corev1.ResourceRequirements {
- requests := corev1.ResourceList{}
- limits := corev1.ResourceList{}
- type item struct {
- env string
- resourceName corev1.ResourceName
- resourceList corev1.ResourceList
- }
- items := []item{
- // kubernetes requests
- {env: "SHUFFLE_APP_CPU_REQUEST", resourceName: corev1.ResourceCPU, resourceList: requests},
- {env: "SHUFFLE_APP_MEMORY_REQUEST", resourceName: corev1.ResourceMemory, resourceList: requests},
- {env: "SHUFFLE_APP_EPHEMERAL_STORAGE_REQUEST", resourceName: corev1.ResourceEphemeralStorage, resourceList: requests},
- // kubernetes limits
- {env: "SHUFFLE_APP_CPU_LIMIT", resourceName: corev1.ResourceCPU, resourceList: limits},
- {env: "SHUFFLE_APP_MEMORY_LIMIT", resourceName: corev1.ResourceMemory, resourceList: limits},
- {env: "SHUFFLE_APP_EPHEMERAL_STORAGE_LIMIT", resourceName: corev1.ResourceEphemeralStorage, resourceList: limits},
- }
- for _, it := range items {
- if value := strings.TrimSpace(os.Getenv(it.env)); value != "" {
- if quantity, err := resource.ParseQuantity(value); err == nil {
- it.resourceList[it.resourceName] = quantity
- } else {
- log.Printf("[WARNING] Cannot parse %s=%q as resource quantity: %v", it.env, value, err)
- }
- }
- }
- rr := corev1.ResourceRequirements{}
- if len(requests) > 0 {
- rr.Requests = requests
- }
- if len(limits) > 0 {
- rr.Limits = limits
- }
- return rr
- }
- func getWorkerBackendExecution(auth string, executionId string) (*shuffle.WorkflowExecution, error) {
- backendUrl := os.Getenv("BASE_URL")
- if len(backendUrl) == 0 {
- backendUrl = "http://shuffle-backend:5001"
- }
- var workflowExecution *shuffle.WorkflowExecution
- streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", backendUrl)
- topClient := shuffle.GetExternalClient(backendUrl)
- requestData := shuffle.ActionResult{
- Authorization: auth,
- ExecutionId: executionId,
- }
- data, err := json.Marshal(requestData)
- if err != nil {
- return workflowExecution, err
- }
- req, err := http.NewRequest(
- "POST",
- streamResultUrl,
- bytes.NewBuffer([]byte(data)),
- )
- newresp, err := topClient.Do(req)
- if err != nil {
- return workflowExecution, err
- }
- defer newresp.Body.Close()
- if newresp.StatusCode != 200 {
- return workflowExecution, errors.New(fmt.Sprintf("Got bad status code from backend %d", newresp.StatusCode))
- }
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- return workflowExecution, err
- }
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- return workflowExecution, err
- }
- if debug {
- log.Printf("[INFO] Here is the result we got back from backend: %s", workflowExecution.Results)
- }
- //setWorkflowExecution(context.Background(), *workflowExecution, false)
- return workflowExecution, nil
- }
- func handleWorkflowQueue(resp http.ResponseWriter, request *http.Request) {
- if request.Body == nil {
- resp.WriteHeader(http.StatusBadRequest)
- return
- }
- defer request.Body.Close()
- body, err := ioutil.ReadAll(request.Body)
- if err != nil {
- log.Printf("[WARNING] (3) Failed reading body for workflowqueue")
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- var actionResult shuffle.ActionResult
- err = json.Unmarshal(body, &actionResult)
- if err != nil {
- log.Printf("[ERROR] Failed shuffle.ActionResult unmarshaling (2): %s", err)
- //resp.WriteHeader(401)
- //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- //return
- }
- if len(actionResult.ExecutionId) == 0 {
- log.Printf("[ERROR] No workflow execution id in action result. Data: %s", string(body))
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`)))
- return
- }
- // 1. Get the shuffle.WorkflowExecution(ExecutionId) from the database
- // 2. if shuffle.ActionResult.Authentication != shuffle.WorkflowExecution.Authentication -> exit
- // 3. Add to and update actionResult in workflowExecution
- // 4. Push to db
- // IF FAIL: Set executionstatus: abort or cancel
- ctx := context.Background()
- if actionResult.ExecutionId == "TBD" {
- return
- }
- workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
- if err != nil {
- log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (1): %s", actionResult.ExecutionId, err)
- workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
- if err != nil {
- log.Printf("[ERROR][%s] Failed getting execution (workflowqueue) %s: %s", actionResult.ExecutionId, actionResult.ExecutionId, err)
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution ID %s because it doesn't exist locally."}`, actionResult.ExecutionId)))
- return
- }
- }
- if workflowExecution.Authorization != actionResult.Authorization {
- log.Printf("[ERROR][%s] Bad authorization key when updating node (workflowQueue). Want: %s, Have: %s", actionResult.ExecutionId, workflowExecution.Authorization, actionResult.Authorization)
- resp.WriteHeader(403)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key"}`)))
- return
- }
- if workflowExecution.Status == "FINISHED" {
- log.Printf("[DEBUG][%s] Workflowexecution is already FINISHED. No further action can be taken", workflowExecution.ExecutionId)
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is already finished because it has status %s. Lastnode: %s"}`, workflowExecution.Status, workflowExecution.LastNode)))
- return
- }
- if workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
- log.Printf("[WARNING][%s] Workflowexecution already has status %s. No further action can be taken", workflowExecution.ExecutionId, workflowExecution.Status)
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is aborted because of %s with result %s and status %s"}`, workflowExecution.LastNode, workflowExecution.Result, workflowExecution.Status)))
- return
- }
- retries := 0
- retry, retriesok := request.URL.Query()["retries"]
- if retriesok && len(retry) > 0 {
- val, err := strconv.Atoi(retry[0])
- if err == nil {
- retries = val
- }
- }
- // Not doing environment as we don't want to hook a worker to specific env. It should just not handle cloud actions
- // limiting a worker to an env will not allow us to run multiple orborus in the same server?
- if strings.EqualFold(actionResult.Action.Environment, "cloud") {
- log.Printf("[WARNING] Got an action for %s environment forwarding it to the backend", actionResult.Action.Environment)
- streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
- req, err := http.NewRequest(
- "POST",
- streamUrl,
- bytes.NewBuffer([]byte(body)),
- )
- if err != nil {
- log.Printf("[ERROR] Error building subflow (%s) request: %s", workflowExecution.ExecutionId, err)
- return
- }
- newresp, err := topClient.Do(req)
- if err != nil {
- log.Printf("[ERROR] Error running subflow (%s) request: %s", workflowExecution.ExecutionId, err)
- return
- }
- defer newresp.Body.Close()
- if newresp.StatusCode != 200 {
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[INFO][%s] Failed reading body after subflow request: %s", workflowExecution.ExecutionId, err)
- return
- } else {
- log.Printf("[ERROR][%s] Failed forwarding subflow request of length %d\n: %s", workflowExecution.ExecutionId, len(actionResult.Result), string(body))
- }
- }
- return
- }
- log.Printf("[DEBUG][%s] Action: Received, Label: '%s', Action: '%s', Status: %s, Run status: %s, Extra=Retry:%d", workflowExecution.ExecutionId, actionResult.Action.Label, actionResult.Action.AppName, actionResult.Status, workflowExecution.Status, retries)
- // results = append(results, actionResult)
- // log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters))
- // log.Printf("[DEBUG][%s] In workflowQueue with transaction", workflowExecution.ExecutionId)
- runWorkflowExecutionTransaction(ctx, 0, workflowExecution.ExecutionId, actionResult, resp)
- }
- // Will make sure transactions are always ran for an execution. This is recursive if it fails. Allowed to fail up to 5 times
- func runWorkflowExecutionTransaction(ctx context.Context, attempts int64, workflowExecutionId string, actionResult shuffle.ActionResult, resp http.ResponseWriter) {
- //log.Printf("[DEBUG][%s] IN WORKFLOWEXECUTION SUB!", actionResult.ExecutionId)
- workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
- if err != nil {
- log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (2): %s", actionResult.ExecutionId, err)
- workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
- if err != nil {
- log.Printf("[ERROR] Failed getting execution cache: %s", err)
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
- return
- }
- }
- resultLength := len(workflowExecution.Results)
- setExecution := true
- workflowExecution, dbSave, err := shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, true, 0)
- if err == nil {
- if workflowExecution.Status != "EXECUTING" && workflowExecution.Status != "WAITING" {
- log.Printf("[WARNING][%s] Execution is not executing, but %s. Stopping Transaction update.", workflowExecution.ExecutionId, workflowExecution.Status)
- if resp != nil {
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Execution is not executing, but %s"}`, workflowExecution.Status)))
- }
- log.Printf("[DEBUG][%s] Shutting down (35)", workflowExecution.ExecutionId)
- // Force sending result
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR][%s] Failed marshalling execution (35): %s", workflowExecution.ExecutionId, err)
- }
- sendResult(*workflowExecution, shutdownData)
- shutdown(*workflowExecution, "", "", false)
- return
- }
- /*** STARTREMOVE ***/
- if workflowExecution.Status == "WAITING" && (os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm") {
- log.Printf("[INFO][%s] Workflow execution is waiting while in swarm. Sending info to backend to ensure execution stops.", workflowExecution.ExecutionId)
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR][%s] Failed marshalling execution (36) - not sending backend WAITING: %s", workflowExecution.ExecutionId, err)
- } else {
- sendResult(*workflowExecution, shutdownData)
- shutdown(*workflowExecution, "", "", false)
- }
- }
- /*** ENDREMOVE ***/
- } else {
- if strings.Contains(strings.ToLower(fmt.Sprintf("%s", err)), "already been ran") || strings.Contains(strings.ToLower(fmt.Sprintf("%s", err)), "already finished") {
- log.Printf("[ERROR][%s] Skipping rerun of action result as it's already been ran: %s", workflowExecution.ExecutionId)
- return
- }
- log.Printf("[DEBUG] Rerunning transaction? %s", err)
- if strings.Contains(fmt.Sprintf("%s", err), "Rerun this transaction") {
- workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
- if err != nil {
- log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (3): %s", actionResult.ExecutionId, err)
- workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
- if err != nil {
- log.Printf("[ERROR][%s] Failed getting execution cache (2): %s", workflowExecution.ExecutionId, err)
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`)))
- return
- }
- }
- resultLength = len(workflowExecution.Results)
- setExecution = true
- workflowExecution, dbSave, err = shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, false, 0)
- if err != nil {
- log.Printf("[ERROR][%s] Failed execution of parsedexecution (2): %s", workflowExecution.ExecutionId, err)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`)))
- return
- } else {
- log.Printf("[DEBUG][%s] Successfully got ParsedExecution with %d results!", workflowExecution.ExecutionId, len(workflowExecution.Results))
- }
- } else {
- log.Printf("[ERROR][%s] Failed execution of parsedexecution: %s", workflowExecution.ExecutionId, err)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
- return
- }
- }
- //log.Printf(`[DEBUG][%s] Got result %s from %s. Execution status: %s. Save: %#v. Parent: %#v`, actionResult.ExecutionId, actionResult.Status, actionResult.Action.ID, workflowExecution.Status, dbSave, workflowExecution.ExecutionParent)
- //dbSave := false
- //if len(results) != len(workflowExecution.Results) {
- // log.Printf("[DEBUG][%s] There may have been an issue in transaction queue. Result lengths: %d vs %d. Should check which exists the base results, but not in entire execution, then append.", workflowExecution.ExecutionId, len(results), len(workflowExecution.Results))
- //}
- // Validating that action results hasn't changed
- // Handled using cachhing, so actually pretty fast
- cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
- cache, err := shuffle.GetCache(ctx, cacheKey)
- if err == nil {
- //parsedValue := value.(*shuffle.WorkflowExecution)
- parsedValue := &shuffle.WorkflowExecution{}
- cacheData := []byte(cache.([]uint8))
- err = json.Unmarshal(cacheData, &workflowExecution)
- if err != nil {
- log.Printf("[ERROR][%s] Failed unmarshalling workflowexecution: %s", workflowExecution.ExecutionId, err)
- }
- if len(parsedValue.Results) > 0 && len(parsedValue.Results) != resultLength {
- setExecution = false
- if attempts > 5 {
- }
- attempts += 1
- log.Printf("[DEBUG][%s] Rerunning transaction as results has changed. %d vs %d", workflowExecution.ExecutionId, len(parsedValue.Results), resultLength)
- /*
- if len(workflowExecution.Results) <= len(workflowExecution.Workflow.Actions) {
- log.Printf("[DEBUG][%s] Rerunning transaction as results has changed. %d vs %d", workflowExecution.ExecutionId, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions))
- runWorkflowExecutionTransaction(ctx, attempts, workflowExecutionId, actionResult, resp)
- return
- }
- */
- }
- }
- if setExecution || workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
- if debug {
- log.Printf("[DEBUG][%s] Running setexec with status %s and %d/%d results", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions))
- }
- //result(s)", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results))
- err = setWorkflowExecution(ctx, *workflowExecution, dbSave)
- if err != nil {
- log.Printf("[ERROR][%s] Failed setting execution: %s", workflowExecution.ExecutionId, err)
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed setting workflowexecution actionresult: %s"}`, err)))
- return
- }
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
- finished := shuffle.ValidateFinished(ctx, -1, *workflowExecution)
- if !finished {
- if debug {
- log.Printf("[DEBUG][%s] Handling next node since it's not finished!", workflowExecution.ExecutionId)
- }
- handleExecutionResult(*workflowExecution)
- } else {
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling shutdowndata during set: %s", err)
- }
- sendResult(*workflowExecution, shutdownData)
- }
- }
- /*** ENDREMOVE ***/
- } else {
- log.Printf("[INFO][%s] Skipping setexec with status %s", workflowExecution.ExecutionId, workflowExecution.Status)
- // Just in case. Should MAYBE validate finishing another time as well.
- // This fixes issues with e.g. shuffle.Action -> shuffle.Trigger -> shuffle.Action.
- handleExecutionResult(*workflowExecution)
- }
- //if newExecutions && len(nextActions) > 0 {
- // log.Printf("[DEBUG][%s] New execution: %#v. NextActions: %#v", newExecutions, nextActions)
- // //handleExecutionResult(*workflowExecution)
- //}
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
- }
- func sendSelfRequest(actionResult shuffle.ActionResult) {
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- log.Printf("[INFO][%s] Not sending self request info since source is default (not swarm)", actionResult.ExecutionId)
- return
- }
- /*** ENDREMOVE ***/
- data, err := json.Marshal(actionResult)
- if err != nil {
- log.Printf("[ERROR][%s] Shutting down (24): Failed to unmarshal data for backend: %s", actionResult.ExecutionId, err)
- return
- }
- if actionResult.ExecutionId == "TBD" {
- return
- }
- log.Printf("[DEBUG][%s] Sending FAILURE to self to stop the workflow execution. Action: %s (%s), app %s:%s", actionResult.ExecutionId, actionResult.Action.Label, actionResult.Action.ID, actionResult.Action.AppName, actionResult.Action.AppVersion)
- // Literally sending to same worker to run it as a new request
- streamUrl := fmt.Sprintf("http://localhost:33333/api/v1/streams")
- hostenv := os.Getenv("WORKER_HOSTNAME")
- if len(hostenv) > 0 {
- streamUrl = fmt.Sprintf("http://%s:33333/api/v1/streams", hostenv)
- }
- req, err := http.NewRequest(
- "POST",
- streamUrl,
- bytes.NewBuffer([]byte(data)),
- )
- if err != nil {
- log.Printf("[ERROR][%s] Failed creating self request (1): %s", actionResult.ExecutionId, err)
- return
- }
- client := shuffle.GetExternalClient(streamUrl)
- newresp, err := client.Do(req)
- if err != nil {
- log.Printf("[ERROR][%s] Error running finishing request (2): %s", actionResult.ExecutionId, err)
- return
- }
- defer newresp.Body.Close()
- if newresp.Body != nil {
- body, err := ioutil.ReadAll(newresp.Body)
- //log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode)
- if err != nil {
- log.Printf("[ERROR][%s] Failed reading body: %s", actionResult.ExecutionId, err)
- } else {
- log.Printf("[DEBUG][%s] Sent update to backend - 2: %s", actionResult.ExecutionId, string(body))
- }
- }
- }
- func sendResult(workflowExecution shuffle.WorkflowExecution, data []byte) {
- if workflowExecution.ExecutionSource == "default" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- //log.Printf("[INFO][%s] Not sending backend info since source is default (not swarm)", workflowExecution.ExecutionId)
- //return
- } else {
- }
- // Basically to reduce backend strain
- /*
- if shuffle.ArrayContains(finishedExecutions, workflowExecution.ExecutionId) {
- log.Printf("[INFO][%s] NOT sending backend info since it's already been sent before.", workflowExecution.ExecutionId)
- return
- }
- */
- // Take it down again
- /*
- if len(finishedExecutions) > 100 {
- log.Printf("[DEBUG][%s] Removing old execution from finishedExecutions: %s", workflowExecution.ExecutionId, finishedExecutions[0])
- finishedExecutions = finishedExecutions[99:]
- }
- finishedExecutions = append(finishedExecutions, workflowExecution.ExecutionId)
- */
- streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
- req, err := http.NewRequest(
- "POST",
- streamUrl,
- bytes.NewBuffer([]byte(data)),
- )
- if err != nil {
- log.Printf("[ERROR][%s] Failed creating finishing request: %s", workflowExecution.ExecutionId, err)
- log.Printf("[DEBUG][%s] Shutting down (22)", workflowExecution.ExecutionId)
- shutdown(workflowExecution, "", "", false)
- return
- }
- client := shuffle.GetExternalClient(streamUrl)
- newresp, err := client.Do(req)
- if err != nil {
- log.Printf("[ERROR][%s] Error running finishing request (1): %s", workflowExecution.ExecutionId, err)
- log.Printf("[DEBUG][%s] Shutting down (23)", workflowExecution.ExecutionId)
- shutdown(workflowExecution, "", "", false)
- return
- }
- defer newresp.Body.Close()
- if newresp.Body != nil {
- body, err := ioutil.ReadAll(newresp.Body)
- //log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode)
- if err != nil {
- log.Printf("[ERROR][%s] Failed reading body: %s", workflowExecution.ExecutionId, err)
- } else {
- log.Printf("[DEBUG][%s] Sent request to backend: %s", workflowExecution.ExecutionId, string(body))
- }
- }
- }
- func validateFinished(workflowExecution shuffle.WorkflowExecution) bool {
- ctx := context.Background()
- newexec, err := shuffle.GetWorkflowExecution(ctx, workflowExecution.ExecutionId)
- if err != nil {
- log.Printf("[ERROR][%s] Failed getting workflow execution: %s", workflowExecution.ExecutionId, err)
- return false
- } else {
- workflowExecution = *newexec
- }
- //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
- workflowExecution, _ = shuffle.Fixexecution(ctx, workflowExecution)
- _, extra, _, _, _, _, _, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
- log.Printf("[INFO][%s] VALIDATION. Status: %s, shuffle.Actions: %d, Extra: %d, Results: %d. Parent: %#v", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), extra, len(workflowExecution.Results), workflowExecution.ExecutionParent)
- if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || (len(environments) == 1 && requestsSent == 0 && len(workflowExecution.Results) >= 1 && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm") || (len(workflowExecution.Results) >= len(workflowExecution.Workflow.Actions)+extra && len(workflowExecution.Workflow.Actions) > 0) {
- if workflowExecution.Status == "FINISHED" {
- for _, result := range workflowExecution.Results {
- if result.Status == "EXECUTING" || result.Status == "WAITING" {
- log.Printf("[WARNING] NOT returning full result, as a result may be unfinished: %s (%s) - %s", result.Action.Label, result.Action.ID, result.Status)
- return false
- }
- }
- }
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
- requestsSent += 1
- }
- /*** ENDREMOVE ***/
- log.Printf("[DEBUG][%s] Should send full result to %s", workflowExecution.ExecutionId, baseUrl)
- //data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
- shutdownData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR][%s] Shutting down (32): Failed to unmarshal data for backend: %s", workflowExecution.ExecutionId, err)
- shutdown(workflowExecution, "", "", true)
- }
- cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
- if len(workflowExecution.Authorization) > 0 {
- err = shuffle.SetCache(ctx, cacheKey, shutdownData, 31)
- if err != nil {
- log.Printf("[ERROR][%s] Failed adding to cache during ValidateFinished", workflowExecution)
- }
- }
- shuffle.RunCacheCleanup(ctx, workflowExecution)
- sendResult(workflowExecution, shutdownData)
- return true
- }
- return false
- }
- func handleGetStreamResults(resp http.ResponseWriter, request *http.Request) {
- defer request.Body.Close()
- body, err := ioutil.ReadAll(request.Body)
- if err != nil {
- log.Printf("[WARNING] Failed reading body for stream result queue")
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- var actionResult shuffle.ActionResult
- err = json.Unmarshal(body, &actionResult)
- if err != nil {
- log.Printf("[WARNING] Failed shuffle.ActionResult unmarshaling: %s", err)
- //resp.WriteHeader(400)
- //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- //return
- }
- if len(actionResult.ExecutionId) == 0 {
- log.Printf("[WARNING] No workflow execution id in action result (2). Data: %s", string(body))
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`)))
- return
- }
- ctx := context.Background()
- workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
- if err != nil {
- log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (4): %s", actionResult.ExecutionId, err)
- workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
- if err != nil {
- log.Printf("[ERROR] Failed getting execution (streamresult) %s: %s", actionResult.ExecutionId, err)
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
- return
- }
- }
- // Authorization is done here
- if workflowExecution.Authorization != actionResult.Authorization {
- log.Printf("[ERROR] Bad authorization key when getting stream results from cache %s.", actionResult.ExecutionId)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
- return
- }
- newjson, err := json.Marshal(workflowExecution)
- if err != nil {
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow execution"}`)))
- return
- }
- resp.WriteHeader(200)
- resp.Write(newjson)
- }
- // GetLocalIP returns the non loopback local IP of the host
- func getLocalIP() string {
- /*** STARTREMOVE ***/
- if os.Getenv("IS_KUBERNETES") == "true" {
- return "shuffle-workers"
- }
- if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
- name, err := os.Hostname()
- if err != nil {
- log.Printf("[ERROR] Couldn't find hostname of worker: %s", err)
- os.Exit(3)
- }
- log.Printf("[DEBUG] Found hostname %s since worker is running with \"run\" command", name)
- return name
- /**
- Everything below was a test to see if we needed to match directly to a network interface. May require docker network API.
- **/
- log.Printf("[DEBUG] Looking for IP for the external docker-network %s", swarmNetworkName)
- // Different process to ensure we find the right IP.
- // Necessary due to Ingress being added to docker ser
- ifaces, err := net.Interfaces()
- if err != nil {
- log.Printf("[ERROR] FATAL: networks the container is listening in %s: %s", swarmNetworkName, err)
- os.Exit(3)
- }
- foundIP := ""
- for _, i := range ifaces {
- log.Printf("NETWORK: %s", i.Name)
- //If i.Name != swarmNetworkName {
- // continue
- //}
- addrs, err := i.Addrs()
- if err != nil {
- log.Printf("[ERROR] FATAL: Failed getting address for listener in network %s: %s", swarmNetworkName, err)
- continue
- }
- for _, addr := range addrs {
- var ip net.IP
- switch v := addr.(type) {
- case *net.IPNet:
- ip = v.IP
- case *net.IPAddr:
- ip = v.IP
- }
- log.Printf("%s: IP: %#v", i.Name, ip)
- // FIXME: Allow for IPv6 too!
- //if strings.Count(ip.String(), ".") == 3 {
- // foundIP = ip.String()
- // break
- //}
- // process IP address
- }
- }
- if len(foundIP) == 0 {
- log.Printf("[ERROR] FATAL: No valid IP found for network %s. Defaulting to base IP", swarmNetworkName)
- } else {
- return foundIP
- }
- }
- /*** ENDREMOVE ***/
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return ""
- }
- for _, address := range addrs {
- // check the address type and if it is not a loopback the display it
- if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- return ipnet.IP.String()
- }
- }
- }
- return ""
- }
- func getAvailablePort() (net.Listener, error) {
- listener, err := net.Listen("tcp", ":0")
- if err != nil {
- log.Printf("[WARNING] Failed to assign port by default. Defaulting to 5001")
- return nil, err
- }
- return listener, nil
- //return fmt.Sprintf(":%d", port)
- }
- func webserverSetup(workflowExecution shuffle.WorkflowExecution) net.Listener {
- hostname = getLocalIP()
- if isKubernetes == "true" {
- os.Setenv("WORKER_HOSTNAME", "shuffle-workers")
- } else {
- os.Setenv("WORKER_HOSTNAME", hostname)
- }
- // FIXME: This MAY not work because of speed between first
- // container being launched and port being assigned to webserver
- listener, err := getAvailablePort()
- if err != nil {
- log.Printf("[ERROR] Failed to create init listener: %s", err)
- return listener
- }
- log.Printf("[DEBUG] OLD HOSTNAME: %s", appCallbackUrl)
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
- log.Printf("[DEBUG] Starting webserver (1) on port %d with hostname: %s", baseport, hostname)
- os.Setenv("WORKER_PORT", fmt.Sprintf("%d", baseport))
- appCallbackUrl = fmt.Sprintf("http://%s:%d", hostname, baseport)
- if os.Getenv("IS_KUBERNETES") == "true" {
- appCallbackUrl = fmt.Sprintf("http://%s:%d", "shuffle-workers", baseport)
- log.Printf("[DEBUG] NEW WORKER APP: %s", appCallbackUrl)
- hostname = "shuffle-workers"
- }
- listener, err = net.Listen("tcp", fmt.Sprintf(":%d", baseport))
- if err != nil {
- log.Printf("[ERROR] Failed to assign port to %d: %s", baseport, err)
- return nil
- }
- return listener
- }
- /*** ENDREMOVE ***/
- port := listener.Addr().(*net.TCPAddr).Port
- // Set the port environment variable
- os.Setenv("WORKER_PORT", fmt.Sprintf("%d", port))
- log.Printf("[DEBUG] Starting webserver (2) on port %d with hostname: %s", port, hostname)
- appCallbackUrl = fmt.Sprintf("http://%s:%d", hostname, port)
- log.Printf("[INFO] NEW WORKER HOSTNAME: %s", appCallbackUrl)
- return listener
- }
- func findActiveSwarmNodes(dockercli *dockerclient.Client) (int64, error) {
- ctx := context.Background()
- nodes, err := dockercli.NodeList(ctx, types.NodeListOptions{})
- if err != nil {
- return 1, err
- }
- nodeCount := int64(0)
- for _, node := range nodes {
- //log.Printf("ID: %s - %#v", node.ID, node.Status.State)
- if node.Status.State == "ready" {
- nodeCount += 1
- }
- }
- // Check for SHUFFLE_MAX_NODES
- maxNodesString := os.Getenv("SHUFFLE_MAX_SWARM_NODES")
- // Make it into a number and check if it's lower than nodeCount
- if len(maxNodesString) > 0 {
- maxNodes, err := strconv.ParseInt(maxNodesString, 10, 64)
- if err != nil {
- return nodeCount, err
- }
- if nodeCount > maxNodes {
- nodeCount = maxNodes
- }
- }
- return nodeCount, nil
- /*
- containers, err := dockercli.ContainerList(ctx, types.ContainerListOptions{
- All: true,
- })
- */
- }
- /*** STARTREMOVE ***/
- func deploySwarmService(dockercli *dockerclient.Client, name, image string, deployport int, inputReplicas int64, retry bool) error {
- log.Printf("[DEBUG] Deploying service for %s to swarm on port %d", name, deployport)
- //containerName := fmt.Sprintf("shuffle-worker-%s", parsedUuid)
- // Check if the image exists or not - just in case
- _, _, err := dockercli.ImageInspectWithRaw(context.Background(), image)
- if err != nil {
- log.Printf("[INFO] Image %s not found locally. Pulling from registry...", image)
- localRegistry := os.Getenv("REGISTRY_URL")
- if !strings.HasPrefix(image, localRegistry) && len(localRegistry) > 0 {
- image = fmt.Sprintf("%s/%s", localRegistry, image)
- log.Printf("[DEBUG] Changed image to %s", image)
- }
- _, err := dockercli.ImagePull(
- context.Background(),
- image,
- dockerimage.PullOptions{},
- )
- if err != nil {
- log.Printf("[ERROR] Failed pulling image %s: %s", image, err)
- return err
- }
- }
- if len(baseimagename) == 0 || baseimagename == "/" {
- baseimagename = "frikky/shuffle"
- //var baseimagename = "frikky/shuffle"
- //var registryName = "registry.hub.docker.com"
- }
- //image := fmt.Sprintf("%s:%s", baseimagename, name)
- networkName := "shuffle-executions"
- if len(swarmNetworkName) > 0 {
- networkName = swarmNetworkName
- }
- // Apps used a lot should have 2 replicas (default)
- // New default to 3 (as the chance of queues piling up is lower)
- replicas := uint64(3)
- // Sent from Orborus
- // Should be equal to
- scaleReplicas := os.Getenv("SHUFFLE_APP_REPLICAS")
- if len(scaleReplicas) > 0 {
- tmpInt, err := strconv.Atoi(scaleReplicas)
- if err != nil {
- log.Printf("[ERROR] %s is not a valid number for replication", scaleReplicas)
- } else {
- replicas = uint64(tmpInt)
- }
- log.Printf("[DEBUG] SHUFFLE_APP_REPLICAS set to value %#v. Trying to overwrite default (%d/node)", scaleReplicas, replicas)
- }
- // Max scale as well
- nodeCount := uint64(1)
- if inputReplicas > 0 && inputReplicas < 100 {
- if replicas != uint64(inputReplicas) {
- log.Printf("[DEBUG] Overwriting replicas to %d/node as inputReplicas is set to %d", inputReplicas, inputReplicas)
- }
- replicas = uint64(inputReplicas)
- } else {
- cnt, err := findActiveSwarmNodes(dockercli)
- if err != nil {
- log.Printf("[ERROR] Unable to find active swarm nodes: %s", err)
- }
- if cnt > 0 {
- nodeCount = uint64(cnt)
- }
- // FIXME: From September 2025 - This is set back to 1, as this doesn't really reflect how scale works at all. It is just confusing, and makes number larger/smaller "arbitrarily" instead of using default docker scale
- nodeCount = 1
- }
- replicatedJobs := uint64(replicas * nodeCount)
- log.Printf("[DEBUG] Deploying app with name %s with image %s", name, image)
- containerName := fmt.Sprintf(strings.Replace(name, ".", "-", -1))
- serviceSpec := swarm.ServiceSpec{
- Annotations: swarm.Annotations{
- Name: containerName,
- Labels: map[string]string{},
- },
- Mode: swarm.ServiceMode{
- Replicated: &swarm.ReplicatedService{
- // Max replicas total (?)
- Replicas: &replicatedJobs,
- },
- },
- Networks: []swarm.NetworkAttachmentConfig{
- swarm.NetworkAttachmentConfig{
- Target: networkName,
- },
- },
- EndpointSpec: &swarm.EndpointSpec{
- Ports: []swarm.PortConfig{
- swarm.PortConfig{
- Protocol: swarm.PortConfigProtocolTCP,
- PublishMode: swarm.PortConfigPublishModeIngress,
- Name: "app-port",
- PublishedPort: uint32(deployport),
- TargetPort: uint32(deployport),
- },
- },
- },
- TaskTemplate: swarm.TaskSpec{
- Resources: &swarm.ResourceRequirements{
- Reservations: &swarm.Resources{},
- },
- LogDriver: &swarm.Driver{
- Name: "json-file",
- Options: map[string]string{
- "max-size": "10m",
- },
- },
- ContainerSpec: &swarm.ContainerSpec{
- Image: image,
- Env: []string{
- fmt.Sprintf("SHUFFLE_APP_EXPOSED_PORT=%d", deployport),
- fmt.Sprintf("SHUFFLE_SWARM_CONFIG=%s", os.Getenv("SHUFFLE_SWARM_CONFIG")),
- fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", logsDisabled),
- },
- Hosts: []string{
- containerName,
- },
- },
- RestartPolicy: &swarm.RestartPolicy{
- Condition: swarm.RestartPolicyConditionAny,
- },
- Placement: &swarm.Placement{
- Constraints: []string{},
- },
- },
- }
- if len(os.Getenv("SHUFFLE_SWARM_OTHER_NETWORK")) > 0 {
- serviceSpec.Networks = append(serviceSpec.Networks, swarm.NetworkAttachmentConfig{
- Target: "shuffle_shuffle",
- })
- }
- if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("no_proxy=%s", os.Getenv("no_proxy")))
- }
- overrideHttpProxy := os.Getenv("SHUFFLE_INTERNAL_HTTP_PROXY")
- overrideHttpsProxy := os.Getenv("SHUFFLE_INTERNAL_HTTPS_PROXY")
- if overrideHttpProxy != "" {
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTP_PROXY=%s", overrideHttpProxy))
- }
- if overrideHttpsProxy != "" {
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTPS_PROXY=%s", overrideHttpsProxy))
- }
- /*
- Mounts: []mount.Mount{
- mount.Mount{
- Source: "/var/run/docker.sock",
- Target: "/var/run/docker.sock",
- Type: mount.TypeBind,
- },
- },
- */
- if dockerApiVersion != "" {
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("DOCKER_API_VERSION=%s", dockerApiVersion))
- }
- if len(os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) > 0 {
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
- }
- // Required for certain apps
- if timezone == "" {
- timezone = "Europe/Amsterdam"
- }
- serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("TZ=%s", timezone))
- serviceOptions := types.ServiceCreateOptions{}
- service, err := dockercli.ServiceCreate(
- context.Background(),
- serviceSpec,
- serviceOptions,
- )
- _ = service
- if err != nil {
- if strings.Contains(fmt.Sprintf("%s", err), "network") && strings.Contains(fmt.Sprintf("%s", err), "not found") {
- log.Printf("[DEBUG] Network %s not found. Trying to initialize it.", networkName)
- networkErr := initSwarmNetwork()
- if networkErr != nil {
- log.Printf("[ERROR] Failed initializing swarm network: %s", err)
- //return err
- }
- // Retry deploying the service (once)
- if !retry {
- return deploySwarmService(dockercli, name, image, deployport, -1, true)
- }
- }
- // For port mapping.
- if strings.Contains(fmt.Sprintf("%s", err), "InvalidArgument") && strings.Contains(fmt.Sprintf("%s", err), "is already in use") {
- //log.Printf("\n\n[WARNING] Port %d is already allocated. Trying to deploy on next port.\n\n", deployport)
- // Random sleep 1-4 seconds
- time.Sleep(time.Duration(rand.Intn(4)+1) * time.Second)
- return deploySwarmService(dockercli, name, image, deployport+1, -1, retry)
- }
- log.Printf("[DEBUG] Failed deploying %s with image %s: %s", name, image, err)
- return err
- } else {
- // wait for service to be ready
- time.Sleep(time.Duration(rand.Intn(4)+1) * time.Second)
- //log.Printf("[DEBUG] Servicecreate request: %#v %#v", service, err)
- // patch service network
- // this is an edgecase that we noticed on docker version 29
- // and API version 1.44
- // get networkID of swarmNetworkName
- networkID := ""
- ctx := context.Background()
- // find network ID
- networks, err := dockercli.NetworkList(ctx, network.ListOptions{})
- if err == nil {
- for _, net := range networks {
- if net.Name == networkName {
- if net.Scope == "swarm" {
- log.Printf("[DEBUG] Found swarm-scoped network: %s (%s)", networkName, net.ID)
- networkID = net.ID
- } else {
- log.Printf("[WARNING] Network %s exists but is not swarm scoped (scope=%s)", networkName, net.Scope)
- }
- break
- }
- }
- }
- if networkID == "" {
- log.Printf("[ERROR] Network %s not found", networkName)
- networkID = networkName
- }
- services, serr := dockercli.ServiceList(ctx, types.ServiceListOptions{})
- if serr == nil {
- for _, svc := range services {
- if svc.ID == service.ID {
- log.Printf("[DEBUG] Found service %s (%s) — patching network attach", service.ID, svc.ID)
- spec := svc.Spec
- spec.TaskTemplate.Networks = append(spec.TaskTemplate.Networks, swarm.NetworkAttachmentConfig{
- Target: networkID,
- })
- _, uerr := dockercli.ServiceUpdate(ctx, svc.ID, svc.Version, spec, types.ServiceUpdateOptions{})
- if uerr != nil {
- log.Printf("[WARNING] Failed to patch service %s with network %s: %v", service.ID, networkID, uerr)
- } else {
- log.Printf("[INFO] Successfully attached network %s to service %s", networkID, service.ID)
- }
- break
- }
- }
- } else {
- log.Printf("[WARNING] Failed to list services for patching network attach: %v", serr)
- }
- }
- log.Printf("[DEBUG] Successfully deployed service %s with image %s on port %d", name, image, deployport)
- return nil
- }
- /*** ENDREMOVE ***/
- func findAppInfo(image, name string, redeploy bool) (int, error) {
- // Sleep between 0 and 1.5 second - ensures deployments have a higher
- // chance of being successful
- time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
- highest := baseport
- exposedPort := -1
- // Exists as a "cache" layer
- if portMappings != nil {
- for key, value := range portMappings {
- if value > highest {
- highest = value
- }
- if key == name {
- exposedPort = value
- break
- }
- }
- } else {
- portMappings = make(map[string]int)
- }
- //Filters:
- if exposedPort == -1 || redeploy {
- // dockercli, err := dockerclient.NewEnvClient()
- dockercli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (2): %s", err)
- return -1, err
- }
- serviceListOptions := types.ServiceListOptions{}
- services, err := dockercli.ServiceList(
- context.Background(),
- serviceListOptions,
- )
- // Basic self-correction
- if err != nil {
- log.Printf("[ERROR] Unable to list services: %s (may continue anyway?)", err)
- if strings.Contains(fmt.Sprintf("%s", err), "is too new") {
- // Static for some reason
- defaultVersion := "1.40"
- dockerApiVersion = defaultVersion
- os.Setenv("DOCKER_API_VERSION", defaultVersion)
- log.Printf("[DEBUG] Setting Docker API to %s default and retrying listing requests", defaultVersion)
- } else {
- return -1, err
- }
- services, err = dockercli.ServiceList(
- context.Background(),
- serviceListOptions,
- )
- if err != nil {
- log.Printf("[ERROR] Unable to list services (2): %s", err)
- return -1, err
- }
- }
- for _, service := range services {
- //log.Printf("[INFO] Service: %#v. Ports: %#v", service.Spec.Annotations.Name, service.Spec.EndpointSpec)
- for _, endpoint := range service.Spec.EndpointSpec.Ports {
- if !strings.Contains(endpoint.Name, "port") {
- continue
- }
- // This seems to have concurrency issues
- portMappings[service.Spec.Annotations.Name] = int(endpoint.PublishedPort)
- if int(endpoint.PublishedPort) > highest {
- highest = int(endpoint.PublishedPort)
- }
- if service.Spec.Annotations.Name == name || service.Spec.Annotations.Name == strings.Replace(name, ".", "-", -1) {
- exposedPort = int(endpoint.PublishedPort)
- //break
- }
- }
- if service.Spec.Annotations.Name != name && service.Spec.Annotations.Name != strings.Replace(name, ".", "-", -1) {
- continue
- }
- if redeploy {
- log.Printf("[INFO] Found to redeploy! Service: %s with image %s on port %d", name, image, exposedPort)
- // Remove the service and redeploy it.
- // There are cases where the service doesn't update properly
- // Check when the last update happened. If it was within the last few minutes, skip
- if int(time.Since(service.UpdatedAt).Seconds()) > 60 {
- log.Printf("[INFO] Attempting redeploy of app %s with image %s since it is more than 10 minutes since last attempt with failure.", name, image)
- err = dockercli.ServiceRemove(
- context.Background(),
- service.ID,
- )
- if err != nil {
- log.Printf("[ERROR] Failed auto-removing service %s: %s", name, err)
- } else {
- log.Printf("[INFO] Auto-removed service %s successfully (rebuild due to redeploy).", name)
- // Sleep between 8 and 12 seconds
- time.Sleep(time.Duration(rand.Intn(4)+8) * time.Second)
- replicas := service.Spec.Mode.Replicated.Replicas
- err = deploySwarmService(
- dockercli,
- name,
- image,
- exposedPort,
- int64(*replicas),
- false,
- )
- if err != nil {
- log.Printf("[ERROR] Failed re-deploying service %s: %s", name, err)
- } else {
- time.Sleep(10 * time.Second)
- }
- }
- } else {
- //log.Printf("[INFO] NOT redeploying service %s since it was updated less than 3 minutes ago.", name)
- }
- }
- // Break if it's the correct port, as it's the right service
- if exposedPort >= 0 {
- break
- }
- }
- }
- //log.Printf("[DEBUG] Portmappings: %#v", portMappings)
- if exposedPort >= 0 {
- //log.Printf("[INFO] Found service %s on port %d - no need to deploy another", name, exposedPort)
- } else {
- // dockercli, err := dockerclient.NewEnvClient()
- dockercli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (2): %s", err)
- return -1, err
- }
- // Increment by 1 for highest port
- if highest <= baseport {
- highest = baseport
- }
- highest += 1
- err = deploySwarmService(dockercli, name, image, highest, -1, false)
- if err != nil {
- log.Printf("[WARNING] NOT Found service: %s. error: %s", name, err)
- return highest, err
- } else {
- log.Printf("[DEBUG] Waiting 20 seconds before moving on to let app '%s' start properly. Service: %s (swarm)", name, image)
- time.Sleep(time.Duration(20) * time.Second)
- }
- exposedPort = highest
- //return exposedPort, errors.New("Deployed app %s")
- }
- return exposedPort, nil
- }
- // Runs data discovery
- /*** STARTREMOVE ***/
- func findAppInfoKubernetes(image, name string, env []string) error {
- clientset, _, err := shuffle.GetKubernetesClient()
- if err != nil {
- log.Printf("[ERROR] Failed getting kubernetes: %s", err)
- return err
- }
- // Check if it exists as a pod
- namespace := "default"
- if len(kubernetesNamespace) > 0 {
- namespace = kubernetesNamespace
- }
- // check deployments
- deployments, err := clientset.AppsV1().Deployments(namespace).List(context.Background(), metav1.ListOptions{})
- if err != nil {
- log.Printf("[ERROR] Failed listing deployments: %s", err)
- return err
- }
- name = strings.Replace(name, "_", "-", -1)
- // check if it exists as a pod
- // for _, pod := range pods.Items {
- // if pod.Name == name {
- // log.Printf("[INFO] Found pod %s - no need to deploy another", name)
- // return nil
- // }
- // }
- for _, deployment := range deployments.Items {
- if deployment.Name == name {
- if debug {
- log.Printf("[DEBUG] Found deployment %s - no need to deploy another", name)
- }
- return nil
- }
- }
- err = deployk8sApp(image, name, env)
- return err
- }
- // Backups in case networks are removed
- func initSwarmNetwork() error {
- ctx := context.Background()
- // dockercli, err := dockerclient.NewEnvClient()
- dockercli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (2): %s", err)
- return err
- }
- // Create the network options with the specified MTU
- options := make(map[string]string)
- mtu := 1500
- options["com.docker.network.driver.mtu"] = fmt.Sprintf("%d", mtu)
- ingressOptions := network.CreateOptions{
- Driver: "overlay",
- Attachable: false,
- Ingress: true,
- IPAM: &network.IPAM{
- Driver: "default",
- Config: []network.IPAMConfig{
- network.IPAMConfig{
- Subnet: "10.225.225.0/24",
- Gateway: "10.225.225.1",
- },
- },
- },
- }
- _, err = dockercli.NetworkCreate(
- ctx,
- "ingress",
- ingressOptions,
- )
- if err != nil {
- log.Printf("[WARNING] Ingress network may already exist: %s", err)
- }
- //docker network create --driver=overlay workers
- // Specific subnet?
- networkName := "shuffle_swarm_executions"
- if len(swarmNetworkName) > 0 {
- networkName = swarmNetworkName
- }
- networkCreateOptions := network.CreateOptions{
- Driver: "overlay",
- Options: options,
- Attachable: true,
- Ingress: false,
- IPAM: &network.IPAM{
- Driver: "default",
- Config: []network.IPAMConfig{
- network.IPAMConfig{
- Subnet: "10.224.224.0/24",
- Gateway: "10.224.224.1",
- },
- },
- },
- }
- _, err = dockercli.NetworkCreate(
- ctx,
- networkName,
- networkCreateOptions,
- )
- if err != nil {
- log.Printf("[WARNING] Swarm Executions network may already exist: %s", err)
- }
- networkName = "shuffle-executions"
- networkCreateOptions = network.CreateOptions{
- Driver: "overlay",
- Options: options,
- Attachable: true,
- Ingress: false,
- IPAM: &network.IPAM{
- Driver: "default",
- Config: []network.IPAMConfig{
- network.IPAMConfig{
- Subnet: "10.223.223.0/24",
- Gateway: "10.223.223.1",
- },
- },
- },
- }
- _, err = dockercli.NetworkCreate(
- ctx,
- networkName,
- networkCreateOptions,
- )
- if err != nil {
- log.Printf("[WARNING] Swarm Executions network may already exist: %s", err)
- }
- return nil
- }
- /*** ENDREMOVE ***/
- func sendAppRequest(ctx context.Context, incomingUrl, appName string, port int, action *shuffle.Action, workflowExecution *shuffle.WorkflowExecution, image string, attempts int64) error {
- parsedRequest := shuffle.OrborusExecutionRequest{
- Cleanup: cleanupEnv,
- ExecutionId: workflowExecution.ExecutionId,
- Authorization: workflowExecution.Authorization,
- EnvironmentName: os.Getenv("ENVIRONMENT_NAME"),
- Timezone: os.Getenv("TZ"),
- HTTPProxy: os.Getenv("HTTP_PROXY"),
- HTTPSProxy: os.Getenv("HTTPS_PROXY"),
- ShufflePassProxyToApp: os.Getenv("SHUFFLE_PASS_APP_PROXY"),
- Url: baseUrl,
- BaseUrl: baseUrl,
- Action: *action,
- FullExecution: *workflowExecution,
- }
- // Sometimes makes it have the wrong data due to timing
- // Specific for subflow to ensure worker matches the backend correctly
- parsedBaseurl := incomingUrl
- if strings.Count(baseUrl, ":") >= 2 {
- baseUrlSplit := strings.Split(baseUrl, ":")
- if len(baseUrlSplit) >= 3 {
- parsedBaseurl = strings.Join(baseUrlSplit[0:2], ":")
- //parsedRequest.BaseUrl = fmt.Sprintf("%s:33333", parsedBaseurl)
- }
- }
- if len(parsedRequest.Url) == 0 {
- // Fixed callback url to the worker itself
- if strings.Count(parsedBaseurl, ":") >= 2 {
- parsedRequest.Url = parsedBaseurl
- } else {
- // Callback to worker
- parsedRequest.Url = fmt.Sprintf("%s:%d", parsedBaseurl, baseport)
- //parsedRequest.Url
- }
- //log.Printf("[DEBUG][%s] Should add a baseurl for the app to get back to: %s", workflowExecution.ExecutionId, parsedRequest.Url)
- }
- // Swapping because this was confusing during dev
- // No real reason, just variable names
- tmp := parsedRequest.Url
- parsedRequest.Url = parsedRequest.BaseUrl
- parsedRequest.BaseUrl = tmp
- // Run with proper hostname, but set to shuffle-worker to avoid specific host target.
- // This means running with VIP instead.
- if len(hostname) > 0 {
- parsedRequest.BaseUrl = fmt.Sprintf("http://%s:%d", hostname, baseport)
- //parsedRequest.BaseUrl = fmt.Sprintf("http://shuffle-workers:%d", baseport)
- //log.Printf("[DEBUG][%s] Changing hostname to local hostname in Docker network for WORKER URL: %s", workflowExecution.ExecutionId, parsedRequest.BaseUrl)
- if parsedRequest.Action.AppName == "shuffle-subflow" || parsedRequest.Action.AppName == "shuffle-subflow-v2" || parsedRequest.Action.AppName == "User Input" {
- parsedRequest.BaseUrl = fmt.Sprintf("http://%s:%d", hostname, baseport)
- //parsedRequest.Url = parsedRequest.BaseUrl
- }
- }
- // Making sure to get the LATEST execution data
- // This is due to cache timing issues
- exec, err := shuffle.GetWorkflowExecution(ctx, workflowExecution.ExecutionId)
- if err == nil && len(exec.ExecutionId) > 0 {
- parsedRequest.FullExecution = *exec
- }
- data, err := json.Marshal(parsedRequest)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling worker request: %s", err)
- return err
- }
- if isKubernetes == "true" {
- appName = strings.Replace(appName, "_", "-", -1)
- }
- // Shitty hardcoded fix for now
- if strings.Contains(appName, "1.0.0") {
- appName = strings.Replace(appName, "1.0.0", "1-0-0", 1)
- } else if strings.Contains(appName, "1.1.0") {
- appName = strings.Replace(appName, "1.1.0", "1-1-0", 1)
- } else if strings.Contains(appName, "1.2.0") {
- appName = strings.Replace(appName, "1.2.0", "1-2-0", 1)
- } else if strings.Contains(appName, "1.4.0") {
- appName = strings.Replace(appName, "1.4.0", "1-4-0", 1)
- } else if strings.Contains(appName, "2.0.0") {
- appName = strings.Replace(appName, "2.0.0", "2-0-0", 1)
- }
- streamUrl := fmt.Sprintf("http://%s:%d/api/v1/run", appName, port)
- // log.Printf("[DEBUG][%s] Worker URL: %s, Backend URL: %s, Target App: %s", workflowExecution.ExecutionId, parsedRequest.BaseUrl, parsedRequest.Url, streamUrl)
- req, err := http.NewRequest(
- "POST",
- streamUrl,
- bytes.NewBuffer([]byte(data)),
- )
- // Checking as LATE as possible, ensuring we don't rerun what's already ran
- // ctx = context.Background()
- // Sleep between 0 and 250 ms for randomness so no same worker check at same time (same as cloud)
- rand.Seed(time.Now().UnixNano())
- randMs := rand.Intn(250)
- time.Sleep(time.Duration(randMs) * time.Millisecond)
- newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
- _, err = shuffle.GetCache(ctx, newExecId)
- if err == nil {
- log.Printf("[DEBUG] Result for %s already found (PRE REQUEST) - returning", newExecId)
- return nil
- }
- cacheData := []byte("1")
- err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
- if err != nil {
- log.Printf("[WARNING] Failed setting cache for action %s: %s", newExecId, err)
- } else {
- //log.Printf("[DEBUG][%s] Adding %s to cache (%#v)", workflowExecution.ExecutionId, newExecId, action.Name)
- }
- client := shuffle.GetExternalClient(streamUrl)
- customTimeout := os.Getenv("SHUFFLE_APP_REQUEST_TIMEOUT")
- if len(customTimeout) > 0 {
- // convert to int
- timeoutInt, err := strconv.Atoi(customTimeout)
- if err != nil {
- log.Printf("[ERROR] Failed converting SHUFFLE_APP_REQUEST_TIMEOUT to int: %s", err)
- } else {
- log.Printf("[DEBUG] Setting client timeout to %d seconds for app request", timeoutInt)
- client.Timeout = time.Duration(timeoutInt) * time.Second
- }
- }
- // Content type required
- req.Header.Set("Content-Type", "application/json")
- newresp, err := client.Do(req)
- if err != nil {
- // Another timeout issue here somewhere
- // context deadline
- if strings.Contains(fmt.Sprintf("%s", err), "context deadline exceeded") || strings.Contains(fmt.Sprintf("%s", err), "Client.Timeout exceeded") {
- return nil
- }
- if strings.Contains(fmt.Sprintf("%s", err), "timeout awaiting response") {
- return nil
- }
- newerr := fmt.Sprintf("%s", err)
- if strings.Contains(newerr, "connection refused") || strings.Contains(newerr, "no such host") {
- newerr = fmt.Sprintf("Failed connecting to app %s. Is the Docker image available?", appName)
- } else {
- // escape quotes and newlines
- newerr = strings.ReplaceAll(strings.ReplaceAll(newerr, "\"", "\\\""), "\n", "\\n")
- }
- if strings.Contains(fmt.Sprintf("%s", err), "no such host") {
- log.Printf("[ERROR] Should be removing references to location for app '%s' as to be rediscovered. URL: %s. Error: %s", action.AppName, streamUrl, err)
- //for k, v := range portMappings {
- // if strings.Contains(strings.ToLower(strings.ReplaceAll(action.AppName, " ", "_"))) {
- // }
- //}
- //var portMappings map[string]int
- }
- // Try redeployment
- attempts += 1
- if attempts < 2 {
- // Check the service and fix it.
- if isKubernetes == "true" {
- log.Printf("[WARNING] App Redeployment in K8s isn't fully supported yet, but should be done for app %s with image %s.", appName, image)
- } else {
- _, err = findAppInfo(image, appName, true)
- if err != nil {
- log.Printf("[ERROR][%s] Error re-deploying app %s: %s", workflowExecution.ExecutionId, appName, err)
- }
- return sendAppRequest(ctx, incomingUrl, appName, port, action, workflowExecution, image, attempts)
- }
- }
- log.Printf("[ERROR][%s] Error running app run request: %s", workflowExecution.ExecutionId, err)
- actionResult := shuffle.ActionResult{
- Action: *action,
- ExecutionId: workflowExecution.ExecutionId,
- Authorization: workflowExecution.Authorization,
- Result: fmt.Sprintf(`{"success": false, "attempts": %d, "reason": "Failed to connect to app %s in swarm. Try the action again, restart Orborus if this is recurring, or contact support@shuffler.io.", "details": "%s"}`, attempts, streamUrl, newerr),
- StartedAt: int64(time.Now().Unix()),
- CompletedAt: int64(time.Now().Unix()),
- Status: "FAILURE",
- }
- // If this happens - send failure signal to stop the workflow?
- sendSelfRequest(actionResult)
- return err
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading app request body body: %s", err)
- return err
- } else {
- if debug {
- log.Printf("[DEBUG][%s] NEWRESP (from app): %s", workflowExecution.ExecutionId, string(body))
- }
- }
- return nil
- }
- // Function to auto-deploy certain apps if "run" is set
- // Has some issues with loading when running multiple workers and such.
- func baseDeploy() {
- var cli *dockerclient.Client
- //var err error
- if isKubernetes != "true" {
- // cli, err := dockerclient.NewEnvClient()
- cli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (3): %s", err)
- return
- }
- defer cli.Close()
- }
- for key, value := range autoDeploy {
- newNameSplit := strings.Split(key, ":")
- action := shuffle.Action{
- AppName: newNameSplit[0],
- AppVersion: newNameSplit[1],
- ID: "TBD",
- }
- workflowExecution := shuffle.WorkflowExecution{
- ExecutionId: "TBD",
- }
- appname := action.AppName
- appversion := action.AppVersion
- appname = strings.Replace(appname, ".", "-", -1)
- appversion = strings.Replace(appversion, ".", "-", -1)
- env := []string{
- fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId),
- fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization),
- fmt.Sprintf("CALLBACK_URL=%s", baseUrl),
- fmt.Sprintf("BASE_URL=%s", appCallbackUrl),
- fmt.Sprintf("TZ=%s", timezone),
- fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", logsDisabled),
- }
- if key == "shuffle-tools-fork:1.0.0" {
- env = append(env, fmt.Sprintf("SHUFFLE_ALLOW_PACKAGE_INSTALL=%s", "true"))
- }
- if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
- //log.Printf("APPENDING PROXY TO THE APP!")
- env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
- env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
- env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
- env = append(env, fmt.Sprintf("no_proxy=%s", os.Getenv("no_proxy")))
- }
- if len(os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) > 0 {
- log.Printf("[DEBUG] Setting SHUFFLE_APP_SDK_TIMEOUT to %s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT"))
- env = append(env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
- }
- identifier := fmt.Sprintf("%s_%s", appname, appversion)
- //identifier := fmt.Sprintf("%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId)
- //if strings.Contains(identifier, " ") {
- // identifier = strings.ReplaceAll(identifier, " ", "-")
- //}
- //deployApp(cli, value, identifier, env, workflowExecution, action)
- log.Printf("[DEBUG] Deploying app with identifier %s to ensure basic apps are available from the get-go", identifier)
- //findAppInfo("frikky/shuffle:http_1.4.0", "http_1-4-0", true)
- // go findAppInfo(value, identifier, false)
- // findAppInfo leads to the following error:
- // Unable to list services: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running? (may continue anyway?)
- // Replaced with deployApp again. See https://github.com/Shuffle/Shuffle/issues/1817.
- go deployApp(cli, value, identifier, env, workflowExecution, action)
- //err := deployApp(cli, value, identifier, env, workflowExecution, action)
- //if err != nil {
- // log.Printf("[DEBUG] Failed deploying app %s: %s", value, err)
- //}
- }
- appsInitialized = true
- }
- func getStreamResultsWrapper(client *http.Client, req *http.Request, workflowExecution shuffle.WorkflowExecution, firstRequest bool, environments []string) ([]string, error) {
- // Because of this, it always has updated data.
- // Removed request requirement from app_sdk
- newresp, err := topClient.Do(req)
- if err != nil {
- log.Printf("[ERROR] Failed request: %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return environments, err
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading body: %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return environments, err
- }
- if newresp.StatusCode != 200 {
- log.Printf("[ERROR] StatusCode (1): %d - %s", newresp.StatusCode, string(body))
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return environments, errors.New(fmt.Sprintf("Bad status code from backend: %d", newresp.StatusCode))
- }
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return environments, err
- }
- if firstRequest {
- firstRequest = false
- ctx := context.Background()
- cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
- execData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err)
- } else {
- err = shuffle.SetCache(ctx, cacheKey, execData, 30)
- if err != nil {
- log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err)
- }
- }
- for _, action := range workflowExecution.Workflow.Actions {
- found := false
- for _, environment := range environments {
- if action.Environment == environment {
- found = true
- break
- }
- }
- if !found {
- environments = append(environments, action.Environment)
- }
- }
- // Checks if a subflow is child of the startnode, as sub-subflows aren't working properly yet
- childNodes := shuffle.FindChildNodes(workflowExecution.Workflow, workflowExecution.Start, []string{}, []string{})
- //log.Printf("[DEBUG] Looking for subflow in %#v to check execution pattern as child of %s", childNodes, workflowExecution.Start)
- subflowFound := false
- for _, childNode := range childNodes {
- for _, trigger := range workflowExecution.Workflow.Triggers {
- if trigger.ID != childNode {
- continue
- }
- if trigger.AppName == "Shuffle Workflow" {
- subflowFound = true
- break
- }
- }
- if subflowFound {
- break
- }
- }
- log.Printf("[DEBUG] Environments: %s. Source: %s. 1 env = webserver, 0 or >1 = default. Subflow exists: %#v", environments, workflowExecution.ExecutionSource, subflowFound)
- if len(environments) == 1 && workflowExecution.ExecutionSource != "default" && !subflowFound {
- log.Printf("[DEBUG] Running OPTIMIZED execution (not manual)")
- os.Setenv("SHUFFLE_OPTIMIZED", "true")
- listener := webserverSetup(workflowExecution)
- err := executionInit(workflowExecution)
- if err != nil {
- log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err)
- log.Printf("[DEBUG] Shutting down (30)")
- shutdown(workflowExecution, "", "", true)
- }
- go func() {
- time.Sleep(time.Duration(1))
- handleExecutionResult(workflowExecution)
- }()
- log.Printf("[DEBUG] Running with port %#v", os.Getenv("WORKER_PORT"))
- runWebserver(listener)
- // Set environment variable
- //log.Printf("Before wait")
- //wg := sync.WaitGroup{}
- //wg.Add(1)
- //wg.Wait()
- } else {
- log.Printf("[DEBUG] Running NON-OPTIMIZED execution for type %s with %d environment(s). This only happens when ran manually OR when running with subflows. Status: %s", workflowExecution.ExecutionSource, len(environments), workflowExecution.Status)
- err := executionInit(workflowExecution)
- if err != nil {
- log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err)
- shutdown(workflowExecution, "", "", true)
- }
- // Trying to make worker into microservice~ :)
- }
- }
- if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
- log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId)
- log.Printf("[DEBUG] Shutting down (31)")
- shutdown(workflowExecution, "", "", true)
- }
- if workflowExecution.Status == "EXECUTING" || workflowExecution.Status == "RUNNING" {
- //log.Printf("Status: %s", workflowExecution.Status)
- err = handleDefaultExecution(client, req, workflowExecution)
- if err != nil {
- log.Printf("[DEBUG] Workflow %s is finished: %s", workflowExecution.ExecutionId, err)
- log.Printf("[DEBUG] Shutting down (32)")
- shutdown(workflowExecution, "", "", true)
- }
- } else {
- log.Printf("[DEBUG] Workflow %s has status %s. Exiting worker (if WAITING, rerun will happen).", workflowExecution.ExecutionId, workflowExecution.Status)
- log.Printf("[DEBUG] Shutting down (33)")
- shutdown(workflowExecution, workflowExecution.Workflow.ID, "", true)
- }
- time.Sleep(time.Duration(sleepTime) * time.Second)
- return environments, nil
- }
- func checkStandaloneRun() {
- // Check if the required argc/argv is set
- //log.Printf("ARGS: %#v", os.Args)
- if len(os.Args) < 4 {
- if debug {
- log.Printf("[DEBUG] You can run the worker in standalone mode with: go run worker.go standalone <executionid> <authorization> <optional:url>")
- }
- return
- }
- if os.Args[1] != "standalone" {
- log.Printf("[ERROR] First argument should be 'standalone' to run worker standalone")
- return
- }
- if os.Args[2] == "" || len(os.Args[2]) != 36 {
- log.Printf("[ERROR] Second argument should be the execution ID, with next being authorization")
- return
- }
- if os.Args[3] == "" || len(os.Args[3]) < 10 {
- log.Printf("[ERROR] Third argument should be the authorization key")
- return
- }
- backendUrl := "https://shuffler.io"
- if len(os.Args) > 4 {
- backendUrl = os.Args[4]
- }
- if !strings.Contains(backendUrl, "http") {
- log.Printf("[ERROR] Backend URL should start with http:// or https://")
- return
- }
- // Format:
- // go run worker.go standalone <executionid> <authorization> <optional:url>
- executionId := os.Args[2]
- authorization := os.Args[3]
- os.Setenv("EXECUTIONID", executionId)
- os.Setenv("AUTHORIZATION", authorization)
- os.Setenv("BASE_URL", backendUrl)
- os.Setenv("STANDALONE_EXECUTION", "true")
- log.Printf("\n\n\n[DEBUG] Running worker in standalone mode with execution ID %s and authorization %s. Backend URL (default): %s\nThis means we will first RESET the execution results, then rerun it\n\n", executionId, authorization, backendUrl)
- // 1. Reset the execution after getting it
- data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
- streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", backendUrl)
- client := shuffle.GetExternalClient(streamResultUrl)
- req, err := http.NewRequest(
- "POST",
- streamResultUrl,
- bytes.NewBuffer([]byte(data)),
- )
- if err != nil {
- log.Printf("[ERROR] Failed making request builder for backend: %s", err)
- os.Exit(1)
- }
- // Read the data and unmarshal it
- newresp, err := client.Do(req)
- if err != nil {
- log.Printf("[ERROR] Failed standalone request in setup: %s", err)
- os.Exit(1)
- }
- defer newresp.Body.Close()
- body, err := ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading body: %s", err)
- os.Exit(1)
- }
- if newresp.StatusCode != 200 {
- log.Printf("[ERROR] Failed resetting execution: %s. Body: %s", newresp.Status, string(body))
- os.Exit(1)
- }
- // Map to shuffle.Workflowexecution struct
- workflowExecution := shuffle.WorkflowExecution{}
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed unmarshalling body: %s", err)
- os.Exit(1)
- }
- log.Printf("[DEBUG][%s] Got %d results with status %s. Running full reset IF status is not executing.", workflowExecution.ExecutionId, len(workflowExecution.Results), workflowExecution.Status)
- // Just continue as per usual?
- //if workflowExecution.Status == "EXECUTING" {
- // return
- //}
- workflowExecution.Status = "EXECUTING"
- newResults := []shuffle.ActionResult{}
- for _, result := range workflowExecution.Results {
- if result.Status == "SKIPPED" {
- newResults = append(newResults, result)
- continue
- }
- // This is to handle reruns of SINGLE actions
- if result.Action.Category == "rerun" {
- newResults = append(newResults, result)
- continue
- }
- // Anything else here.
- }
- workflowExecution.Results = newResults
- workflowExecution.Status = "EXECUTING"
- workflowExecution.CompletedAt = 0
- marshalledResult, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed marshalling body: %s", err)
- os.Exit(1)
- }
- // Send a /api/v1/streams result back
- // 1. Reset the execution after getting it
- streamUrl := fmt.Sprintf("%s/api/v1/streams?reset=true", backendUrl)
- req, err = http.NewRequest(
- "POST",
- streamUrl,
- bytes.NewBuffer([]byte(marshalledResult)),
- )
- if err != nil {
- log.Printf("[ERROR] Failed making request builder (2) for backend: %s", err)
- os.Exit(1)
- }
- // Read the data and unmarshal it
- newresp, err = client.Do(req)
- if err != nil {
- log.Printf("[ERROR] Failed standalone request in setup: %s", err)
- os.Exit(1)
- }
- defer newresp.Body.Close()
- body, err = ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading body (2): %s", err)
- os.Exit(1)
- }
- if newresp.StatusCode != 200 {
- log.Printf("[ERROR] Failed resetting execution (2): %s. Body: %s", newresp.Status, string(body))
- os.Exit(1)
- }
- log.Printf("\n\n\n[DEBUG] Finished resetting execution %s. Body: %s. Starting execution.\n\n\n", newresp.Status, string(body))
- }
- // Initial loop etc
- func main() {
- // Testing swarm auto-replacements. This also tests ports
- // in rapid succession
- checkStandaloneRun()
- if os.Getenv("DEBUG") == "true" {
- debug = true
- log.Printf("[INFO] Disabled cleanup due to debug mode (DEBUG=true)")
- cleanupEnv = "false"
- }
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
- logsDisabled = "true"
- os.Setenv("SHUFFLE_LOGS_DISABLED", "true")
- }
- /*** ENDREMOVE ***/
- // Elasticsearch necessary to ensure we'ren ot running with Datastore configurations for minimal/maximal data sizes
- // Recursive import kind of :)
- _, err := shuffle.RunInit(*shuffle.GetDatastore(), *shuffle.GetStorage(), "", "worker", true, "elasticsearch", false, 0)
- if err != nil {
- if !strings.Contains(fmt.Sprintf("%s", err), "no such host") {
- log.Printf("[ERROR] Failed to run worker init: %s", err)
- }
- } else {
- if isKubernetes != "true" {
- log.Printf("[DEBUG] Ran init for worker to set up cache system. Docker version: %s", dockerApiVersion)
- } else {
- log.Printf("[DEBUG] Ran init for worker to set up cache system on Kubernetes")
- }
- }
- //log.Printf("[INFO] Setting up worker environment")
- sleepTime = 5
- client := shuffle.GetExternalClient(baseUrl)
- if timezone == "" {
- timezone = "Europe/Amsterdam"
- }
- if baseimagename == "" {
- log.Printf("[DEBUG] Setting baseimagename to frikky/shuffle as it's empty (docker.io)")
- baseimagename = "frikky/shuffle" // Dockerhub
- //baseimagename = "shuffle" // Github (ghcr.io)
- }
- topClient = client
- swarmConfig := os.Getenv("SHUFFLE_SWARM_CONFIG")
- log.Printf("[INFO] Running with timezone %s and swarm config %#v", timezone, swarmConfig)
- /*** STARTREMOVE ***/
- if swarmConfig == "run" || swarmConfig == "swarm" {
- // Forcing download just in case on the first iteration.
- log.Printf("[INFO] Running in swarm mode - forcing download of apps")
- workflowExecution := shuffle.WorkflowExecution{}
- go baseDeploy()
- listener := webserverSetup(workflowExecution)
- runWebserver(listener)
- // Should never get down here
- log.Printf("[ERROR] Stopped listener %#v - exiting.", listener)
- os.Exit(3)
- }
- /*** ENDREMOVE ***/
- authorization := ""
- executionId := ""
- // INFO: Allows you to run a test execution
- testing := os.Getenv("WORKER_TESTING_WORKFLOW")
- shuffle_apikey := os.Getenv("WORKER_TESTING_APIKEY")
- if len(testing) > 0 && len(shuffle_apikey) > 0 {
- // Execute a workflow and use that info
- log.Printf("[WARNING] Running test environment for worker by executing workflow %s. PS: This may NOT reach the worker in real time, but rather be deployed as a docker container (bad). Instead use AUTHORIZATION and EXECUTIONID for direct testing", testing)
- authorization, executionId = runTestExecution(client, testing, shuffle_apikey)
- } else {
- authorization = os.Getenv("AUTHORIZATION")
- executionId = os.Getenv("EXECUTIONID")
- log.Printf("[INFO] Running normal execution with auth %s and ID %s", authorization, executionId)
- }
- workflowExecution := shuffle.WorkflowExecution{
- ExecutionId: executionId,
- }
- if len(authorization) == 0 {
- log.Printf("[INFO] No AUTHORIZATION key set in env")
- log.Printf("[DEBUG] Shutting down (27)")
- shutdown(workflowExecution, "", "", false)
- }
- if len(executionId) == 0 {
- log.Printf("[INFO] No EXECUTIONID key set in env")
- log.Printf("[DEBUG] Shutting down (28)")
- shutdown(workflowExecution, "", "", false)
- }
- data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
- streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
- req, err := http.NewRequest(
- "POST",
- streamResultUrl,
- bytes.NewBuffer([]byte(data)),
- )
- if err != nil {
- log.Printf("[ERROR] Failed making request builder for backend")
- log.Printf("[DEBUG] Shutting down (29)")
- shutdown(workflowExecution, "", "", true)
- }
- firstRequest := true
- environments := []string{}
- for {
- environments, err = getStreamResultsWrapper(client, req, workflowExecution, firstRequest, environments)
- if err != nil {
- log.Printf("[ERROR] Failed getting stream results: %s", err)
- }
- }
- }
- func checkUnfinished(resp http.ResponseWriter, request *http.Request, execRequest shuffle.OrborusExecutionRequest) {
- // Meant as a function that periodically checks whether previous executions have finished or not.
- // Should probably be based on executedIds and finishedIds
- // Schedule a check in the future instead?
- ctx := context.Background()
- exec, err := shuffle.GetWorkflowExecution(ctx, execRequest.ExecutionId)
- log.Printf("[DEBUG][%s] Rechecking execution and it's status to send to backend IF the status is EXECUTING (%s - %d/%d finished)", execRequest.ExecutionId, exec.Status, len(exec.Results), len(exec.Workflow.Actions))
- // FIXMe: Does this create issue with infinite loops?
- // Usually caused by issue during startup
- if exec.Status == "" {
- //handleRunExecution(resp, request)
- return
- }
- if exec.Status != "EXECUTING" {
- return
- }
- log.Printf("[DEBUG][%s] Should send full result for execution to backend as it has %d results. Status: %s", execRequest.ExecutionId, len(exec.Results), exec.Status)
- data, err := json.Marshal(exec)
- if err != nil {
- return
- }
- sendResult(*exec, data)
- }
- func handleRunExecution(resp http.ResponseWriter, request *http.Request) {
- defer request.Body.Close()
- body, err := ioutil.ReadAll(request.Body)
- if err != nil {
- log.Printf("[WARNING] Failed reading body for stream result queue")
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- //log.Printf("[DEBUG] In run execution with body length %d", len(body))
- var execRequest shuffle.OrborusExecutionRequest
- err = json.Unmarshal(body, &execRequest)
- if err != nil {
- log.Printf("[WARNING] Failed shuffle.WorkflowExecution unmarshaling: %s", err)
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- // Checks if a workflow is done 30 seconds later, and sends info to backend no matter what
- go func() {
- time.Sleep(time.Duration(30) * time.Second)
- checkUnfinished(resp, request, execRequest)
- }()
- window.AddEvent(time.Now())
- ctx := context.Background()
- // FIXME: This should be PER EXECUTION
- //if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
- // Is it ok if these are standard? Should they be update-able after launch? Hmm
- if len(execRequest.HTTPProxy) > 0 {
- log.Printf("[DEBUG] Sending proxy info to child process")
- os.Setenv("SHUFFLE_PASS_APP_PROXY", execRequest.ShufflePassProxyToApp)
- }
- if len(execRequest.HTTPProxy) > 0 {
- log.Printf("[DEBUG] Running with default HTTP proxy %s", execRequest.HTTPProxy)
- os.Setenv("HTTP_PROXY", execRequest.HTTPProxy)
- }
- if len(execRequest.HTTPSProxy) > 0 {
- log.Printf("[DEBUG] Running with default HTTPS proxy %s", execRequest.HTTPSProxy)
- os.Setenv("HTTPS_PROXY", execRequest.HTTPSProxy)
- }
- if len(execRequest.EnvironmentName) > 0 {
- os.Setenv("ENVIRONMENT_NAME", execRequest.EnvironmentName)
- environment = execRequest.EnvironmentName
- }
- if len(execRequest.Timezone) > 0 {
- os.Setenv("TZ", execRequest.Timezone)
- timezone = execRequest.Timezone
- }
- if len(execRequest.Cleanup) > 0 {
- os.Setenv("CLEANUP", execRequest.Cleanup)
- cleanupEnv = execRequest.Cleanup
- }
- if len(execRequest.BaseUrl) > 0 {
- os.Setenv("BASE_URL", execRequest.BaseUrl)
- baseUrl = execRequest.BaseUrl
- }
- // Setting to just have an auth available.
- if len(execRequest.Authorization) > 0 && len(os.Getenv("AUTHORIZATION")) == 0 {
- //log.Printf("[DEBUG] Sending proxy info to child process")
- os.Setenv("AUTHORIZATION", execRequest.Authorization)
- }
- var workflowExecution shuffle.WorkflowExecution
- streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
- req, err := http.NewRequest(
- "POST",
- streamResultUrl,
- bytes.NewBuffer([]byte(fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, execRequest.ExecutionId, execRequest.Authorization))),
- )
- if err != nil {
- log.Printf("[ERROR][%s] Failed to create a new request", execRequest.ExecutionId)
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- client := shuffle.GetExternalClient(streamResultUrl)
- newresp, err := client.Do(req)
- if err != nil {
- log.Printf("[ERROR] Failed making request (2): %s", err)
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- defer newresp.Body.Close()
- body, err = ioutil.ReadAll(newresp.Body)
- if err != nil {
- log.Printf("[ERROR][%s] Failed reading body (2): %s", execRequest.ExecutionId, err)
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- if newresp.StatusCode != 200 {
- log.Printf("[ERROR][%s] Bad statuscode: %d, %s", execRequest.ExecutionId, newresp.StatusCode, string(body))
- if strings.Contains(string(body), "Workflowexecution is already finished") {
- log.Printf("[DEBUG] Shutting down (19)")
- //shutdown(workflowExecution, "", "", true)
- }
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad statuscode: %d"}`, newresp.StatusCode)))
- return
- }
- err = json.Unmarshal(body, &workflowExecution)
- if err != nil {
- log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- //err = shuffle.SetWorkflowExecution(ctx, workflowExecution, true)
- err = setWorkflowExecution(ctx, workflowExecution, true)
- if err != nil {
- log.Printf("[ERROR] Failed initializing execution saving for %s: %s", workflowExecution.ExecutionId, err)
- }
- if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
- log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId)
- log.Printf("[DEBUG] Shutting down (20)")
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status for execution - already %s. Returning with 200 OK"}`, workflowExecution.Status)))
- return
- }
- //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
- extra := 0
- for _, trigger := range workflowExecution.Workflow.Triggers {
- //log.Printf("Appname trigger (0): %s", trigger.AppName)
- if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
- extra += 1
- }
- }
- log.Printf("[INFO][%s] (1) Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra)
- if workflowExecution.Status != "EXECUTING" {
- log.Printf("[WARNING] Exiting as worker execution has status %s!", workflowExecution.Status)
- log.Printf("[DEBUG] Shutting down (38)")
- resp.WriteHeader(400)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status %s for the workflow execution %s"}`, workflowExecution.Status, workflowExecution.ExecutionId)))
- return
- }
- //log.Printf("[DEBUG] Starting execution :O")
- cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
- execData, err := json.Marshal(workflowExecution)
- if err != nil {
- log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err)
- } else {
- err = shuffle.SetCache(ctx, cacheKey, execData, 31)
- if err != nil {
- log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err)
- }
- }
- err = executionInit(workflowExecution)
- if err != nil {
- log.Printf("[DEBUG][%s] Shutting down (30) - Workflow setup failed: %s", workflowExecution.ExecutionId, err)
- resp.WriteHeader(500)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Error in execution init: %s"}`, err)))
- return
- //shutdown(workflowExecution, "", "", true)
- }
- handleExecutionResult(workflowExecution)
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
- }
- func handleDownloadImage(resp http.ResponseWriter, request *http.Request) {
- // Read the request body
- defer request.Body.Close()
- bodyBytes, err := ioutil.ReadAll(request.Body)
- if err != nil {
- log.Printf("[ERROR] Failed reading body for stream result queue. Error: %s", err)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- // get images from request
- imageBody := &ImageDownloadBody{}
- err = json.Unmarshal(bodyBytes, imageBody)
- if err != nil {
- log.Printf("[ERROR] Error in unmarshalling body: %s", err)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- // client, err := dockerclient.NewEnvClient()
- client, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (4): %s", err)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- defer client.Close()
- // check if images are already downloaded
- // Retrieve a list of Docker images
- listOptions := dockerimage.ListOptions{}
- images, err := client.ImageList(context.Background(), listOptions)
- if err != nil {
- log.Printf("[ERROR] listing images: %s", err)
- resp.WriteHeader(401)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
- return
- }
- for _, img := range images {
- for _, tag := range img.RepoTags {
- splitTag := strings.Split(tag, ":")
- baseTag := tag
- if len(splitTag) > 1 {
- baseTag = splitTag[1]
- }
- var possibleNames []string
- possibleNames = append(possibleNames, fmt.Sprintf("frikky/shuffle:%s", baseTag))
- possibleNames = append(possibleNames, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", baseTag))
- if arrayContains(possibleNames, imageBody.Image) {
- log.Printf("[DEBUG] Image %s already downloaded that has been requested to download", imageBody.Image)
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "image already present"}`)))
- return
- }
- }
- }
- log.Printf("[INFO] Downloading image %s", imageBody.Image)
- err = shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, imageBody.Image)
- if err == nil {
- downloadedImages = append(downloadedImages, imageBody.Image)
- }
- // return success
- resp.WriteHeader(200)
- resp.Write([]byte(fmt.Sprintf(`{"success": true, "status": "starting download"}`)))
- }
- func runWebserver(listener net.Listener) {
- r := mux.NewRouter()
- r.HandleFunc("/api/v1/streams", handleWorkflowQueue).Methods("POST", "OPTIONS")
- r.HandleFunc("/api/v1/streams/results", handleGetStreamResults).Methods("POST", "OPTIONS")
- r.HandleFunc("/api/v1/download", handleDownloadImage).Methods("POST", "OPTIONS")
- // Synonyms. Require an execution ID + auth + shuffle backend
- r.HandleFunc("/api/v1/execute", handleRunExecution).Methods("POST", "OPTIONS")
- r.HandleFunc("/api/v1/run", handleRunExecution).Methods("POST", "OPTIONS")
- // What would be require to run a workflow otherwise?
- // Maybe directly /workflow/run
- /*** STARTREMOVE ***/
- if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
- log.Printf("[DEBUG] Running webserver config for SWARM and K8s")
- }
- /*** ENDREMOVE ***/
- // var dockercli *dockerclient.Client
- // ctx := context.Background()
- scaleReplicas := os.Getenv("SHUFFLE_APP_REPLICAS")
- if len(scaleReplicas) > 0 {
- tmpInt, err := strconv.Atoi(scaleReplicas)
- if err != nil {
- log.Printf("[ERROR] %s is not a valid number for replication", scaleReplicas)
- } else {
- maxReplicas = uint64(tmpInt)
- _ = tmpInt
- }
- log.Printf("[DEBUG] SHUFFLE_APP_REPLICAS set to value %#v. Trying to overwrite default (%d/node)", scaleReplicas, maxReplicas)
- }
- maxExecutionsPerMinute := 10
- if os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE") != "" {
- tmpInt, err := strconv.Atoi(os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE"))
- if err != nil {
- log.Printf("[ERROR] %s is not a valid number for executions per minute", os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE"))
- } else {
- maxExecutionsPerMinute = tmpInt
- }
- log.Printf("[DEBUG] SHUFFLE_APP_EXECUTIONS_PER_MINUTE set to value %s. Trying to overwrite default (%d)", os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE"), maxExecutionsPerMinute)
- }
- if strings.ToLower(os.Getenv("SHUFFLE_SWARM_CONFIG")) == "run" || strings.ToLower(os.Getenv("SHUFFLE_APP_REPLICAS")) == "" {
- // go AutoScaleApps(ctx, dockercli, maxExecutionsPerMinute)
- }
- if strings.ToLower(os.Getenv("SHUFFLE_DEBUG_MEMORY")) == "true" || strings.ToLower(os.Getenv("DEBUG_MEMORY")) == "true" {
- r.HandleFunc("/debug/pprof/", pprof.Index)
- r.HandleFunc("/debug/pprof/heap", pprof.Handler("heap").ServeHTTP)
- r.HandleFunc("/debug/pprof/profile", pprof.Profile)
- r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
- r.HandleFunc("/debug/pprof/trace", pprof.Trace)
- }
- //log.Fatal(http.ListenAndServe(port, nil))
- //srv := http.Server{
- // Addr: ":8888",
- // WriteTimeout: 1 * time.Second,
- // Handler: http.HandlerFunc(slowHandler),
- //}
- //log.Fatal(http.Serve(listener, nil))
- log.Printf("[DEBUG] NEW webserver setup. Port: %s", listener.Addr().String())
- http.Handle("/", r)
- srv := http.Server{
- Handler: r,
- ReadTimeout: 60 * time.Second,
- ReadHeaderTimeout: 60 * time.Second,
- IdleTimeout: 60 * time.Second,
- WriteTimeout: 60 * time.Second,
- }
- err := srv.Serve(listener)
- if err != nil {
- log.Printf("[ERROR] Serve issue in worker: %#v", err)
- }
- }
- // 0x0elliot:
- // IF we had to rewrite this, we will focus on ONLY auto scale for apps.
- // i recommend we target executions/minute (?) as a metric.
- // edge-case: subflows are helped with when worker replicas are higher.
- // i kind of never want to scale down. at least, not now.
- // also, algorithm is very broken. executions/worker
- func AutoScaleApps(ctx context.Context, client *dockerclient.Client, maxExecutionsPerMinute int) {
- ticker := time.NewTicker(1 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- count := window.CountEvents(time.Now())
- j := numberOfApps(ctx, client)
- workers := numberOfWorkers(ctx, client)
- execPerMin := maxExecutionsPerMinute / workers
- if count >= execPerMin {
- log.Printf("[DEBUG] Too many executions per minute (%d). Scaling down to %d", count, execPerMin)
- scaleApps(ctx, client, uint64(j+1))
- }
- }
- }
- }
- func scaleApps(ctx context.Context, client *dockerclient.Client, replicas uint64) error {
- // client, err := dockerclient.NewEnvClient()
- client, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (scaleApps): %s", err)
- return err
- }
- services, err := client.ServiceList(ctx, types.ServiceListOptions{})
- if err != nil {
- log.Printf("[ERROR] Failed to find services in the swarm: %s", err)
- }
- networkId, err := getNetworkId(ctx, client)
- if err != nil {
- log.Printf("[ERROR] Failed to get network Id in the swarm service: %s", err)
- }
- workers := numberOfWorkers(ctx, client)
- if replicas > uint64(workers) {
- return nil
- }
- for _, service := range services {
- if service.Spec.Name == "shuffle-workers" {
- continue
- }
- inNetwork := false
- for _, vip := range service.Endpoint.VirtualIPs {
- if vip.NetworkID == networkId {
- inNetwork = true
- break
- }
- }
- if !inNetwork {
- continue // skip services not in the target network
- }
- if service.Spec.Mode.Replicated == nil {
- return errors.New("Service is not replicated")
- }
- if *service.Spec.Mode.Replicated.Replicas >= replicas {
- continue
- }
- service.Spec.Mode.Replicated.Replicas = &replicas
- _, err = client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
- if err != nil {
- return err
- }
- }
- log.Printf("[DEBUG] Scaled all services to %d replicas", replicas)
- return nil
- }
- func getNetworkId(ctx context.Context, dockercli *dockerclient.Client) (string, error) {
- networkFilter := filters.NewArgs()
- networkFilter.Add("name", swarmNetworkName)
- listOptions := network.ListOptions{
- Filters: networkFilter,
- }
- networks, err := dockercli.NetworkList(ctx, listOptions)
- if err != nil || len(networks) == 0 {
- return "", err
- }
- networkId := networks[0].ID
- return networkId, nil
- }
- func numberOfApps(ctx context.Context, dockercli *dockerclient.Client) int {
- // swarmNetworkName
- var err error
- if swarmNetworkName == "" {
- swarmNetworkName = "shuffle_swarm_executions"
- }
- if dockercli == nil {
- // dockercli, err = dockerclient.NewEnvClient()
- dockercli, _, err = shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (5): %s", err)
- return 0
- }
- }
- networkFilter := filters.NewArgs()
- networkFilter.Add("name", swarmNetworkName)
- listOptions := network.ListOptions{
- Filters: networkFilter,
- }
- networks, err := dockercli.NetworkList(ctx, listOptions)
- if err != nil || len(networks) == 0 {
- return 0
- }
- networkId, err := getNetworkId(ctx, dockercli)
- if err != nil {
- log.Printf("[WARNING] Failed to get networkID is worker running in swarm: %s", err)
- return 0
- }
- services, err := dockercli.ServiceList(ctx, types.ServiceListOptions{})
- if err != nil {
- log.Printf("[WARNING] Can't found any services. %s", err)
- return 0
- }
- runningReplicas := 0
- for _, service := range services {
- if service.Spec.Name == "shuffle-workers" {
- continue
- }
- inNetwork := false
- for _, vip := range service.Endpoint.VirtualIPs {
- if vip.NetworkID == networkId {
- inNetwork = true
- break
- }
- }
- if !inNetwork {
- continue // skip services not in the target network
- }
- filterArgs := filters.NewArgs()
- filterArgs.Add("service", service.Spec.Name)
- filterArgs.Add("desired-state", "running")
- task, err := dockercli.TaskList(ctx, types.TaskListOptions{
- Filters: filterArgs,
- })
- if err != nil {
- log.Printf("[WARNING] Failed to get the list of running services %s: %s", service.Spec.Name, err)
- continue
- }
- runningReplicas = len(task)
- break
- }
- return runningReplicas
- }
- func IsServiceRunning(ctx context.Context, cli *dockerclient.Client) bool {
- serviceName := "shuffle-tools_1-2-0"
- filterArgs := filters.NewArgs()
- filterArgs.Add("name", serviceName)
- services, err := cli.ServiceList(ctx, types.ServiceListOptions{Filters: filterArgs})
- if err != nil {
- log.Printf("[ERROR] Couldn't find %s service running got error: %s", serviceName, err)
- return false
- }
- if len(services) > 0 {
- return true
- }
- return false
- }
- func numberOfWorkers(ctx context.Context, cli *dockerclient.Client) int {
- // cli, err := dockerclient.NewEnvClient()
- cli, _, err := shuffle.GetDockerClient()
- if err != nil {
- log.Printf("[ERROR] Unable to create docker client (5): %s", err)
- return 0
- }
- service, _, err := cli.ServiceInspectWithRaw(ctx, "shuffle-workers", types.ServiceInspectOptions{})
- if err != nil {
- return 0
- }
- if service.Spec.Mode.Replicated == nil {
- return 0
- }
- replics := *service.Spec.Mode.Replicated.Replicas
- return int(replics)
- }
|