Sin descripción

worker.go 176KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289
  1. package main
  2. import (
  3. "github.com/shuffle/shuffle-shared"
  4. singul "github.com/shuffle/singul/pkg"
  5. "bytes"
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "log"
  13. "net"
  14. "net/http"
  15. "net/http/pprof"
  16. "net/url"
  17. "os"
  18. "strconv"
  19. "strings"
  20. "time"
  21. "github.com/docker/docker/api/types"
  22. "github.com/docker/docker/api/types/container"
  23. "github.com/docker/docker/api/types/filters"
  24. dockerimage "github.com/docker/docker/api/types/image"
  25. "github.com/docker/docker/api/types/mount"
  26. "github.com/docker/docker/api/types/network"
  27. dockerclient "github.com/docker/docker/client"
  28. // This is for automatic removal of certain code :)
  29. /*** STARTREMOVE ***/
  30. "math/rand"
  31. "github.com/docker/docker/api/types/swarm"
  32. uuid "github.com/satori/go.uuid"
  33. /*** ENDREMOVE ***/
  34. "github.com/gorilla/mux"
  35. //k8s deps
  36. appsv1 "k8s.io/api/apps/v1"
  37. corev1 "k8s.io/api/core/v1"
  38. "k8s.io/apimachinery/pkg/api/resource"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. "k8s.io/apimachinery/pkg/util/intstr"
  41. "k8s.io/client-go/kubernetes"
  42. )
  43. // This is getting out of hand :)
  44. var timezone = os.Getenv("TZ")
  45. var baseUrl = os.Getenv("BASE_URL")
  46. var appCallbackUrl = os.Getenv("BASE_URL")
  47. var isKubernetes = os.Getenv("IS_KUBERNETES")
  48. var environment = os.Getenv("ENVIRONMENT_NAME")
  49. var logsDisabled = os.Getenv("SHUFFLE_LOGS_DISABLED")
  50. var cleanupEnv = strings.ToLower(os.Getenv("CLEANUP"))
  51. var swarmNetworkName = os.Getenv("SHUFFLE_SWARM_NETWORK_NAME")
  52. var dockerApiVersion = strings.ToLower(os.Getenv("DOCKER_API_VERSION"))
  53. var shutdownDisabled = strings.ToLower(os.Getenv("SHUFFLE_WORKER_SHUTDOWN_DISABLED"))
  54. // Kubernetes settings
  55. var appServiceAccountName = os.Getenv("SHUFFLE_APP_SERVICE_ACCOUNT_NAME")
  56. var appPodSecurityContext = os.Getenv("SHUFFLE_APP_POD_SECURITY_CONTEXT")
  57. var appContainerSecurityContext = os.Getenv("SHUFFLE_APP_CONTAINER_SECURITY_CONTEXT")
  58. var kubernetesNamespace = os.Getenv("KUBERNETES_NAMESPACE")
  59. var executionCount int64
  60. var baseimagename = os.Getenv("SHUFFLE_BASE_IMAGE_NAME")
  61. // var baseimagename = "registry.hub.docker.com/frikky/shuffle"
  62. var registryName = "registry.hub.docker.com"
  63. var sleepTime = 2
  64. var topClient *http.Client
  65. var data string
  66. var requestsSent = 0
  67. var appsInitialized = false
  68. var hostname string
  69. var maxReplicas = uint64(12)
  70. var debug bool
  71. /*
  72. var environments []string
  73. var parents map[string][]string
  74. var children map[string][]string
  75. var visited []string
  76. var executed []string
  77. var nextActions []string
  78. var extra int
  79. var startAction string
  80. */
  81. //var results []shuffle.ActionResult
  82. //var allLogs map[string]string
  83. //var containerIds []string
  84. var downloadedImages []string
  85. type ImageDownloadBody struct {
  86. Image string `json:"image"`
  87. }
  88. type ImageRequest struct {
  89. Image string `json:"image"`
  90. }
  91. var finishedExecutions []string
  92. var imagesDistributed []string
  93. var imagedownloadTimeout = time.Second * 300
  94. var window = shuffle.NewTimeWindow(10 * time.Second)
  95. // Images to be autodeployed in the latest version of Shuffle.
  96. var autoDeploy = map[string]string{
  97. "http:1.4.0": "frikky/shuffle:http_1.4.0",
  98. "shuffle-tools:1.2.0": "frikky/shuffle:shuffle-tools_1.2.0",
  99. "shuffle-subflow:1.1.0": "frikky/shuffle:shuffle-subflow_1.1.0",
  100. "shuffle-ai:1.1.0": "frikky/shuffle:shuffle-ai_1.1.0",
  101. // "shuffle-tools-fork:1.0.0": "frikky/shuffle:shuffle-tools-fork_1.0.0",
  102. }
  103. // New Worker mappings
  104. // visited, appendActions, nextActions, notFound, queueNodes, toRemove, executed, env
  105. var portMappings map[string]int
  106. var baseport = 33333
  107. type UserInputSubflow struct {
  108. Argument string `json:"execution_argument"`
  109. ContinueUrl string `json:"continue_url"`
  110. CancelUrl string `json:"cancel_url"`
  111. }
  112. // Not using shuffle.SetWorkflowExecution as we only want to use cache in reality
  113. func setWorkflowExecution(ctx context.Context, workflowExecution shuffle.WorkflowExecution, dbSave bool) error {
  114. if len(workflowExecution.ExecutionId) == 0 {
  115. log.Printf("[DEBUG] Workflowexecution executionId can't be empty.")
  116. return errors.New("ExecutionId can't be empty.")
  117. }
  118. //log.Printf("[DEBUG][%s] Setting with %d results (pre)", workflowExecution.ExecutionId, len(workflowExecution.Results))
  119. workflowExecution, _ = shuffle.Fixexecution(ctx, workflowExecution)
  120. cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
  121. execData, err := json.Marshal(workflowExecution)
  122. if err != nil {
  123. log.Printf("[ERROR] Failed marshalling execution during set: %s", err)
  124. return err
  125. }
  126. err = shuffle.SetCache(ctx, cacheKey, execData, 30)
  127. if err != nil {
  128. log.Printf("[ERROR][%s] Failed adding to cache during setexecution", workflowExecution.ExecutionId)
  129. return err
  130. }
  131. handleExecutionResult(workflowExecution)
  132. validated := shuffle.ValidateFinished(ctx, -1, workflowExecution)
  133. if validated {
  134. shutdownData, err := json.Marshal(workflowExecution)
  135. if err != nil {
  136. log.Printf("[ERROR] Failed marshalling shutdowndata during set: %s", err)
  137. }
  138. log.Printf("[DEBUG][%s] Sending result (set). Status: %s, Actions: %d, Results: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), len(workflowExecution.Results))
  139. sendResult(workflowExecution, shutdownData)
  140. return nil
  141. }
  142. // FIXME: Should this shutdown OR send the result?
  143. // The worker may not be running the backend hmm
  144. if dbSave {
  145. if workflowExecution.ExecutionSource == "default" {
  146. log.Printf("[DEBUG][%s] Shutting down (25)", workflowExecution.ExecutionId)
  147. shutdown(workflowExecution, "", "", true)
  148. //return
  149. } else {
  150. log.Printf("[DEBUG][%s] NOT shutting down with dbSave (%s). Instead sending result to backend and start polling until subflow is updated", workflowExecution.ExecutionId, workflowExecution.ExecutionSource)
  151. shutdownData, err := json.Marshal(workflowExecution)
  152. if err != nil {
  153. log.Printf("[ERROR] Failed marshalling shutdowndata during dbSave handler: %s", err)
  154. }
  155. sendResult(workflowExecution, shutdownData)
  156. // Poll for 1 minute max if there is a "wait for results" subflow
  157. subflowId := ""
  158. for _, result := range workflowExecution.Results {
  159. if result.Status == "WAITING" {
  160. //log.Printf("[DEBUG][%s] Found waiting result", workflowExecution.ExecutionId)
  161. subflowId = result.Action.ID
  162. }
  163. }
  164. if len(subflowId) == 0 {
  165. log.Printf("[DEBUG][%s] No waiting result found. Not polling", workflowExecution.ExecutionId)
  166. for _, action := range workflowExecution.Workflow.Actions {
  167. if action.AppName == "User Input" || action.AppName == "Shuffle Workflow" || action.AppName == "shuffle-subflow" {
  168. workflowExecution.Workflow.Triggers = append(workflowExecution.Workflow.Triggers, shuffle.Trigger{
  169. AppName: action.AppName,
  170. Parameters: action.Parameters,
  171. ID: action.ID,
  172. })
  173. }
  174. }
  175. for _, trigger := range workflowExecution.Workflow.Triggers {
  176. //log.Printf("[DEBUG] Found trigger %s", trigger.AppName)
  177. if trigger.AppName != "User Input" && trigger.AppName != "Shuffle Workflow" && trigger.AppName != "shuffle-subflow" {
  178. continue
  179. }
  180. // check if it has wait for results in params
  181. wait := false
  182. for _, param := range trigger.Parameters {
  183. //log.Printf("[DEBUG] Found param %s with value %s", param.Name, param.Value)
  184. if param.Name == "check_result" && strings.ToLower(param.Value) == "true" {
  185. //log.Printf("[DEBUG][%s] Found check result param!", workflowExecution.ExecutionId)
  186. wait = true
  187. break
  188. }
  189. }
  190. if wait {
  191. // Check if it has a result or not
  192. found := false
  193. for _, result := range workflowExecution.Results {
  194. //log.Printf("[DEBUG][%s] Found result %s", workflowExecution.ExecutionId, result.Action.ID)
  195. if result.Action.ID == trigger.ID && result.Status != "SUCCESS" && result.Status != "FAILURE" {
  196. //log.Printf("[DEBUG][%s] Found subflow result that is not handled. Waiting for results", workflowExecution.ExecutionId)
  197. subflowId = result.Action.ID
  198. found = true
  199. break
  200. }
  201. }
  202. if !found {
  203. log.Printf("[DEBUG][%s] No result found for subflow. Setting subflowId to %s", workflowExecution.ExecutionId, trigger.ID)
  204. subflowId = trigger.ID
  205. }
  206. }
  207. if len(subflowId) > 0 {
  208. break
  209. }
  210. }
  211. }
  212. if len(subflowId) > 0 {
  213. // Under rerun period timeout
  214. timeComparison := 120
  215. log.Printf("[DEBUG][%s] Starting polling for %d seconds to see if new subflow updates are found on the backend that are not handled. Subflow ID: %s", workflowExecution.ExecutionId, timeComparison, subflowId)
  216. timestart := time.Now()
  217. streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
  218. for {
  219. err = handleSubflowPoller(ctx, workflowExecution, streamResultUrl, subflowId)
  220. if err == nil {
  221. log.Printf("[DEBUG] Subflow is finished and we are breaking the thingy")
  222. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" && workflowExecution.ExecutionSource != "default" {
  223. log.Printf("[DEBUG] Force shutdown of worker due to optimized run with webserver. Expecting reruns to take care of this")
  224. os.Exit(0)
  225. }
  226. break
  227. }
  228. timepassed := time.Since(timestart)
  229. if timepassed.Seconds() > float64(timeComparison) {
  230. log.Printf("[DEBUG][%s] Max poll time reached to look for updates. Stopping poll. This poll is here to send personal results back to itself to be handled, then to stop this thread.", workflowExecution.ExecutionId)
  231. break
  232. }
  233. // Sleep for 1 second
  234. time.Sleep(1 * time.Second)
  235. }
  236. } else {
  237. log.Printf("[DEBUG][%s] No need to poll for results. Not polling", workflowExecution.ExecutionId)
  238. }
  239. }
  240. }
  241. return nil
  242. }
  243. // removes every container except itself (worker)
  244. func shutdown(workflowExecution shuffle.WorkflowExecution, nodeId string, reason string, handleResultSend bool) {
  245. log.Printf("[DEBUG][%s] Shutdown (%s) started with reason %#v. Result amount: %d. ResultsSent: %d, Send result: %#v, Parent: %#v", workflowExecution.ExecutionId, workflowExecution.Status, reason, len(workflowExecution.Results), requestsSent, handleResultSend, workflowExecution.ExecutionParent)
  246. // This is an escape hatch for development only
  247. // Typically meant to be used when you aren't sure how to make a workflow run in bad scenarios, and want to rapidly debug it.
  248. if shutdownDisabled == "true" {
  249. log.Printf("[ERROR] Shutdown disabled: NOT shutting down. This should ONLY be used for development & debugging.")
  250. os.Exit(3)
  251. }
  252. sleepDuration := 1
  253. if handleResultSend && requestsSent < 2 {
  254. shutdownData, err := json.Marshal(workflowExecution)
  255. if err == nil {
  256. sendResult(workflowExecution, shutdownData)
  257. //log.Printf("[WARNING][%s] Sent shutdown update with %d results and result value %s", workflowExecution.ExecutionId, len(workflowExecution.Results), reason)
  258. } else {
  259. log.Printf("[WARNING][%s] Failed to send update: %s", workflowExecution.ExecutionId, err)
  260. }
  261. time.Sleep(time.Duration(sleepDuration) * time.Second)
  262. }
  263. // Might not be necessary because of cleanupEnv hostconfig autoremoval
  264. if strings.ToLower(cleanupEnv) == "true" && (os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm") {
  265. /*
  266. ctx := context.Background()
  267. dockercli, err := dockerclient.NewEnvClient()
  268. if err == nil {
  269. log.Printf("[INFO] Cleaning up %d containers", len(containerIds))
  270. removeOptions := types.ContainerRemoveOptions{
  271. RemoveVolumes: true,
  272. Force: true,
  273. }
  274. for _, containername := range containerIds {
  275. log.Printf("[INFO] Should stop and and remove container %s (deprecated)", containername)
  276. //dockercli.ContainerStop(ctx, containername, nil)
  277. //dockercli.ContainerRemove(ctx, containername, removeOptions)
  278. //removeContainers = append(removeContainers, containername)
  279. }
  280. }
  281. */
  282. } else {
  283. /*** STARTREMOVE ***/
  284. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  285. log.Printf("[DEBUG][%s] NOT cleaning up containers. IDS: %d, CLEANUP env: %s", workflowExecution.ExecutionId, 0, cleanupEnv)
  286. }
  287. /*** ENDREMOVE ***/
  288. }
  289. if len(reason) > 0 && len(nodeId) > 0 {
  290. //log.Printf("[INFO] Running abort of workflow because it should be finished")
  291. abortUrl := fmt.Sprintf("%s/api/v1/workflows/%s/executions/%s/abort", baseUrl, workflowExecution.Workflow.ID, workflowExecution.ExecutionId)
  292. path := fmt.Sprintf("?reason=%s", url.QueryEscape(reason))
  293. if len(nodeId) > 0 {
  294. path += fmt.Sprintf("&node=%s", url.QueryEscape(nodeId))
  295. }
  296. if len(environment) > 0 {
  297. path += fmt.Sprintf("&env=%s", url.QueryEscape(environment))
  298. }
  299. abortUrl += path
  300. log.Printf("[DEBUG][%s] Abort URL: %s", workflowExecution.ExecutionId, abortUrl)
  301. req, err := http.NewRequest(
  302. "GET",
  303. abortUrl,
  304. nil,
  305. )
  306. if err != nil {
  307. log.Printf("[WARNING][%s] Failed building request: %s", workflowExecution.ExecutionId, err)
  308. }
  309. // FIXME: Add an API call to the backend
  310. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  311. authorization := os.Getenv("AUTHORIZATION")
  312. if len(authorization) > 0 {
  313. req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization))
  314. } else {
  315. log.Printf("[ERROR][%s] No authorization specified for abort", workflowExecution.ExecutionId)
  316. }
  317. } else {
  318. req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", workflowExecution.Authorization))
  319. }
  320. req.Header.Add("Content-Type", "application/json")
  321. //log.Printf("[DEBUG][%s] All App Logs: %#v", workflowExecution.ExecutionId, allLogs)
  322. client := shuffle.GetExternalClient(abortUrl)
  323. newresp, err := client.Do(req)
  324. if err != nil {
  325. log.Printf("[WARNING][%s] Failed abort request: %s", workflowExecution.ExecutionId, err)
  326. } else {
  327. defer newresp.Body.Close()
  328. }
  329. } else {
  330. //log.Printf("[INFO][%s] NOT running abort during shutdown.", workflowExecution.ExecutionId)
  331. }
  332. log.Printf("[DEBUG][%s] Finished shutdown (after %d seconds). ", workflowExecution.ExecutionId, sleepDuration)
  333. //Finished shutdown (after %d seconds). ", sleepDuration)
  334. // Allows everything to finish in subprocesses (apps)
  335. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" && isKubernetes != "true" {
  336. time.Sleep(time.Duration(sleepDuration) * time.Second)
  337. os.Exit(3)
  338. } else {
  339. log.Printf("[DEBUG][%s] Sending result and resetting values (K8s & Swarm).", workflowExecution.ExecutionId)
  340. }
  341. }
  342. // ** STARTREMOVE ***/
  343. func deployk8sApp(image string, identifier string, env []string) error {
  344. if len(os.Getenv("KUBERNETES_NAMESPACE")) > 0 {
  345. kubernetesNamespace = os.Getenv("KUBERNETES_NAMESPACE")
  346. } else {
  347. kubernetesNamespace = "default"
  348. }
  349. ctx := context.Background()
  350. log.Printf("[DEBUG] Deploying k8s app with identifier %s to namespace %s", identifier, kubernetesNamespace)
  351. deployport, err := strconv.Atoi(os.Getenv("SHUFFLE_APP_EXPOSED_PORT"))
  352. if err != nil {
  353. deployport = 80
  354. }
  355. envMap := make(map[string]string)
  356. for _, envStr := range env {
  357. parts := strings.SplitN(envStr, "=", 2)
  358. if len(parts) == 2 {
  359. envMap[parts[0]] = parts[1]
  360. }
  361. }
  362. // add to env
  363. envMap["SHUFFLE_APP_EXPOSED_PORT"] = strconv.Itoa(deployport)
  364. envMap["SHUFFLE_SWARM_CONFIG"] = os.Getenv("SHUFFLE_SWARM_CONFIG")
  365. envMap["BASE_URL"] = "http://shuffle-workers:33333"
  366. if len(os.Getenv("SHUFFLE_LOGS_DISABLED")) > 0 {
  367. envMap["SHUFFLE_LOGS_DISABLED"] = os.Getenv("SHUFFLE_LOGS_DISABLED")
  368. }
  369. clientset, _, err := shuffle.GetKubernetesClient()
  370. if err != nil {
  371. log.Printf("[ERROR] Failed getting kubernetes: %s", err)
  372. return err
  373. }
  374. // str := strings.ToLower(identifier)
  375. // strSplit := strings.Split(str, "_")
  376. // value := strSplit[0]
  377. // value = strings.ReplaceAll(value, "_", "-")
  378. value := identifier
  379. baseDeployMode := false
  380. // check if autoDeploy contains a value
  381. // that is equal to the image being deployed.
  382. for _, value := range autoDeploy {
  383. if value == image {
  384. baseDeployMode = true
  385. }
  386. }
  387. autoDeployOverride := os.Getenv("SHUFFLE_USE_GHCR_OVERRIDE_FOR_AUTODEPLOY") == "true"
  388. localRegistry := ""
  389. // Checking if app is generated or not
  390. if !(baseDeployMode && autoDeployOverride) {
  391. localRegistry = os.Getenv("REGISTRY_URL")
  392. } else {
  393. log.Printf("[DEBUG] Detected baseDeploy image (%s) and ghcr override. Resorting to using ghcr instead of registry", image)
  394. }
  395. /*
  396. appDetails := strings.Split(image, ":")[1]
  397. appDetailsSplit := strings.Split(appDetails, "_")
  398. appName := strings.Join(appDetailsSplit[:len(appDetailsSplit)-1], "_")
  399. appVersion := appDetailsSplit[len(appDetailsSplit)-1]
  400. for _, app := range workflowExecution.Workflow.Actions {
  401. // log.Printf("[DEBUG] App: %s, Version: %s", appName, appVersion)
  402. // log.Printf("[DEBUG] Checking app %s with version %s", app.AppName, app.AppVersion)
  403. if app.AppName == appName && app.AppVersion == appVersion {
  404. if app.Generated == true {
  405. log.Printf("[DEBUG] Generated app, setting local registry")
  406. image = fmt.Sprintf("%s/%s", localRegistry, image)
  407. break
  408. } else {
  409. log.Printf("[DEBUG] Not generated app, setting shuffle registry")
  410. }
  411. }
  412. }
  413. */
  414. if (len(localRegistry) == 0 && len(os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")) > 0) && !(baseDeployMode && autoDeployOverride) {
  415. localRegistry = os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")
  416. }
  417. if (len(localRegistry) > 0 && strings.Count(image, "/") <= 2) && !(baseDeployMode && autoDeployOverride) {
  418. log.Printf("[DEBUG] Using REGISTRY_URL %s", localRegistry)
  419. image = fmt.Sprintf("%s/%s", localRegistry, image)
  420. } else {
  421. if strings.Count(image, "/") <= 2 && !strings.HasPrefix(image, "frikky/shuffle:") {
  422. image = fmt.Sprintf("frikky/shuffle:%s", image)
  423. }
  424. }
  425. log.Printf("[DEBUG] Got kubernetes with namespace %#v to run image '%s'", kubernetesNamespace, image)
  426. //fix naming convention
  427. // podUuid := uuid.NewV4().String()
  428. // name := fmt.Sprintf("%s-%s", value, podUuid)
  429. // replace identifier "_" with "-"
  430. name := strings.ReplaceAll(identifier, "_", "-")
  431. labels := map[string]string{
  432. // Well-known Kubernetes labels
  433. "app.kubernetes.io/name": "shuffle-app",
  434. "app.kubernetes.io/instance": name,
  435. "app.kubernetes.io/part-of": "shuffle",
  436. "app.kubernetes.io/managed-by": "shuffle-worker",
  437. // Keep legacy labels for backward compatibility
  438. "app": name,
  439. // TODO: Add Shuffle specific labels
  440. // "app.shuffler.io/name": "APP_NAME",
  441. // "app.shuffler.io/version": "APP_VERSION",
  442. }
  443. matchLabels := map[string]string{
  444. "app.kubernetes.io/name": "shuffle-app",
  445. "app.kubernetes.io/instance": name,
  446. }
  447. // Parse security contexts from env
  448. var podSecurityContext *corev1.PodSecurityContext
  449. var containerSecurityContext *corev1.SecurityContext
  450. if len(appPodSecurityContext) > 0 {
  451. podSecurityContext = &corev1.PodSecurityContext{}
  452. err = json.Unmarshal([]byte(appPodSecurityContext), podSecurityContext)
  453. if err != nil {
  454. log.Printf("[ERROR] Failed to unmarshal app pod security context: %v", err)
  455. return fmt.Errorf("failed to unmarshal app pod security context: %v", err)
  456. }
  457. }
  458. if len(appContainerSecurityContext) > 0 {
  459. containerSecurityContext = &corev1.SecurityContext{}
  460. err = json.Unmarshal([]byte(appContainerSecurityContext), containerSecurityContext)
  461. if err != nil {
  462. log.Printf("[ERROR] Failed to unmarshal app container security context: %v", err)
  463. return fmt.Errorf("failed to unmarshal app container security context: %v", err)
  464. }
  465. }
  466. // pod := &corev1.Pod{
  467. // ObjectMeta: metav1.ObjectMeta{
  468. // Name: podName,
  469. // Labels: map[string]string{
  470. // "app": podName,
  471. // // "executionId": workflowExecution.ExecutionId,
  472. // },
  473. // },
  474. // Spec: corev1.PodSpec{
  475. // RestartPolicy: "Never", // As a crash is not useful in this context
  476. // // DNSPolicy: "Default",
  477. // DNSPolicy: corev1.DNSClusterFirst,
  478. // // NodeName: "worker1"
  479. // Containers: []corev1.Container{
  480. // {
  481. // Name: value,
  482. // Image: image,
  483. // Env: buildEnvVars(envMap),
  484. // // Pull if not available
  485. // ImagePullPolicy: corev1.PullIfNotPresent,
  486. // },
  487. // },
  488. // },
  489. // }
  490. // createdPod, err := clientset.CoreV1().Pods(kubernetesNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
  491. // if err != nil {
  492. // log.Printf("[ERROR] Failed creating pod: %v", err)
  493. // // os.Exit(1)
  494. // } else {
  495. // log.Printf("[DEBUG] Created pod %#v in namespace %#v", createdPod.Name, kubernetesNamespace)
  496. // }
  497. // service := &corev1.Service{
  498. // ObjectMeta: metav1.ObjectMeta{
  499. // Name: identifier,
  500. // },
  501. // Spec: corev1.ServiceSpec{
  502. // Selector: map[string]string{
  503. // "app": podName,
  504. // },
  505. // Ports: []corev1.ServicePort{
  506. // {
  507. // Protocol: "TCP",
  508. // Port: 80,
  509. // TargetPort: intstr.FromInt(80),
  510. // },
  511. // },
  512. // Type: corev1.ServiceTypeNodePort,
  513. // },
  514. // }
  515. // _, err = clientset.CoreV1().Services(kubernetesNamespace).Create(context.TODO(), service, metav1.CreateOptions{})
  516. // if err != nil {
  517. // log.Printf("[ERROR] Failed creating service: %v", err)
  518. // return err
  519. // }
  520. // use deployment instead of pod
  521. // then expose a service similarly.
  522. // number of replicas can be set to os.Getenv("SHUFFLE_APP_REPLICAS")
  523. replicaNumber := 1
  524. replicaNumberStr := os.Getenv("SHUFFLE_APP_REPLICAS")
  525. if len(replicaNumberStr) > 0 {
  526. tmpInt, err := strconv.Atoi(replicaNumberStr)
  527. if err != nil {
  528. log.Printf("[ERROR] %s is not a valid number for replication", replicaNumberStr)
  529. } else {
  530. replicaNumber = tmpInt
  531. }
  532. }
  533. existing, err := clientset.AppsV1().Deployments(kubernetesNamespace).List(
  534. ctx,
  535. metav1.ListOptions{
  536. LabelSelector: fmt.Sprintf("app: %s", name),
  537. },
  538. )
  539. if err != nil {
  540. log.Printf("[ERROR] Failed listing existing deployments: %v", err)
  541. }
  542. if len(existing.Items) > 0 {
  543. log.Printf("[INFO] Found existing deployments, skipping creation")
  544. return nil
  545. }
  546. replicaNumberInt32 := int32(replicaNumber)
  547. // apps do not need access the k8s api.
  548. automountServiceAccountToken := false
  549. deployment := &appsv1.Deployment{
  550. ObjectMeta: metav1.ObjectMeta{
  551. Name: name,
  552. Labels: labels,
  553. },
  554. Spec: appsv1.DeploymentSpec{
  555. Replicas: &replicaNumberInt32,
  556. Selector: &metav1.LabelSelector{
  557. MatchLabels: matchLabels,
  558. },
  559. Template: corev1.PodTemplateSpec{
  560. ObjectMeta: metav1.ObjectMeta{
  561. Labels: labels,
  562. },
  563. Spec: corev1.PodSpec{
  564. Containers: []corev1.Container{
  565. {
  566. Name: value,
  567. Image: image,
  568. Env: buildEnvVars(envMap),
  569. Ports: []corev1.ContainerPort{
  570. {
  571. Protocol: "TCP",
  572. ContainerPort: int32(deployport),
  573. },
  574. },
  575. SecurityContext: containerSecurityContext,
  576. Resources: buildResourcesFromEnv(),
  577. },
  578. },
  579. DNSPolicy: corev1.DNSClusterFirst,
  580. ServiceAccountName: appServiceAccountName,
  581. AutomountServiceAccountToken: &automountServiceAccountToken,
  582. SecurityContext: podSecurityContext,
  583. },
  584. },
  585. },
  586. }
  587. if os.Getenv("SHUFFLE_APP_MOUNT_TMP_VOLUME") == "true" {
  588. deployment.Spec.Template.Spec.Volumes = append(
  589. deployment.Spec.Template.Spec.Volumes,
  590. corev1.Volume{
  591. Name: "tmp",
  592. VolumeSource: corev1.VolumeSource{
  593. EmptyDir: &corev1.EmptyDirVolumeSource{},
  594. },
  595. },
  596. )
  597. deployment.Spec.Template.Spec.Containers[0].VolumeMounts = append(
  598. deployment.Spec.Template.Spec.Containers[0].VolumeMounts,
  599. corev1.VolumeMount{
  600. Name: "tmp",
  601. ReadOnly: false,
  602. MountPath: "/tmp",
  603. },
  604. )
  605. }
  606. if len(os.Getenv("REGISTRY_URL")) > 0 && len(os.Getenv("SHUFFLE_BASE_IMAGE_NAME")) > 0 {
  607. log.Printf("[INFO] Setting image pull policy to Always as private registry is used.")
  608. //containerAttachment.ImagePullPolicy = corev1.PullAlways
  609. deployment.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullAlways
  610. } else {
  611. deployment.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
  612. }
  613. _, err = clientset.AppsV1().Deployments(kubernetesNamespace).Create(context.Background(), deployment, metav1.CreateOptions{})
  614. if err != nil {
  615. log.Printf("[ERROR] Failed creating deployment: %v", err)
  616. return err
  617. }
  618. svcAppProtocol := "http"
  619. service := &corev1.Service{
  620. ObjectMeta: metav1.ObjectMeta{
  621. Name: name,
  622. Labels: labels,
  623. },
  624. Spec: corev1.ServiceSpec{
  625. Selector: matchLabels,
  626. Ports: []corev1.ServicePort{
  627. {
  628. Protocol: "TCP",
  629. AppProtocol: &svcAppProtocol,
  630. Port: 80,
  631. TargetPort: intstr.FromInt(deployport),
  632. },
  633. },
  634. Type: corev1.ServiceTypeClusterIP,
  635. },
  636. }
  637. _, err = clientset.CoreV1().Services(kubernetesNamespace).Create(context.TODO(), service, metav1.CreateOptions{})
  638. if err != nil {
  639. log.Printf("[ERROR] Failed creating service: %v", err)
  640. return err
  641. }
  642. // Giving the service time to start before we contineu anything
  643. log.Printf("[DEBUG] Waiting 20 seconds before moving on to let app '%s' start properly. Service: %s (k8s)", name, image)
  644. time.Sleep(20 * time.Second)
  645. return nil
  646. }
  647. //** ENDREMOVE ***/
  648. // Deploys the internal worker whenever something happens
  649. func deployApp(cli *dockerclient.Client, image string, identifier string, env []string, workflowExecution shuffle.WorkflowExecution, action shuffle.Action) error {
  650. // if isKubernetes == "true" {
  651. // if len(os.Getenv("KUBERNETES_NAMESPACE")) > 0 {
  652. // kubernetesNamespace = os.Getenv("KUBERNETES_NAMESPACE")
  653. // } else {
  654. // kubernetesNamespace = "default"
  655. // }
  656. // envMap := make(map[string]string)
  657. // for _, envStr := range env {
  658. // parts := strings.SplitN(envStr, "=", 2)
  659. // if len(parts) == 2 {
  660. // envMap[parts[0]] = parts[1]
  661. // }
  662. // }
  663. // clientset, _, err := shuffle.GetKubernetesClient()
  664. // if err != nil {
  665. // log.Printf("[ERROR] Failed getting kubernetes: %s", err)
  666. // return err
  667. // }
  668. // str := strings.ToLower(identifier)
  669. // strSplit := strings.Split(str, "_")
  670. // value := strSplit[0]
  671. // value = strings.ReplaceAll(value, "_", "-")
  672. // // Checking if app is generated or not
  673. // localRegistry := os.Getenv("REGISTRY_URL")
  674. // /*
  675. // appDetails := strings.Split(image, ":")[1]
  676. // appDetailsSplit := strings.Split(appDetails, "_")
  677. // appName := strings.Join(appDetailsSplit[:len(appDetailsSplit)-1], "_")
  678. // appVersion := appDetailsSplit[len(appDetailsSplit)-1]
  679. // for _, app := range workflowExecution.Workflow.Actions {
  680. // // log.Printf("[DEBUG] App: %s, Version: %s", appName, appVersion)
  681. // // log.Printf("[DEBUG] Checking app %s with version %s", app.AppName, app.AppVersion)
  682. // if app.AppName == appName && app.AppVersion == appVersion {
  683. // if app.Generated == true {
  684. // log.Printf("[DEBUG] Generated app, setting local registry")
  685. // image = fmt.Sprintf("%s/%s", localRegistry, image)
  686. // break
  687. // } else {
  688. // log.Printf("[DEBUG] Not generated app, setting shuffle registry")
  689. // }
  690. // }
  691. // }
  692. // */
  693. // if len(localRegistry) == 0 && len(os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")) > 0 {
  694. // localRegistry = os.Getenv("SHUFFLE_BASE_IMAGE_REGISTRY")
  695. // }
  696. // if len(localRegistry) > 0 && strings.Count(image, "/") <= 2 {
  697. // log.Printf("[DEBUG] Using REGISTRY_URL %s", localRegistry)
  698. // image = fmt.Sprintf("%s/%s", localRegistry, image)
  699. // } else {
  700. // if strings.Count(image, "/") <= 2 {
  701. // image = fmt.Sprintf("frikky/shuffle:%s", image)
  702. // }
  703. // }
  704. // log.Printf("[DEBUG] Got kubernetes with namespace %#v to run image '%s'", kubernetesNamespace, image)
  705. // //fix naming convention
  706. // podUuid := uuid.NewV4().String()
  707. // podName := fmt.Sprintf("%s-%s", value, podUuid)
  708. // pod := &corev1.Pod{
  709. // ObjectMeta: metav1.ObjectMeta{
  710. // Name: podName,
  711. // Labels: map[string]string{
  712. // "app": "shuffle-app",
  713. // "executionId": workflowExecution.ExecutionId,
  714. // },
  715. // },
  716. // Spec: corev1.PodSpec{
  717. // RestartPolicy: "Never", // As a crash is not useful in this context
  718. // // DNSPolicy: "Default",
  719. // DNSPolicy: corev1.DNSClusterFirst,
  720. // // NodeName: "worker1"
  721. // Containers: []corev1.Container{
  722. // {
  723. // Name: value,
  724. // Image: image,
  725. // Env: buildEnvVars(envMap),
  726. // // Pull if not available
  727. // ImagePullPolicy: corev1.PullIfNotPresent,
  728. // },
  729. // },
  730. // },
  731. // }
  732. // createdPod, err := clientset.CoreV1().Pods(kubernetesNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
  733. // if err != nil {
  734. // log.Printf("[ERROR] Failed creating pod: %v", err)
  735. // // os.Exit(1)
  736. // } else {
  737. // log.Printf("[DEBUG] Created pod %#v in namespace %#v", createdPod.Name, kubernetesNamespace)
  738. // }
  739. // return nil
  740. // }
  741. // form basic hostConfig
  742. ctx := context.Background()
  743. // Check action if subflow
  744. // Check if url is default (shuffle-backend)
  745. // If it doesn't exist, add it
  746. // FIXME: This does NOT replace it in all cases as the data
  747. // is not saved in the database as the correct param.
  748. if action.AppName == "shuffle-subflow" {
  749. // Automatic replacement of URL
  750. for paramIndex, param := range action.Parameters {
  751. if param.Name != "backend_url" {
  752. continue
  753. }
  754. if !strings.Contains(param.Value, "shuffle-backend") {
  755. continue
  756. }
  757. // Automatic replacement as this is default
  758. if len(os.Getenv("BASE_URL")) > 0 {
  759. action.Parameters[paramIndex].Value = os.Getenv("BASE_URL")
  760. log.Printf("[DEBUG][%s] Replaced backend_url with base_url %s", workflowExecution.ExecutionId, os.Getenv("BASE_URL"))
  761. }
  762. if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 {
  763. action.Parameters[paramIndex].Value = os.Getenv("SHUFFLE_CLOUDRUN_URL")
  764. log.Printf("[DEBUG][%s] Replaced backend_url with cloudrun %s", workflowExecution.ExecutionId, os.Getenv("SHUFFLE_CLOUDRUN_URL"))
  765. }
  766. }
  767. }
  768. /*** STARTREMOVE ***/
  769. if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
  770. appName := strings.Replace(identifier, fmt.Sprintf("_%s", action.ID), "", -1)
  771. appName = strings.Replace(appName, fmt.Sprintf("_%s", workflowExecution.ExecutionId), "", -1)
  772. appName = strings.ToLower(appName)
  773. //log.Printf("[INFO][%s] New appname: %s, image: %s", workflowExecution.ExecutionId, appName, image)
  774. if !shuffle.ArrayContains(downloadedImages, image) && isKubernetes != "true" {
  775. log.Printf("[DEBUG] Downloading image %s from backend as it's first iteration for this image on the worker. Timeout: 60", image)
  776. // FIXME: Not caring if it's ok or not. Just continuing
  777. // This is working as intended, just designed to download an updated
  778. // image on every Orborus/new worker restart.
  779. // Running as coroutine for eventual completeness
  780. // FIXME: With goroutines it got too much trouble of deploying with an older version
  781. // Allowing slow startups, as long as it's eventually fast, and uses the same registry as on host.
  782. err := shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, image)
  783. if err == nil {
  784. downloadedImages = append(downloadedImages, image)
  785. }
  786. }
  787. var exposedPort int
  788. var err error
  789. if isKubernetes != "true" {
  790. exposedPort, err = findAppInfo(image, appName, false)
  791. if err != nil {
  792. log.Printf("[ERROR] Failed finding and creating port for %s: %s", appName, err)
  793. return err
  794. }
  795. } else {
  796. // ** STARTREMOVE ***/
  797. exposedPort = 80
  798. //deployport, err := strconv.Atoi(os.Getenv("SHUFFLE_APP_EXPOSED_PORT"))
  799. //if err == nil {
  800. // exposedPort = deployport
  801. //}
  802. err = findAppInfoKubernetes(image, appName, env)
  803. if err != nil {
  804. log.Printf("[ERROR] Failed finding and creating port for %s: %s", appName, err)
  805. return err
  806. }
  807. // ** ENDREMOVE ***/
  808. }
  809. /*
  810. // Makes it not run at all.
  811. cacheData := []byte("1")
  812. newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
  813. err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
  814. if err != nil {
  815. log.Printf("[WARNING] (1) Failed setting cache for action %s: %s", newExecId, err)
  816. } else {
  817. log.Printf("[DEBUG][%s] (1) Adding %s to cache (%#v)", workflowExecution.ExecutionId, newExecId, action.Name)
  818. }
  819. */
  820. log.Printf("[DEBUG][%s] Should run towards port %d for app %s. DELAY: %d", workflowExecution.ExecutionId, exposedPort, appName, action.ExecutionDelay)
  821. ctx := context.Background()
  822. if action.ExecutionDelay > 0 {
  823. //log.Printf("[DEBUG] Running app %s with delay of %d", action.Name, action.ExecutionDelay)
  824. waitTime := time.Duration(action.ExecutionDelay) * time.Second
  825. time.AfterFunc(waitTime, func() {
  826. err = sendAppRequest(ctx, baseUrl, appName, exposedPort, &action, &workflowExecution, image, 0)
  827. if err != nil {
  828. log.Printf("[ERROR] Failed sending SCHEDULED request to app %s on port %d: %s", appName, exposedPort, err)
  829. }
  830. })
  831. } else {
  832. rand.Seed(time.Now().UnixNano())
  833. waitTime := time.Duration(rand.Intn(500)) * time.Millisecond
  834. // Added a random delay + context timeout to ensure that the function returns, and only once
  835. time.AfterFunc(waitTime, func() {
  836. ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
  837. defer cancel() // Cancel the context to release resources even if not used
  838. go sendAppRequest(ctx, baseUrl, appName, exposedPort, &action, &workflowExecution, image, 0)
  839. })
  840. }
  841. return nil
  842. }
  843. /*** ENDREMOVE ***/
  844. // Max 10% CPU every second
  845. //CPUShares: 128,
  846. //CPUQuota: 10000,
  847. //CPUPeriod: 100000,
  848. hostConfig := &container.HostConfig{
  849. LogConfig: container.LogConfig{
  850. Type: "json-file",
  851. Config: map[string]string{
  852. "max-size": "10m",
  853. },
  854. },
  855. Resources: container.Resources{},
  856. }
  857. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  858. hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId))
  859. //log.Printf("Environments: %#v", env)
  860. }
  861. // Removing because log extraction should happen first
  862. if strings.ToLower(cleanupEnv) == "true" {
  863. hostConfig.AutoRemove = true
  864. }
  865. // Get environment for certificates
  866. volumeBinds := []string{}
  867. volumeBindString := os.Getenv("SHUFFLE_VOLUME_BINDS")
  868. if len(volumeBindString) > 0 {
  869. volumeBindSplit := strings.Split(volumeBindString, ",")
  870. for _, volumeBind := range volumeBindSplit {
  871. if volumeBind == "srcfolder=dstfolder" || volumeBind == "srcfolder:dstfolder" || volumeBind == "/srcfolder:/dstfolder" {
  872. log.Printf("[DEBUG] Volume bind '%s' is invalid.", volumeBind)
  873. continue
  874. }
  875. if !strings.HasPrefix(volumeBind, "/") {
  876. log.Printf("[ERROR] Volume bind '%s' is invalid. Use absolute paths.", volumeBind)
  877. continue
  878. }
  879. if !strings.Contains(volumeBind, ":") {
  880. log.Printf("[ERROR] Volume bind '%s' is invalid. Use absolute paths with colon inbetween them (/srcpath:dstpath/", volumeBind)
  881. continue
  882. }
  883. volumeBinds = append(volumeBinds, volumeBind)
  884. }
  885. }
  886. // Add more volume binds if possible
  887. if len(volumeBinds) > 0 {
  888. // Only use mounts, not direct binds
  889. hostConfig.Binds = []string{}
  890. hostConfig.Mounts = []mount.Mount{}
  891. for _, bind := range volumeBinds {
  892. if !strings.Contains(bind, ":") || strings.Contains(bind, "..") || strings.HasPrefix(bind, "~") {
  893. log.Printf("[ERROR] Volume bind '%s' is invalid. Use absolute paths.", bind)
  894. continue
  895. }
  896. log.Printf("[DEBUG] Appending bind %s to App container", bind)
  897. bindSplit := strings.Split(bind, ":")
  898. sourceFolder := bindSplit[0]
  899. destinationFolder := bindSplit[1]
  900. readOnly := false
  901. if len(bindSplit) > 2 {
  902. mode := bindSplit[2]
  903. if mode == "ro" {
  904. readOnly = true
  905. }
  906. }
  907. builtMount := mount.Mount{
  908. Type: mount.TypeBind,
  909. Source: sourceFolder,
  910. Target: destinationFolder,
  911. ReadOnly: readOnly,
  912. }
  913. hostConfig.Mounts = append(hostConfig.Mounts, builtMount)
  914. }
  915. }
  916. config := &container.Config{
  917. Image: image,
  918. Env: env,
  919. }
  920. //log.Printf("[DEBUG] Deploying image with env: %#v", env)
  921. // Checking as late as possible, just in case.
  922. newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
  923. _, err := shuffle.GetCache(ctx, newExecId)
  924. if err == nil {
  925. log.Printf("[DEBUG][%s] Result for action %s already found - returning", newExecId, action.ID)
  926. return nil
  927. }
  928. cacheData := []byte("1")
  929. err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
  930. if err != nil {
  931. //log.Printf("[WARNING][%s] Failed setting cache for action: %s", newExecId, err)
  932. } else {
  933. //log.Printf("[DEBUG][%s] Adding to cache. Name: %s", workflowExecution.ExecutionId, action.Name)
  934. }
  935. if action.ExecutionDelay > 0 {
  936. log.Printf("[DEBUG][%s] Running app '%s' with label '%s' in docker with delay of %d", workflowExecution.ExecutionId, action.AppName, action.Label, action.ExecutionDelay)
  937. waitTime := time.Duration(action.ExecutionDelay) * time.Second
  938. time.AfterFunc(waitTime, func() {
  939. DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId)
  940. })
  941. } else {
  942. log.Printf("[DEBUG][%s] Running app %s in docker NORMALLY as there is no delay set with identifier %s", workflowExecution.ExecutionId, action.Name, identifier)
  943. returnvalue := DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId)
  944. //log.Printf("[DEBUG][%s] Normal deploy ret: %s", workflowExecution.ExecutionId, returnvalue)
  945. return returnvalue
  946. }
  947. return nil
  948. }
  949. func cleanupKubernetesExecution(clientset *kubernetes.Clientset, workflowExecution shuffle.WorkflowExecution, namespace string) error {
  950. // workerName := fmt.Sprintf("worker-%s", workflowExecution.ExecutionId)
  951. // FIXME: The executionId label is currently not set
  952. labelSelector := fmt.Sprintf("app.kubernetes.io/name=shuffle-app,executionId=%s", workflowExecution.ExecutionId)
  953. podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
  954. LabelSelector: labelSelector,
  955. })
  956. if err != nil {
  957. return fmt.Errorf("[ERROR] Failed to list apps with label selector %s: %#vv", labelSelector, err)
  958. }
  959. for _, pod := range podList.Items {
  960. err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
  961. if err != nil {
  962. return fmt.Errorf("failed to delete app %s: %v", pod.Name, err)
  963. }
  964. log.Printf("App %s in namespace %s deleted.", pod.Name, namespace)
  965. }
  966. // podErr := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), workerName, metav1.DeleteOptions{})
  967. // if podErr != nil {
  968. // return fmt.Errorf("[ERROR] failed to delete the worker %s in namespace %s: %v", workerName, namespace, podErr)
  969. // }
  970. // log.Printf("[DEBUG] %s in namespace %s deleted.", workerName, namespace)
  971. return nil
  972. }
  973. func DeployContainer(ctx context.Context, cli *dockerclient.Client, config *container.Config, hostConfig *container.HostConfig, identifier string, workflowExecution shuffle.WorkflowExecution, actionExecId string) error {
  974. cont, err := cli.ContainerCreate(
  975. ctx,
  976. config,
  977. hostConfig,
  978. nil,
  979. nil,
  980. identifier,
  981. )
  982. //log.Printf("[DEBUG] config set: %#v", config)
  983. if err != nil {
  984. //log.Printf("[ERROR] Failed creating container: %s", err)
  985. if !strings.Contains(err.Error(), "Conflict. The container name") {
  986. log.Printf("[ERROR] Container CREATE error (1): %s", err)
  987. cacheErr := shuffle.DeleteCache(ctx, actionExecId)
  988. if cacheErr != nil {
  989. log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
  990. }
  991. return err
  992. } else {
  993. parsedUuid := uuid.NewV4()
  994. identifier = fmt.Sprintf("%s-%s", identifier, parsedUuid)
  995. //hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId))
  996. log.Printf("[DEBUG] 2 - Identifier: %s", identifier)
  997. cont, err = cli.ContainerCreate(
  998. context.Background(),
  999. config,
  1000. hostConfig,
  1001. nil,
  1002. nil,
  1003. identifier,
  1004. )
  1005. if err != nil {
  1006. log.Printf("[ERROR] Container create error (2): %s", err)
  1007. cacheErr := shuffle.DeleteCache(ctx, actionExecId)
  1008. if cacheErr != nil {
  1009. log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
  1010. }
  1011. return err
  1012. }
  1013. //log.Printf("[DEBUG] Made new container ID
  1014. }
  1015. }
  1016. err = cli.ContainerStart(ctx, cont.ID, container.StartOptions{})
  1017. if err != nil {
  1018. if strings.Contains(fmt.Sprintf("%s", err), "cannot join network") || strings.Contains(fmt.Sprintf("%s", err), "No such container") {
  1019. // Remove the "CREATED" one from the previous if possible
  1020. go cli.ContainerRemove(ctx, cont.ID, container.RemoveOptions{})
  1021. log.Printf("[WARNING] Failed deploying App on first attempt: %s. Removing some HostConfig configs.", err)
  1022. parsedUuid := uuid.NewV4()
  1023. identifier = fmt.Sprintf("%s-%s-nonetwork", identifier, parsedUuid)
  1024. hostConfig.NetworkMode = container.NetworkMode("")
  1025. hostConfig.LogConfig = container.LogConfig{
  1026. Type: "json-file",
  1027. Config: map[string]string{
  1028. "max-size": "10m",
  1029. },
  1030. }
  1031. hostConfig.Resources = container.Resources{}
  1032. cont, err = cli.ContainerCreate(
  1033. context.Background(),
  1034. config,
  1035. hostConfig,
  1036. nil,
  1037. nil,
  1038. identifier,
  1039. )
  1040. if err != nil {
  1041. log.Printf("[ERROR] Container create error (3): %s", err)
  1042. cacheErr := shuffle.DeleteCache(ctx, actionExecId)
  1043. if cacheErr != nil {
  1044. log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
  1045. }
  1046. return err
  1047. }
  1048. //log.Printf("[DEBUG] Running secondary check without network with worker")
  1049. err = cli.ContainerStart(ctx, cont.ID, container.StartOptions{})
  1050. }
  1051. if err != nil {
  1052. log.Printf("[ERROR] Failed to start container (2) in runtime location %s: %s", environment, err)
  1053. cacheErr := shuffle.DeleteCache(ctx, actionExecId)
  1054. if cacheErr != nil {
  1055. log.Printf("[ERROR] FAILURE Deleting cache for %s: %s", actionExecId, cacheErr)
  1056. }
  1057. //shutdown(workflowExecution, workflowExecution.Workflow.ID, true)
  1058. return err
  1059. }
  1060. }
  1061. log.Printf("[DEBUG][%s] Container %s was created for %s", workflowExecution.ExecutionId, cont.ID, identifier)
  1062. // Waiting to see if it exits.. Stupid, but stable(r)
  1063. if workflowExecution.ExecutionSource != "default" {
  1064. log.Printf("[INFO][%s] Handling NON-default execution source %s - NOT waiting or validating!", workflowExecution.ExecutionId, workflowExecution.ExecutionSource)
  1065. } else if workflowExecution.ExecutionSource == "default" {
  1066. log.Printf("[INFO][%s] Handling DEFAULT execution source %s - SKIPPING wait anyway due to exited issues!", workflowExecution.ExecutionId, workflowExecution.ExecutionSource)
  1067. }
  1068. //log.Printf("[DEBUG] Deployed container ID %s", cont.ID)
  1069. //containerIds = append(containerIds, cont.ID)
  1070. return nil
  1071. }
  1072. func removeContainer(containername string) error {
  1073. ctx := context.Background()
  1074. // cli, err := dockerclient.NewEnvClient()
  1075. cli, _, err := shuffle.GetDockerClient()
  1076. if err != nil {
  1077. log.Printf("[DEBUG] Unable to create docker client: %s", err)
  1078. return err
  1079. }
  1080. defer cli.Close()
  1081. // FIXME - ucnomment
  1082. // containers, err := cli.ContainerList(ctx, types.ContainerListOptions{
  1083. // All: true,
  1084. // })
  1085. _ = ctx
  1086. _ = cli
  1087. //if err := cli.ContainerStop(ctx, containername, nil); err != nil {
  1088. // log.Printf("Unable to stop container %s - running removal anyway, just in case: %s", containername, err)
  1089. //}
  1090. removeOptions := container.RemoveOptions{
  1091. RemoveVolumes: true,
  1092. Force: true,
  1093. }
  1094. // FIXME - remove comments etc
  1095. _ = removeOptions
  1096. //if err := cli.ContainerRemove(ctx, containername, removeOptions); err != nil {
  1097. // log.Printf("Unable to remove container: %s", err)
  1098. //}
  1099. return nil
  1100. }
  1101. func runFilter(workflowExecution shuffle.WorkflowExecution, action shuffle.Action) {
  1102. // 1. Get the parameter $.#.id
  1103. if action.Label == "filter_cases" && len(action.Parameters) > 0 {
  1104. if action.Parameters[0].Variant == "ACTION_RESULT" {
  1105. param := action.Parameters[0]
  1106. value := param.Value
  1107. _ = value
  1108. // Loop cases.. Hmm, that's tricky
  1109. }
  1110. } else {
  1111. log.Printf("No handler for filter %s with %d params", action.Label, len(action.Parameters))
  1112. }
  1113. }
  1114. func removeIndex(s []string, i int) []string {
  1115. s[len(s)-1], s[i] = s[i], s[len(s)-1]
  1116. return s[:len(s)-1]
  1117. }
  1118. func getWorkerURLs() ([]string, error) {
  1119. workerUrls := []string{}
  1120. if isKubernetes == "true" {
  1121. workerUrls = append(workerUrls, "http://shuffle-workers:33333")
  1122. // workerUrls = append(workerUrls, "http://192.168.29.16:33333")
  1123. // get service "shuffle-workers" "Endpoints"
  1124. // serviceName := "shuffle-workers"
  1125. // clientset, _, err := shuffle.GetKubernetesClient()
  1126. // if err != nil {
  1127. // log.Println("[ERROR] Failed to get Kubernetes client:", err)
  1128. // return workerUrls, err
  1129. // }
  1130. // services, err := clientset.CoreV1().Services("default").List(context.Background(), metav1.ListOptions{})
  1131. // if err != nil {
  1132. // log.Println("[ERROR] Failed to list services:", err)
  1133. // return workerUrls, err
  1134. // }
  1135. // for _, service := range services.Items {
  1136. // if service.Name == serviceName {
  1137. // endpoints, err := clientset.CoreV1().Endpoints("default").Get(context.Background(), serviceName, metav1.GetOptions{})
  1138. // if err != nil {
  1139. // log.Println("[ERROR] Failed to get endpoints for service:", err)
  1140. // return workerUrls, err
  1141. // }
  1142. // for _, subset := range endpoints.Subsets {
  1143. // for _, address := range subset.Addresses {
  1144. // for _, port := range subset.Ports {
  1145. // url := fmt.Sprintf("http://%s:%d", address.IP, port.Port)
  1146. // workerUrls = append(workerUrls, url)
  1147. // }
  1148. // }
  1149. // }
  1150. // }
  1151. // }
  1152. //log.Printf("[DEBUG] Worker URLs for k8s: %#v", workerUrls)
  1153. return workerUrls, nil
  1154. }
  1155. // Create a new Docker client
  1156. // cli, err := dockerclient.NewEnvClient()
  1157. cli, _, err := shuffle.GetDockerClient()
  1158. if err != nil {
  1159. log.Println("[ERROR] Failed to create Docker client:", err)
  1160. return workerUrls, err
  1161. }
  1162. defer cli.Close()
  1163. // Specify the name of the service for which you want to list tasks
  1164. serviceName := "shuffle-workers"
  1165. // Get the list of tasks for the service
  1166. tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
  1167. Filters: filters.NewArgs(filters.Arg("service", serviceName)),
  1168. })
  1169. if err != nil {
  1170. log.Println("[ERROR] Failed to list tasks for service:", err)
  1171. return workerUrls, err
  1172. }
  1173. // Print task information
  1174. for _, task := range tasks {
  1175. url := fmt.Sprintf("http://%s.%d.%s:33333", serviceName, task.Slot, task.ID)
  1176. workerUrls = append(workerUrls, url)
  1177. }
  1178. return workerUrls, nil
  1179. }
  1180. func askOtherWorkersToDownloadImage(image string) {
  1181. // Why wouldn't it happen on swarm? Hmm
  1182. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  1183. return
  1184. }
  1185. // Check environment SHUFFLE_AUTO_IMAGE_DOWNLOAD
  1186. if os.Getenv("SHUFFLE_AUTO_IMAGE_DOWNLOAD") == "false" {
  1187. log.Printf("[DEBUG] SHUFFLE_AUTO_IMAGE_DOWNLOAD is false. NOT distributing images %s", image)
  1188. return
  1189. }
  1190. if shuffle.ArrayContains(imagesDistributed, image) {
  1191. return
  1192. }
  1193. urls, err := getWorkerURLs()
  1194. if err != nil {
  1195. log.Printf("[ERROR] Error in listing worker urls: %s", err)
  1196. return
  1197. }
  1198. if len(urls) < 2 {
  1199. return
  1200. }
  1201. httpClient := &http.Client{}
  1202. distributed := false
  1203. for _, url := range urls {
  1204. //log.Printf("[DEBUG] Trying to speak to: %s", url)
  1205. imagesRequest := ImageRequest{
  1206. Image: image,
  1207. }
  1208. url = fmt.Sprintf("%s/api/v1/download", url)
  1209. //log.Printf("[INFO] Making a request to %s to download images", url)
  1210. imageJSON, err := json.Marshal(imagesRequest)
  1211. req, err := http.NewRequest(
  1212. "POST",
  1213. url,
  1214. bytes.NewBuffer(imageJSON),
  1215. )
  1216. if err != nil {
  1217. log.Printf("[ERROR] Error in making request to %s : %s", url, err)
  1218. continue
  1219. }
  1220. resp, err := httpClient.Do(req)
  1221. if err != nil {
  1222. log.Printf("[ERROR] Error in making request to %s : %s", url, err)
  1223. continue
  1224. }
  1225. defer resp.Body.Close()
  1226. respBody, err := ioutil.ReadAll(resp.Body)
  1227. if err != nil {
  1228. log.Printf("[ERROR] Error in reading response body : %s", err)
  1229. continue
  1230. }
  1231. log.Printf("[INFO] Response body when tried sending images for nodes to download: %s", respBody)
  1232. distributed = true
  1233. }
  1234. if distributed {
  1235. imagesDistributed = append(imagesDistributed, image)
  1236. }
  1237. }
  1238. func handleExecutionResult(workflowExecution shuffle.WorkflowExecution) {
  1239. ctx := context.Background()
  1240. workflowExecution, relevantActions := shuffle.DecideExecution(ctx, workflowExecution, environment)
  1241. if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "FAILURE" || workflowExecution.Status == "ABORTED" {
  1242. log.Printf("[DEBUG][%s] Shutting down because status is %s", workflowExecution.ExecutionId, workflowExecution.Status)
  1243. shutdown(workflowExecution, "", "Workflow run is already finished", true)
  1244. return
  1245. }
  1246. startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
  1247. var dockercli *dockerclient.Client
  1248. var err error
  1249. if isKubernetes != "true" {
  1250. // dockercli, err = dockerclient.NewEnvClient()
  1251. dockercli, _, err = shuffle.GetDockerClient()
  1252. if err != nil {
  1253. log.Printf("[ERROR] Unable to create docker client (3): %s", err)
  1254. return
  1255. }
  1256. defer dockercli.Close()
  1257. }
  1258. for _, action := range relevantActions {
  1259. appname := action.AppName
  1260. appversion := action.AppVersion
  1261. appname = strings.Replace(appname, ".", "-", -1)
  1262. appversion = strings.Replace(appversion, ".", "-", -1)
  1263. action, _ = singul.HandleSingulStartnode(workflowExecution, action, []string{})
  1264. parsedAppname := strings.Replace(strings.ToLower(action.AppName), " ", "-", -1)
  1265. // if strings.ToLower(parsedAppname) == "singul" {
  1266. // parsedAppname = "shuffle-ai"
  1267. // appversion = "1.1.0"
  1268. // appname = "shuffle-ai"
  1269. // }
  1270. if parsedAppname == "ai-agent" {
  1271. parsedAppname = "shuffle-ai"
  1272. appversion = "1.1.0"
  1273. appname = "shuffle-ai"
  1274. action.AppVersion = "1.1.0"
  1275. action.AppName = "shuffle-ai"
  1276. action.Name = "run_agent"
  1277. inputParamValue := ""
  1278. allowedActions := ""
  1279. for _, param := range action.Parameters {
  1280. if strings.ToLower(param.Name) == "input" {
  1281. inputParamValue = param.Value
  1282. } else if strings.ToLower(param.Name) == "action" {
  1283. param.Value = strings.ReplaceAll("Nothing,", " ", "")
  1284. param.Value = strings.ReplaceAll("Nothing", " ", "")
  1285. allowedActions = param.Value
  1286. }
  1287. }
  1288. // Rewriting them
  1289. action.Parameters = []shuffle.WorkflowAppActionParameter{
  1290. shuffle.WorkflowAppActionParameter{
  1291. Name: "input_data",
  1292. Value: inputParamValue,
  1293. },
  1294. shuffle.WorkflowAppActionParameter{
  1295. Name: "actions",
  1296. Value: allowedActions,
  1297. },
  1298. }
  1299. }
  1300. imageName := fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion)
  1301. if strings.Contains(imageName, " ") {
  1302. imageName = strings.ReplaceAll(imageName, " ", "-")
  1303. }
  1304. // Kubernetes specific.
  1305. // Should it be though?
  1306. if isKubernetes == "true" {
  1307. // Map it to:
  1308. // <registry>/baseimagename/<appname>:<appversion>
  1309. localRegistry := os.Getenv("REGISTRY_URL")
  1310. if len(localRegistry) > 0 && len(baseimagename) > 0 {
  1311. newImageName := fmt.Sprintf("%s/%s/%s:%s", localRegistry, baseimagename, parsedAppname, action.AppVersion)
  1312. log.Printf("[INFO] Remapping image name %s to %s due to registry+image name existing on k8s", imageName, newImageName)
  1313. imageName = newImageName
  1314. }
  1315. }
  1316. askOtherWorkersToDownloadImage(imageName)
  1317. // Added UUID to identifier just in case
  1318. //identifier := fmt.Sprintf("%s_%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId, uuid.NewV4())
  1319. identifier := fmt.Sprintf("%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId)
  1320. if strings.Contains(identifier, " ") {
  1321. identifier = strings.ReplaceAll(identifier, " ", "-")
  1322. }
  1323. //if arrayContains(executed, action.ID) || arrayContains(visited, action.ID) {
  1324. // log.Printf("[WARNING] Action %s is already executed")
  1325. // continue
  1326. //}
  1327. //visited = append(visited, action.ID)
  1328. //executed = append(executed, action.ID)
  1329. // FIXME - check whether it's running locally yet too
  1330. // take care of auto clean up later on for k8s
  1331. if isKubernetes != "true" {
  1332. stats, err := dockercli.ContainerInspect(context.Background(), identifier)
  1333. if err != nil || stats.ContainerJSONBase.State.Status != "running" {
  1334. // REMOVE
  1335. if err == nil {
  1336. log.Printf("[DEBUG][%s] Docker Container Status: %s, should kill: %s", workflowExecution.ExecutionId, stats.ContainerJSONBase.State.Status, identifier)
  1337. err = removeContainer(identifier)
  1338. if err != nil {
  1339. log.Printf("[ERROR] Error killing container: %s", err)
  1340. }
  1341. } else {
  1342. //log.Printf("WHAT TO DO HERE?: %s", err)
  1343. }
  1344. } else if stats.ContainerJSONBase.State.Status == "running" {
  1345. //log.Printf("
  1346. continue
  1347. }
  1348. }
  1349. if len(action.Parameters) == 0 {
  1350. action.Parameters = []shuffle.WorkflowAppActionParameter{}
  1351. }
  1352. if len(action.Errors) == 0 {
  1353. action.Errors = []string{}
  1354. }
  1355. // marshal action and put it in there rofl
  1356. //log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters))
  1357. log.Printf("[DEBUG][%s] Action: Send, Label: '%s', Action: '%s', Run status: %s, Extra=", workflowExecution.ExecutionId, action.Label, action.AppName, workflowExecution.Status)
  1358. actionData, err := json.Marshal(action)
  1359. if err != nil {
  1360. log.Printf("[WARNING] Failed unmarshalling action: %s", err)
  1361. continue
  1362. }
  1363. if action.AppID == "0ca8887e-b4af-4e3e-887c-87e9d3bc3d3e" {
  1364. log.Printf("[DEBUG] Should run filter: %#v", action)
  1365. runFilter(workflowExecution, action)
  1366. continue
  1367. }
  1368. executionData, err := json.Marshal(workflowExecution)
  1369. if err != nil {
  1370. log.Printf("[ERROR] Failed marshalling executiondata: %s", err)
  1371. executionData = []byte("")
  1372. }
  1373. // Sending full execution so that it won't have to load in every app
  1374. // This might be an issue if they can read environments, but that's alright
  1375. // if everything is generated during execution
  1376. //log.Printf("[DEBUG][%s] Deployed with CALLBACK_URL %s and BASE_URL %s", workflowExecution.ExecutionId, appCallbackUrl, baseUrl)
  1377. env := []string{
  1378. fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId),
  1379. fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization),
  1380. fmt.Sprintf("CALLBACK_URL=%s", baseUrl),
  1381. fmt.Sprintf("BASE_URL=%s", appCallbackUrl),
  1382. fmt.Sprintf("TZ=%s", timezone),
  1383. fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", logsDisabled),
  1384. }
  1385. if len(actionData) >= 100000 {
  1386. log.Printf("[WARNING] Omitting some data from action execution. Length: %d. Fix in SDK!", len(actionData))
  1387. newParams := []shuffle.WorkflowAppActionParameter{}
  1388. for _, param := range action.Parameters {
  1389. paramData, err := json.Marshal(param)
  1390. if err != nil {
  1391. log.Printf("[WARNING] Failed to marshal param %s: %s", param.Name, err)
  1392. newParams = append(newParams, param)
  1393. continue
  1394. }
  1395. if len(paramData) >= 50000 {
  1396. log.Printf("[WARNING] Removing a lot of data from param %s with length %d", param.Name, len(paramData))
  1397. param.Value = "SHUFFLE_AUTO_REMOVED"
  1398. }
  1399. newParams = append(newParams, param)
  1400. }
  1401. action.Parameters = newParams
  1402. actionData, err = json.Marshal(action)
  1403. if err == nil {
  1404. log.Printf("[DEBUG] Ran data replace on action %s. new length: %d", action.Name, len(actionData))
  1405. } else {
  1406. log.Printf("[WARNING] Failed to marshal new actionData: %s", err)
  1407. }
  1408. } else {
  1409. //log.Printf("[DEBUG] Actiondata is NOT 100000 in length. Adding as normal.")
  1410. }
  1411. actionEnv := fmt.Sprintf("ACTION=%s", string(actionData))
  1412. env = append(env, actionEnv)
  1413. if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
  1414. //log.Printf("APPENDING PROXY TO THE APP!")
  1415. env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
  1416. env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
  1417. env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
  1418. env = append(env, fmt.Sprintf("no_proxy=%s", os.Getenv("no_proxy")))
  1419. }
  1420. overrideHttpProxy := os.Getenv("SHUFFLE_INTERNAL_HTTP_PROXY")
  1421. overrideHttpsProxy := os.Getenv("SHUFFLE_INTERNAL_HTTPS_PROXY")
  1422. if overrideHttpProxy != "" {
  1423. env = append(env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTP_PROXY=%s", overrideHttpProxy))
  1424. }
  1425. if overrideHttpsProxy != "" {
  1426. env = append(env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTPS_PROXY=%s", overrideHttpsProxy))
  1427. }
  1428. if len(os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) > 0 {
  1429. env = append(env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
  1430. }
  1431. // FIXME: Ensure to NEVER do this anymore
  1432. // This potentially breaks too much stuff. Better to have the app poll the data.
  1433. _ = executionData
  1434. /*
  1435. maxSize := 32700 - len(string(actionData)) - 2000
  1436. if len(executionData) < maxSize {
  1437. log.Printf("[INFO] ADDING FULL_EXECUTION because size is smaller than %d", maxSize)
  1438. env = append(env, fmt.Sprintf("FULL_EXECUTION=%s", string(executionData)))
  1439. } else {
  1440. log.Printf("[WARNING] Skipping FULL_EXECUTION because size is larger than %d", maxSize)
  1441. }
  1442. */
  1443. // Uses a few ways of getting / checking if an app is available
  1444. // 1. Try original with lowercase
  1445. // 2. Go to original (no spaces)
  1446. // 3. Add remote repo location
  1447. images := []string{
  1448. imageName,
  1449. fmt.Sprintf("%s/%s:%s_%s", registryName, baseimagename, parsedAppname, action.AppVersion),
  1450. fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion),
  1451. }
  1452. // This is the weirdest shit ever looking back at
  1453. // Needs optimization lol
  1454. pullOptions := dockerimage.PullOptions{}
  1455. if strings.ToLower(cleanupEnv) == "true" {
  1456. err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action)
  1457. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1458. if strings.Contains(err.Error(), "exited prematurely") {
  1459. log.Printf("[DEBUG] Shutting down (2)")
  1460. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1461. return
  1462. }
  1463. err := shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, imageName)
  1464. executed := false
  1465. if err == nil {
  1466. log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", imageName)
  1467. downloadedImages = append(downloadedImages, imageName)
  1468. //err = deployApp(dockercli, image, identifier, env, workflow, action)
  1469. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1470. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1471. if strings.Contains(err.Error(), "exited prematurely") {
  1472. log.Printf("[DEBUG] Shutting down (41)")
  1473. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1474. return
  1475. }
  1476. } else {
  1477. executed = true
  1478. }
  1479. }
  1480. if !executed {
  1481. imageName = images[2]
  1482. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1483. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1484. if strings.Contains(err.Error(), "exited prematurely") {
  1485. log.Printf("[DEBUG] Shutting down (3)")
  1486. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1487. return
  1488. }
  1489. //log.Printf("[WARNING] Failed CLEANUP execution. Downloading image %s remotely.", image)
  1490. log.Printf("[WARNING] Failed to download image %s (CLEANUP): %s", imageName, err)
  1491. reader, err := dockercli.ImagePull(context.Background(), imageName, pullOptions)
  1492. if err != nil {
  1493. log.Printf("[ERROR] Failed getting %s. Couldn't be find locally, AND is missing.", imageName)
  1494. log.Printf("[DEBUG] Shutting down (4)")
  1495. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1496. return
  1497. } else {
  1498. defer reader.Close()
  1499. baseTag := strings.Split(imageName, ":")
  1500. if len(baseTag) > 1 {
  1501. tag := baseTag[1]
  1502. log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag)
  1503. // Remapping
  1504. ctx := context.Background()
  1505. dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag))
  1506. dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
  1507. }
  1508. }
  1509. buildBuf := new(strings.Builder)
  1510. _, err = io.Copy(buildBuf, reader)
  1511. if err != nil && !strings.Contains(fmt.Sprintf("%s", err.Error()), "Conflict. The container name") {
  1512. log.Printf("[ERROR] Error in IO copy: %s", err)
  1513. log.Printf("[DEBUG] Shutting down (5)")
  1514. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1515. return
  1516. } else {
  1517. if strings.Contains(buildBuf.String(), "errorDetail") {
  1518. log.Printf("[ERROR] Docker build:%sERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), imageName)
  1519. log.Printf("[DEBUG] Shutting down (6)")
  1520. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1521. return
  1522. }
  1523. log.Printf("[INFO] Successfully downloaded %s", imageName)
  1524. }
  1525. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1526. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1527. log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist")
  1528. if strings.Contains(err.Error(), "exited prematurely") {
  1529. log.Printf("[DEBUG] Shutting down (7)")
  1530. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1531. return
  1532. }
  1533. if strings.Contains(err.Error(), "No such image") {
  1534. //log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err)
  1535. log.Printf("[ERROR] Image doesn't exist. Shutting down")
  1536. log.Printf("[DEBUG] Shutting down (8)")
  1537. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1538. return
  1539. }
  1540. }
  1541. }
  1542. }
  1543. }
  1544. } else {
  1545. err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action)
  1546. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1547. log.Printf("[DEBUG] Failed deploying app? %s", err)
  1548. if strings.Contains(err.Error(), "exited prematurely") {
  1549. log.Printf("[DEBUG] Shutting down (9)")
  1550. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1551. return
  1552. }
  1553. // Trying to replace with lowercase to deploy again. This seems to work with Dockerhub well.
  1554. // FIXME: Should try to remotely download directly if this persists.
  1555. imageName = images[1]
  1556. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1557. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1558. if strings.Contains(err.Error(), "exited prematurely") {
  1559. log.Printf("[DEBUG] Shutting down (10)")
  1560. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1561. return
  1562. }
  1563. log.Printf("[DEBUG][%s] Failed deploy. Downloading image %s: %s", workflowExecution.ExecutionId, imageName, err)
  1564. err := shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, imageName)
  1565. executed := false
  1566. if err == nil {
  1567. log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", imageName)
  1568. downloadedImages = append(downloadedImages, imageName)
  1569. //err = deployApp(dockercli, image, identifier, env, workflow, action)
  1570. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1571. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1572. log.Printf("[ERROR] Err: %s", err)
  1573. if strings.Contains(err.Error(), "exited prematurely") {
  1574. log.Printf("[DEBUG] Shutting down (40)")
  1575. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1576. return
  1577. }
  1578. } else {
  1579. executed = true
  1580. }
  1581. }
  1582. if !executed {
  1583. imageName = images[2]
  1584. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1585. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1586. log.Printf("[ERROR] Err: %s", err)
  1587. if strings.Contains(err.Error(), "exited prematurely") {
  1588. log.Printf("[DEBUG] Shutting down (11)")
  1589. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1590. return
  1591. }
  1592. log.Printf("[WARNING] Failed deploying image THREE TIMES. Attempting to download %s as last resort from backend and dockerhub: %s", imageName, err)
  1593. if isKubernetes == "true" {
  1594. log.Printf("[ERROR] Image %s doesn't exist. Returning error for now")
  1595. return
  1596. }
  1597. reader, err := dockercli.ImagePull(context.Background(), imageName, pullOptions)
  1598. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1599. log.Printf("[ERROR] Failed getting %s. The couldn't be find locally, AND is missing.", imageName)
  1600. log.Printf("[DEBUG] Shutting down (12)")
  1601. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1602. return
  1603. } else {
  1604. defer reader.Close()
  1605. baseTag := strings.Split(imageName, ":")
  1606. if len(baseTag) > 1 {
  1607. tag := baseTag[1]
  1608. log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag)
  1609. // Remapping
  1610. ctx := context.Background()
  1611. dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag))
  1612. dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
  1613. }
  1614. }
  1615. buildBuf := new(strings.Builder)
  1616. _, err = io.Copy(buildBuf, reader)
  1617. if err != nil {
  1618. log.Printf("[ERROR] Error in IO copy: %s", err)
  1619. log.Printf("[DEBUG] Shutting down (13)")
  1620. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1621. return
  1622. } else {
  1623. if strings.Contains(buildBuf.String(), "errorDetail") {
  1624. log.Printf("[ERROR] Docker build:%sERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), imageName)
  1625. log.Printf("[DEBUG] Shutting down (14)")
  1626. shutdown(workflowExecution, action.ID, fmt.Sprintf("Error deploying container: %s", buildBuf.String()), true)
  1627. return
  1628. }
  1629. log.Printf("[INFO] Successfully downloaded %s", imageName)
  1630. }
  1631. }
  1632. err = deployApp(dockercli, imageName, identifier, env, workflowExecution, action)
  1633. if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
  1634. log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist")
  1635. if strings.Contains(err.Error(), "exited prematurely") {
  1636. log.Printf("[DEBUG] Shutting down (15)")
  1637. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1638. return
  1639. }
  1640. if strings.Contains(err.Error(), "No such image") {
  1641. //log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err)
  1642. log.Printf("[ERROR] Image doesn't exist. Shutting down")
  1643. log.Printf("[DEBUG] Shutting down (16)")
  1644. shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
  1645. return
  1646. }
  1647. }
  1648. }
  1649. }
  1650. }
  1651. }
  1652. //log.Printf("[INFO][%s] Adding visited (3): %s (%s). Actions: %d, Results: %d", workflowExecution.ExecutionId, action.Label, action.ID, len(workflowExecution.Workflow.Actions), len(workflowExecution.Results))
  1653. visited = append(visited, action.ID)
  1654. executed = append(executed, action.ID)
  1655. // If children of action.ID are NOT in executed:
  1656. // Remove them from visited.
  1657. //log.Printf("EXECUTED: %#v", executed)
  1658. }
  1659. //log.Printf(nextAction)
  1660. //log.Printf(startAction, children[startAction])
  1661. // FIXME - new request here
  1662. // FIXME - clean up stopped (remove) containers with this execution id
  1663. err = shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra)
  1664. if err != nil {
  1665. log.Printf("[ERROR] Failed to update exec variables for execution %s: %s (2)", workflowExecution.ExecutionId, err)
  1666. }
  1667. if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra {
  1668. shutdownCheck := true
  1669. for _, result := range workflowExecution.Results {
  1670. if result.Status == "EXECUTING" || result.Status == "WAITING" {
  1671. // Cleaning up executing stuff
  1672. shutdownCheck = false
  1673. // USED TO BE CONTAINER REMOVAL
  1674. // FIXME - send POST request to kill the container
  1675. //log.Printf("Should remove (POST request) stopped containers")
  1676. //ret = requests.post("%s%s" % (self.url, stream_path), headers=headers, json=action_result)
  1677. }
  1678. }
  1679. if shutdownCheck {
  1680. log.Printf("[INFO][%s] BREAKING BECAUSE RESULTS IS SAME LENGTH AS ACTIONS. SHOULD CHECK ALL RESULTS FOR WHETHER THEY'RE DONE", workflowExecution.ExecutionId)
  1681. validated := shuffle.ValidateFinished(ctx, -1, workflowExecution)
  1682. if validated {
  1683. shutdownData, err := json.Marshal(workflowExecution)
  1684. if err != nil {
  1685. log.Printf("[ERROR] Failed marshalling shutdowndata during set: %s", err)
  1686. }
  1687. sendResult(workflowExecution, shutdownData)
  1688. }
  1689. log.Printf("[DEBUG][%s] Shutting down (17)", workflowExecution.ExecutionId)
  1690. if isKubernetes == "true" {
  1691. // log.Printf("workflow execution: %#v", workflowExecution)
  1692. clientset, _, err := shuffle.GetKubernetesClient()
  1693. if err != nil {
  1694. log.Println("[ERROR] Error getting kubernetes client (1):", err)
  1695. os.Exit(1)
  1696. }
  1697. cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
  1698. } else {
  1699. shutdown(workflowExecution, "", "", true)
  1700. }
  1701. return
  1702. }
  1703. }
  1704. time.Sleep(time.Duration(sleepTime) * time.Second)
  1705. return
  1706. }
  1707. func executionInit(workflowExecution shuffle.WorkflowExecution) error {
  1708. ctx := context.Background()
  1709. parents := map[string][]string{}
  1710. children := map[string][]string{}
  1711. nextActions := []string{}
  1712. extra := 0
  1713. startAction := workflowExecution.Start
  1714. //log.Printf("[INFO][%s] STARTACTION: %s", workflowExecution.ExecutionId, startAction)
  1715. if len(startAction) == 0 {
  1716. log.Printf("[INFO][%s] Didn't find execution start action. Setting it to workflow start action.", workflowExecution.ExecutionId)
  1717. startAction = workflowExecution.Workflow.Start
  1718. }
  1719. // Setting up extra counter
  1720. for _, trigger := range workflowExecution.Workflow.Triggers {
  1721. //log.Printf("[DEBUG] Appname trigger (0): %s", trigger.AppName)
  1722. if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
  1723. extra += 1
  1724. }
  1725. }
  1726. // Validates RERUN of single actions
  1727. // Identified by:
  1728. // 1. Predefined result from previous exec
  1729. // 2. Only ONE action
  1730. // 3. Every predefined result having result.Action.Category == "rerun"
  1731. /*
  1732. if len(workflowExecution.Workflow.Actions) == 1 && len(workflowExecution.Results) > 0 {
  1733. finished := shuffle.ValidateFinished(ctx, extra, workflowExecution)
  1734. if finished {
  1735. return nil
  1736. }
  1737. }
  1738. */
  1739. nextActions = append(nextActions, startAction)
  1740. for _, branch := range workflowExecution.Workflow.Branches {
  1741. // Check what the parent is first. If it's trigger - skip
  1742. sourceFound := false
  1743. destinationFound := false
  1744. for _, action := range workflowExecution.Workflow.Actions {
  1745. if action.ID == branch.SourceID {
  1746. sourceFound = true
  1747. }
  1748. if action.ID == branch.DestinationID {
  1749. destinationFound = true
  1750. }
  1751. }
  1752. for _, trigger := range workflowExecution.Workflow.Triggers {
  1753. //log.Printf("Appname trigger (0): %s (%s)", trigger.AppName, trigger.ID)
  1754. if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
  1755. if trigger.ID == branch.SourceID {
  1756. sourceFound = true
  1757. } else if trigger.ID == branch.DestinationID {
  1758. destinationFound = true
  1759. }
  1760. }
  1761. }
  1762. if sourceFound {
  1763. parents[branch.DestinationID] = append(parents[branch.DestinationID], branch.SourceID)
  1764. } else {
  1765. log.Printf("[DEBUG] Parent ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID)
  1766. }
  1767. if destinationFound {
  1768. children[branch.SourceID] = append(children[branch.SourceID], branch.DestinationID)
  1769. } else {
  1770. log.Printf("[DEBUG] Child ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID)
  1771. }
  1772. }
  1773. log.Printf("[INFO][%s] shuffle.Actions: %d + Special shuffle.Triggers: %d", workflowExecution.ExecutionId, len(workflowExecution.Workflow.Actions), extra)
  1774. onpremApps := []string{}
  1775. toExecuteOnprem := []string{}
  1776. for _, action := range workflowExecution.Workflow.Actions {
  1777. if strings.ToLower(action.Environment) != strings.ToLower(environment) {
  1778. continue
  1779. }
  1780. toExecuteOnprem = append(toExecuteOnprem, action.ID)
  1781. actionName := fmt.Sprintf("%s:%s_%s", baseimagename, action.AppName, action.AppVersion)
  1782. found := false
  1783. for _, app := range onpremApps {
  1784. if actionName == app {
  1785. found = true
  1786. }
  1787. }
  1788. if !found {
  1789. onpremApps = append(onpremApps, actionName)
  1790. }
  1791. }
  1792. if len(onpremApps) == 0 {
  1793. //return errors.New(fmt.Sprintf("No apps to handle onprem (%s)", environment))
  1794. log.Printf("[INFO][%s] No apps to handle onprem (%s). Returning 200 OK anyway", workflowExecution.ExecutionId, environment)
  1795. return nil
  1796. }
  1797. pullOptions := dockerimage.PullOptions{}
  1798. _ = pullOptions
  1799. for _, image := range onpremApps {
  1800. //log.Printf("[INFO] Image: %s", image)
  1801. // Kind of gambling that the image exists.
  1802. if strings.Contains(image, " ") {
  1803. image = strings.ReplaceAll(image, " ", "-")
  1804. }
  1805. // FIXME: Reimplement for speed later
  1806. // Skip to make it faster
  1807. //reader, err := dockercli.ImagePull(context.Background(), image, pullOptions)
  1808. //if err != nil {
  1809. // log.Printf("Failed getting %s. The app is missing or some other issue", image)
  1810. // shutdown(workflowExecution)
  1811. //}
  1812. ////io.Copy(os.Stdout, reader)
  1813. //_ = reader
  1814. //log.Printf("Successfully downloaded and built %s", image)
  1815. }
  1816. visited := []string{}
  1817. executed := []string{}
  1818. environments := []string{}
  1819. for _, action := range workflowExecution.Workflow.Actions {
  1820. found := false
  1821. for _, environment := range environments {
  1822. if action.Environment == environment {
  1823. found = true
  1824. break
  1825. }
  1826. }
  1827. if !found {
  1828. environments = append(environments, action.Environment)
  1829. }
  1830. }
  1831. err := shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra)
  1832. if err != nil {
  1833. log.Printf("[ERROR] Failed to update exec variables for execution %s: %s", workflowExecution.ExecutionId, err)
  1834. }
  1835. return nil
  1836. }
  1837. func handleSubflowPoller(ctx context.Context, workflowExecution shuffle.WorkflowExecution, streamResultUrl, subflowId string) error {
  1838. // FIXME: If MEMCACHE is enabled, check in this order:
  1839. extra := 0
  1840. for _, trigger := range workflowExecution.Workflow.Triggers {
  1841. if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
  1842. extra += 1
  1843. }
  1844. }
  1845. if len(data) == 0 {
  1846. log.Printf("[WARNING] Stream result missing execution ID and authorization; injecting them from workflow execution")
  1847. data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, workflowExecution.ExecutionId, workflowExecution.Authorization)
  1848. }
  1849. req, err := http.NewRequest(
  1850. "POST",
  1851. streamResultUrl,
  1852. bytes.NewBuffer([]byte(data)),
  1853. )
  1854. client := shuffle.GetExternalClient(streamResultUrl)
  1855. newresp, err := client.Do(req)
  1856. if err != nil {
  1857. log.Printf("[ERROR] Failed making request (1): %s", err)
  1858. time.Sleep(time.Duration(sleepTime) * time.Second)
  1859. return err
  1860. }
  1861. defer newresp.Body.Close()
  1862. body, err := ioutil.ReadAll(newresp.Body)
  1863. if err != nil {
  1864. log.Printf("[ERROR] Failed reading body (1): %s", err)
  1865. time.Sleep(time.Duration(sleepTime) * time.Second)
  1866. return err
  1867. }
  1868. if newresp.StatusCode != 200 {
  1869. log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body))
  1870. if strings.Contains(string(body), "Workflowexecution is already finished") {
  1871. log.Printf("[DEBUG] Shutting down (19)")
  1872. shutdown(workflowExecution, "", "", true)
  1873. }
  1874. time.Sleep(time.Duration(sleepTime) * time.Second)
  1875. return errors.New(fmt.Sprintf("Bad statuscode: %d", newresp.StatusCode))
  1876. }
  1877. err = json.Unmarshal(body, &workflowExecution)
  1878. if err != nil {
  1879. log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
  1880. time.Sleep(time.Duration(sleepTime) * time.Second)
  1881. return err
  1882. }
  1883. if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
  1884. log.Printf("[INFO][%s] Workflow execution is finished. Exiting worker.", workflowExecution.ExecutionId)
  1885. log.Printf("[DEBUG] Shutting down (20)")
  1886. if isKubernetes == "true" {
  1887. // log.Printf("workflow execution: %#v", workflowExecution)
  1888. clientset, _, err := shuffle.GetKubernetesClient()
  1889. if err != nil {
  1890. log.Println("[ERROR] Error getting kubernetes client (2):", err)
  1891. os.Exit(1)
  1892. }
  1893. cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
  1894. } else {
  1895. shutdown(workflowExecution, "", "", true)
  1896. }
  1897. }
  1898. hasUserinput := false
  1899. for _, result := range workflowExecution.Results {
  1900. if result.Action.ID != subflowId {
  1901. continue
  1902. }
  1903. if result.Action.AppName == "User Input" {
  1904. hasUserinput = true
  1905. }
  1906. log.Printf("[DEBUG][%s] Found subflow to handle: %s (%s)", workflowExecution.ExecutionId, result.Action.AppName, result.Status)
  1907. if result.Status == "SUCCESS" || result.Status == "FINISHED" || result.Status == "FAILURE" || result.Status == "ABORTED" {
  1908. // Check for results
  1909. setWorkflowExecution(ctx, workflowExecution, false)
  1910. return nil
  1911. }
  1912. }
  1913. if workflowExecution.Status == "WAITING" && workflowExecution.ExecutionSource != "default" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  1914. log.Printf("[INFO][%s] Workflow execution is waiting. Exiting worker, as backend will restart it.", workflowExecution.ExecutionId)
  1915. shutdown(workflowExecution, "", "", true)
  1916. }
  1917. log.Printf("[INFO][%s] (2) Status: %s, Results: %d, actions: %d. Userinput: %#v", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra, hasUserinput)
  1918. return errors.New("Subflow status not found yet")
  1919. }
  1920. func handleDefaultExecutionWrapper(ctx context.Context, workflowExecution shuffle.WorkflowExecution, streamResultUrl string, extra int) error {
  1921. if extra == -1 {
  1922. extra = 0
  1923. for _, trigger := range workflowExecution.Workflow.Triggers {
  1924. if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
  1925. extra += 1
  1926. }
  1927. }
  1928. }
  1929. req, err := http.NewRequest(
  1930. "POST",
  1931. streamResultUrl,
  1932. bytes.NewBuffer([]byte(data)),
  1933. )
  1934. newresp, err := topClient.Do(req)
  1935. if err != nil {
  1936. log.Printf("[ERROR] Failed making request (1): %s", err)
  1937. time.Sleep(time.Duration(sleepTime) * time.Second)
  1938. return err
  1939. }
  1940. defer newresp.Body.Close()
  1941. body, err := ioutil.ReadAll(newresp.Body)
  1942. if err != nil {
  1943. log.Printf("[ERROR] Failed reading body (1): %s", err)
  1944. time.Sleep(time.Duration(sleepTime) * time.Second)
  1945. return err
  1946. }
  1947. if newresp.StatusCode != 200 {
  1948. log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body))
  1949. if strings.Contains(string(body), "Workflowexecution is already finished") {
  1950. log.Printf("[DEBUG] Shutting down (19)")
  1951. shutdown(workflowExecution, "", "", true)
  1952. }
  1953. time.Sleep(time.Duration(sleepTime) * time.Second)
  1954. return errors.New(fmt.Sprintf("Bad statuscode: %d", newresp.StatusCode))
  1955. }
  1956. err = json.Unmarshal(body, &workflowExecution)
  1957. if err != nil {
  1958. log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
  1959. time.Sleep(time.Duration(sleepTime) * time.Second)
  1960. return err
  1961. }
  1962. if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
  1963. log.Printf("[INFO][%s] Workflow execution is finished. Exiting worker.", workflowExecution.ExecutionId)
  1964. log.Printf("[DEBUG] Shutting down (20)")
  1965. if isKubernetes == "true" {
  1966. // log.Printf("workflow execution: %#v", workflowExecution)
  1967. clientset, _, err := shuffle.GetKubernetesClient()
  1968. if err != nil {
  1969. log.Println("[ERROR] Error getting kubernetes client (2):", err)
  1970. os.Exit(1)
  1971. }
  1972. cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
  1973. } else {
  1974. shutdown(workflowExecution, "", "", true)
  1975. }
  1976. }
  1977. log.Printf("[INFO][%s] (3) Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra)
  1978. if workflowExecution.Status != "EXECUTING" {
  1979. log.Printf("[WARNING][%s] Exiting as worker execution has status %s!", workflowExecution.ExecutionId, workflowExecution.Status)
  1980. log.Printf("[DEBUG] Shutting down (21)")
  1981. if isKubernetes == "true" {
  1982. // log.Printf("workflow execution: %#v", workflowExecution)
  1983. clientset, _, err := shuffle.GetKubernetesClient()
  1984. if err != nil {
  1985. log.Println("[ERROR] Error getting kubernetes client (3):", err)
  1986. os.Exit(1)
  1987. }
  1988. cleanupKubernetesExecution(clientset, workflowExecution, kubernetesNamespace)
  1989. } else {
  1990. shutdown(workflowExecution, "", "", true)
  1991. }
  1992. }
  1993. setWorkflowExecution(ctx, workflowExecution, false)
  1994. return nil
  1995. }
  1996. func handleDefaultExecution(client *http.Client, req *http.Request, workflowExecution shuffle.WorkflowExecution) error {
  1997. // if no onprem runs (shouldn't happen, but extra check), exit
  1998. // if there are some, load the images ASAP for the app
  1999. ctx := context.Background()
  2000. //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
  2001. startAction, extra, _, _, _, _, _, _ := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
  2002. err := executionInit(workflowExecution)
  2003. if err != nil {
  2004. log.Printf("[INFO] Workflow setup failed for %s: %s", workflowExecution.ExecutionId, err)
  2005. log.Printf("[DEBUG] Shutting down (18)")
  2006. shutdown(workflowExecution, "", "", true)
  2007. }
  2008. log.Printf("[DEBUG] DEFAULT EXECUTION Startaction: %s", startAction)
  2009. setWorkflowExecution(ctx, workflowExecution, false)
  2010. streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
  2011. for {
  2012. err = handleDefaultExecutionWrapper(ctx, workflowExecution, streamResultUrl, extra)
  2013. if err != nil {
  2014. log.Printf("[ERROR] Failed handling default execution: %s", err)
  2015. }
  2016. }
  2017. return nil
  2018. }
  2019. func arrayContains(visited []string, id string) bool {
  2020. found := false
  2021. for _, item := range visited {
  2022. if item == id {
  2023. found = true
  2024. break
  2025. }
  2026. }
  2027. return found
  2028. }
  2029. func getResult(workflowExecution shuffle.WorkflowExecution, id string) shuffle.ActionResult {
  2030. for _, actionResult := range workflowExecution.Results {
  2031. if actionResult.Action.ID == id {
  2032. return actionResult
  2033. }
  2034. }
  2035. return shuffle.ActionResult{}
  2036. }
  2037. func getAction(workflowExecution shuffle.WorkflowExecution, id, environment string) shuffle.Action {
  2038. for _, action := range workflowExecution.Workflow.Actions {
  2039. if action.ID == id {
  2040. return action
  2041. }
  2042. }
  2043. for _, trigger := range workflowExecution.Workflow.Triggers {
  2044. if trigger.ID == id {
  2045. return shuffle.Action{
  2046. ID: trigger.ID,
  2047. AppName: trigger.AppName,
  2048. Name: trigger.AppName,
  2049. Environment: environment,
  2050. Label: trigger.Label,
  2051. }
  2052. log.Printf("FOUND TRIGGER: %#v!", trigger)
  2053. }
  2054. }
  2055. return shuffle.Action{}
  2056. }
  2057. func runSkipAction(client *http.Client, action shuffle.Action, workflowId, workflowExecutionId, authorization string, configuration string) error {
  2058. timeNow := time.Now().Unix()
  2059. result := shuffle.ActionResult{
  2060. Action: action,
  2061. ExecutionId: workflowExecutionId,
  2062. Authorization: authorization,
  2063. Result: configuration,
  2064. StartedAt: timeNow,
  2065. CompletedAt: 0,
  2066. Status: "SUCCESS",
  2067. }
  2068. resultData, err := json.Marshal(result)
  2069. if err != nil {
  2070. return err
  2071. }
  2072. streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
  2073. req, err := http.NewRequest(
  2074. "POST",
  2075. streamUrl,
  2076. bytes.NewBuffer([]byte(resultData)),
  2077. )
  2078. if err != nil {
  2079. log.Printf("[WARNING] Error building skip request (0): %s", err)
  2080. return err
  2081. }
  2082. newresp, err := topClient.Do(req)
  2083. if err != nil {
  2084. log.Printf("[WARNING] Error running skip request (0): %s", err)
  2085. return err
  2086. }
  2087. defer newresp.Body.Close()
  2088. body, err := ioutil.ReadAll(newresp.Body)
  2089. if err != nil {
  2090. log.Printf("[WARNING] Failed reading body when skipping (0): %s", err)
  2091. return err
  2092. }
  2093. log.Printf("[INFO] Skip Action Body: %s", string(body))
  2094. return nil
  2095. }
  2096. func runTestExecution(client *http.Client, workflowId, apikey string) (string, string) {
  2097. executeUrl := fmt.Sprintf("%s/api/v1/workflows/%s/execute", baseUrl, workflowId)
  2098. req, err := http.NewRequest(
  2099. "GET",
  2100. executeUrl,
  2101. nil,
  2102. )
  2103. if err != nil {
  2104. log.Printf("Error building test request: %s", err)
  2105. return "", ""
  2106. }
  2107. req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", apikey))
  2108. newresp, err := topClient.Do(req)
  2109. if err != nil {
  2110. log.Printf("[WARNING] Error running test request (3): %s", err)
  2111. return "", ""
  2112. }
  2113. defer newresp.Body.Close()
  2114. body, err := ioutil.ReadAll(newresp.Body)
  2115. if err != nil {
  2116. log.Printf("[WARNING] Failed reading body: %s", err)
  2117. return "", ""
  2118. }
  2119. log.Printf("[INFO] Test Body: %s", string(body))
  2120. var workflowExecution shuffle.WorkflowExecution
  2121. err = json.Unmarshal(body, &workflowExecution)
  2122. if err != nil {
  2123. log.Printf("Failed workflowExecution unmarshal: %s", err)
  2124. return "", ""
  2125. }
  2126. return workflowExecution.Authorization, workflowExecution.ExecutionId
  2127. }
  2128. func isRunningInCluster() bool {
  2129. _, existsHost := os.LookupEnv("KUBERNETES_SERVICE_HOST")
  2130. _, existsPort := os.LookupEnv("KUBERNETES_SERVICE_PORT")
  2131. return existsHost && existsPort
  2132. }
  2133. func buildEnvVars(envMap map[string]string) []corev1.EnvVar {
  2134. var envVars []corev1.EnvVar
  2135. for key, value := range envMap {
  2136. envVars = append(envVars, corev1.EnvVar{Name: key, Value: value})
  2137. }
  2138. return envVars
  2139. }
  2140. func buildResourcesFromEnv() corev1.ResourceRequirements {
  2141. requests := corev1.ResourceList{}
  2142. limits := corev1.ResourceList{}
  2143. type item struct {
  2144. env string
  2145. resourceName corev1.ResourceName
  2146. resourceList corev1.ResourceList
  2147. }
  2148. items := []item{
  2149. // kubernetes requests
  2150. {env: "SHUFFLE_APP_CPU_REQUEST", resourceName: corev1.ResourceCPU, resourceList: requests},
  2151. {env: "SHUFFLE_APP_MEMORY_REQUEST", resourceName: corev1.ResourceMemory, resourceList: requests},
  2152. {env: "SHUFFLE_APP_EPHEMERAL_STORAGE_REQUEST", resourceName: corev1.ResourceEphemeralStorage, resourceList: requests},
  2153. // kubernetes limits
  2154. {env: "SHUFFLE_APP_CPU_LIMIT", resourceName: corev1.ResourceCPU, resourceList: limits},
  2155. {env: "SHUFFLE_APP_MEMORY_LIMIT", resourceName: corev1.ResourceMemory, resourceList: limits},
  2156. {env: "SHUFFLE_APP_EPHEMERAL_STORAGE_LIMIT", resourceName: corev1.ResourceEphemeralStorage, resourceList: limits},
  2157. }
  2158. for _, it := range items {
  2159. if value := strings.TrimSpace(os.Getenv(it.env)); value != "" {
  2160. if quantity, err := resource.ParseQuantity(value); err == nil {
  2161. it.resourceList[it.resourceName] = quantity
  2162. } else {
  2163. log.Printf("[WARNING] Cannot parse %s=%q as resource quantity: %v", it.env, value, err)
  2164. }
  2165. }
  2166. }
  2167. rr := corev1.ResourceRequirements{}
  2168. if len(requests) > 0 {
  2169. rr.Requests = requests
  2170. }
  2171. if len(limits) > 0 {
  2172. rr.Limits = limits
  2173. }
  2174. return rr
  2175. }
  2176. func getWorkerBackendExecution(auth string, executionId string) (*shuffle.WorkflowExecution, error) {
  2177. backendUrl := os.Getenv("BASE_URL")
  2178. if len(backendUrl) == 0 {
  2179. backendUrl = "http://shuffle-backend:5001"
  2180. }
  2181. var workflowExecution *shuffle.WorkflowExecution
  2182. streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", backendUrl)
  2183. topClient := shuffle.GetExternalClient(backendUrl)
  2184. requestData := shuffle.ActionResult{
  2185. Authorization: auth,
  2186. ExecutionId: executionId,
  2187. }
  2188. data, err := json.Marshal(requestData)
  2189. if err != nil {
  2190. return workflowExecution, err
  2191. }
  2192. req, err := http.NewRequest(
  2193. "POST",
  2194. streamResultUrl,
  2195. bytes.NewBuffer([]byte(data)),
  2196. )
  2197. newresp, err := topClient.Do(req)
  2198. if err != nil {
  2199. return workflowExecution, err
  2200. }
  2201. defer newresp.Body.Close()
  2202. if newresp.StatusCode != 200 {
  2203. return workflowExecution, errors.New(fmt.Sprintf("Got bad status code from backend %d", newresp.StatusCode))
  2204. }
  2205. body, err := ioutil.ReadAll(newresp.Body)
  2206. if err != nil {
  2207. return workflowExecution, err
  2208. }
  2209. err = json.Unmarshal(body, &workflowExecution)
  2210. if err != nil {
  2211. return workflowExecution, err
  2212. }
  2213. if debug {
  2214. log.Printf("[INFO] Here is the result we got back from backend: %s", workflowExecution.Results)
  2215. }
  2216. //setWorkflowExecution(context.Background(), *workflowExecution, false)
  2217. return workflowExecution, nil
  2218. }
  2219. func handleWorkflowQueue(resp http.ResponseWriter, request *http.Request) {
  2220. if request.Body == nil {
  2221. resp.WriteHeader(http.StatusBadRequest)
  2222. return
  2223. }
  2224. defer request.Body.Close()
  2225. body, err := ioutil.ReadAll(request.Body)
  2226. if err != nil {
  2227. log.Printf("[WARNING] (3) Failed reading body for workflowqueue")
  2228. resp.WriteHeader(401)
  2229. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  2230. return
  2231. }
  2232. var actionResult shuffle.ActionResult
  2233. err = json.Unmarshal(body, &actionResult)
  2234. if err != nil {
  2235. log.Printf("[ERROR] Failed shuffle.ActionResult unmarshaling (2): %s", err)
  2236. //resp.WriteHeader(401)
  2237. //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  2238. //return
  2239. }
  2240. if len(actionResult.ExecutionId) == 0 {
  2241. log.Printf("[ERROR] No workflow execution id in action result. Data: %s", string(body))
  2242. resp.WriteHeader(400)
  2243. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`)))
  2244. return
  2245. }
  2246. // 1. Get the shuffle.WorkflowExecution(ExecutionId) from the database
  2247. // 2. if shuffle.ActionResult.Authentication != shuffle.WorkflowExecution.Authentication -> exit
  2248. // 3. Add to and update actionResult in workflowExecution
  2249. // 4. Push to db
  2250. // IF FAIL: Set executionstatus: abort or cancel
  2251. ctx := context.Background()
  2252. if actionResult.ExecutionId == "TBD" {
  2253. return
  2254. }
  2255. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
  2256. if err != nil {
  2257. log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (1): %s", actionResult.ExecutionId, err)
  2258. workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
  2259. if err != nil {
  2260. log.Printf("[ERROR][%s] Failed getting execution (workflowqueue) %s: %s", actionResult.ExecutionId, actionResult.ExecutionId, err)
  2261. resp.WriteHeader(500)
  2262. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution ID %s because it doesn't exist locally."}`, actionResult.ExecutionId)))
  2263. return
  2264. }
  2265. }
  2266. if workflowExecution.Authorization != actionResult.Authorization {
  2267. log.Printf("[ERROR][%s] Bad authorization key when updating node (workflowQueue). Want: %s, Have: %s", actionResult.ExecutionId, workflowExecution.Authorization, actionResult.Authorization)
  2268. resp.WriteHeader(403)
  2269. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key"}`)))
  2270. return
  2271. }
  2272. if workflowExecution.Status == "FINISHED" {
  2273. log.Printf("[DEBUG][%s] Workflowexecution is already FINISHED. No further action can be taken", workflowExecution.ExecutionId)
  2274. resp.WriteHeader(200)
  2275. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is already finished because it has status %s. Lastnode: %s"}`, workflowExecution.Status, workflowExecution.LastNode)))
  2276. return
  2277. }
  2278. if workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
  2279. log.Printf("[WARNING][%s] Workflowexecution already has status %s. No further action can be taken", workflowExecution.ExecutionId, workflowExecution.Status)
  2280. resp.WriteHeader(200)
  2281. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is aborted because of %s with result %s and status %s"}`, workflowExecution.LastNode, workflowExecution.Result, workflowExecution.Status)))
  2282. return
  2283. }
  2284. retries := 0
  2285. retry, retriesok := request.URL.Query()["retries"]
  2286. if retriesok && len(retry) > 0 {
  2287. val, err := strconv.Atoi(retry[0])
  2288. if err == nil {
  2289. retries = val
  2290. }
  2291. }
  2292. // Not doing environment as we don't want to hook a worker to specific env. It should just not handle cloud actions
  2293. // limiting a worker to an env will not allow us to run multiple orborus in the same server?
  2294. if strings.EqualFold(actionResult.Action.Environment, "cloud") {
  2295. log.Printf("[WARNING] Got an action for %s environment forwarding it to the backend", actionResult.Action.Environment)
  2296. streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
  2297. req, err := http.NewRequest(
  2298. "POST",
  2299. streamUrl,
  2300. bytes.NewBuffer([]byte(body)),
  2301. )
  2302. if err != nil {
  2303. log.Printf("[ERROR] Error building subflow (%s) request: %s", workflowExecution.ExecutionId, err)
  2304. return
  2305. }
  2306. newresp, err := topClient.Do(req)
  2307. if err != nil {
  2308. log.Printf("[ERROR] Error running subflow (%s) request: %s", workflowExecution.ExecutionId, err)
  2309. return
  2310. }
  2311. defer newresp.Body.Close()
  2312. if newresp.StatusCode != 200 {
  2313. body, err := ioutil.ReadAll(newresp.Body)
  2314. if err != nil {
  2315. log.Printf("[INFO][%s] Failed reading body after subflow request: %s", workflowExecution.ExecutionId, err)
  2316. return
  2317. } else {
  2318. log.Printf("[ERROR][%s] Failed forwarding subflow request of length %d\n: %s", workflowExecution.ExecutionId, len(actionResult.Result), string(body))
  2319. }
  2320. }
  2321. return
  2322. }
  2323. log.Printf("[DEBUG][%s] Action: Received, Label: '%s', Action: '%s', Status: %s, Run status: %s, Extra=Retry:%d", workflowExecution.ExecutionId, actionResult.Action.Label, actionResult.Action.AppName, actionResult.Status, workflowExecution.Status, retries)
  2324. // results = append(results, actionResult)
  2325. // log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters))
  2326. // log.Printf("[DEBUG][%s] In workflowQueue with transaction", workflowExecution.ExecutionId)
  2327. runWorkflowExecutionTransaction(ctx, 0, workflowExecution.ExecutionId, actionResult, resp)
  2328. }
  2329. // Will make sure transactions are always ran for an execution. This is recursive if it fails. Allowed to fail up to 5 times
  2330. func runWorkflowExecutionTransaction(ctx context.Context, attempts int64, workflowExecutionId string, actionResult shuffle.ActionResult, resp http.ResponseWriter) {
  2331. //log.Printf("[DEBUG][%s] IN WORKFLOWEXECUTION SUB!", actionResult.ExecutionId)
  2332. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
  2333. if err != nil {
  2334. log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (2): %s", actionResult.ExecutionId, err)
  2335. workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
  2336. if err != nil {
  2337. log.Printf("[ERROR] Failed getting execution cache: %s", err)
  2338. resp.WriteHeader(400)
  2339. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
  2340. return
  2341. }
  2342. }
  2343. resultLength := len(workflowExecution.Results)
  2344. setExecution := true
  2345. workflowExecution, dbSave, err := shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, true, 0)
  2346. if err == nil {
  2347. if workflowExecution.Status != "EXECUTING" && workflowExecution.Status != "WAITING" {
  2348. log.Printf("[WARNING][%s] Execution is not executing, but %s. Stopping Transaction update.", workflowExecution.ExecutionId, workflowExecution.Status)
  2349. if resp != nil {
  2350. resp.WriteHeader(200)
  2351. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Execution is not executing, but %s"}`, workflowExecution.Status)))
  2352. }
  2353. log.Printf("[DEBUG][%s] Shutting down (35)", workflowExecution.ExecutionId)
  2354. // Force sending result
  2355. shutdownData, err := json.Marshal(workflowExecution)
  2356. if err != nil {
  2357. log.Printf("[ERROR][%s] Failed marshalling execution (35): %s", workflowExecution.ExecutionId, err)
  2358. }
  2359. sendResult(*workflowExecution, shutdownData)
  2360. shutdown(*workflowExecution, "", "", false)
  2361. return
  2362. }
  2363. /*** STARTREMOVE ***/
  2364. if workflowExecution.Status == "WAITING" && (os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm") {
  2365. log.Printf("[INFO][%s] Workflow execution is waiting while in swarm. Sending info to backend to ensure execution stops.", workflowExecution.ExecutionId)
  2366. shutdownData, err := json.Marshal(workflowExecution)
  2367. if err != nil {
  2368. log.Printf("[ERROR][%s] Failed marshalling execution (36) - not sending backend WAITING: %s", workflowExecution.ExecutionId, err)
  2369. } else {
  2370. sendResult(*workflowExecution, shutdownData)
  2371. shutdown(*workflowExecution, "", "", false)
  2372. }
  2373. }
  2374. /*** ENDREMOVE ***/
  2375. } else {
  2376. if strings.Contains(strings.ToLower(fmt.Sprintf("%s", err)), "already been ran") || strings.Contains(strings.ToLower(fmt.Sprintf("%s", err)), "already finished") {
  2377. log.Printf("[ERROR][%s] Skipping rerun of action result as it's already been ran: %s", workflowExecution.ExecutionId)
  2378. return
  2379. }
  2380. log.Printf("[DEBUG] Rerunning transaction? %s", err)
  2381. if strings.Contains(fmt.Sprintf("%s", err), "Rerun this transaction") {
  2382. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
  2383. if err != nil {
  2384. log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (3): %s", actionResult.ExecutionId, err)
  2385. workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
  2386. if err != nil {
  2387. log.Printf("[ERROR][%s] Failed getting execution cache (2): %s", workflowExecution.ExecutionId, err)
  2388. resp.WriteHeader(400)
  2389. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`)))
  2390. return
  2391. }
  2392. }
  2393. resultLength = len(workflowExecution.Results)
  2394. setExecution = true
  2395. workflowExecution, dbSave, err = shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, false, 0)
  2396. if err != nil {
  2397. log.Printf("[ERROR][%s] Failed execution of parsedexecution (2): %s", workflowExecution.ExecutionId, err)
  2398. resp.WriteHeader(401)
  2399. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`)))
  2400. return
  2401. } else {
  2402. log.Printf("[DEBUG][%s] Successfully got ParsedExecution with %d results!", workflowExecution.ExecutionId, len(workflowExecution.Results))
  2403. }
  2404. } else {
  2405. log.Printf("[ERROR][%s] Failed execution of parsedexecution: %s", workflowExecution.ExecutionId, err)
  2406. resp.WriteHeader(401)
  2407. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
  2408. return
  2409. }
  2410. }
  2411. //log.Printf(`[DEBUG][%s] Got result %s from %s. Execution status: %s. Save: %#v. Parent: %#v`, actionResult.ExecutionId, actionResult.Status, actionResult.Action.ID, workflowExecution.Status, dbSave, workflowExecution.ExecutionParent)
  2412. //dbSave := false
  2413. //if len(results) != len(workflowExecution.Results) {
  2414. // log.Printf("[DEBUG][%s] There may have been an issue in transaction queue. Result lengths: %d vs %d. Should check which exists the base results, but not in entire execution, then append.", workflowExecution.ExecutionId, len(results), len(workflowExecution.Results))
  2415. //}
  2416. // Validating that action results hasn't changed
  2417. // Handled using cachhing, so actually pretty fast
  2418. cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
  2419. cache, err := shuffle.GetCache(ctx, cacheKey)
  2420. if err == nil {
  2421. //parsedValue := value.(*shuffle.WorkflowExecution)
  2422. parsedValue := &shuffle.WorkflowExecution{}
  2423. cacheData := []byte(cache.([]uint8))
  2424. err = json.Unmarshal(cacheData, &workflowExecution)
  2425. if err != nil {
  2426. log.Printf("[ERROR][%s] Failed unmarshalling workflowexecution: %s", workflowExecution.ExecutionId, err)
  2427. }
  2428. if len(parsedValue.Results) > 0 && len(parsedValue.Results) != resultLength {
  2429. setExecution = false
  2430. if attempts > 5 {
  2431. }
  2432. attempts += 1
  2433. log.Printf("[DEBUG][%s] Rerunning transaction as results has changed. %d vs %d", workflowExecution.ExecutionId, len(parsedValue.Results), resultLength)
  2434. /*
  2435. if len(workflowExecution.Results) <= len(workflowExecution.Workflow.Actions) {
  2436. log.Printf("[DEBUG][%s] Rerunning transaction as results has changed. %d vs %d", workflowExecution.ExecutionId, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions))
  2437. runWorkflowExecutionTransaction(ctx, attempts, workflowExecutionId, actionResult, resp)
  2438. return
  2439. }
  2440. */
  2441. }
  2442. }
  2443. if setExecution || workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
  2444. if debug {
  2445. log.Printf("[DEBUG][%s] Running setexec with status %s and %d/%d results", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions))
  2446. }
  2447. //result(s)", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results))
  2448. err = setWorkflowExecution(ctx, *workflowExecution, dbSave)
  2449. if err != nil {
  2450. log.Printf("[ERROR][%s] Failed setting execution: %s", workflowExecution.ExecutionId, err)
  2451. resp.WriteHeader(400)
  2452. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed setting workflowexecution actionresult: %s"}`, err)))
  2453. return
  2454. }
  2455. /*** STARTREMOVE ***/
  2456. if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
  2457. finished := shuffle.ValidateFinished(ctx, -1, *workflowExecution)
  2458. if !finished {
  2459. if debug {
  2460. log.Printf("[DEBUG][%s] Handling next node since it's not finished!", workflowExecution.ExecutionId)
  2461. }
  2462. handleExecutionResult(*workflowExecution)
  2463. } else {
  2464. shutdownData, err := json.Marshal(workflowExecution)
  2465. if err != nil {
  2466. log.Printf("[ERROR] Failed marshalling shutdowndata during set: %s", err)
  2467. }
  2468. sendResult(*workflowExecution, shutdownData)
  2469. }
  2470. }
  2471. /*** ENDREMOVE ***/
  2472. } else {
  2473. log.Printf("[INFO][%s] Skipping setexec with status %s", workflowExecution.ExecutionId, workflowExecution.Status)
  2474. // Just in case. Should MAYBE validate finishing another time as well.
  2475. // This fixes issues with e.g. shuffle.Action -> shuffle.Trigger -> shuffle.Action.
  2476. handleExecutionResult(*workflowExecution)
  2477. }
  2478. //if newExecutions && len(nextActions) > 0 {
  2479. // log.Printf("[DEBUG][%s] New execution: %#v. NextActions: %#v", newExecutions, nextActions)
  2480. // //handleExecutionResult(*workflowExecution)
  2481. //}
  2482. resp.WriteHeader(200)
  2483. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  2484. }
  2485. func sendSelfRequest(actionResult shuffle.ActionResult) {
  2486. /*** STARTREMOVE ***/
  2487. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  2488. log.Printf("[INFO][%s] Not sending self request info since source is default (not swarm)", actionResult.ExecutionId)
  2489. return
  2490. }
  2491. /*** ENDREMOVE ***/
  2492. data, err := json.Marshal(actionResult)
  2493. if err != nil {
  2494. log.Printf("[ERROR][%s] Shutting down (24): Failed to unmarshal data for backend: %s", actionResult.ExecutionId, err)
  2495. return
  2496. }
  2497. if actionResult.ExecutionId == "TBD" {
  2498. return
  2499. }
  2500. log.Printf("[DEBUG][%s] Sending FAILURE to self to stop the workflow execution. Action: %s (%s), app %s:%s", actionResult.ExecutionId, actionResult.Action.Label, actionResult.Action.ID, actionResult.Action.AppName, actionResult.Action.AppVersion)
  2501. // Literally sending to same worker to run it as a new request
  2502. streamUrl := fmt.Sprintf("http://localhost:33333/api/v1/streams")
  2503. hostenv := os.Getenv("WORKER_HOSTNAME")
  2504. if len(hostenv) > 0 {
  2505. streamUrl = fmt.Sprintf("http://%s:33333/api/v1/streams", hostenv)
  2506. }
  2507. req, err := http.NewRequest(
  2508. "POST",
  2509. streamUrl,
  2510. bytes.NewBuffer([]byte(data)),
  2511. )
  2512. if err != nil {
  2513. log.Printf("[ERROR][%s] Failed creating self request (1): %s", actionResult.ExecutionId, err)
  2514. return
  2515. }
  2516. client := shuffle.GetExternalClient(streamUrl)
  2517. newresp, err := client.Do(req)
  2518. if err != nil {
  2519. log.Printf("[ERROR][%s] Error running finishing request (2): %s", actionResult.ExecutionId, err)
  2520. return
  2521. }
  2522. defer newresp.Body.Close()
  2523. if newresp.Body != nil {
  2524. body, err := ioutil.ReadAll(newresp.Body)
  2525. //log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode)
  2526. if err != nil {
  2527. log.Printf("[ERROR][%s] Failed reading body: %s", actionResult.ExecutionId, err)
  2528. } else {
  2529. log.Printf("[DEBUG][%s] Sent update to backend - 2: %s", actionResult.ExecutionId, string(body))
  2530. }
  2531. }
  2532. }
  2533. func sendResult(workflowExecution shuffle.WorkflowExecution, data []byte) {
  2534. if workflowExecution.ExecutionSource == "default" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  2535. //log.Printf("[INFO][%s] Not sending backend info since source is default (not swarm)", workflowExecution.ExecutionId)
  2536. //return
  2537. } else {
  2538. }
  2539. // Basically to reduce backend strain
  2540. /*
  2541. if shuffle.ArrayContains(finishedExecutions, workflowExecution.ExecutionId) {
  2542. log.Printf("[INFO][%s] NOT sending backend info since it's already been sent before.", workflowExecution.ExecutionId)
  2543. return
  2544. }
  2545. */
  2546. // Take it down again
  2547. /*
  2548. if len(finishedExecutions) > 100 {
  2549. log.Printf("[DEBUG][%s] Removing old execution from finishedExecutions: %s", workflowExecution.ExecutionId, finishedExecutions[0])
  2550. finishedExecutions = finishedExecutions[99:]
  2551. }
  2552. finishedExecutions = append(finishedExecutions, workflowExecution.ExecutionId)
  2553. */
  2554. streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
  2555. req, err := http.NewRequest(
  2556. "POST",
  2557. streamUrl,
  2558. bytes.NewBuffer([]byte(data)),
  2559. )
  2560. if err != nil {
  2561. log.Printf("[ERROR][%s] Failed creating finishing request: %s", workflowExecution.ExecutionId, err)
  2562. log.Printf("[DEBUG][%s] Shutting down (22)", workflowExecution.ExecutionId)
  2563. shutdown(workflowExecution, "", "", false)
  2564. return
  2565. }
  2566. client := shuffle.GetExternalClient(streamUrl)
  2567. newresp, err := client.Do(req)
  2568. if err != nil {
  2569. log.Printf("[ERROR][%s] Error running finishing request (1): %s", workflowExecution.ExecutionId, err)
  2570. log.Printf("[DEBUG][%s] Shutting down (23)", workflowExecution.ExecutionId)
  2571. shutdown(workflowExecution, "", "", false)
  2572. return
  2573. }
  2574. defer newresp.Body.Close()
  2575. if newresp.Body != nil {
  2576. body, err := ioutil.ReadAll(newresp.Body)
  2577. //log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode)
  2578. if err != nil {
  2579. log.Printf("[ERROR][%s] Failed reading body: %s", workflowExecution.ExecutionId, err)
  2580. } else {
  2581. log.Printf("[DEBUG][%s] Sent request to backend: %s", workflowExecution.ExecutionId, string(body))
  2582. }
  2583. }
  2584. }
  2585. func validateFinished(workflowExecution shuffle.WorkflowExecution) bool {
  2586. ctx := context.Background()
  2587. newexec, err := shuffle.GetWorkflowExecution(ctx, workflowExecution.ExecutionId)
  2588. if err != nil {
  2589. log.Printf("[ERROR][%s] Failed getting workflow execution: %s", workflowExecution.ExecutionId, err)
  2590. return false
  2591. } else {
  2592. workflowExecution = *newexec
  2593. }
  2594. //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
  2595. workflowExecution, _ = shuffle.Fixexecution(ctx, workflowExecution)
  2596. _, extra, _, _, _, _, _, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
  2597. log.Printf("[INFO][%s] VALIDATION. Status: %s, shuffle.Actions: %d, Extra: %d, Results: %d. Parent: %#v", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), extra, len(workflowExecution.Results), workflowExecution.ExecutionParent)
  2598. if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || (len(environments) == 1 && requestsSent == 0 && len(workflowExecution.Results) >= 1 && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm") || (len(workflowExecution.Results) >= len(workflowExecution.Workflow.Actions)+extra && len(workflowExecution.Workflow.Actions) > 0) {
  2599. if workflowExecution.Status == "FINISHED" {
  2600. for _, result := range workflowExecution.Results {
  2601. if result.Status == "EXECUTING" || result.Status == "WAITING" {
  2602. log.Printf("[WARNING] NOT returning full result, as a result may be unfinished: %s (%s) - %s", result.Action.Label, result.Action.ID, result.Status)
  2603. return false
  2604. }
  2605. }
  2606. }
  2607. /*** STARTREMOVE ***/
  2608. if os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
  2609. requestsSent += 1
  2610. }
  2611. /*** ENDREMOVE ***/
  2612. log.Printf("[DEBUG][%s] Should send full result to %s", workflowExecution.ExecutionId, baseUrl)
  2613. //data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
  2614. shutdownData, err := json.Marshal(workflowExecution)
  2615. if err != nil {
  2616. log.Printf("[ERROR][%s] Shutting down (32): Failed to unmarshal data for backend: %s", workflowExecution.ExecutionId, err)
  2617. shutdown(workflowExecution, "", "", true)
  2618. }
  2619. cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
  2620. if len(workflowExecution.Authorization) > 0 {
  2621. err = shuffle.SetCache(ctx, cacheKey, shutdownData, 31)
  2622. if err != nil {
  2623. log.Printf("[ERROR][%s] Failed adding to cache during ValidateFinished", workflowExecution)
  2624. }
  2625. }
  2626. shuffle.RunCacheCleanup(ctx, workflowExecution)
  2627. sendResult(workflowExecution, shutdownData)
  2628. return true
  2629. }
  2630. return false
  2631. }
  2632. func handleGetStreamResults(resp http.ResponseWriter, request *http.Request) {
  2633. defer request.Body.Close()
  2634. body, err := ioutil.ReadAll(request.Body)
  2635. if err != nil {
  2636. log.Printf("[WARNING] Failed reading body for stream result queue")
  2637. resp.WriteHeader(500)
  2638. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  2639. return
  2640. }
  2641. var actionResult shuffle.ActionResult
  2642. err = json.Unmarshal(body, &actionResult)
  2643. if err != nil {
  2644. log.Printf("[WARNING] Failed shuffle.ActionResult unmarshaling: %s", err)
  2645. //resp.WriteHeader(400)
  2646. //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  2647. //return
  2648. }
  2649. if len(actionResult.ExecutionId) == 0 {
  2650. log.Printf("[WARNING] No workflow execution id in action result (2). Data: %s", string(body))
  2651. resp.WriteHeader(400)
  2652. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`)))
  2653. return
  2654. }
  2655. ctx := context.Background()
  2656. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
  2657. if err != nil {
  2658. log.Printf("[WARNING][%s] Failed to find execution in cache requesting backend (4): %s", actionResult.ExecutionId, err)
  2659. workflowExecution, err = getWorkerBackendExecution(actionResult.Authorization, actionResult.ExecutionId)
  2660. if err != nil {
  2661. log.Printf("[ERROR] Failed getting execution (streamresult) %s: %s", actionResult.ExecutionId, err)
  2662. resp.WriteHeader(400)
  2663. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
  2664. return
  2665. }
  2666. }
  2667. // Authorization is done here
  2668. if workflowExecution.Authorization != actionResult.Authorization {
  2669. log.Printf("[ERROR] Bad authorization key when getting stream results from cache %s.", actionResult.ExecutionId)
  2670. resp.WriteHeader(401)
  2671. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
  2672. return
  2673. }
  2674. newjson, err := json.Marshal(workflowExecution)
  2675. if err != nil {
  2676. resp.WriteHeader(500)
  2677. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow execution"}`)))
  2678. return
  2679. }
  2680. resp.WriteHeader(200)
  2681. resp.Write(newjson)
  2682. }
  2683. // GetLocalIP returns the non loopback local IP of the host
  2684. func getLocalIP() string {
  2685. /*** STARTREMOVE ***/
  2686. if os.Getenv("IS_KUBERNETES") == "true" {
  2687. return "shuffle-workers"
  2688. }
  2689. if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
  2690. name, err := os.Hostname()
  2691. if err != nil {
  2692. log.Printf("[ERROR] Couldn't find hostname of worker: %s", err)
  2693. os.Exit(3)
  2694. }
  2695. log.Printf("[DEBUG] Found hostname %s since worker is running with \"run\" command", name)
  2696. return name
  2697. /**
  2698. Everything below was a test to see if we needed to match directly to a network interface. May require docker network API.
  2699. **/
  2700. log.Printf("[DEBUG] Looking for IP for the external docker-network %s", swarmNetworkName)
  2701. // Different process to ensure we find the right IP.
  2702. // Necessary due to Ingress being added to docker ser
  2703. ifaces, err := net.Interfaces()
  2704. if err != nil {
  2705. log.Printf("[ERROR] FATAL: networks the container is listening in %s: %s", swarmNetworkName, err)
  2706. os.Exit(3)
  2707. }
  2708. foundIP := ""
  2709. for _, i := range ifaces {
  2710. log.Printf("NETWORK: %s", i.Name)
  2711. //If i.Name != swarmNetworkName {
  2712. // continue
  2713. //}
  2714. addrs, err := i.Addrs()
  2715. if err != nil {
  2716. log.Printf("[ERROR] FATAL: Failed getting address for listener in network %s: %s", swarmNetworkName, err)
  2717. continue
  2718. }
  2719. for _, addr := range addrs {
  2720. var ip net.IP
  2721. switch v := addr.(type) {
  2722. case *net.IPNet:
  2723. ip = v.IP
  2724. case *net.IPAddr:
  2725. ip = v.IP
  2726. }
  2727. log.Printf("%s: IP: %#v", i.Name, ip)
  2728. // FIXME: Allow for IPv6 too!
  2729. //if strings.Count(ip.String(), ".") == 3 {
  2730. // foundIP = ip.String()
  2731. // break
  2732. //}
  2733. // process IP address
  2734. }
  2735. }
  2736. if len(foundIP) == 0 {
  2737. log.Printf("[ERROR] FATAL: No valid IP found for network %s. Defaulting to base IP", swarmNetworkName)
  2738. } else {
  2739. return foundIP
  2740. }
  2741. }
  2742. /*** ENDREMOVE ***/
  2743. addrs, err := net.InterfaceAddrs()
  2744. if err != nil {
  2745. return ""
  2746. }
  2747. for _, address := range addrs {
  2748. // check the address type and if it is not a loopback the display it
  2749. if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  2750. if ipnet.IP.To4() != nil {
  2751. return ipnet.IP.String()
  2752. }
  2753. }
  2754. }
  2755. return ""
  2756. }
  2757. func getAvailablePort() (net.Listener, error) {
  2758. listener, err := net.Listen("tcp", ":0")
  2759. if err != nil {
  2760. log.Printf("[WARNING] Failed to assign port by default. Defaulting to 5001")
  2761. return nil, err
  2762. }
  2763. return listener, nil
  2764. //return fmt.Sprintf(":%d", port)
  2765. }
  2766. func webserverSetup(workflowExecution shuffle.WorkflowExecution) net.Listener {
  2767. hostname = getLocalIP()
  2768. if isKubernetes == "true" {
  2769. os.Setenv("WORKER_HOSTNAME", "shuffle-workers")
  2770. } else {
  2771. os.Setenv("WORKER_HOSTNAME", hostname)
  2772. }
  2773. // FIXME: This MAY not work because of speed between first
  2774. // container being launched and port being assigned to webserver
  2775. listener, err := getAvailablePort()
  2776. if err != nil {
  2777. log.Printf("[ERROR] Failed to create init listener: %s", err)
  2778. return listener
  2779. }
  2780. log.Printf("[DEBUG] OLD HOSTNAME: %s", appCallbackUrl)
  2781. /*** STARTREMOVE ***/
  2782. if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
  2783. log.Printf("[DEBUG] Starting webserver (1) on port %d with hostname: %s", baseport, hostname)
  2784. os.Setenv("WORKER_PORT", fmt.Sprintf("%d", baseport))
  2785. appCallbackUrl = fmt.Sprintf("http://%s:%d", hostname, baseport)
  2786. if os.Getenv("IS_KUBERNETES") == "true" {
  2787. appCallbackUrl = fmt.Sprintf("http://%s:%d", "shuffle-workers", baseport)
  2788. log.Printf("[DEBUG] NEW WORKER APP: %s", appCallbackUrl)
  2789. hostname = "shuffle-workers"
  2790. }
  2791. listener, err = net.Listen("tcp", fmt.Sprintf(":%d", baseport))
  2792. if err != nil {
  2793. log.Printf("[ERROR] Failed to assign port to %d: %s", baseport, err)
  2794. return nil
  2795. }
  2796. return listener
  2797. }
  2798. /*** ENDREMOVE ***/
  2799. port := listener.Addr().(*net.TCPAddr).Port
  2800. // Set the port environment variable
  2801. os.Setenv("WORKER_PORT", fmt.Sprintf("%d", port))
  2802. log.Printf("[DEBUG] Starting webserver (2) on port %d with hostname: %s", port, hostname)
  2803. appCallbackUrl = fmt.Sprintf("http://%s:%d", hostname, port)
  2804. log.Printf("[INFO] NEW WORKER HOSTNAME: %s", appCallbackUrl)
  2805. return listener
  2806. }
  2807. func findActiveSwarmNodes(dockercli *dockerclient.Client) (int64, error) {
  2808. ctx := context.Background()
  2809. nodes, err := dockercli.NodeList(ctx, types.NodeListOptions{})
  2810. if err != nil {
  2811. return 1, err
  2812. }
  2813. nodeCount := int64(0)
  2814. for _, node := range nodes {
  2815. //log.Printf("ID: %s - %#v", node.ID, node.Status.State)
  2816. if node.Status.State == "ready" {
  2817. nodeCount += 1
  2818. }
  2819. }
  2820. // Check for SHUFFLE_MAX_NODES
  2821. maxNodesString := os.Getenv("SHUFFLE_MAX_SWARM_NODES")
  2822. // Make it into a number and check if it's lower than nodeCount
  2823. if len(maxNodesString) > 0 {
  2824. maxNodes, err := strconv.ParseInt(maxNodesString, 10, 64)
  2825. if err != nil {
  2826. return nodeCount, err
  2827. }
  2828. if nodeCount > maxNodes {
  2829. nodeCount = maxNodes
  2830. }
  2831. }
  2832. return nodeCount, nil
  2833. /*
  2834. containers, err := dockercli.ContainerList(ctx, types.ContainerListOptions{
  2835. All: true,
  2836. })
  2837. */
  2838. }
  2839. /*** STARTREMOVE ***/
  2840. func deploySwarmService(dockercli *dockerclient.Client, name, image string, deployport int, inputReplicas int64, retry bool) error {
  2841. log.Printf("[DEBUG] Deploying service for %s to swarm on port %d", name, deployport)
  2842. //containerName := fmt.Sprintf("shuffle-worker-%s", parsedUuid)
  2843. // Check if the image exists or not - just in case
  2844. _, _, err := dockercli.ImageInspectWithRaw(context.Background(), image)
  2845. if err != nil {
  2846. log.Printf("[INFO] Image %s not found locally. Pulling from registry...", image)
  2847. localRegistry := os.Getenv("REGISTRY_URL")
  2848. if !strings.HasPrefix(image, localRegistry) && len(localRegistry) > 0 {
  2849. image = fmt.Sprintf("%s/%s", localRegistry, image)
  2850. log.Printf("[DEBUG] Changed image to %s", image)
  2851. }
  2852. _, err := dockercli.ImagePull(
  2853. context.Background(),
  2854. image,
  2855. dockerimage.PullOptions{},
  2856. )
  2857. if err != nil {
  2858. log.Printf("[ERROR] Failed pulling image %s: %s", image, err)
  2859. return err
  2860. }
  2861. }
  2862. if len(baseimagename) == 0 || baseimagename == "/" {
  2863. baseimagename = "frikky/shuffle"
  2864. //var baseimagename = "frikky/shuffle"
  2865. //var registryName = "registry.hub.docker.com"
  2866. }
  2867. //image := fmt.Sprintf("%s:%s", baseimagename, name)
  2868. networkName := "shuffle-executions"
  2869. if len(swarmNetworkName) > 0 {
  2870. networkName = swarmNetworkName
  2871. }
  2872. // Apps used a lot should have 2 replicas (default)
  2873. // New default to 3 (as the chance of queues piling up is lower)
  2874. replicas := uint64(3)
  2875. // Sent from Orborus
  2876. // Should be equal to
  2877. scaleReplicas := os.Getenv("SHUFFLE_APP_REPLICAS")
  2878. if len(scaleReplicas) > 0 {
  2879. tmpInt, err := strconv.Atoi(scaleReplicas)
  2880. if err != nil {
  2881. log.Printf("[ERROR] %s is not a valid number for replication", scaleReplicas)
  2882. } else {
  2883. replicas = uint64(tmpInt)
  2884. }
  2885. log.Printf("[DEBUG] SHUFFLE_APP_REPLICAS set to value %#v. Trying to overwrite default (%d/node)", scaleReplicas, replicas)
  2886. }
  2887. // Max scale as well
  2888. nodeCount := uint64(1)
  2889. if inputReplicas > 0 && inputReplicas < 100 {
  2890. if replicas != uint64(inputReplicas) {
  2891. log.Printf("[DEBUG] Overwriting replicas to %d/node as inputReplicas is set to %d", inputReplicas, inputReplicas)
  2892. }
  2893. replicas = uint64(inputReplicas)
  2894. } else {
  2895. cnt, err := findActiveSwarmNodes(dockercli)
  2896. if err != nil {
  2897. log.Printf("[ERROR] Unable to find active swarm nodes: %s", err)
  2898. }
  2899. if cnt > 0 {
  2900. nodeCount = uint64(cnt)
  2901. }
  2902. // FIXME: From September 2025 - This is set back to 1, as this doesn't really reflect how scale works at all. It is just confusing, and makes number larger/smaller "arbitrarily" instead of using default docker scale
  2903. nodeCount = 1
  2904. }
  2905. replicatedJobs := uint64(replicas * nodeCount)
  2906. log.Printf("[DEBUG] Deploying app with name %s with image %s", name, image)
  2907. containerName := fmt.Sprintf(strings.Replace(name, ".", "-", -1))
  2908. serviceSpec := swarm.ServiceSpec{
  2909. Annotations: swarm.Annotations{
  2910. Name: containerName,
  2911. Labels: map[string]string{},
  2912. },
  2913. Mode: swarm.ServiceMode{
  2914. Replicated: &swarm.ReplicatedService{
  2915. // Max replicas total (?)
  2916. Replicas: &replicatedJobs,
  2917. },
  2918. },
  2919. Networks: []swarm.NetworkAttachmentConfig{
  2920. swarm.NetworkAttachmentConfig{
  2921. Target: networkName,
  2922. },
  2923. },
  2924. EndpointSpec: &swarm.EndpointSpec{
  2925. Ports: []swarm.PortConfig{
  2926. swarm.PortConfig{
  2927. Protocol: swarm.PortConfigProtocolTCP,
  2928. PublishMode: swarm.PortConfigPublishModeIngress,
  2929. Name: "app-port",
  2930. PublishedPort: uint32(deployport),
  2931. TargetPort: uint32(deployport),
  2932. },
  2933. },
  2934. },
  2935. TaskTemplate: swarm.TaskSpec{
  2936. Resources: &swarm.ResourceRequirements{
  2937. Reservations: &swarm.Resources{},
  2938. },
  2939. LogDriver: &swarm.Driver{
  2940. Name: "json-file",
  2941. Options: map[string]string{
  2942. "max-size": "10m",
  2943. },
  2944. },
  2945. ContainerSpec: &swarm.ContainerSpec{
  2946. Image: image,
  2947. Env: []string{
  2948. fmt.Sprintf("SHUFFLE_APP_EXPOSED_PORT=%d", deployport),
  2949. fmt.Sprintf("SHUFFLE_SWARM_CONFIG=%s", os.Getenv("SHUFFLE_SWARM_CONFIG")),
  2950. fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", logsDisabled),
  2951. },
  2952. Hosts: []string{
  2953. containerName,
  2954. },
  2955. },
  2956. RestartPolicy: &swarm.RestartPolicy{
  2957. Condition: swarm.RestartPolicyConditionAny,
  2958. },
  2959. Placement: &swarm.Placement{
  2960. Constraints: []string{},
  2961. },
  2962. },
  2963. }
  2964. if len(os.Getenv("SHUFFLE_SWARM_OTHER_NETWORK")) > 0 {
  2965. serviceSpec.Networks = append(serviceSpec.Networks, swarm.NetworkAttachmentConfig{
  2966. Target: "shuffle_shuffle",
  2967. })
  2968. }
  2969. if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
  2970. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
  2971. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
  2972. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
  2973. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("no_proxy=%s", os.Getenv("no_proxy")))
  2974. }
  2975. overrideHttpProxy := os.Getenv("SHUFFLE_INTERNAL_HTTP_PROXY")
  2976. overrideHttpsProxy := os.Getenv("SHUFFLE_INTERNAL_HTTPS_PROXY")
  2977. if overrideHttpProxy != "" {
  2978. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTP_PROXY=%s", overrideHttpProxy))
  2979. }
  2980. if overrideHttpsProxy != "" {
  2981. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("SHUFFLE_INTERNAL_HTTPS_PROXY=%s", overrideHttpsProxy))
  2982. }
  2983. /*
  2984. Mounts: []mount.Mount{
  2985. mount.Mount{
  2986. Source: "/var/run/docker.sock",
  2987. Target: "/var/run/docker.sock",
  2988. Type: mount.TypeBind,
  2989. },
  2990. },
  2991. */
  2992. if dockerApiVersion != "" {
  2993. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("DOCKER_API_VERSION=%s", dockerApiVersion))
  2994. }
  2995. if len(os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) > 0 {
  2996. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
  2997. }
  2998. // Required for certain apps
  2999. if timezone == "" {
  3000. timezone = "Europe/Amsterdam"
  3001. }
  3002. serviceSpec.TaskTemplate.ContainerSpec.Env = append(serviceSpec.TaskTemplate.ContainerSpec.Env, fmt.Sprintf("TZ=%s", timezone))
  3003. serviceOptions := types.ServiceCreateOptions{}
  3004. service, err := dockercli.ServiceCreate(
  3005. context.Background(),
  3006. serviceSpec,
  3007. serviceOptions,
  3008. )
  3009. _ = service
  3010. if err != nil {
  3011. if strings.Contains(fmt.Sprintf("%s", err), "network") && strings.Contains(fmt.Sprintf("%s", err), "not found") {
  3012. log.Printf("[DEBUG] Network %s not found. Trying to initialize it.", networkName)
  3013. networkErr := initSwarmNetwork()
  3014. if networkErr != nil {
  3015. log.Printf("[ERROR] Failed initializing swarm network: %s", err)
  3016. //return err
  3017. }
  3018. // Retry deploying the service (once)
  3019. if !retry {
  3020. return deploySwarmService(dockercli, name, image, deployport, -1, true)
  3021. }
  3022. }
  3023. // For port mapping.
  3024. if strings.Contains(fmt.Sprintf("%s", err), "InvalidArgument") && strings.Contains(fmt.Sprintf("%s", err), "is already in use") {
  3025. //log.Printf("\n\n[WARNING] Port %d is already allocated. Trying to deploy on next port.\n\n", deployport)
  3026. // Random sleep 1-4 seconds
  3027. time.Sleep(time.Duration(rand.Intn(4)+1) * time.Second)
  3028. return deploySwarmService(dockercli, name, image, deployport+1, -1, retry)
  3029. }
  3030. log.Printf("[DEBUG] Failed deploying %s with image %s: %s", name, image, err)
  3031. return err
  3032. } else {
  3033. // wait for service to be ready
  3034. time.Sleep(time.Duration(rand.Intn(4)+1) * time.Second)
  3035. //log.Printf("[DEBUG] Servicecreate request: %#v %#v", service, err)
  3036. // patch service network
  3037. // this is an edgecase that we noticed on docker version 29
  3038. // and API version 1.44
  3039. // get networkID of swarmNetworkName
  3040. networkID := ""
  3041. ctx := context.Background()
  3042. // find network ID
  3043. networks, err := dockercli.NetworkList(ctx, network.ListOptions{})
  3044. if err == nil {
  3045. for _, net := range networks {
  3046. if net.Name == networkName {
  3047. if net.Scope == "swarm" {
  3048. log.Printf("[DEBUG] Found swarm-scoped network: %s (%s)", networkName, net.ID)
  3049. networkID = net.ID
  3050. } else {
  3051. log.Printf("[WARNING] Network %s exists but is not swarm scoped (scope=%s)", networkName, net.Scope)
  3052. }
  3053. break
  3054. }
  3055. }
  3056. }
  3057. if networkID == "" {
  3058. log.Printf("[ERROR] Network %s not found", networkName)
  3059. networkID = networkName
  3060. }
  3061. services, serr := dockercli.ServiceList(ctx, types.ServiceListOptions{})
  3062. if serr == nil {
  3063. for _, svc := range services {
  3064. if svc.ID == service.ID {
  3065. log.Printf("[DEBUG] Found service %s (%s) — patching network attach", service.ID, svc.ID)
  3066. spec := svc.Spec
  3067. spec.TaskTemplate.Networks = append(spec.TaskTemplate.Networks, swarm.NetworkAttachmentConfig{
  3068. Target: networkID,
  3069. })
  3070. _, uerr := dockercli.ServiceUpdate(ctx, svc.ID, svc.Version, spec, types.ServiceUpdateOptions{})
  3071. if uerr != nil {
  3072. log.Printf("[WARNING] Failed to patch service %s with network %s: %v", service.ID, networkID, uerr)
  3073. } else {
  3074. log.Printf("[INFO] Successfully attached network %s to service %s", networkID, service.ID)
  3075. }
  3076. break
  3077. }
  3078. }
  3079. } else {
  3080. log.Printf("[WARNING] Failed to list services for patching network attach: %v", serr)
  3081. }
  3082. }
  3083. log.Printf("[DEBUG] Successfully deployed service %s with image %s on port %d", name, image, deployport)
  3084. return nil
  3085. }
  3086. /*** ENDREMOVE ***/
  3087. func findAppInfo(image, name string, redeploy bool) (int, error) {
  3088. // Sleep between 0 and 1.5 second - ensures deployments have a higher
  3089. // chance of being successful
  3090. time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
  3091. highest := baseport
  3092. exposedPort := -1
  3093. // Exists as a "cache" layer
  3094. if portMappings != nil {
  3095. for key, value := range portMappings {
  3096. if value > highest {
  3097. highest = value
  3098. }
  3099. if key == name {
  3100. exposedPort = value
  3101. break
  3102. }
  3103. }
  3104. } else {
  3105. portMappings = make(map[string]int)
  3106. }
  3107. //Filters:
  3108. if exposedPort == -1 || redeploy {
  3109. // dockercli, err := dockerclient.NewEnvClient()
  3110. dockercli, _, err := shuffle.GetDockerClient()
  3111. if err != nil {
  3112. log.Printf("[ERROR] Unable to create docker client (2): %s", err)
  3113. return -1, err
  3114. }
  3115. serviceListOptions := types.ServiceListOptions{}
  3116. services, err := dockercli.ServiceList(
  3117. context.Background(),
  3118. serviceListOptions,
  3119. )
  3120. // Basic self-correction
  3121. if err != nil {
  3122. log.Printf("[ERROR] Unable to list services: %s (may continue anyway?)", err)
  3123. if strings.Contains(fmt.Sprintf("%s", err), "is too new") {
  3124. // Static for some reason
  3125. defaultVersion := "1.40"
  3126. dockerApiVersion = defaultVersion
  3127. os.Setenv("DOCKER_API_VERSION", defaultVersion)
  3128. log.Printf("[DEBUG] Setting Docker API to %s default and retrying listing requests", defaultVersion)
  3129. } else {
  3130. return -1, err
  3131. }
  3132. services, err = dockercli.ServiceList(
  3133. context.Background(),
  3134. serviceListOptions,
  3135. )
  3136. if err != nil {
  3137. log.Printf("[ERROR] Unable to list services (2): %s", err)
  3138. return -1, err
  3139. }
  3140. }
  3141. for _, service := range services {
  3142. //log.Printf("[INFO] Service: %#v. Ports: %#v", service.Spec.Annotations.Name, service.Spec.EndpointSpec)
  3143. for _, endpoint := range service.Spec.EndpointSpec.Ports {
  3144. if !strings.Contains(endpoint.Name, "port") {
  3145. continue
  3146. }
  3147. // This seems to have concurrency issues
  3148. portMappings[service.Spec.Annotations.Name] = int(endpoint.PublishedPort)
  3149. if int(endpoint.PublishedPort) > highest {
  3150. highest = int(endpoint.PublishedPort)
  3151. }
  3152. if service.Spec.Annotations.Name == name || service.Spec.Annotations.Name == strings.Replace(name, ".", "-", -1) {
  3153. exposedPort = int(endpoint.PublishedPort)
  3154. //break
  3155. }
  3156. }
  3157. if service.Spec.Annotations.Name != name && service.Spec.Annotations.Name != strings.Replace(name, ".", "-", -1) {
  3158. continue
  3159. }
  3160. if redeploy {
  3161. log.Printf("[INFO] Found to redeploy! Service: %s with image %s on port %d", name, image, exposedPort)
  3162. // Remove the service and redeploy it.
  3163. // There are cases where the service doesn't update properly
  3164. // Check when the last update happened. If it was within the last few minutes, skip
  3165. if int(time.Since(service.UpdatedAt).Seconds()) > 60 {
  3166. log.Printf("[INFO] Attempting redeploy of app %s with image %s since it is more than 10 minutes since last attempt with failure.", name, image)
  3167. err = dockercli.ServiceRemove(
  3168. context.Background(),
  3169. service.ID,
  3170. )
  3171. if err != nil {
  3172. log.Printf("[ERROR] Failed auto-removing service %s: %s", name, err)
  3173. } else {
  3174. log.Printf("[INFO] Auto-removed service %s successfully (rebuild due to redeploy).", name)
  3175. // Sleep between 8 and 12 seconds
  3176. time.Sleep(time.Duration(rand.Intn(4)+8) * time.Second)
  3177. replicas := service.Spec.Mode.Replicated.Replicas
  3178. err = deploySwarmService(
  3179. dockercli,
  3180. name,
  3181. image,
  3182. exposedPort,
  3183. int64(*replicas),
  3184. false,
  3185. )
  3186. if err != nil {
  3187. log.Printf("[ERROR] Failed re-deploying service %s: %s", name, err)
  3188. } else {
  3189. time.Sleep(10 * time.Second)
  3190. }
  3191. }
  3192. } else {
  3193. //log.Printf("[INFO] NOT redeploying service %s since it was updated less than 3 minutes ago.", name)
  3194. }
  3195. }
  3196. // Break if it's the correct port, as it's the right service
  3197. if exposedPort >= 0 {
  3198. break
  3199. }
  3200. }
  3201. }
  3202. //log.Printf("[DEBUG] Portmappings: %#v", portMappings)
  3203. if exposedPort >= 0 {
  3204. //log.Printf("[INFO] Found service %s on port %d - no need to deploy another", name, exposedPort)
  3205. } else {
  3206. // dockercli, err := dockerclient.NewEnvClient()
  3207. dockercli, _, err := shuffle.GetDockerClient()
  3208. if err != nil {
  3209. log.Printf("[ERROR] Unable to create docker client (2): %s", err)
  3210. return -1, err
  3211. }
  3212. // Increment by 1 for highest port
  3213. if highest <= baseport {
  3214. highest = baseport
  3215. }
  3216. highest += 1
  3217. err = deploySwarmService(dockercli, name, image, highest, -1, false)
  3218. if err != nil {
  3219. log.Printf("[WARNING] NOT Found service: %s. error: %s", name, err)
  3220. return highest, err
  3221. } else {
  3222. log.Printf("[DEBUG] Waiting 20 seconds before moving on to let app '%s' start properly. Service: %s (swarm)", name, image)
  3223. time.Sleep(time.Duration(20) * time.Second)
  3224. }
  3225. exposedPort = highest
  3226. //return exposedPort, errors.New("Deployed app %s")
  3227. }
  3228. return exposedPort, nil
  3229. }
  3230. // Runs data discovery
  3231. /*** STARTREMOVE ***/
  3232. func findAppInfoKubernetes(image, name string, env []string) error {
  3233. clientset, _, err := shuffle.GetKubernetesClient()
  3234. if err != nil {
  3235. log.Printf("[ERROR] Failed getting kubernetes: %s", err)
  3236. return err
  3237. }
  3238. // Check if it exists as a pod
  3239. namespace := "default"
  3240. if len(kubernetesNamespace) > 0 {
  3241. namespace = kubernetesNamespace
  3242. }
  3243. // check deployments
  3244. deployments, err := clientset.AppsV1().Deployments(namespace).List(context.Background(), metav1.ListOptions{})
  3245. if err != nil {
  3246. log.Printf("[ERROR] Failed listing deployments: %s", err)
  3247. return err
  3248. }
  3249. name = strings.Replace(name, "_", "-", -1)
  3250. // check if it exists as a pod
  3251. // for _, pod := range pods.Items {
  3252. // if pod.Name == name {
  3253. // log.Printf("[INFO] Found pod %s - no need to deploy another", name)
  3254. // return nil
  3255. // }
  3256. // }
  3257. for _, deployment := range deployments.Items {
  3258. if deployment.Name == name {
  3259. if debug {
  3260. log.Printf("[DEBUG] Found deployment %s - no need to deploy another", name)
  3261. }
  3262. return nil
  3263. }
  3264. }
  3265. err = deployk8sApp(image, name, env)
  3266. return err
  3267. }
  3268. // Backups in case networks are removed
  3269. func initSwarmNetwork() error {
  3270. ctx := context.Background()
  3271. // dockercli, err := dockerclient.NewEnvClient()
  3272. dockercli, _, err := shuffle.GetDockerClient()
  3273. if err != nil {
  3274. log.Printf("[ERROR] Unable to create docker client (2): %s", err)
  3275. return err
  3276. }
  3277. // Create the network options with the specified MTU
  3278. options := make(map[string]string)
  3279. mtu := 1500
  3280. options["com.docker.network.driver.mtu"] = fmt.Sprintf("%d", mtu)
  3281. ingressOptions := network.CreateOptions{
  3282. Driver: "overlay",
  3283. Attachable: false,
  3284. Ingress: true,
  3285. IPAM: &network.IPAM{
  3286. Driver: "default",
  3287. Config: []network.IPAMConfig{
  3288. network.IPAMConfig{
  3289. Subnet: "10.225.225.0/24",
  3290. Gateway: "10.225.225.1",
  3291. },
  3292. },
  3293. },
  3294. }
  3295. _, err = dockercli.NetworkCreate(
  3296. ctx,
  3297. "ingress",
  3298. ingressOptions,
  3299. )
  3300. if err != nil {
  3301. log.Printf("[WARNING] Ingress network may already exist: %s", err)
  3302. }
  3303. //docker network create --driver=overlay workers
  3304. // Specific subnet?
  3305. networkName := "shuffle_swarm_executions"
  3306. if len(swarmNetworkName) > 0 {
  3307. networkName = swarmNetworkName
  3308. }
  3309. networkCreateOptions := network.CreateOptions{
  3310. Driver: "overlay",
  3311. Options: options,
  3312. Attachable: true,
  3313. Ingress: false,
  3314. IPAM: &network.IPAM{
  3315. Driver: "default",
  3316. Config: []network.IPAMConfig{
  3317. network.IPAMConfig{
  3318. Subnet: "10.224.224.0/24",
  3319. Gateway: "10.224.224.1",
  3320. },
  3321. },
  3322. },
  3323. }
  3324. _, err = dockercli.NetworkCreate(
  3325. ctx,
  3326. networkName,
  3327. networkCreateOptions,
  3328. )
  3329. if err != nil {
  3330. log.Printf("[WARNING] Swarm Executions network may already exist: %s", err)
  3331. }
  3332. networkName = "shuffle-executions"
  3333. networkCreateOptions = network.CreateOptions{
  3334. Driver: "overlay",
  3335. Options: options,
  3336. Attachable: true,
  3337. Ingress: false,
  3338. IPAM: &network.IPAM{
  3339. Driver: "default",
  3340. Config: []network.IPAMConfig{
  3341. network.IPAMConfig{
  3342. Subnet: "10.223.223.0/24",
  3343. Gateway: "10.223.223.1",
  3344. },
  3345. },
  3346. },
  3347. }
  3348. _, err = dockercli.NetworkCreate(
  3349. ctx,
  3350. networkName,
  3351. networkCreateOptions,
  3352. )
  3353. if err != nil {
  3354. log.Printf("[WARNING] Swarm Executions network may already exist: %s", err)
  3355. }
  3356. return nil
  3357. }
  3358. /*** ENDREMOVE ***/
  3359. func sendAppRequest(ctx context.Context, incomingUrl, appName string, port int, action *shuffle.Action, workflowExecution *shuffle.WorkflowExecution, image string, attempts int64) error {
  3360. parsedRequest := shuffle.OrborusExecutionRequest{
  3361. Cleanup: cleanupEnv,
  3362. ExecutionId: workflowExecution.ExecutionId,
  3363. Authorization: workflowExecution.Authorization,
  3364. EnvironmentName: os.Getenv("ENVIRONMENT_NAME"),
  3365. Timezone: os.Getenv("TZ"),
  3366. HTTPProxy: os.Getenv("HTTP_PROXY"),
  3367. HTTPSProxy: os.Getenv("HTTPS_PROXY"),
  3368. ShufflePassProxyToApp: os.Getenv("SHUFFLE_PASS_APP_PROXY"),
  3369. Url: baseUrl,
  3370. BaseUrl: baseUrl,
  3371. Action: *action,
  3372. FullExecution: *workflowExecution,
  3373. }
  3374. // Sometimes makes it have the wrong data due to timing
  3375. // Specific for subflow to ensure worker matches the backend correctly
  3376. parsedBaseurl := incomingUrl
  3377. if strings.Count(baseUrl, ":") >= 2 {
  3378. baseUrlSplit := strings.Split(baseUrl, ":")
  3379. if len(baseUrlSplit) >= 3 {
  3380. parsedBaseurl = strings.Join(baseUrlSplit[0:2], ":")
  3381. //parsedRequest.BaseUrl = fmt.Sprintf("%s:33333", parsedBaseurl)
  3382. }
  3383. }
  3384. if len(parsedRequest.Url) == 0 {
  3385. // Fixed callback url to the worker itself
  3386. if strings.Count(parsedBaseurl, ":") >= 2 {
  3387. parsedRequest.Url = parsedBaseurl
  3388. } else {
  3389. // Callback to worker
  3390. parsedRequest.Url = fmt.Sprintf("%s:%d", parsedBaseurl, baseport)
  3391. //parsedRequest.Url
  3392. }
  3393. //log.Printf("[DEBUG][%s] Should add a baseurl for the app to get back to: %s", workflowExecution.ExecutionId, parsedRequest.Url)
  3394. }
  3395. // Swapping because this was confusing during dev
  3396. // No real reason, just variable names
  3397. tmp := parsedRequest.Url
  3398. parsedRequest.Url = parsedRequest.BaseUrl
  3399. parsedRequest.BaseUrl = tmp
  3400. // Run with proper hostname, but set to shuffle-worker to avoid specific host target.
  3401. // This means running with VIP instead.
  3402. if len(hostname) > 0 {
  3403. parsedRequest.BaseUrl = fmt.Sprintf("http://%s:%d", hostname, baseport)
  3404. //parsedRequest.BaseUrl = fmt.Sprintf("http://shuffle-workers:%d", baseport)
  3405. //log.Printf("[DEBUG][%s] Changing hostname to local hostname in Docker network for WORKER URL: %s", workflowExecution.ExecutionId, parsedRequest.BaseUrl)
  3406. if parsedRequest.Action.AppName == "shuffle-subflow" || parsedRequest.Action.AppName == "shuffle-subflow-v2" || parsedRequest.Action.AppName == "User Input" {
  3407. parsedRequest.BaseUrl = fmt.Sprintf("http://%s:%d", hostname, baseport)
  3408. //parsedRequest.Url = parsedRequest.BaseUrl
  3409. }
  3410. }
  3411. // Making sure to get the LATEST execution data
  3412. // This is due to cache timing issues
  3413. exec, err := shuffle.GetWorkflowExecution(ctx, workflowExecution.ExecutionId)
  3414. if err == nil && len(exec.ExecutionId) > 0 {
  3415. parsedRequest.FullExecution = *exec
  3416. }
  3417. data, err := json.Marshal(parsedRequest)
  3418. if err != nil {
  3419. log.Printf("[ERROR] Failed marshalling worker request: %s", err)
  3420. return err
  3421. }
  3422. if isKubernetes == "true" {
  3423. appName = strings.Replace(appName, "_", "-", -1)
  3424. }
  3425. // Shitty hardcoded fix for now
  3426. if strings.Contains(appName, "1.0.0") {
  3427. appName = strings.Replace(appName, "1.0.0", "1-0-0", 1)
  3428. } else if strings.Contains(appName, "1.1.0") {
  3429. appName = strings.Replace(appName, "1.1.0", "1-1-0", 1)
  3430. } else if strings.Contains(appName, "1.2.0") {
  3431. appName = strings.Replace(appName, "1.2.0", "1-2-0", 1)
  3432. } else if strings.Contains(appName, "1.4.0") {
  3433. appName = strings.Replace(appName, "1.4.0", "1-4-0", 1)
  3434. } else if strings.Contains(appName, "2.0.0") {
  3435. appName = strings.Replace(appName, "2.0.0", "2-0-0", 1)
  3436. }
  3437. streamUrl := fmt.Sprintf("http://%s:%d/api/v1/run", appName, port)
  3438. // log.Printf("[DEBUG][%s] Worker URL: %s, Backend URL: %s, Target App: %s", workflowExecution.ExecutionId, parsedRequest.BaseUrl, parsedRequest.Url, streamUrl)
  3439. req, err := http.NewRequest(
  3440. "POST",
  3441. streamUrl,
  3442. bytes.NewBuffer([]byte(data)),
  3443. )
  3444. // Checking as LATE as possible, ensuring we don't rerun what's already ran
  3445. // ctx = context.Background()
  3446. // Sleep between 0 and 250 ms for randomness so no same worker check at same time (same as cloud)
  3447. rand.Seed(time.Now().UnixNano())
  3448. randMs := rand.Intn(250)
  3449. time.Sleep(time.Duration(randMs) * time.Millisecond)
  3450. newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
  3451. _, err = shuffle.GetCache(ctx, newExecId)
  3452. if err == nil {
  3453. log.Printf("[DEBUG] Result for %s already found (PRE REQUEST) - returning", newExecId)
  3454. return nil
  3455. }
  3456. cacheData := []byte("1")
  3457. err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
  3458. if err != nil {
  3459. log.Printf("[WARNING] Failed setting cache for action %s: %s", newExecId, err)
  3460. } else {
  3461. //log.Printf("[DEBUG][%s] Adding %s to cache (%#v)", workflowExecution.ExecutionId, newExecId, action.Name)
  3462. }
  3463. client := shuffle.GetExternalClient(streamUrl)
  3464. customTimeout := os.Getenv("SHUFFLE_APP_REQUEST_TIMEOUT")
  3465. if len(customTimeout) > 0 {
  3466. // convert to int
  3467. timeoutInt, err := strconv.Atoi(customTimeout)
  3468. if err != nil {
  3469. log.Printf("[ERROR] Failed converting SHUFFLE_APP_REQUEST_TIMEOUT to int: %s", err)
  3470. } else {
  3471. log.Printf("[DEBUG] Setting client timeout to %d seconds for app request", timeoutInt)
  3472. client.Timeout = time.Duration(timeoutInt) * time.Second
  3473. }
  3474. }
  3475. // Content type required
  3476. req.Header.Set("Content-Type", "application/json")
  3477. newresp, err := client.Do(req)
  3478. if err != nil {
  3479. // Another timeout issue here somewhere
  3480. // context deadline
  3481. if strings.Contains(fmt.Sprintf("%s", err), "context deadline exceeded") || strings.Contains(fmt.Sprintf("%s", err), "Client.Timeout exceeded") {
  3482. return nil
  3483. }
  3484. if strings.Contains(fmt.Sprintf("%s", err), "timeout awaiting response") {
  3485. return nil
  3486. }
  3487. newerr := fmt.Sprintf("%s", err)
  3488. if strings.Contains(newerr, "connection refused") || strings.Contains(newerr, "no such host") {
  3489. newerr = fmt.Sprintf("Failed connecting to app %s. Is the Docker image available?", appName)
  3490. } else {
  3491. // escape quotes and newlines
  3492. newerr = strings.ReplaceAll(strings.ReplaceAll(newerr, "\"", "\\\""), "\n", "\\n")
  3493. }
  3494. if strings.Contains(fmt.Sprintf("%s", err), "no such host") {
  3495. log.Printf("[ERROR] Should be removing references to location for app '%s' as to be rediscovered. URL: %s. Error: %s", action.AppName, streamUrl, err)
  3496. //for k, v := range portMappings {
  3497. // if strings.Contains(strings.ToLower(strings.ReplaceAll(action.AppName, " ", "_"))) {
  3498. // }
  3499. //}
  3500. //var portMappings map[string]int
  3501. }
  3502. // Try redeployment
  3503. attempts += 1
  3504. if attempts < 2 {
  3505. // Check the service and fix it.
  3506. if isKubernetes == "true" {
  3507. log.Printf("[WARNING] App Redeployment in K8s isn't fully supported yet, but should be done for app %s with image %s.", appName, image)
  3508. } else {
  3509. _, err = findAppInfo(image, appName, true)
  3510. if err != nil {
  3511. log.Printf("[ERROR][%s] Error re-deploying app %s: %s", workflowExecution.ExecutionId, appName, err)
  3512. }
  3513. return sendAppRequest(ctx, incomingUrl, appName, port, action, workflowExecution, image, attempts)
  3514. }
  3515. }
  3516. log.Printf("[ERROR][%s] Error running app run request: %s", workflowExecution.ExecutionId, err)
  3517. actionResult := shuffle.ActionResult{
  3518. Action: *action,
  3519. ExecutionId: workflowExecution.ExecutionId,
  3520. Authorization: workflowExecution.Authorization,
  3521. Result: fmt.Sprintf(`{"success": false, "attempts": %d, "reason": "Failed to connect to app %s in swarm. Try the action again, restart Orborus if this is recurring, or contact support@shuffler.io.", "details": "%s"}`, attempts, streamUrl, newerr),
  3522. StartedAt: int64(time.Now().Unix()),
  3523. CompletedAt: int64(time.Now().Unix()),
  3524. Status: "FAILURE",
  3525. }
  3526. // If this happens - send failure signal to stop the workflow?
  3527. sendSelfRequest(actionResult)
  3528. return err
  3529. }
  3530. defer newresp.Body.Close()
  3531. body, err := ioutil.ReadAll(newresp.Body)
  3532. if err != nil {
  3533. log.Printf("[ERROR] Failed reading app request body body: %s", err)
  3534. return err
  3535. } else {
  3536. if debug {
  3537. log.Printf("[DEBUG][%s] NEWRESP (from app): %s", workflowExecution.ExecutionId, string(body))
  3538. }
  3539. }
  3540. return nil
  3541. }
  3542. // Function to auto-deploy certain apps if "run" is set
  3543. // Has some issues with loading when running multiple workers and such.
  3544. func baseDeploy() {
  3545. var cli *dockerclient.Client
  3546. //var err error
  3547. if isKubernetes != "true" {
  3548. // cli, err := dockerclient.NewEnvClient()
  3549. cli, _, err := shuffle.GetDockerClient()
  3550. if err != nil {
  3551. log.Printf("[ERROR] Unable to create docker client (3): %s", err)
  3552. return
  3553. }
  3554. defer cli.Close()
  3555. }
  3556. for key, value := range autoDeploy {
  3557. newNameSplit := strings.Split(key, ":")
  3558. action := shuffle.Action{
  3559. AppName: newNameSplit[0],
  3560. AppVersion: newNameSplit[1],
  3561. ID: "TBD",
  3562. }
  3563. workflowExecution := shuffle.WorkflowExecution{
  3564. ExecutionId: "TBD",
  3565. }
  3566. appname := action.AppName
  3567. appversion := action.AppVersion
  3568. appname = strings.Replace(appname, ".", "-", -1)
  3569. appversion = strings.Replace(appversion, ".", "-", -1)
  3570. env := []string{
  3571. fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId),
  3572. fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization),
  3573. fmt.Sprintf("CALLBACK_URL=%s", baseUrl),
  3574. fmt.Sprintf("BASE_URL=%s", appCallbackUrl),
  3575. fmt.Sprintf("TZ=%s", timezone),
  3576. fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", logsDisabled),
  3577. }
  3578. if key == "shuffle-tools-fork:1.0.0" {
  3579. env = append(env, fmt.Sprintf("SHUFFLE_ALLOW_PACKAGE_INSTALL=%s", "true"))
  3580. }
  3581. if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
  3582. //log.Printf("APPENDING PROXY TO THE APP!")
  3583. env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
  3584. env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
  3585. env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
  3586. env = append(env, fmt.Sprintf("no_proxy=%s", os.Getenv("no_proxy")))
  3587. }
  3588. if len(os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) > 0 {
  3589. log.Printf("[DEBUG] Setting SHUFFLE_APP_SDK_TIMEOUT to %s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT"))
  3590. env = append(env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
  3591. }
  3592. identifier := fmt.Sprintf("%s_%s", appname, appversion)
  3593. //identifier := fmt.Sprintf("%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId)
  3594. //if strings.Contains(identifier, " ") {
  3595. // identifier = strings.ReplaceAll(identifier, " ", "-")
  3596. //}
  3597. //deployApp(cli, value, identifier, env, workflowExecution, action)
  3598. log.Printf("[DEBUG] Deploying app with identifier %s to ensure basic apps are available from the get-go", identifier)
  3599. //findAppInfo("frikky/shuffle:http_1.4.0", "http_1-4-0", true)
  3600. // go findAppInfo(value, identifier, false)
  3601. // findAppInfo leads to the following error:
  3602. // Unable to list services: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running? (may continue anyway?)
  3603. // Replaced with deployApp again. See https://github.com/Shuffle/Shuffle/issues/1817.
  3604. go deployApp(cli, value, identifier, env, workflowExecution, action)
  3605. //err := deployApp(cli, value, identifier, env, workflowExecution, action)
  3606. //if err != nil {
  3607. // log.Printf("[DEBUG] Failed deploying app %s: %s", value, err)
  3608. //}
  3609. }
  3610. appsInitialized = true
  3611. }
  3612. func getStreamResultsWrapper(client *http.Client, req *http.Request, workflowExecution shuffle.WorkflowExecution, firstRequest bool, environments []string) ([]string, error) {
  3613. // Because of this, it always has updated data.
  3614. // Removed request requirement from app_sdk
  3615. newresp, err := topClient.Do(req)
  3616. if err != nil {
  3617. log.Printf("[ERROR] Failed request: %s", err)
  3618. time.Sleep(time.Duration(sleepTime) * time.Second)
  3619. return environments, err
  3620. }
  3621. defer newresp.Body.Close()
  3622. body, err := ioutil.ReadAll(newresp.Body)
  3623. if err != nil {
  3624. log.Printf("[ERROR] Failed reading body: %s", err)
  3625. time.Sleep(time.Duration(sleepTime) * time.Second)
  3626. return environments, err
  3627. }
  3628. if newresp.StatusCode != 200 {
  3629. log.Printf("[ERROR] StatusCode (1): %d - %s", newresp.StatusCode, string(body))
  3630. time.Sleep(time.Duration(sleepTime) * time.Second)
  3631. return environments, errors.New(fmt.Sprintf("Bad status code from backend: %d", newresp.StatusCode))
  3632. }
  3633. err = json.Unmarshal(body, &workflowExecution)
  3634. if err != nil {
  3635. log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
  3636. time.Sleep(time.Duration(sleepTime) * time.Second)
  3637. return environments, err
  3638. }
  3639. if firstRequest {
  3640. firstRequest = false
  3641. ctx := context.Background()
  3642. cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
  3643. execData, err := json.Marshal(workflowExecution)
  3644. if err != nil {
  3645. log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err)
  3646. } else {
  3647. err = shuffle.SetCache(ctx, cacheKey, execData, 30)
  3648. if err != nil {
  3649. log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err)
  3650. }
  3651. }
  3652. for _, action := range workflowExecution.Workflow.Actions {
  3653. found := false
  3654. for _, environment := range environments {
  3655. if action.Environment == environment {
  3656. found = true
  3657. break
  3658. }
  3659. }
  3660. if !found {
  3661. environments = append(environments, action.Environment)
  3662. }
  3663. }
  3664. // Checks if a subflow is child of the startnode, as sub-subflows aren't working properly yet
  3665. childNodes := shuffle.FindChildNodes(workflowExecution.Workflow, workflowExecution.Start, []string{}, []string{})
  3666. //log.Printf("[DEBUG] Looking for subflow in %#v to check execution pattern as child of %s", childNodes, workflowExecution.Start)
  3667. subflowFound := false
  3668. for _, childNode := range childNodes {
  3669. for _, trigger := range workflowExecution.Workflow.Triggers {
  3670. if trigger.ID != childNode {
  3671. continue
  3672. }
  3673. if trigger.AppName == "Shuffle Workflow" {
  3674. subflowFound = true
  3675. break
  3676. }
  3677. }
  3678. if subflowFound {
  3679. break
  3680. }
  3681. }
  3682. log.Printf("[DEBUG] Environments: %s. Source: %s. 1 env = webserver, 0 or >1 = default. Subflow exists: %#v", environments, workflowExecution.ExecutionSource, subflowFound)
  3683. if len(environments) == 1 && workflowExecution.ExecutionSource != "default" && !subflowFound {
  3684. log.Printf("[DEBUG] Running OPTIMIZED execution (not manual)")
  3685. os.Setenv("SHUFFLE_OPTIMIZED", "true")
  3686. listener := webserverSetup(workflowExecution)
  3687. err := executionInit(workflowExecution)
  3688. if err != nil {
  3689. log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err)
  3690. log.Printf("[DEBUG] Shutting down (30)")
  3691. shutdown(workflowExecution, "", "", true)
  3692. }
  3693. go func() {
  3694. time.Sleep(time.Duration(1))
  3695. handleExecutionResult(workflowExecution)
  3696. }()
  3697. log.Printf("[DEBUG] Running with port %#v", os.Getenv("WORKER_PORT"))
  3698. runWebserver(listener)
  3699. // Set environment variable
  3700. //log.Printf("Before wait")
  3701. //wg := sync.WaitGroup{}
  3702. //wg.Add(1)
  3703. //wg.Wait()
  3704. } else {
  3705. log.Printf("[DEBUG] Running NON-OPTIMIZED execution for type %s with %d environment(s). This only happens when ran manually OR when running with subflows. Status: %s", workflowExecution.ExecutionSource, len(environments), workflowExecution.Status)
  3706. err := executionInit(workflowExecution)
  3707. if err != nil {
  3708. log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err)
  3709. shutdown(workflowExecution, "", "", true)
  3710. }
  3711. // Trying to make worker into microservice~ :)
  3712. }
  3713. }
  3714. if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
  3715. log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId)
  3716. log.Printf("[DEBUG] Shutting down (31)")
  3717. shutdown(workflowExecution, "", "", true)
  3718. }
  3719. if workflowExecution.Status == "EXECUTING" || workflowExecution.Status == "RUNNING" {
  3720. //log.Printf("Status: %s", workflowExecution.Status)
  3721. err = handleDefaultExecution(client, req, workflowExecution)
  3722. if err != nil {
  3723. log.Printf("[DEBUG] Workflow %s is finished: %s", workflowExecution.ExecutionId, err)
  3724. log.Printf("[DEBUG] Shutting down (32)")
  3725. shutdown(workflowExecution, "", "", true)
  3726. }
  3727. } else {
  3728. log.Printf("[DEBUG] Workflow %s has status %s. Exiting worker (if WAITING, rerun will happen).", workflowExecution.ExecutionId, workflowExecution.Status)
  3729. log.Printf("[DEBUG] Shutting down (33)")
  3730. shutdown(workflowExecution, workflowExecution.Workflow.ID, "", true)
  3731. }
  3732. time.Sleep(time.Duration(sleepTime) * time.Second)
  3733. return environments, nil
  3734. }
  3735. func checkStandaloneRun() {
  3736. // Check if the required argc/argv is set
  3737. //log.Printf("ARGS: %#v", os.Args)
  3738. if len(os.Args) < 4 {
  3739. if debug {
  3740. log.Printf("[DEBUG] You can run the worker in standalone mode with: go run worker.go standalone <executionid> <authorization> <optional:url>")
  3741. }
  3742. return
  3743. }
  3744. if os.Args[1] != "standalone" {
  3745. log.Printf("[ERROR] First argument should be 'standalone' to run worker standalone")
  3746. return
  3747. }
  3748. if os.Args[2] == "" || len(os.Args[2]) != 36 {
  3749. log.Printf("[ERROR] Second argument should be the execution ID, with next being authorization")
  3750. return
  3751. }
  3752. if os.Args[3] == "" || len(os.Args[3]) < 10 {
  3753. log.Printf("[ERROR] Third argument should be the authorization key")
  3754. return
  3755. }
  3756. backendUrl := "https://shuffler.io"
  3757. if len(os.Args) > 4 {
  3758. backendUrl = os.Args[4]
  3759. }
  3760. if !strings.Contains(backendUrl, "http") {
  3761. log.Printf("[ERROR] Backend URL should start with http:// or https://")
  3762. return
  3763. }
  3764. // Format:
  3765. // go run worker.go standalone <executionid> <authorization> <optional:url>
  3766. executionId := os.Args[2]
  3767. authorization := os.Args[3]
  3768. os.Setenv("EXECUTIONID", executionId)
  3769. os.Setenv("AUTHORIZATION", authorization)
  3770. os.Setenv("BASE_URL", backendUrl)
  3771. os.Setenv("STANDALONE_EXECUTION", "true")
  3772. log.Printf("\n\n\n[DEBUG] Running worker in standalone mode with execution ID %s and authorization %s. Backend URL (default): %s\nThis means we will first RESET the execution results, then rerun it\n\n", executionId, authorization, backendUrl)
  3773. // 1. Reset the execution after getting it
  3774. data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
  3775. streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", backendUrl)
  3776. client := shuffle.GetExternalClient(streamResultUrl)
  3777. req, err := http.NewRequest(
  3778. "POST",
  3779. streamResultUrl,
  3780. bytes.NewBuffer([]byte(data)),
  3781. )
  3782. if err != nil {
  3783. log.Printf("[ERROR] Failed making request builder for backend: %s", err)
  3784. os.Exit(1)
  3785. }
  3786. // Read the data and unmarshal it
  3787. newresp, err := client.Do(req)
  3788. if err != nil {
  3789. log.Printf("[ERROR] Failed standalone request in setup: %s", err)
  3790. os.Exit(1)
  3791. }
  3792. defer newresp.Body.Close()
  3793. body, err := ioutil.ReadAll(newresp.Body)
  3794. if err != nil {
  3795. log.Printf("[ERROR] Failed reading body: %s", err)
  3796. os.Exit(1)
  3797. }
  3798. if newresp.StatusCode != 200 {
  3799. log.Printf("[ERROR] Failed resetting execution: %s. Body: %s", newresp.Status, string(body))
  3800. os.Exit(1)
  3801. }
  3802. // Map to shuffle.Workflowexecution struct
  3803. workflowExecution := shuffle.WorkflowExecution{}
  3804. err = json.Unmarshal(body, &workflowExecution)
  3805. if err != nil {
  3806. log.Printf("[ERROR] Failed unmarshalling body: %s", err)
  3807. os.Exit(1)
  3808. }
  3809. log.Printf("[DEBUG][%s] Got %d results with status %s. Running full reset IF status is not executing.", workflowExecution.ExecutionId, len(workflowExecution.Results), workflowExecution.Status)
  3810. // Just continue as per usual?
  3811. //if workflowExecution.Status == "EXECUTING" {
  3812. // return
  3813. //}
  3814. workflowExecution.Status = "EXECUTING"
  3815. newResults := []shuffle.ActionResult{}
  3816. for _, result := range workflowExecution.Results {
  3817. if result.Status == "SKIPPED" {
  3818. newResults = append(newResults, result)
  3819. continue
  3820. }
  3821. // This is to handle reruns of SINGLE actions
  3822. if result.Action.Category == "rerun" {
  3823. newResults = append(newResults, result)
  3824. continue
  3825. }
  3826. // Anything else here.
  3827. }
  3828. workflowExecution.Results = newResults
  3829. workflowExecution.Status = "EXECUTING"
  3830. workflowExecution.CompletedAt = 0
  3831. marshalledResult, err := json.Marshal(workflowExecution)
  3832. if err != nil {
  3833. log.Printf("[ERROR] Failed marshalling body: %s", err)
  3834. os.Exit(1)
  3835. }
  3836. // Send a /api/v1/streams result back
  3837. // 1. Reset the execution after getting it
  3838. streamUrl := fmt.Sprintf("%s/api/v1/streams?reset=true", backendUrl)
  3839. req, err = http.NewRequest(
  3840. "POST",
  3841. streamUrl,
  3842. bytes.NewBuffer([]byte(marshalledResult)),
  3843. )
  3844. if err != nil {
  3845. log.Printf("[ERROR] Failed making request builder (2) for backend: %s", err)
  3846. os.Exit(1)
  3847. }
  3848. // Read the data and unmarshal it
  3849. newresp, err = client.Do(req)
  3850. if err != nil {
  3851. log.Printf("[ERROR] Failed standalone request in setup: %s", err)
  3852. os.Exit(1)
  3853. }
  3854. defer newresp.Body.Close()
  3855. body, err = ioutil.ReadAll(newresp.Body)
  3856. if err != nil {
  3857. log.Printf("[ERROR] Failed reading body (2): %s", err)
  3858. os.Exit(1)
  3859. }
  3860. if newresp.StatusCode != 200 {
  3861. log.Printf("[ERROR] Failed resetting execution (2): %s. Body: %s", newresp.Status, string(body))
  3862. os.Exit(1)
  3863. }
  3864. log.Printf("\n\n\n[DEBUG] Finished resetting execution %s. Body: %s. Starting execution.\n\n\n", newresp.Status, string(body))
  3865. }
  3866. // Initial loop etc
  3867. func main() {
  3868. // Testing swarm auto-replacements. This also tests ports
  3869. // in rapid succession
  3870. checkStandaloneRun()
  3871. if os.Getenv("DEBUG") == "true" {
  3872. debug = true
  3873. log.Printf("[INFO] Disabled cleanup due to debug mode (DEBUG=true)")
  3874. cleanupEnv = "false"
  3875. }
  3876. /*** STARTREMOVE ***/
  3877. if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
  3878. logsDisabled = "true"
  3879. os.Setenv("SHUFFLE_LOGS_DISABLED", "true")
  3880. }
  3881. /*** ENDREMOVE ***/
  3882. // Elasticsearch necessary to ensure we'ren ot running with Datastore configurations for minimal/maximal data sizes
  3883. // Recursive import kind of :)
  3884. _, err := shuffle.RunInit(*shuffle.GetDatastore(), *shuffle.GetStorage(), "", "worker", true, "elasticsearch", false, 0)
  3885. if err != nil {
  3886. if !strings.Contains(fmt.Sprintf("%s", err), "no such host") {
  3887. log.Printf("[ERROR] Failed to run worker init: %s", err)
  3888. }
  3889. } else {
  3890. if isKubernetes != "true" {
  3891. log.Printf("[DEBUG] Ran init for worker to set up cache system. Docker version: %s", dockerApiVersion)
  3892. } else {
  3893. log.Printf("[DEBUG] Ran init for worker to set up cache system on Kubernetes")
  3894. }
  3895. }
  3896. //log.Printf("[INFO] Setting up worker environment")
  3897. sleepTime = 5
  3898. client := shuffle.GetExternalClient(baseUrl)
  3899. if timezone == "" {
  3900. timezone = "Europe/Amsterdam"
  3901. }
  3902. if baseimagename == "" {
  3903. log.Printf("[DEBUG] Setting baseimagename to frikky/shuffle as it's empty (docker.io)")
  3904. baseimagename = "frikky/shuffle" // Dockerhub
  3905. //baseimagename = "shuffle" // Github (ghcr.io)
  3906. }
  3907. topClient = client
  3908. swarmConfig := os.Getenv("SHUFFLE_SWARM_CONFIG")
  3909. log.Printf("[INFO] Running with timezone %s and swarm config %#v", timezone, swarmConfig)
  3910. /*** STARTREMOVE ***/
  3911. if swarmConfig == "run" || swarmConfig == "swarm" {
  3912. // Forcing download just in case on the first iteration.
  3913. log.Printf("[INFO] Running in swarm mode - forcing download of apps")
  3914. workflowExecution := shuffle.WorkflowExecution{}
  3915. go baseDeploy()
  3916. listener := webserverSetup(workflowExecution)
  3917. runWebserver(listener)
  3918. // Should never get down here
  3919. log.Printf("[ERROR] Stopped listener %#v - exiting.", listener)
  3920. os.Exit(3)
  3921. }
  3922. /*** ENDREMOVE ***/
  3923. authorization := ""
  3924. executionId := ""
  3925. // INFO: Allows you to run a test execution
  3926. testing := os.Getenv("WORKER_TESTING_WORKFLOW")
  3927. shuffle_apikey := os.Getenv("WORKER_TESTING_APIKEY")
  3928. if len(testing) > 0 && len(shuffle_apikey) > 0 {
  3929. // Execute a workflow and use that info
  3930. log.Printf("[WARNING] Running test environment for worker by executing workflow %s. PS: This may NOT reach the worker in real time, but rather be deployed as a docker container (bad). Instead use AUTHORIZATION and EXECUTIONID for direct testing", testing)
  3931. authorization, executionId = runTestExecution(client, testing, shuffle_apikey)
  3932. } else {
  3933. authorization = os.Getenv("AUTHORIZATION")
  3934. executionId = os.Getenv("EXECUTIONID")
  3935. log.Printf("[INFO] Running normal execution with auth %s and ID %s", authorization, executionId)
  3936. }
  3937. workflowExecution := shuffle.WorkflowExecution{
  3938. ExecutionId: executionId,
  3939. }
  3940. if len(authorization) == 0 {
  3941. log.Printf("[INFO] No AUTHORIZATION key set in env")
  3942. log.Printf("[DEBUG] Shutting down (27)")
  3943. shutdown(workflowExecution, "", "", false)
  3944. }
  3945. if len(executionId) == 0 {
  3946. log.Printf("[INFO] No EXECUTIONID key set in env")
  3947. log.Printf("[DEBUG] Shutting down (28)")
  3948. shutdown(workflowExecution, "", "", false)
  3949. }
  3950. data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
  3951. streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
  3952. req, err := http.NewRequest(
  3953. "POST",
  3954. streamResultUrl,
  3955. bytes.NewBuffer([]byte(data)),
  3956. )
  3957. if err != nil {
  3958. log.Printf("[ERROR] Failed making request builder for backend")
  3959. log.Printf("[DEBUG] Shutting down (29)")
  3960. shutdown(workflowExecution, "", "", true)
  3961. }
  3962. firstRequest := true
  3963. environments := []string{}
  3964. for {
  3965. environments, err = getStreamResultsWrapper(client, req, workflowExecution, firstRequest, environments)
  3966. if err != nil {
  3967. log.Printf("[ERROR] Failed getting stream results: %s", err)
  3968. }
  3969. }
  3970. }
  3971. func checkUnfinished(resp http.ResponseWriter, request *http.Request, execRequest shuffle.OrborusExecutionRequest) {
  3972. // Meant as a function that periodically checks whether previous executions have finished or not.
  3973. // Should probably be based on executedIds and finishedIds
  3974. // Schedule a check in the future instead?
  3975. ctx := context.Background()
  3976. exec, err := shuffle.GetWorkflowExecution(ctx, execRequest.ExecutionId)
  3977. log.Printf("[DEBUG][%s] Rechecking execution and it's status to send to backend IF the status is EXECUTING (%s - %d/%d finished)", execRequest.ExecutionId, exec.Status, len(exec.Results), len(exec.Workflow.Actions))
  3978. // FIXMe: Does this create issue with infinite loops?
  3979. // Usually caused by issue during startup
  3980. if exec.Status == "" {
  3981. //handleRunExecution(resp, request)
  3982. return
  3983. }
  3984. if exec.Status != "EXECUTING" {
  3985. return
  3986. }
  3987. log.Printf("[DEBUG][%s] Should send full result for execution to backend as it has %d results. Status: %s", execRequest.ExecutionId, len(exec.Results), exec.Status)
  3988. data, err := json.Marshal(exec)
  3989. if err != nil {
  3990. return
  3991. }
  3992. sendResult(*exec, data)
  3993. }
  3994. func handleRunExecution(resp http.ResponseWriter, request *http.Request) {
  3995. defer request.Body.Close()
  3996. body, err := ioutil.ReadAll(request.Body)
  3997. if err != nil {
  3998. log.Printf("[WARNING] Failed reading body for stream result queue")
  3999. resp.WriteHeader(400)
  4000. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4001. return
  4002. }
  4003. //log.Printf("[DEBUG] In run execution with body length %d", len(body))
  4004. var execRequest shuffle.OrborusExecutionRequest
  4005. err = json.Unmarshal(body, &execRequest)
  4006. if err != nil {
  4007. log.Printf("[WARNING] Failed shuffle.WorkflowExecution unmarshaling: %s", err)
  4008. resp.WriteHeader(400)
  4009. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4010. return
  4011. }
  4012. // Checks if a workflow is done 30 seconds later, and sends info to backend no matter what
  4013. go func() {
  4014. time.Sleep(time.Duration(30) * time.Second)
  4015. checkUnfinished(resp, request, execRequest)
  4016. }()
  4017. window.AddEvent(time.Now())
  4018. ctx := context.Background()
  4019. // FIXME: This should be PER EXECUTION
  4020. //if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
  4021. // Is it ok if these are standard? Should they be update-able after launch? Hmm
  4022. if len(execRequest.HTTPProxy) > 0 {
  4023. log.Printf("[DEBUG] Sending proxy info to child process")
  4024. os.Setenv("SHUFFLE_PASS_APP_PROXY", execRequest.ShufflePassProxyToApp)
  4025. }
  4026. if len(execRequest.HTTPProxy) > 0 {
  4027. log.Printf("[DEBUG] Running with default HTTP proxy %s", execRequest.HTTPProxy)
  4028. os.Setenv("HTTP_PROXY", execRequest.HTTPProxy)
  4029. }
  4030. if len(execRequest.HTTPSProxy) > 0 {
  4031. log.Printf("[DEBUG] Running with default HTTPS proxy %s", execRequest.HTTPSProxy)
  4032. os.Setenv("HTTPS_PROXY", execRequest.HTTPSProxy)
  4033. }
  4034. if len(execRequest.EnvironmentName) > 0 {
  4035. os.Setenv("ENVIRONMENT_NAME", execRequest.EnvironmentName)
  4036. environment = execRequest.EnvironmentName
  4037. }
  4038. if len(execRequest.Timezone) > 0 {
  4039. os.Setenv("TZ", execRequest.Timezone)
  4040. timezone = execRequest.Timezone
  4041. }
  4042. if len(execRequest.Cleanup) > 0 {
  4043. os.Setenv("CLEANUP", execRequest.Cleanup)
  4044. cleanupEnv = execRequest.Cleanup
  4045. }
  4046. if len(execRequest.BaseUrl) > 0 {
  4047. os.Setenv("BASE_URL", execRequest.BaseUrl)
  4048. baseUrl = execRequest.BaseUrl
  4049. }
  4050. // Setting to just have an auth available.
  4051. if len(execRequest.Authorization) > 0 && len(os.Getenv("AUTHORIZATION")) == 0 {
  4052. //log.Printf("[DEBUG] Sending proxy info to child process")
  4053. os.Setenv("AUTHORIZATION", execRequest.Authorization)
  4054. }
  4055. var workflowExecution shuffle.WorkflowExecution
  4056. streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
  4057. req, err := http.NewRequest(
  4058. "POST",
  4059. streamResultUrl,
  4060. bytes.NewBuffer([]byte(fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, execRequest.ExecutionId, execRequest.Authorization))),
  4061. )
  4062. if err != nil {
  4063. log.Printf("[ERROR][%s] Failed to create a new request", execRequest.ExecutionId)
  4064. resp.WriteHeader(500)
  4065. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4066. return
  4067. }
  4068. client := shuffle.GetExternalClient(streamResultUrl)
  4069. newresp, err := client.Do(req)
  4070. if err != nil {
  4071. log.Printf("[ERROR] Failed making request (2): %s", err)
  4072. resp.WriteHeader(500)
  4073. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4074. return
  4075. }
  4076. defer newresp.Body.Close()
  4077. body, err = ioutil.ReadAll(newresp.Body)
  4078. if err != nil {
  4079. log.Printf("[ERROR][%s] Failed reading body (2): %s", execRequest.ExecutionId, err)
  4080. resp.WriteHeader(500)
  4081. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4082. return
  4083. }
  4084. if newresp.StatusCode != 200 {
  4085. log.Printf("[ERROR][%s] Bad statuscode: %d, %s", execRequest.ExecutionId, newresp.StatusCode, string(body))
  4086. if strings.Contains(string(body), "Workflowexecution is already finished") {
  4087. log.Printf("[DEBUG] Shutting down (19)")
  4088. //shutdown(workflowExecution, "", "", true)
  4089. }
  4090. resp.WriteHeader(500)
  4091. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad statuscode: %d"}`, newresp.StatusCode)))
  4092. return
  4093. }
  4094. err = json.Unmarshal(body, &workflowExecution)
  4095. if err != nil {
  4096. log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
  4097. resp.WriteHeader(500)
  4098. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4099. return
  4100. }
  4101. //err = shuffle.SetWorkflowExecution(ctx, workflowExecution, true)
  4102. err = setWorkflowExecution(ctx, workflowExecution, true)
  4103. if err != nil {
  4104. log.Printf("[ERROR] Failed initializing execution saving for %s: %s", workflowExecution.ExecutionId, err)
  4105. }
  4106. if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
  4107. log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId)
  4108. log.Printf("[DEBUG] Shutting down (20)")
  4109. resp.WriteHeader(200)
  4110. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status for execution - already %s. Returning with 200 OK"}`, workflowExecution.Status)))
  4111. return
  4112. }
  4113. //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
  4114. extra := 0
  4115. for _, trigger := range workflowExecution.Workflow.Triggers {
  4116. //log.Printf("Appname trigger (0): %s", trigger.AppName)
  4117. if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
  4118. extra += 1
  4119. }
  4120. }
  4121. log.Printf("[INFO][%s] (1) Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra)
  4122. if workflowExecution.Status != "EXECUTING" {
  4123. log.Printf("[WARNING] Exiting as worker execution has status %s!", workflowExecution.Status)
  4124. log.Printf("[DEBUG] Shutting down (38)")
  4125. resp.WriteHeader(400)
  4126. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status %s for the workflow execution %s"}`, workflowExecution.Status, workflowExecution.ExecutionId)))
  4127. return
  4128. }
  4129. //log.Printf("[DEBUG] Starting execution :O")
  4130. cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
  4131. execData, err := json.Marshal(workflowExecution)
  4132. if err != nil {
  4133. log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err)
  4134. } else {
  4135. err = shuffle.SetCache(ctx, cacheKey, execData, 31)
  4136. if err != nil {
  4137. log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err)
  4138. }
  4139. }
  4140. err = executionInit(workflowExecution)
  4141. if err != nil {
  4142. log.Printf("[DEBUG][%s] Shutting down (30) - Workflow setup failed: %s", workflowExecution.ExecutionId, err)
  4143. resp.WriteHeader(500)
  4144. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Error in execution init: %s"}`, err)))
  4145. return
  4146. //shutdown(workflowExecution, "", "", true)
  4147. }
  4148. handleExecutionResult(workflowExecution)
  4149. resp.WriteHeader(200)
  4150. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  4151. }
  4152. func handleDownloadImage(resp http.ResponseWriter, request *http.Request) {
  4153. // Read the request body
  4154. defer request.Body.Close()
  4155. bodyBytes, err := ioutil.ReadAll(request.Body)
  4156. if err != nil {
  4157. log.Printf("[ERROR] Failed reading body for stream result queue. Error: %s", err)
  4158. resp.WriteHeader(401)
  4159. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4160. return
  4161. }
  4162. // get images from request
  4163. imageBody := &ImageDownloadBody{}
  4164. err = json.Unmarshal(bodyBytes, imageBody)
  4165. if err != nil {
  4166. log.Printf("[ERROR] Error in unmarshalling body: %s", err)
  4167. resp.WriteHeader(401)
  4168. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4169. return
  4170. }
  4171. // client, err := dockerclient.NewEnvClient()
  4172. client, _, err := shuffle.GetDockerClient()
  4173. if err != nil {
  4174. log.Printf("[ERROR] Unable to create docker client (4): %s", err)
  4175. resp.WriteHeader(401)
  4176. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4177. return
  4178. }
  4179. defer client.Close()
  4180. // check if images are already downloaded
  4181. // Retrieve a list of Docker images
  4182. listOptions := dockerimage.ListOptions{}
  4183. images, err := client.ImageList(context.Background(), listOptions)
  4184. if err != nil {
  4185. log.Printf("[ERROR] listing images: %s", err)
  4186. resp.WriteHeader(401)
  4187. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  4188. return
  4189. }
  4190. for _, img := range images {
  4191. for _, tag := range img.RepoTags {
  4192. splitTag := strings.Split(tag, ":")
  4193. baseTag := tag
  4194. if len(splitTag) > 1 {
  4195. baseTag = splitTag[1]
  4196. }
  4197. var possibleNames []string
  4198. possibleNames = append(possibleNames, fmt.Sprintf("frikky/shuffle:%s", baseTag))
  4199. possibleNames = append(possibleNames, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", baseTag))
  4200. if arrayContains(possibleNames, imageBody.Image) {
  4201. log.Printf("[DEBUG] Image %s already downloaded that has been requested to download", imageBody.Image)
  4202. resp.WriteHeader(200)
  4203. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "image already present"}`)))
  4204. return
  4205. }
  4206. }
  4207. }
  4208. log.Printf("[INFO] Downloading image %s", imageBody.Image)
  4209. err = shuffle.DownloadDockerImageBackend(&http.Client{Timeout: imagedownloadTimeout}, imageBody.Image)
  4210. if err == nil {
  4211. downloadedImages = append(downloadedImages, imageBody.Image)
  4212. }
  4213. // return success
  4214. resp.WriteHeader(200)
  4215. resp.Write([]byte(fmt.Sprintf(`{"success": true, "status": "starting download"}`)))
  4216. }
  4217. func runWebserver(listener net.Listener) {
  4218. r := mux.NewRouter()
  4219. r.HandleFunc("/api/v1/streams", handleWorkflowQueue).Methods("POST", "OPTIONS")
  4220. r.HandleFunc("/api/v1/streams/results", handleGetStreamResults).Methods("POST", "OPTIONS")
  4221. r.HandleFunc("/api/v1/download", handleDownloadImage).Methods("POST", "OPTIONS")
  4222. // Synonyms. Require an execution ID + auth + shuffle backend
  4223. r.HandleFunc("/api/v1/execute", handleRunExecution).Methods("POST", "OPTIONS")
  4224. r.HandleFunc("/api/v1/run", handleRunExecution).Methods("POST", "OPTIONS")
  4225. // What would be require to run a workflow otherwise?
  4226. // Maybe directly /workflow/run
  4227. /*** STARTREMOVE ***/
  4228. if os.Getenv("SHUFFLE_SWARM_CONFIG") == "run" || os.Getenv("SHUFFLE_SWARM_CONFIG") == "swarm" {
  4229. log.Printf("[DEBUG] Running webserver config for SWARM and K8s")
  4230. }
  4231. /*** ENDREMOVE ***/
  4232. // var dockercli *dockerclient.Client
  4233. // ctx := context.Background()
  4234. scaleReplicas := os.Getenv("SHUFFLE_APP_REPLICAS")
  4235. if len(scaleReplicas) > 0 {
  4236. tmpInt, err := strconv.Atoi(scaleReplicas)
  4237. if err != nil {
  4238. log.Printf("[ERROR] %s is not a valid number for replication", scaleReplicas)
  4239. } else {
  4240. maxReplicas = uint64(tmpInt)
  4241. _ = tmpInt
  4242. }
  4243. log.Printf("[DEBUG] SHUFFLE_APP_REPLICAS set to value %#v. Trying to overwrite default (%d/node)", scaleReplicas, maxReplicas)
  4244. }
  4245. maxExecutionsPerMinute := 10
  4246. if os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE") != "" {
  4247. tmpInt, err := strconv.Atoi(os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE"))
  4248. if err != nil {
  4249. log.Printf("[ERROR] %s is not a valid number for executions per minute", os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE"))
  4250. } else {
  4251. maxExecutionsPerMinute = tmpInt
  4252. }
  4253. log.Printf("[DEBUG] SHUFFLE_APP_EXECUTIONS_PER_MINUTE set to value %s. Trying to overwrite default (%d)", os.Getenv("SHUFFLE_APP_EXECUTIONS_PER_MINUTE"), maxExecutionsPerMinute)
  4254. }
  4255. if strings.ToLower(os.Getenv("SHUFFLE_SWARM_CONFIG")) == "run" || strings.ToLower(os.Getenv("SHUFFLE_APP_REPLICAS")) == "" {
  4256. // go AutoScaleApps(ctx, dockercli, maxExecutionsPerMinute)
  4257. }
  4258. if strings.ToLower(os.Getenv("SHUFFLE_DEBUG_MEMORY")) == "true" || strings.ToLower(os.Getenv("DEBUG_MEMORY")) == "true" {
  4259. r.HandleFunc("/debug/pprof/", pprof.Index)
  4260. r.HandleFunc("/debug/pprof/heap", pprof.Handler("heap").ServeHTTP)
  4261. r.HandleFunc("/debug/pprof/profile", pprof.Profile)
  4262. r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  4263. r.HandleFunc("/debug/pprof/trace", pprof.Trace)
  4264. }
  4265. //log.Fatal(http.ListenAndServe(port, nil))
  4266. //srv := http.Server{
  4267. // Addr: ":8888",
  4268. // WriteTimeout: 1 * time.Second,
  4269. // Handler: http.HandlerFunc(slowHandler),
  4270. //}
  4271. //log.Fatal(http.Serve(listener, nil))
  4272. log.Printf("[DEBUG] NEW webserver setup. Port: %s", listener.Addr().String())
  4273. http.Handle("/", r)
  4274. srv := http.Server{
  4275. Handler: r,
  4276. ReadTimeout: 60 * time.Second,
  4277. ReadHeaderTimeout: 60 * time.Second,
  4278. IdleTimeout: 60 * time.Second,
  4279. WriteTimeout: 60 * time.Second,
  4280. }
  4281. err := srv.Serve(listener)
  4282. if err != nil {
  4283. log.Printf("[ERROR] Serve issue in worker: %#v", err)
  4284. }
  4285. }
  4286. // 0x0elliot:
  4287. // IF we had to rewrite this, we will focus on ONLY auto scale for apps.
  4288. // i recommend we target executions/minute (?) as a metric.
  4289. // edge-case: subflows are helped with when worker replicas are higher.
  4290. // i kind of never want to scale down. at least, not now.
  4291. // also, algorithm is very broken. executions/worker
  4292. func AutoScaleApps(ctx context.Context, client *dockerclient.Client, maxExecutionsPerMinute int) {
  4293. ticker := time.NewTicker(1 * time.Second)
  4294. defer ticker.Stop()
  4295. for {
  4296. select {
  4297. case <-ctx.Done():
  4298. return
  4299. case <-ticker.C:
  4300. count := window.CountEvents(time.Now())
  4301. j := numberOfApps(ctx, client)
  4302. workers := numberOfWorkers(ctx, client)
  4303. execPerMin := maxExecutionsPerMinute / workers
  4304. if count >= execPerMin {
  4305. log.Printf("[DEBUG] Too many executions per minute (%d). Scaling down to %d", count, execPerMin)
  4306. scaleApps(ctx, client, uint64(j+1))
  4307. }
  4308. }
  4309. }
  4310. }
  4311. func scaleApps(ctx context.Context, client *dockerclient.Client, replicas uint64) error {
  4312. // client, err := dockerclient.NewEnvClient()
  4313. client, _, err := shuffle.GetDockerClient()
  4314. if err != nil {
  4315. log.Printf("[ERROR] Unable to create docker client (scaleApps): %s", err)
  4316. return err
  4317. }
  4318. services, err := client.ServiceList(ctx, types.ServiceListOptions{})
  4319. if err != nil {
  4320. log.Printf("[ERROR] Failed to find services in the swarm: %s", err)
  4321. }
  4322. networkId, err := getNetworkId(ctx, client)
  4323. if err != nil {
  4324. log.Printf("[ERROR] Failed to get network Id in the swarm service: %s", err)
  4325. }
  4326. workers := numberOfWorkers(ctx, client)
  4327. if replicas > uint64(workers) {
  4328. return nil
  4329. }
  4330. for _, service := range services {
  4331. if service.Spec.Name == "shuffle-workers" {
  4332. continue
  4333. }
  4334. inNetwork := false
  4335. for _, vip := range service.Endpoint.VirtualIPs {
  4336. if vip.NetworkID == networkId {
  4337. inNetwork = true
  4338. break
  4339. }
  4340. }
  4341. if !inNetwork {
  4342. continue // skip services not in the target network
  4343. }
  4344. if service.Spec.Mode.Replicated == nil {
  4345. return errors.New("Service is not replicated")
  4346. }
  4347. if *service.Spec.Mode.Replicated.Replicas >= replicas {
  4348. continue
  4349. }
  4350. service.Spec.Mode.Replicated.Replicas = &replicas
  4351. _, err = client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, types.ServiceUpdateOptions{})
  4352. if err != nil {
  4353. return err
  4354. }
  4355. }
  4356. log.Printf("[DEBUG] Scaled all services to %d replicas", replicas)
  4357. return nil
  4358. }
  4359. func getNetworkId(ctx context.Context, dockercli *dockerclient.Client) (string, error) {
  4360. networkFilter := filters.NewArgs()
  4361. networkFilter.Add("name", swarmNetworkName)
  4362. listOptions := network.ListOptions{
  4363. Filters: networkFilter,
  4364. }
  4365. networks, err := dockercli.NetworkList(ctx, listOptions)
  4366. if err != nil || len(networks) == 0 {
  4367. return "", err
  4368. }
  4369. networkId := networks[0].ID
  4370. return networkId, nil
  4371. }
  4372. func numberOfApps(ctx context.Context, dockercli *dockerclient.Client) int {
  4373. // swarmNetworkName
  4374. var err error
  4375. if swarmNetworkName == "" {
  4376. swarmNetworkName = "shuffle_swarm_executions"
  4377. }
  4378. if dockercli == nil {
  4379. // dockercli, err = dockerclient.NewEnvClient()
  4380. dockercli, _, err = shuffle.GetDockerClient()
  4381. if err != nil {
  4382. log.Printf("[ERROR] Unable to create docker client (5): %s", err)
  4383. return 0
  4384. }
  4385. }
  4386. networkFilter := filters.NewArgs()
  4387. networkFilter.Add("name", swarmNetworkName)
  4388. listOptions := network.ListOptions{
  4389. Filters: networkFilter,
  4390. }
  4391. networks, err := dockercli.NetworkList(ctx, listOptions)
  4392. if err != nil || len(networks) == 0 {
  4393. return 0
  4394. }
  4395. networkId, err := getNetworkId(ctx, dockercli)
  4396. if err != nil {
  4397. log.Printf("[WARNING] Failed to get networkID is worker running in swarm: %s", err)
  4398. return 0
  4399. }
  4400. services, err := dockercli.ServiceList(ctx, types.ServiceListOptions{})
  4401. if err != nil {
  4402. log.Printf("[WARNING] Can't found any services. %s", err)
  4403. return 0
  4404. }
  4405. runningReplicas := 0
  4406. for _, service := range services {
  4407. if service.Spec.Name == "shuffle-workers" {
  4408. continue
  4409. }
  4410. inNetwork := false
  4411. for _, vip := range service.Endpoint.VirtualIPs {
  4412. if vip.NetworkID == networkId {
  4413. inNetwork = true
  4414. break
  4415. }
  4416. }
  4417. if !inNetwork {
  4418. continue // skip services not in the target network
  4419. }
  4420. filterArgs := filters.NewArgs()
  4421. filterArgs.Add("service", service.Spec.Name)
  4422. filterArgs.Add("desired-state", "running")
  4423. task, err := dockercli.TaskList(ctx, types.TaskListOptions{
  4424. Filters: filterArgs,
  4425. })
  4426. if err != nil {
  4427. log.Printf("[WARNING] Failed to get the list of running services %s: %s", service.Spec.Name, err)
  4428. continue
  4429. }
  4430. runningReplicas = len(task)
  4431. break
  4432. }
  4433. return runningReplicas
  4434. }
  4435. func IsServiceRunning(ctx context.Context, cli *dockerclient.Client) bool {
  4436. serviceName := "shuffle-tools_1-2-0"
  4437. filterArgs := filters.NewArgs()
  4438. filterArgs.Add("name", serviceName)
  4439. services, err := cli.ServiceList(ctx, types.ServiceListOptions{Filters: filterArgs})
  4440. if err != nil {
  4441. log.Printf("[ERROR] Couldn't find %s service running got error: %s", serviceName, err)
  4442. return false
  4443. }
  4444. if len(services) > 0 {
  4445. return true
  4446. }
  4447. return false
  4448. }
  4449. func numberOfWorkers(ctx context.Context, cli *dockerclient.Client) int {
  4450. // cli, err := dockerclient.NewEnvClient()
  4451. cli, _, err := shuffle.GetDockerClient()
  4452. if err != nil {
  4453. log.Printf("[ERROR] Unable to create docker client (5): %s", err)
  4454. return 0
  4455. }
  4456. service, _, err := cli.ServiceInspectWithRaw(ctx, "shuffle-workers", types.ServiceInspectOptions{})
  4457. if err != nil {
  4458. return 0
  4459. }
  4460. if service.Spec.Mode.Replicated == nil {
  4461. return 0
  4462. }
  4463. replics := *service.Spec.Mode.Replicated.Replicas
  4464. return int(replics)
  4465. }