暂无描述

walkoff.go 108KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537
  1. package main
  2. import (
  3. "github.com/shuffle/shuffle-shared"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "log"
  12. //"math/rand"
  13. "net/http"
  14. "os"
  15. "strconv"
  16. "strings"
  17. "time"
  18. "net/url"
  19. http2 "github.com/go-git/go-git/v5/plumbing/transport/http"
  20. "github.com/docker/docker/api/types/image"
  21. "github.com/h2non/filetype"
  22. uuid "github.com/satori/go.uuid"
  23. newscheduler "github.com/carlescere/scheduler"
  24. "github.com/frikky/kin-openapi/openapi3"
  25. gyaml "github.com/ghodss/yaml"
  26. "github.com/go-co-op/gocron"
  27. "github.com/go-git/go-billy/v5"
  28. "github.com/go-git/go-billy/v5/memfs"
  29. "github.com/go-git/go-git/v5"
  30. "github.com/go-git/go-git/v5/plumbing"
  31. "github.com/go-git/go-git/v5/storage/memory"
  32. )
  33. var localBase = "http://localhost:5001"
  34. var baseEnvironment = "onprem"
  35. var cloudname = "cloud"
  36. var scheduledJobs = map[string]*newscheduler.Job{}
  37. var cronJobs = map[string]*gocron.Job{}
  38. var scheduledOrgs = map[string]*newscheduler.Job{}
  39. var CronScheduler = gocron.NewScheduler(time.UTC)
  40. // Frequency = cronjob OR minutes between execution
  41. func createSchedule(ctx context.Context, scheduleId, workflowId, name, startNode, frequency, orgId string, body []byte) error {
  42. var err error
  43. testSplit := strings.Split(frequency, "*")
  44. cronJob := ""
  45. isCron := false
  46. newfrequency := 0
  47. if len(testSplit) > 5 {
  48. cronJob = frequency
  49. isCron = true
  50. } else {
  51. newfrequency, err = strconv.Atoi(frequency)
  52. if err != nil {
  53. log.Printf("Failed to parse time: %s", err)
  54. return err
  55. }
  56. //if int(newfrequency) < 60 {
  57. // cronJob = fmt.Sprintf("*/%s * * * *")
  58. //} else if int(newfrequency) <
  59. }
  60. if newfrequency < 1 && !isCron {
  61. return errors.New("Frequency has to be more than 0")
  62. }
  63. //log.Printf("CRON: %s, body: %s", cronJob, string(body))
  64. // FIXME:
  65. // This may run multiple places if multiple servers,
  66. // but that's a future problem
  67. //log.Printf("BODY: %s", string(body))
  68. parsedArgument := strings.Replace(string(body), "\"", "\\\"", -1)
  69. bodyWrapper := fmt.Sprintf(`{"start": "%s", "execution_source": "schedule", "execution_argument": "%s"}`, startNode, parsedArgument)
  70. log.Printf("[INFO] Body for schedule %s in workflow %s: \n%s", scheduleId, workflowId, bodyWrapper)
  71. job := func() {
  72. request := &http.Request{
  73. URL: &url.URL{},
  74. Method: "POST",
  75. Body: ioutil.NopCloser(strings.NewReader(bodyWrapper)),
  76. }
  77. _, _, err := handleExecution(workflowId, shuffle.Workflow{ExecutingOrg: shuffle.OrgMini{Id: orgId}}, request, orgId)
  78. if err != nil {
  79. log.Printf("Failed to execute %s: %s", workflowId, err)
  80. }
  81. }
  82. log.Printf("[INFO] Starting frequency for execution: %s", frequency)
  83. if isCron {
  84. cronJob, err := CronScheduler.Cron(cronJob).Do(job)
  85. if err != nil {
  86. log.Printf("[ERROR] Failed to start schedule with cron(%s): %s", cronJob, err)
  87. }
  88. cronJobs[scheduleId] = cronJob
  89. } else {
  90. //jobret, err := newscheduler.Every(newfrequency).Seconds().NotImmediately().Run(job)
  91. jobret, err := newscheduler.Every(newfrequency).Seconds().Run(job)
  92. if err != nil {
  93. log.Printf("Failed to schedule workflow: %s", err)
  94. return err
  95. }
  96. scheduledJobs[scheduleId] = jobret
  97. }
  98. //scheduledJobs = append(scheduledJobs, jobret)
  99. // Doesn't need running/not running. If stopped, we just delete it.
  100. timeNow := int64(time.Now().Unix())
  101. schedule := shuffle.ScheduleOld{
  102. Id: scheduleId,
  103. WorkflowId: workflowId,
  104. StartNode: startNode,
  105. Argument: string(body),
  106. WrappedArgument: bodyWrapper,
  107. Seconds: newfrequency,
  108. Frequency: frequency,
  109. CreationTime: timeNow,
  110. LastModificationtime: timeNow,
  111. LastRuntime: timeNow,
  112. Org: orgId,
  113. Environment: "onprem",
  114. }
  115. err = shuffle.SetSchedule(ctx, schedule)
  116. if err != nil {
  117. log.Printf("Failed to set schedule: %s", err)
  118. return err
  119. }
  120. // FIXME - Create a real schedule based on cron:
  121. // 1. Parse the cron in a function to match this schedule
  122. // 2. Make main init check for schedules that aren't running
  123. return nil
  124. }
  125. func handleGetWorkflowqueueConfirm(resp http.ResponseWriter, request *http.Request) {
  126. cors := shuffle.HandleCors(resp, request)
  127. if cors {
  128. return
  129. }
  130. // FIXME: Add authentication?
  131. // Cloud has auth.
  132. id := request.Header.Get("Org-Id")
  133. if len(id) == 0 {
  134. log.Printf("[ERROR] No Org-Id header set - confirm")
  135. resp.WriteHeader(401)
  136. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Specify the org-id header."}`)))
  137. return
  138. }
  139. //setWorkflowqueuetest(id)
  140. ctx := context.Background()
  141. executionRequests, err := shuffle.GetWorkflowQueue(ctx, id, 100)
  142. if err != nil {
  143. log.Printf("[WARNING] (1) Failed reading body for workflowqueue: %s", err)
  144. resp.WriteHeader(500)
  145. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Entity parsing error - confirm"}`)))
  146. return
  147. }
  148. if len(executionRequests.Data) == 0 {
  149. log.Printf("[INFO] No requests to handle from queue")
  150. resp.WriteHeader(200)
  151. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Nothing in queue"}`)))
  152. return
  153. }
  154. body, err := ioutil.ReadAll(request.Body)
  155. if err != nil {
  156. log.Println("[WARNING] Failed reading body for stream result queue")
  157. resp.WriteHeader(500)
  158. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  159. return
  160. }
  161. // Getting from the request
  162. //log.Println(string(body))
  163. var removeExecutionRequests shuffle.ExecutionRequestWrapper
  164. err = json.Unmarshal(body, &removeExecutionRequests)
  165. if err != nil {
  166. log.Printf("[WARNING] Failed executionrequest in queue unmarshaling: %s", err)
  167. resp.WriteHeader(400)
  168. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  169. return
  170. }
  171. if len(removeExecutionRequests.Data) == 0 {
  172. log.Printf("[WARNING] No requests to fix remove from DB")
  173. resp.WriteHeader(200)
  174. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Queue removal error"}`)))
  175. return
  176. }
  177. // remove items from DB
  178. parsedId := strings.ReplaceAll(fmt.Sprintf("workflowqueue-%s", id), " ", "-")
  179. ids := []string{}
  180. for _, execution := range removeExecutionRequests.Data {
  181. ids = append(ids, execution.ExecutionId)
  182. }
  183. err = shuffle.DeleteKeys(ctx, parsedId, ids)
  184. if err != nil {
  185. log.Printf("[ERROR] Failed deleting %d execution keys for org %s: %s", len(ids), id, err)
  186. } else {
  187. //log.Printf("[INFO] Deleted %d keys from org %s", len(ids), parsedId)
  188. }
  189. //var newExecutionRequests ExecutionRequestWrapper
  190. //for _, execution := range executionRequests.Data {
  191. // found := false
  192. // for _, removeExecution := range removeExecutionRequests.Data {
  193. // if removeExecution.ExecutionId == execution.ExecutionId && removeExecution.WorkflowId == execution.WorkflowId {
  194. // found = true
  195. // break
  196. // }
  197. // }
  198. // if !found {
  199. // newExecutionRequests.Data = append(newExecutionRequests.Data, execution)
  200. // }
  201. //}
  202. // Push only the remaining to the DB (remove)
  203. //if len(executionRequests.Data) != len(newExecutionRequests.Data) {
  204. // err := shuffle.SetWorkflowQueue(ctx, newExecutionRequests, id)
  205. // if err != nil {
  206. // log.Printf("Fail: %s", err)
  207. // }
  208. //}
  209. resp.WriteHeader(200)
  210. resp.Write([]byte(`{"success": true}`))
  211. }
  212. // FIXME: Authenticate this one. Can org ID be auth enough?
  213. // (especially since we have a default: shuffle)
  214. func handleGetWorkflowqueue(resp http.ResponseWriter, request *http.Request) {
  215. cors := shuffle.HandleCors(resp, request)
  216. if cors {
  217. return
  218. }
  219. // This is really the environment's name - NOT OrgId
  220. environment := request.Header.Get("Org-Id")
  221. if len(environment) == 0 {
  222. log.Printf("[AUDIT] No org-id header set")
  223. resp.WriteHeader(401)
  224. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Specify the org-id header."}`)))
  225. return
  226. }
  227. // Org => Org ID here
  228. orgId := request.Header.Get("Org")
  229. if len(orgId) == 0 {
  230. //log.Printf("[AUDIT] No 'org' header set (get workflow queue). ")
  231. /*
  232. resp.WriteHeader(403)
  233. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Specify the org header. This can be done by setting the 'ORG' environment variable for Orborus to your Org ID in Shuffle"}`)))
  234. return
  235. */
  236. }
  237. // This section is cloud custom for now
  238. auth := request.Header.Get("Authorization")
  239. if len(auth) == 0 {
  240. //log.Printf("[AUDIT] No Authorization header set. Env: %s, org: %s", orgId, environment)
  241. /*
  242. resp.WriteHeader(401)
  243. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Specify the auth header (only applicable for cloud for now)."}`)))
  244. return
  245. */
  246. }
  247. ctx := shuffle.GetContext(request)
  248. envs, err := shuffle.GetEnvironments(ctx, orgId)
  249. if err != nil || len(envs) == 0 {
  250. //log.Printf("[WARNING] No env found for orgId %s during queue loading", orgId)
  251. }
  252. var env *shuffle.Environment
  253. found := false
  254. for i := range envs {
  255. if envs[i].Name == environment {
  256. env = &envs[i]
  257. found = true
  258. break
  259. }
  260. }
  261. // Only works onprem - shared queues across tenants
  262. // without tenancy
  263. if !found {
  264. env, err = shuffle.GetEnvironment(ctx, environment, "")
  265. if err != nil {
  266. log.Printf("[WARNING] Failed to find the environment(%s) in org(%s). Could cause with Failover test", environment, orgId)
  267. }
  268. }
  269. // Handles failover control between Orborus'
  270. // Further tracks checkin time to ensure this works properly
  271. // across instances
  272. err = shuffle.HandleOrborusFailover(ctx, request, resp, env)
  273. if err != nil {
  274. log.Printf("[WARNING] Failed handling Orborus failover: %s", err)
  275. }
  276. if len(env.OrgId) > 0 {
  277. orgId = env.OrgId
  278. }
  279. executionRequests, err := shuffle.GetWorkflowQueue(ctx, environment, 100, *env)
  280. if err != nil {
  281. resp.WriteHeader(500)
  282. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  283. return
  284. }
  285. // Checking and updating the environment related to the first execution
  286. if len(executionRequests.Data) == 0 {
  287. executionRequests.Data = []shuffle.ExecutionRequest{}
  288. } else {
  289. // Try again? I don't think this is necessary, and shouldn't really ever occur.
  290. if len(executionRequests.Data) > 50 {
  291. executionRequests.Data = executionRequests.Data[0:49]
  292. }
  293. }
  294. newjson, err := json.Marshal(executionRequests)
  295. if err != nil {
  296. resp.WriteHeader(401)
  297. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow execution"}`)))
  298. return
  299. }
  300. resp.WriteHeader(200)
  301. resp.Write(newjson)
  302. }
  303. func handleGetWorkflowExecutionResult(resp http.ResponseWriter, request *http.Request) {
  304. cors := shuffle.HandleCors(resp, request)
  305. if cors {
  306. return
  307. }
  308. if request.Body == nil {
  309. resp.WriteHeader(http.StatusBadRequest)
  310. return
  311. }
  312. body, err := ioutil.ReadAll(request.Body)
  313. if err != nil {
  314. log.Println("Failed reading body for stream result queue")
  315. resp.WriteHeader(401)
  316. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  317. return
  318. }
  319. //log.Printf("Data: %s", string(body))
  320. var actionResult shuffle.ActionResult
  321. err = json.Unmarshal(body, &actionResult)
  322. if err != nil {
  323. log.Printf("[WARNING] Failed ActionResult unmarshaling (stream result): %s", err)
  324. //resp.WriteHeader(401)
  325. //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  326. //return
  327. }
  328. if len(actionResult.ExecutionId) == 0 {
  329. resp.WriteHeader(400)
  330. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Provide execution_id and authorization"}`)))
  331. return
  332. }
  333. ctx := context.Background()
  334. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
  335. if err != nil || workflowExecution.ExecutionId != actionResult.ExecutionId {
  336. if len(actionResult.ExecutionId) > 0 {
  337. log.Printf("[WARNING][%s] Failed getting execution (streamresult): %s", actionResult.ExecutionId, err)
  338. }
  339. resp.WriteHeader(400)
  340. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
  341. return
  342. }
  343. // Authorization is done here
  344. if workflowExecution.Authorization != actionResult.Authorization {
  345. user, err := shuffle.HandleApiAuthentication(resp, request)
  346. if err != nil {
  347. log.Printf("[WARNING] Api authentication failed in exec grabbing workflow: %s", err)
  348. resp.WriteHeader(401)
  349. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
  350. return
  351. }
  352. if len(workflowExecution.ExecutionOrg) > 0 && user.ActiveOrg.Id == workflowExecution.ExecutionOrg {
  353. //log.Printf("[DEBUG] User %s is in correct org. Allowing org continuation for execution!", user.Username)
  354. } else {
  355. log.Printf("[WARNING] Bad authorization key when getting stream results %s.", actionResult.ExecutionId)
  356. resp.WriteHeader(401)
  357. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
  358. return
  359. }
  360. }
  361. for _, action := range workflowExecution.Workflow.Actions {
  362. found := false
  363. for _, result := range workflowExecution.Results {
  364. if result.Action.ID == action.ID {
  365. found = true
  366. break
  367. }
  368. }
  369. if found {
  370. continue
  371. }
  372. //log.Printf("[DEBUG] Maybe not handled yet: %s", action.ID)
  373. cacheId := fmt.Sprintf("%s_%s_result", workflowExecution.ExecutionId, action.ID)
  374. cache, err := shuffle.GetCache(ctx, cacheId)
  375. if err != nil {
  376. //log.Printf("[WARNING] Couldn't find in fix exec %s (2): %s", cacheId, err)
  377. continue
  378. }
  379. actionResult := shuffle.ActionResult{}
  380. cacheData := []byte(cache.([]uint8))
  381. // Just ensuring the data is good
  382. err = json.Unmarshal(cacheData, &actionResult)
  383. if err != nil {
  384. continue
  385. } else {
  386. log.Printf("[DEBUG] APPENDING %s result to send to app or something\n\n\n\n", action.ID)
  387. workflowExecution.Results = append(workflowExecution.Results, actionResult)
  388. }
  389. }
  390. if workflowExecution.Workflow.Sharing == "form" {
  391. newWorkflow := shuffle.Workflow{
  392. Name: workflowExecution.Workflow.Name,
  393. ID: workflowExecution.Workflow.ID,
  394. Owner: workflowExecution.Workflow.Owner,
  395. OrgId: workflowExecution.Workflow.OrgId,
  396. Sharing: workflowExecution.Workflow.Sharing,
  397. Description: workflowExecution.Workflow.Description,
  398. InputQuestions: workflowExecution.Workflow.InputQuestions,
  399. FormControl: workflowExecution.Workflow.FormControl,
  400. }
  401. workflowExecution.Results = []shuffle.ActionResult{}
  402. workflowExecution.Workflow = newWorkflow
  403. }
  404. newjson, err := json.Marshal(workflowExecution)
  405. if err != nil {
  406. resp.WriteHeader(500)
  407. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow execution"}`)))
  408. return
  409. }
  410. resp.WriteHeader(200)
  411. resp.Write(newjson)
  412. }
  413. func handleSetWorkflowExecution(resp http.ResponseWriter, request *http.Request) {
  414. cors := shuffle.HandleCors(resp, request)
  415. if cors {
  416. return
  417. }
  418. if request.Body == nil {
  419. resp.WriteHeader(http.StatusBadRequest)
  420. return
  421. }
  422. body, err := ioutil.ReadAll(request.Body)
  423. if err != nil {
  424. log.Println("[WARNING] (3) Failed reading body for workflowqueue")
  425. resp.WriteHeader(401)
  426. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  427. return
  428. }
  429. // Allows override of existing executions.
  430. // This is a way to set them back to 0 results and rerun the
  431. // exact same. Primarily in use for Worker testing of specific workflows.
  432. shouldReset := false
  433. resetString, ok := request.URL.Query()["reset"]
  434. if ok && len(resetString) > 0 {
  435. if resetString[0] == "true" {
  436. shouldReset = true
  437. }
  438. }
  439. ctx := context.Background()
  440. err = shuffle.ValidateNewWorkerExecution(ctx, body, shouldReset)
  441. if err == nil {
  442. resp.WriteHeader(200)
  443. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Successfully updated the execution"}`)))
  444. return
  445. } else {
  446. //log.Printf("[DEBUG] Handling other execution variant (subflow?): %s", err)
  447. }
  448. var actionResult shuffle.ActionResult
  449. err = json.Unmarshal(body, &actionResult)
  450. if err != nil {
  451. log.Printf("[WARNING] Failed ActionResult unmarshaling (queue): %s", err)
  452. //resp.WriteHeader(401)
  453. //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  454. //return
  455. }
  456. // 1. Get the WorkflowExecution(ExecutionId) from the database
  457. // 2. if ActionResult.Authentication != WorkflowExecution.Authentication -> exit
  458. // 3. Add to and update actionResult in workflowExecution
  459. // 4. Push to db
  460. // IF FAIL: Set executionstatus: abort or cancel
  461. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
  462. if err != nil {
  463. log.Printf("[ERROR] Failed getting execution (workflowqueue) %s: %s", actionResult.ExecutionId, err)
  464. resp.WriteHeader(401)
  465. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution ID %s because it doesn't exist."}`, actionResult.ExecutionId)))
  466. return
  467. }
  468. if workflowExecution.Authorization != actionResult.Authorization {
  469. log.Printf("[INFO] Bad authorization key when updating node (workflowQueue) %s. Want: %s, Have: %s", actionResult.ExecutionId, workflowExecution.Authorization, actionResult.Authorization)
  470. resp.WriteHeader(401)
  471. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key"}`)))
  472. return
  473. }
  474. //if workflowExecution.Status == "FINISHED" {
  475. // log.Printf("[INFO] Workflowexecution is already FINISHED. No further action can be taken.")
  476. // resp.WriteHeader(401)
  477. // resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is already finished because of %s with status %s"}`, workflowExecution.LastNode, workflowExecution.Status)))
  478. // return
  479. //}
  480. if workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
  481. if workflowExecution.Workflow.Configuration.ExitOnError {
  482. log.Printf("Workflowexecution already has status %s. No further action can be taken", workflowExecution.Status)
  483. resp.WriteHeader(401)
  484. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is aborted because of %s with result %s and status %s"}`, workflowExecution.LastNode, workflowExecution.Result, workflowExecution.Status)))
  485. return
  486. } else {
  487. log.Printf("[WARNING] Continuing workflow even though it's aborted (ExitOnError config)")
  488. }
  489. }
  490. runWorkflowExecutionTransaction(ctx, 0, workflowExecution.ExecutionId, actionResult, resp)
  491. }
  492. // Will make sure transactions are always ran for an execution. This is recursive if it fails. Allowed to fail up to 5 times
  493. func runWorkflowExecutionTransaction(ctx context.Context, attempts int64, workflowExecutionId string, actionResult shuffle.ActionResult, resp http.ResponseWriter) {
  494. log.Printf("[DEBUG][%s] Running workflow execution update with result from %s (%s) of status %s", workflowExecutionId, actionResult.Action.Label, actionResult.Action.ID, actionResult.Status)
  495. // Should start a tx for the execution here
  496. workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
  497. if err != nil {
  498. log.Printf("[ERROR] Failed getting execution cache: %s", err)
  499. resp.WriteHeader(401)
  500. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
  501. return
  502. }
  503. workflowExecution, dbSave, err := shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, false, 0)
  504. if err != nil {
  505. b, suberr := json.Marshal(actionResult)
  506. if suberr != nil {
  507. log.Printf("[ERROR] Failed running of parsedexecution: %s", err)
  508. } else {
  509. log.Printf("[ERROR] Failed running of parsedexecution: %s. Data: %s", err, string(b))
  510. }
  511. resp.WriteHeader(400)
  512. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed updating execution"}`)))
  513. return
  514. }
  515. _ = dbSave
  516. setExecution := true
  517. if setExecution || workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
  518. err = shuffle.SetWorkflowExecution(ctx, *workflowExecution, true)
  519. if err != nil {
  520. resp.WriteHeader(401)
  521. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed setting workflowexecution actionresult: %s"}`, err)))
  522. return
  523. }
  524. //handleExecutionResult(ctx, *workflowExecution)
  525. } else {
  526. log.Printf("Skipping setexec with status %s", workflowExecution.Status)
  527. }
  528. if resp != nil {
  529. resp.WriteHeader(200)
  530. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  531. }
  532. }
  533. func JSONCheck(str string) bool {
  534. var jsonStr interface{}
  535. return json.Unmarshal([]byte(str), &jsonStr) == nil
  536. }
  537. func handleExecutionStatistics(execution shuffle.WorkflowExecution) {
  538. // FIXME: CLEAN UP THE JSON THAT'S SAVED.
  539. // https://github.com/shuffle/Shuffle/issues/172
  540. appResults := []shuffle.AppExecutionExample{}
  541. for _, result := range execution.Results {
  542. resultCheck := JSONCheck(result.Result)
  543. if !resultCheck {
  544. //log.Printf("Result is NOT JSON!")
  545. continue
  546. } else {
  547. //log.Printf("Result IS JSON!")
  548. }
  549. appFound := false
  550. executionIndex := 0
  551. for index, appExample := range appResults {
  552. if appExample.AppId == result.Action.ID {
  553. appFound = true
  554. executionIndex = index
  555. break
  556. }
  557. }
  558. if appFound {
  559. // Append to SuccessExamples or FailureExamples
  560. if result.Status == "ABORTED" || result.Status == "FAILURE" {
  561. appResults[executionIndex].FailureExamples = append(appResults[executionIndex].FailureExamples, result.Result)
  562. } else if result.Status == "FINISHED" || result.Status == "SUCCESS" {
  563. appResults[executionIndex].SuccessExamples = append(appResults[executionIndex].SuccessExamples, result.Result)
  564. } else {
  565. log.Printf("[ERROR] Can't handle status %s", result.Status)
  566. }
  567. // appResults = append(appResults, executionExample)
  568. } else {
  569. // CREATE SuccessExamples or FailureExamples
  570. executionExample := shuffle.AppExecutionExample{
  571. AppName: result.Action.AppName,
  572. AppVersion: result.Action.AppVersion,
  573. AppAction: result.Action.Name,
  574. AppId: result.Action.AppID,
  575. ExampleId: fmt.Sprintf("%s_%s", execution.ExecutionId, result.Action.AppID),
  576. }
  577. if result.Status == "ABORTED" || result.Status == "FAILURE" {
  578. executionExample.FailureExamples = append(executionExample.FailureExamples, result.Result)
  579. } else if result.Status == "FINISHED" || result.Status == "SUCCESS" {
  580. executionExample.SuccessExamples = append(executionExample.SuccessExamples, result.Result)
  581. } else {
  582. log.Printf("[ERROR] Can't handle status %s", result.Status)
  583. }
  584. appResults = append(appResults, executionExample)
  585. }
  586. }
  587. // ExampleId string `json:"example_id"`
  588. // func setExampleresult(ctx context.Context, result exampleResult) error {
  589. // log.Printf("Execution length: %d", len(appResults))
  590. if len(appResults) > 0 {
  591. ctx := context.Background()
  592. successful := 0
  593. for _, exampleresult := range appResults {
  594. err := setExampleresult(ctx, exampleresult)
  595. if err != nil {
  596. log.Printf("[ERROR] Failed setting examplresult %s: %s", exampleresult.ExampleId, err)
  597. } else {
  598. successful += 1
  599. }
  600. }
  601. log.Printf("[INFO] Added %d exampleresults to backend", successful)
  602. } else {
  603. //log.Printf("[INFO] No example results necessary to be added for execution %s", execution.ExecutionId)
  604. }
  605. }
  606. func deleteWorkflow(resp http.ResponseWriter, request *http.Request) {
  607. cors := shuffle.HandleCors(resp, request)
  608. if cors {
  609. return
  610. }
  611. user, err := shuffle.HandleApiAuthentication(resp, request)
  612. if err != nil {
  613. log.Printf("[WARNING] Api authentication failed in delete workflow: %s", err)
  614. resp.WriteHeader(401)
  615. resp.Write([]byte(`{"success": false}`))
  616. return
  617. }
  618. if user.Role == "org-reader" {
  619. log.Printf("[WARNING] Org-reader doesn't have access to stop schedule: %s (%s)", user.Username, user.Id)
  620. resp.WriteHeader(401)
  621. resp.Write([]byte(`{"success": false, "reason": "Read only user"}`))
  622. return
  623. }
  624. location := strings.Split(request.URL.String(), "/")
  625. var fileId string
  626. if location[1] == "api" {
  627. if len(location) <= 4 {
  628. resp.WriteHeader(401)
  629. resp.Write([]byte(`{"success": false}`))
  630. return
  631. }
  632. fileId = location[4]
  633. }
  634. if len(fileId) != 36 {
  635. resp.WriteHeader(401)
  636. resp.Write([]byte(`{"success": false, "reason": "Workflow ID to delete is not valid"}`))
  637. return
  638. }
  639. ctx := context.Background()
  640. workflow, err := shuffle.GetWorkflow(ctx, fileId)
  641. if err != nil {
  642. log.Printf("[WARNING] Failed getting workflow %s locally (delete workflow): %s", fileId, err)
  643. resp.WriteHeader(401)
  644. resp.Write([]byte(`{"success": false}`))
  645. return
  646. }
  647. if len(workflow.ParentWorkflowId) > 0 {
  648. resp.WriteHeader(403)
  649. resp.Write([]byte(`{"success": false, "reason": "Can't delete a workflow distributed from your parent org"}`))
  650. return
  651. }
  652. if user.Id != workflow.Owner || len(user.Id) == 0 {
  653. if workflow.OrgId == user.ActiveOrg.Id {
  654. log.Printf("[INFO] User %s is deleting workflow %s as admin. Owner: %s", user.Username, workflow.ID, workflow.Owner)
  655. } else {
  656. log.Printf("[WARNING] Wrong user (%s) for workflow %s (delete workflow)", user.Username, workflow.ID)
  657. resp.WriteHeader(401)
  658. resp.Write([]byte(`{"success": false}`))
  659. return
  660. }
  661. }
  662. // Look for Child workflows and delete them
  663. if workflow.ParentWorkflowId == "" {
  664. log.Printf("[DEBUG] Looking for child workflows for workflow %s to delete. User %s (%s) in org %s (%s)", workflow.ID, user.Username, user.Id, user.ActiveOrg.Name, user.ActiveOrg.Id)
  665. childWorkflows, err := shuffle.ListChildWorkflows(ctx, workflow.ID)
  666. if err != nil {
  667. log.Printf("[ERROR] Failed to list child workflows: %s", err)
  668. } else {
  669. //log.Printf("\n\n[DEBUG] Found %d child workflows for workflow %s\n\n", len(childWorkflows), workflow.ID)
  670. // Find cookies and append them to request.Header to replicate current request as closely as possible
  671. for _, childWorkflow := range childWorkflows {
  672. if childWorkflow.ID == workflow.ID {
  673. continue
  674. }
  675. go shuffle.SendDeleteWorkflowRequest(childWorkflow, request)
  676. }
  677. }
  678. }
  679. // Clean up triggers and executions
  680. for _, item := range workflow.Triggers {
  681. if item.TriggerType == "SCHEDULE" && item.Status != "uninitialized" {
  682. err = deleteSchedule(ctx, item.ID)
  683. if err != nil {
  684. log.Printf("[DEBUG] Failed to delete schedule: %s - is it started?", err)
  685. }
  686. } else if item.TriggerType == "WEBHOOK" {
  687. //err = removeWebhookFunction(ctx, item.ID)
  688. //if err != nil {
  689. // log.Printf("Failed to delete webhook: %s", err)
  690. //}
  691. } else if item.TriggerType == "EMAIL" {
  692. err = shuffle.HandleOutlookSubRemoval(ctx, user, workflow.ID, item.ID)
  693. if err != nil {
  694. log.Printf("[DEBUG] Failed to delete OUTLOOK email sub (checking gmail after): %s", err)
  695. }
  696. err = shuffle.HandleGmailSubRemoval(ctx, user, workflow.ID, item.ID)
  697. if err != nil {
  698. log.Printf("Failed to delete gmail email sub: %s", err)
  699. }
  700. }
  701. }
  702. //log.Printf("[DEBUG] Attempting to delete the workflow %s from the database...", fileId)
  703. err = shuffle.DeleteKey(ctx, "workflow", fileId)
  704. if err != nil {
  705. log.Printf("[DEBUG] Failed deleting workflow key %s", fileId)
  706. resp.WriteHeader(400)
  707. resp.Write([]byte(`{"success": false, "reason": "Failed deleting key"}`))
  708. return
  709. }
  710. log.Printf("[INFO] Should have deleted workflow %s (%s)", workflow.Name, fileId)
  711. shuffle.DeleteCache(ctx, fmt.Sprintf("%s_workflows", user.Id))
  712. shuffle.DeleteCache(ctx, fmt.Sprintf("%s_workflows", user.ActiveOrg.Id))
  713. shuffle.DeleteCache(ctx, fmt.Sprintf("%s_%s", user.Username, fileId))
  714. shuffle.DeleteCache(ctx, fmt.Sprintf("workflow_%s_childworkflows", workflow.ID))
  715. if len(workflow.ParentWorkflowId) > 0 {
  716. shuffle.DeleteCache(ctx, fmt.Sprintf("workflow_%s_childworkflows", workflow.ParentWorkflowId))
  717. }
  718. resp.WriteHeader(200)
  719. resp.Write([]byte(`{"success": true}`))
  720. }
  721. func handleExecution(id string, workflow shuffle.Workflow, request *http.Request, orgId string) (shuffle.WorkflowExecution, string, error) {
  722. //go func() {
  723. // log.Printf("\n\nPRE TIME: %s\n\n", time.Now().Format("2006-01-02 15:04:05"))
  724. // _ = <-time.After(time.Second * 60)
  725. // log.Printf("\n\nPOST TIME: %s\n\n", time.Now().Format("2006-01-02 15:04:05"))
  726. //}()
  727. ctx := context.Background()
  728. if workflow.ID == "" || workflow.ID != id {
  729. tmpworkflow, err := shuffle.GetWorkflow(ctx, id, true)
  730. if err != nil {
  731. //log.Printf("[WARNING] Failed getting the workflow locally (execution setup): %s", err)
  732. return shuffle.WorkflowExecution{}, "Failed getting workflow", err
  733. }
  734. workflow = *tmpworkflow
  735. }
  736. if len(workflow.Actions) == 0 {
  737. workflow.Actions = []shuffle.Action{}
  738. } else {
  739. newactions := []shuffle.Action{}
  740. for _, action := range workflow.Actions {
  741. action.LargeImage = ""
  742. action.SmallImage = ""
  743. newactions = append(newactions, action)
  744. //log.Printf("ACTION: %#v", action)
  745. }
  746. workflow.Actions = newactions
  747. }
  748. if len(workflow.Branches) == 0 {
  749. workflow.Branches = []shuffle.Branch{}
  750. }
  751. if len(workflow.Triggers) == 0 {
  752. workflow.Triggers = []shuffle.Trigger{}
  753. } else {
  754. newtriggers := []shuffle.Trigger{}
  755. for _, trigger := range workflow.Triggers {
  756. trigger.LargeImage = ""
  757. trigger.SmallImage = ""
  758. newtriggers = append(newtriggers, trigger)
  759. //log.Printf("ACTION: %#v", trigger)
  760. }
  761. workflow.Triggers = newtriggers
  762. }
  763. if len(workflow.Errors) == 0 {
  764. workflow.Errors = []string{}
  765. }
  766. /*
  767. if !workflow.IsValid {
  768. log.Printf("[ERROR] Stopped execution as workflow %s is not valid.", workflow.ID)
  769. return shuffle.WorkflowExecution{}, fmt.Sprintf(`workflow %s is invalid`, workflow.ID), errors.New("Failed getting workflow")
  770. }
  771. */
  772. maxExecutionDepth := 10
  773. if os.Getenv("SHUFFLE_MAX_EXECUTION_DEPTH") != "" {
  774. maxExecutionDepthNew, err := strconv.Atoi(os.Getenv("SHUFFLE_MAX_EXECUTION_DEPTH"))
  775. if err == nil && maxExecutionDepthNew > 1 && maxExecutionDepthNew < 1000 {
  776. maxExecutionDepth = maxExecutionDepthNew
  777. }
  778. }
  779. workflowExecution, execInfo, _, workflowExecErr := shuffle.PrepareWorkflowExecution(ctx, workflow, request, int64(maxExecutionDepth))
  780. if workflowExecErr != nil {
  781. if len(workflowExecution.Workflow.Actions) > 0 && len(workflowExecution.Results) > 0 && len(workflowExecution.ExecutionId) > 0 {
  782. err := shuffle.SetWorkflowExecution(ctx, workflowExecution, true)
  783. if err != nil {
  784. log.Printf("[ERROR] Failed setting workflow execution during init (2): %s", err)
  785. }
  786. }
  787. if strings.Contains(fmt.Sprintf("%s", workflowExecErr), "User Input") {
  788. // Special for user input callbacks
  789. // return workflowExecution, fmt.Sprintf("%s", err), nil
  790. //log.Printf("[INFO] User input callback: %s", workflowExecErr)
  791. return shuffle.WorkflowExecution{}, "", nil
  792. } else {
  793. log.Printf("[ERROR] Failed in prepareExecution: '%s'", workflowExecErr)
  794. return shuffle.WorkflowExecution{}, fmt.Sprintf("Failed running: %s", workflowExecErr), workflowExecErr
  795. }
  796. }
  797. err := imageCheckBuilder(execInfo.ImageNames)
  798. if err != nil {
  799. log.Printf("[ERROR] Failed building the required images from %#v: %s", execInfo.ImageNames, err)
  800. return shuffle.WorkflowExecution{}, "Failed unmarshal during execution", err
  801. }
  802. err = shuffle.SetWorkflowExecution(ctx, workflowExecution, true)
  803. if err != nil {
  804. log.Printf("[ERROR] Failed setting workflow execution during init (2): %s", err)
  805. }
  806. onpremExecution := execInfo.OnpremExecution
  807. _ = onpremExecution
  808. environments := execInfo.Environments
  809. var allEnvs []shuffle.Environment
  810. if len(workflowExecution.ExecutionOrg) > 0 {
  811. allEnvironments, err := shuffle.GetEnvironments(ctx, workflowExecution.ExecutionOrg)
  812. if err != nil {
  813. log.Printf("[WARNING] Failed finding environments: %s", err)
  814. return shuffle.WorkflowExecution{}, fmt.Sprintf("Workflow environments not found for this org"), errors.New(fmt.Sprintf("Workflow environments not found for this org"))
  815. }
  816. for _, curenv := range allEnvironments {
  817. if curenv.Archived {
  818. continue
  819. }
  820. allEnvs = append(allEnvs, curenv)
  821. }
  822. } else {
  823. log.Printf("[ERROR] No org identified for execution of %s. Returning", workflowExecution.Workflow.ID)
  824. return shuffle.WorkflowExecution{}, "No org identified for execution", errors.New("No org identified for execution")
  825. }
  826. if len(allEnvs) == 0 {
  827. log.Printf("[ERROR] No active environments found for org: %s", workflowExecution.ExecutionOrg)
  828. return shuffle.WorkflowExecution{}, "No active environments found", errors.New(fmt.Sprintf("No active env found for org %s", workflowExecution.ExecutionOrg))
  829. }
  830. // Check if the actions are children of the startnode?
  831. imageNames := []string{}
  832. cloudExec := false
  833. _ = cloudExec
  834. for _, action := range workflowExecution.Workflow.Actions {
  835. // Verify if the action environment exists and append
  836. found := false
  837. for _, env := range allEnvs {
  838. if env.Name == action.Environment {
  839. found = true
  840. if env.Type == "cloud" {
  841. cloudExec = true
  842. } else if env.Type == "onprem" {
  843. onpremExecution = true
  844. } else {
  845. log.Printf("[ERROR] No handler for environment type %s", env.Type)
  846. return shuffle.WorkflowExecution{}, "No active environments found", errors.New(fmt.Sprintf("No handler for environment type %s", env.Type))
  847. }
  848. break
  849. }
  850. }
  851. if !found {
  852. log.Printf("[ERROR] Couldn't find environment %s. Maybe it's inactive?", action.Environment)
  853. return shuffle.WorkflowExecution{}, "Couldn't find the environment", errors.New(fmt.Sprintf("Couldn't find env %s in org %s", action.Environment, workflowExecution.ExecutionOrg))
  854. }
  855. found = false
  856. for _, env := range environments {
  857. if env == action.Environment {
  858. found = true
  859. break
  860. }
  861. }
  862. // Check if the app exists?
  863. newName := action.AppName
  864. newName = strings.ReplaceAll(newName, " ", "-")
  865. imageNames = append(imageNames, fmt.Sprintf("%s:%s_%s", baseDockerName, newName, action.AppVersion))
  866. if !found {
  867. environments = append(environments, action.Environment)
  868. }
  869. }
  870. err = imageCheckBuilder(imageNames)
  871. if err != nil {
  872. log.Printf("[ERROR] Failed building the required images from %#v: %s", imageNames, err)
  873. return shuffle.WorkflowExecution{}, "Failed building missing Docker images", err
  874. }
  875. err = shuffle.SetWorkflowExecution(ctx, workflowExecution, true)
  876. if err != nil {
  877. log.Printf("[WARNING] Error saving workflow execution for updates %s", err)
  878. return shuffle.WorkflowExecution{}, fmt.Sprintf("Failed setting workflowexecution: %s", err), err
  879. }
  880. // Adds queue for onprem execution
  881. // FIXME - add specifics to executionRequest, e.g. specific environment (can run multi onprem)
  882. if execInfo.OnpremExecution {
  883. // FIXME - tmp name based on future companyname-companyId
  884. // This leads to issues with overlaps. Should set limits and such instead
  885. for _, environment := range execInfo.Environments {
  886. log.Printf("[INFO][%s] Execution: should execute onprem with execution environment \"%s\". Workflow: %s", workflowExecution.ExecutionId, environment, workflowExecution.Workflow.ID)
  887. executionRequest := shuffle.ExecutionRequest{
  888. ExecutionId: workflowExecution.ExecutionId,
  889. WorkflowId: workflowExecution.Workflow.ID,
  890. Authorization: workflowExecution.Authorization,
  891. Environments: execInfo.Environments,
  892. }
  893. //log.Printf("Execution request: %#v", executionRequest)
  894. executionRequest.Priority = workflowExecution.Priority
  895. err = shuffle.SetWorkflowQueue(ctx, executionRequest, environment)
  896. if err != nil {
  897. log.Printf("[ERROR] Failed adding execution to db: %s", err)
  898. }
  899. }
  900. }
  901. // Verifies and runs cloud executions
  902. if execInfo.CloudExec {
  903. featuresList, err := handleVerifyCloudsync(workflowExecution.ExecutionOrg)
  904. if !featuresList.Workflows.Active || err != nil {
  905. log.Printf("Error: %s", err)
  906. log.Printf("[ERROR] Cloud not implemented yet. May need to work on app checking and such")
  907. return shuffle.WorkflowExecution{}, "Cloud not implemented yet", errors.New("Cloud not implemented yet")
  908. }
  909. shuffle.IncrementCache(ctx, workflowExecution.OrgId, "workflow_executions_cloud")
  910. // What it needs to know:
  911. // 1. Parameters
  912. if len(workflowExecution.Workflow.Actions) == 1 {
  913. log.Printf("Should execute directly with cloud instead of worker because only one action")
  914. //cloudExecuteAction(workflowExecution.ExecutionId, workflowExecution.Workflow.Actions[0], workflowExecution.ExecutionOrg, workflowExecution.Workflow.ID)
  915. cloudExecuteAction(workflowExecution)
  916. return shuffle.WorkflowExecution{}, "Cloud not implemented yet (1)", errors.New("Cloud not implemented yet")
  917. } else {
  918. // If it's here, it should be controlled by Worker.
  919. // If worker, should this backend be a proxy? I think so.
  920. return shuffle.WorkflowExecution{}, "Cloud not implemented yet (2)", errors.New("Cloud not implemented yet")
  921. }
  922. } else {
  923. shuffle.IncrementCache(ctx, workflowExecution.OrgId, "workflow_executions_onprem")
  924. }
  925. shuffle.IncrementCache(ctx, workflowExecution.OrgId, "workflow_executions")
  926. return workflowExecution, "", nil
  927. }
  928. // This updates stuff locally from remote executions
  929. func cloudExecuteAction(execution shuffle.WorkflowExecution) error {
  930. ctx := context.Background()
  931. org, err := shuffle.GetOrg(ctx, execution.ExecutionOrg)
  932. if err != nil {
  933. return err
  934. }
  935. type ExecutionStruct struct {
  936. ExecutionId string `json:"execution_id" datastore:"execution_id"`
  937. Action shuffle.Action `json:"action" datastore:"action"`
  938. Authorization string `json:"authorization" datastore:"authorization"`
  939. Results []shuffle.ActionResult `json:"results" datastore:"results,noindex"`
  940. ExecutionArgument string `json:"execution_argument" datastore:"execution_argument,noindex"`
  941. WorkflowId string `json:"workflow_id" datastore:"workflow_id"`
  942. ExecutionSource string `json:"execution_source" datastore:"execution_source"`
  943. }
  944. data := ExecutionStruct{
  945. ExecutionId: execution.ExecutionId,
  946. WorkflowId: execution.Workflow.ID,
  947. Action: execution.Workflow.Actions[0],
  948. Authorization: execution.Authorization,
  949. }
  950. log.Printf("Executing action: %#v in execution ID %s", data.Action, data.ExecutionId)
  951. b, err := json.Marshal(data)
  952. if err != nil {
  953. log.Printf("Failed marshaling api key data: %s", err)
  954. return err
  955. }
  956. syncURL := fmt.Sprintf("%s/api/v1/cloud/sync/execute_node", syncUrl)
  957. client := shuffle.GetExternalClient(syncURL)
  958. req, err := http.NewRequest(
  959. "POST",
  960. syncURL,
  961. bytes.NewBuffer(b),
  962. )
  963. req.Header.Add("Authorization", fmt.Sprintf(`Bearer %s`, org.SyncConfig.Apikey))
  964. newresp, err := client.Do(req)
  965. if err != nil {
  966. return err
  967. }
  968. respBody, err := ioutil.ReadAll(newresp.Body)
  969. if err != nil {
  970. return err
  971. }
  972. log.Printf("Finished request. Data: %s", string(respBody))
  973. log.Printf("Status code: %d", newresp.StatusCode)
  974. responseData := retStruct{}
  975. err = json.Unmarshal(respBody, &responseData)
  976. if err != nil {
  977. return err
  978. }
  979. if newresp.StatusCode != 200 {
  980. return errors.New(fmt.Sprintf("Got status code %d when executing remotely. Expected 200. Contact support.", newresp.StatusCode))
  981. }
  982. if !responseData.Success {
  983. return errors.New(responseData.Reason)
  984. }
  985. return nil
  986. }
  987. // 1. Check CORS
  988. // 2. Check authentication
  989. // 3. Check authorization
  990. // 4. Run the actual function
  991. func executeWorkflow(resp http.ResponseWriter, request *http.Request) {
  992. cors := shuffle.HandleCors(resp, request)
  993. if cors {
  994. return
  995. }
  996. user, userErr := shuffle.HandleApiAuthentication(resp, request)
  997. if user.Role == "org-reader" {
  998. log.Printf("[WARNING] Org-reader doesn't have access to run workflow: %s (%s)", user.Username, user.Id)
  999. resp.WriteHeader(401)
  1000. resp.Write([]byte(`{"success": false, "reason": "Read only user"}`))
  1001. return
  1002. }
  1003. location := strings.Split(request.URL.String(), "/")
  1004. var fileId string
  1005. if location[1] == "api" {
  1006. if len(location) <= 4 {
  1007. resp.WriteHeader(401)
  1008. resp.Write([]byte(`{"success": false}`))
  1009. return
  1010. }
  1011. fileId = location[4]
  1012. if strings.Contains(fileId, "?") {
  1013. fileId = strings.Split(fileId, "?")[0]
  1014. }
  1015. }
  1016. if len(fileId) != 36 {
  1017. resp.WriteHeader(401)
  1018. resp.Write([]byte(`{"success": false, "reason": "Workflow ID to execute is not valid"}`))
  1019. return
  1020. }
  1021. log.Printf("[INFO] Inside execute workflow for ID %s", fileId)
  1022. ctx := context.Background()
  1023. workflow, err := shuffle.GetWorkflow(ctx, fileId, true)
  1024. if err != nil && workflow.ID == "" {
  1025. log.Printf("[WARNING] Failed getting the workflow locally (execute workflow): %s", err)
  1026. resp.WriteHeader(401)
  1027. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflow with ID %s doesn't exist."}`, fileId)))
  1028. return
  1029. }
  1030. executionAuthValid := false
  1031. newOrgId := ""
  1032. if userErr != nil {
  1033. // Check if the execution data has correct info in it! Happens based on subflows.
  1034. // 1. Parent workflow contains this workflow ID in the source trigger?
  1035. // 2. Parent workflow's owner is same org?
  1036. // 3. Parent execution auth is correct
  1037. log.Printf("[INFO] Inside execute workflow access validation!")
  1038. executionAuthValid, newOrgId = shuffle.RunExecuteAccessValidation(request, workflow)
  1039. if !executionAuthValid {
  1040. log.Printf("[INFO] Api authorization failed in execute workflow: %s", userErr)
  1041. resp.WriteHeader(403)
  1042. resp.Write([]byte(`{"success": false}`))
  1043. return
  1044. } else {
  1045. log.Printf("[DEBUG] Execution of %s successfully validated and started based on subflow or user input execution", workflow.ID)
  1046. user.ActiveOrg = shuffle.OrgMini{
  1047. Id: newOrgId,
  1048. }
  1049. }
  1050. }
  1051. if !executionAuthValid {
  1052. if user.Id != workflow.Owner && user.Role != "scheduler" && user.Role != fmt.Sprintf("workflow_%s", fileId) {
  1053. if workflow.OrgId == user.ActiveOrg.Id {
  1054. log.Printf("[AUDIT] Letting user %s execute %s because they're admin of the same org", user.Username, workflow.ID)
  1055. } else {
  1056. log.Printf("[AUDIT] Wrong user (%s) for workflow %s (execute)", user.Username, workflow.ID)
  1057. resp.WriteHeader(403)
  1058. resp.Write([]byte(`{"success": false}`))
  1059. return
  1060. }
  1061. }
  1062. }
  1063. log.Printf("[AUDIT] Starting execution of workflow '%s' by user '%s' (%s). If this is empty, it's most likely a subflow!", fileId, user.Username, user.Id)
  1064. user.ActiveOrg.Users = []shuffle.UserMini{}
  1065. workflow.ExecutingOrg = user.ActiveOrg
  1066. workflowExecution, executionResp, err := handleExecution(fileId, *workflow, request, user.ActiveOrg.Id)
  1067. if err == nil {
  1068. if strings.Contains(executionResp, "User Input:") {
  1069. resp.WriteHeader(400)
  1070. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, executionResp)))
  1071. return
  1072. }
  1073. resp.WriteHeader(200)
  1074. // Check for "wait" query if it's true
  1075. wait, waitok := request.URL.Query()["wait"]
  1076. if waitok && wait[0] == "true" {
  1077. returnBody := shuffle.HandleRetValidation(ctx, workflowExecution, 1, 15)
  1078. returnBytes, err := json.Marshal(returnBody)
  1079. if err != nil {
  1080. log.Printf("[ERROR] Failed to marshal retStruct in single execution: %s", err)
  1081. }
  1082. resp.Write(returnBytes)
  1083. return
  1084. }
  1085. resp.Write([]byte(fmt.Sprintf(`{"success": true, "execution_id": "%s", "authorization": "%s"}`, workflowExecution.ExecutionId, workflowExecution.Authorization)))
  1086. return
  1087. }
  1088. resp.WriteHeader(500)
  1089. resp.Write([]byte(fmt.Sprintf(`{"success": false, "execution_id": "%s", "authorization": "%s", "reason": "%s"}`, workflowExecution.ExecutionId, workflowExecution.Authorization, executionResp)))
  1090. }
  1091. func stopSchedule(resp http.ResponseWriter, request *http.Request) {
  1092. cors := shuffle.HandleCors(resp, request)
  1093. if cors {
  1094. return
  1095. }
  1096. user, err := shuffle.HandleApiAuthentication(resp, request)
  1097. if err != nil {
  1098. log.Printf("Api authentication failed in schedule workflow: %s", err)
  1099. resp.WriteHeader(401)
  1100. resp.Write([]byte(`{"success": false}`))
  1101. return
  1102. }
  1103. if user.Role == "org-reader" {
  1104. log.Printf("[WARNING] Org-reader doesn't have access to stop schedule: %s (%s)", user.Username, user.Id)
  1105. resp.WriteHeader(401)
  1106. resp.Write([]byte(`{"success": false, "reason": "Read only user"}`))
  1107. return
  1108. }
  1109. location := strings.Split(request.URL.String(), "/")
  1110. var fileId string
  1111. var scheduleId string
  1112. if location[1] == "api" {
  1113. if len(location) <= 6 {
  1114. resp.WriteHeader(401)
  1115. resp.Write([]byte(`{"success": false}`))
  1116. return
  1117. }
  1118. fileId = location[4]
  1119. scheduleId = location[6]
  1120. }
  1121. if len(fileId) != 36 {
  1122. resp.WriteHeader(401)
  1123. resp.Write([]byte(`{"success": false, "reason": "Workflow ID to stop schedule is not valid"}`))
  1124. return
  1125. }
  1126. if len(scheduleId) != 36 {
  1127. resp.WriteHeader(401)
  1128. resp.Write([]byte(`{"success": false, "reason": "Schedule ID not valid"}`))
  1129. return
  1130. }
  1131. ctx := context.Background()
  1132. workflow, err := shuffle.GetWorkflow(ctx, fileId)
  1133. if err != nil {
  1134. log.Printf("[WARNING] Failed getting the workflow locally (stop schedule): %s", err)
  1135. resp.WriteHeader(401)
  1136. resp.Write([]byte(`{"success": false}`))
  1137. return
  1138. }
  1139. if user.Id != workflow.Owner || len(user.Id) == 0 {
  1140. if workflow.OrgId == user.ActiveOrg.Id {
  1141. log.Printf("[AUDIT] User %s is accessing workflow %s as admin (stop schedule)", user.Username, workflow.ID)
  1142. } else {
  1143. log.Printf("[WARNING] Wrong user (%s) for workflow %s (stop schedule)", user.Username, workflow.ID)
  1144. resp.WriteHeader(401)
  1145. resp.Write([]byte(`{"success": false}`))
  1146. return
  1147. }
  1148. }
  1149. schedule, err := shuffle.GetSchedule(ctx, scheduleId)
  1150. if err != nil {
  1151. log.Printf("[WARNING] Failed finding schedule %s", scheduleId)
  1152. resp.WriteHeader(401)
  1153. resp.Write([]byte(`{"success": false}`))
  1154. return
  1155. }
  1156. //log.Printf("Schedule: %#v", schedule)
  1157. if schedule.Environment == "cloud" {
  1158. log.Printf("[INFO] Should STOP a cloud schedule for workflow %s with schedule ID %s", fileId, scheduleId)
  1159. org, err := shuffle.GetOrg(ctx, user.ActiveOrg.Id)
  1160. if err != nil {
  1161. log.Printf("Failed finding org %s: %s", org.Id, err)
  1162. return
  1163. }
  1164. // 1. Send request to cloud
  1165. // 2. Remove schedule if success
  1166. action := shuffle.CloudSyncJob{
  1167. Type: "schedule",
  1168. Action: "stop",
  1169. OrgId: org.Id,
  1170. PrimaryItemId: scheduleId,
  1171. SecondaryItem: schedule.Frequency,
  1172. ThirdItem: workflow.ID,
  1173. }
  1174. err = executeCloudAction(action, org.SyncConfig.Apikey)
  1175. if err != nil {
  1176. log.Printf("[WARNING] Failed cloud action STOP schedule: %s", err)
  1177. resp.WriteHeader(401)
  1178. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  1179. return
  1180. } else {
  1181. log.Printf("[INFO] Successfully ran cloud action STOP schedule")
  1182. err = shuffle.DeleteKey(ctx, "schedules", scheduleId)
  1183. if err != nil {
  1184. log.Printf("[WARNING] Failed deleting cloud schedule onprem..")
  1185. resp.WriteHeader(401)
  1186. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed deleting cloud schedule"}`)))
  1187. return
  1188. }
  1189. resp.WriteHeader(200)
  1190. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1191. return
  1192. }
  1193. }
  1194. err = deleteSchedule(ctx, scheduleId)
  1195. if err != nil {
  1196. log.Printf("[WARNING] Failed deleting schedule: %s", err)
  1197. if strings.Contains(err.Error(), "Job not found") {
  1198. resp.WriteHeader(200)
  1199. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1200. } else {
  1201. resp.WriteHeader(401)
  1202. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed stopping schedule"}`)))
  1203. }
  1204. return
  1205. }
  1206. resp.WriteHeader(200)
  1207. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1208. return
  1209. }
  1210. func stopScheduleGCP(resp http.ResponseWriter, request *http.Request) {
  1211. cors := shuffle.HandleCors(resp, request)
  1212. if cors {
  1213. return
  1214. }
  1215. user, err := shuffle.HandleApiAuthentication(resp, request)
  1216. if err != nil {
  1217. log.Printf("Api authentication failed in schedule workflow: %s", err)
  1218. resp.WriteHeader(401)
  1219. resp.Write([]byte(`{"success": false}`))
  1220. return
  1221. }
  1222. location := strings.Split(request.URL.String(), "/")
  1223. var fileId string
  1224. var scheduleId string
  1225. if location[1] == "api" {
  1226. if len(location) <= 6 {
  1227. resp.WriteHeader(401)
  1228. resp.Write([]byte(`{"success": false}`))
  1229. return
  1230. }
  1231. fileId = location[4]
  1232. scheduleId = location[6]
  1233. }
  1234. if len(fileId) != 36 {
  1235. resp.WriteHeader(401)
  1236. resp.Write([]byte(`{"success": false, "reason": "Workflow ID to stop schedule is not valid"}`))
  1237. return
  1238. }
  1239. if len(scheduleId) != 36 {
  1240. resp.WriteHeader(401)
  1241. resp.Write([]byte(`{"success": false, "reason": "Schedule ID not valid"}`))
  1242. return
  1243. }
  1244. ctx := context.Background()
  1245. workflow, err := shuffle.GetWorkflow(ctx, fileId)
  1246. if err != nil {
  1247. log.Printf("Failed getting the workflow locally (stop schedule GCP): %s", err)
  1248. resp.WriteHeader(401)
  1249. resp.Write([]byte(`{"success": false}`))
  1250. return
  1251. }
  1252. // FIXME - have a check for org etc too..
  1253. // FIXME - admin check like this? idk
  1254. if user.Id != workflow.Owner && user.Role != "scheduler" {
  1255. log.Printf("[WARNING] Wrong user (%s) for workflow %s (stop schedule)", user.Username, workflow.ID)
  1256. resp.WriteHeader(401)
  1257. resp.Write([]byte(`{"success": false}`))
  1258. return
  1259. }
  1260. if len(workflow.Actions) == 0 {
  1261. workflow.Actions = []shuffle.Action{}
  1262. }
  1263. if len(workflow.Branches) == 0 {
  1264. workflow.Branches = []shuffle.Branch{}
  1265. }
  1266. if len(workflow.Triggers) == 0 {
  1267. workflow.Triggers = []shuffle.Trigger{}
  1268. }
  1269. if len(workflow.Errors) == 0 {
  1270. workflow.Errors = []string{}
  1271. }
  1272. err = deleteSchedule(ctx, scheduleId)
  1273. if err != nil {
  1274. if strings.Contains(err.Error(), "Job not found") {
  1275. resp.WriteHeader(200)
  1276. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1277. } else {
  1278. resp.WriteHeader(401)
  1279. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed stopping schedule"}`)))
  1280. }
  1281. return
  1282. }
  1283. resp.WriteHeader(200)
  1284. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1285. return
  1286. }
  1287. func deleteKeySchedule(ctx context.Context, id string) error {
  1288. err := shuffle.DeleteKey(ctx, "schedules", id)
  1289. if err != nil {
  1290. return err
  1291. }
  1292. return nil
  1293. }
  1294. func deleteSchedule(ctx context.Context, id string) error {
  1295. log.Printf("[DEBUG] Should stop schedule %s!", id)
  1296. if value, exists := scheduledJobs[id]; exists {
  1297. // Stops the schedule properly
  1298. value.Lock()
  1299. } else {
  1300. // FIXME - allow it to kind of stop anyway?
  1301. if j, ok := cronJobs[id]; ok {
  1302. err := CronScheduler.RemoveByID(j)
  1303. if err != nil {
  1304. log.Printf("[ERROR] Failed to remove the scheduler %s", err)
  1305. return err
  1306. }
  1307. } else {
  1308. // Just stop and delete anyway if not in memory
  1309. deleteKeySchedule(ctx, id)
  1310. return errors.New("Can't find the schedule.")
  1311. }
  1312. }
  1313. err := deleteKeySchedule(ctx, id)
  1314. if err != nil {
  1315. log.Printf("[ERROR] Failed to stop schedule in db %s: %s", id, err)
  1316. return err
  1317. }
  1318. return nil
  1319. }
  1320. func scheduleWorkflow(resp http.ResponseWriter, request *http.Request) {
  1321. cors := shuffle.HandleCors(resp, request)
  1322. if cors {
  1323. return
  1324. }
  1325. user, err := shuffle.HandleApiAuthentication(resp, request)
  1326. if err != nil {
  1327. log.Printf("[WARNING] Api authentication failed in schedule workflow: %s", err)
  1328. resp.WriteHeader(401)
  1329. resp.Write([]byte(`{"success": false}`))
  1330. return
  1331. }
  1332. if user.Role == "org-reader" {
  1333. log.Printf("[WARNING] Org-reader doesn't have access to schedule workflow: %s (%s)", user.Username, user.Id)
  1334. resp.WriteHeader(401)
  1335. resp.Write([]byte(`{"success": false, "reason": "Read only user"}`))
  1336. return
  1337. }
  1338. location := strings.Split(request.URL.String(), "/")
  1339. var fileId string
  1340. if location[1] == "api" {
  1341. if len(location) <= 4 {
  1342. resp.WriteHeader(401)
  1343. resp.Write([]byte(`{"success": false}`))
  1344. return
  1345. }
  1346. fileId = location[4]
  1347. }
  1348. if len(fileId) != 36 {
  1349. resp.WriteHeader(401)
  1350. resp.Write([]byte(`{"success": false, "reason": "Workflow ID to start schedule is not valid"}`))
  1351. return
  1352. }
  1353. ctx := context.Background()
  1354. workflow, err := shuffle.GetWorkflow(ctx, fileId)
  1355. if err != nil {
  1356. log.Printf("[WARNING] Failed getting the workflow locally (schedule workflow): %s", err)
  1357. resp.WriteHeader(401)
  1358. resp.Write([]byte(`{"success": false}`))
  1359. return
  1360. }
  1361. if user.Id != workflow.Owner || len(user.Id) == 0 {
  1362. if workflow.OrgId == user.ActiveOrg.Id {
  1363. log.Printf("[INFO] User %s is deleting workflow %s as admin. Owner: %s", user.Username, workflow.ID, workflow.Owner)
  1364. } else {
  1365. log.Printf("[WARNING] Wrong user (%s) for workflow %s (schedule start). Owner: %s", user.Username, workflow.ID, workflow.Owner)
  1366. resp.WriteHeader(401)
  1367. resp.Write([]byte(`{"success": false}`))
  1368. return
  1369. }
  1370. }
  1371. if len(workflow.Actions) == 0 {
  1372. workflow.Actions = []shuffle.Action{}
  1373. }
  1374. if len(workflow.Branches) == 0 {
  1375. workflow.Branches = []shuffle.Branch{}
  1376. }
  1377. if len(workflow.Triggers) == 0 {
  1378. workflow.Triggers = []shuffle.Trigger{}
  1379. }
  1380. if len(workflow.Errors) == 0 {
  1381. workflow.Errors = []string{}
  1382. }
  1383. body, err := ioutil.ReadAll(request.Body)
  1384. if err != nil {
  1385. log.Printf("Failed hook unmarshaling: %s", err)
  1386. resp.WriteHeader(401)
  1387. resp.Write([]byte(`{"success": false}`))
  1388. return
  1389. }
  1390. var schedule shuffle.Schedule
  1391. err = json.Unmarshal(body, &schedule)
  1392. if err != nil {
  1393. log.Printf("Failed schedule POST unmarshaling: %s", err)
  1394. resp.WriteHeader(401)
  1395. resp.Write([]byte(`{"success": false}`))
  1396. return
  1397. }
  1398. // Finds the startnode for the specific schedule
  1399. startNode := ""
  1400. if schedule.Start != "" {
  1401. startNode = schedule.Start
  1402. } else {
  1403. for _, branch := range workflow.Branches {
  1404. if branch.SourceID == schedule.Id {
  1405. startNode = branch.DestinationID
  1406. }
  1407. }
  1408. if startNode == "" {
  1409. startNode = workflow.Start
  1410. }
  1411. }
  1412. //log.Printf("Startnode: %s", startNode)
  1413. if len(schedule.Id) != 36 {
  1414. log.Printf("ID length is not 36 for schedule: %s", err)
  1415. resp.WriteHeader(http.StatusInternalServerError)
  1416. resp.Write([]byte(`{"success": false, "reason": "Invalid data"}`))
  1417. return
  1418. }
  1419. if len(schedule.Name) == 0 {
  1420. log.Printf("Empty name.")
  1421. resp.WriteHeader(401)
  1422. resp.Write([]byte(`{"success": false, "reason": "Schedule name can't be empty"}`))
  1423. return
  1424. }
  1425. if len(schedule.Frequency) == 0 {
  1426. log.Printf("Empty frequency.")
  1427. resp.WriteHeader(401)
  1428. resp.Write([]byte(`{"success": false, "reason": "Frequency can't be empty"}`))
  1429. return
  1430. }
  1431. scheduleArg, err := json.Marshal(schedule.ExecutionArgument)
  1432. if err != nil {
  1433. log.Printf("Failed scheduleArg marshal: %s", err)
  1434. resp.WriteHeader(http.StatusInternalServerError)
  1435. resp.Write([]byte(`{"success": false}`))
  1436. return
  1437. }
  1438. // Clean up garbage. This might be wrong in some very specific use-cases
  1439. parsedBody := string(scheduleArg)
  1440. parsedBody = strings.Replace(parsedBody, "\\\"", "\"", -1)
  1441. if len(parsedBody) > 0 {
  1442. if string(parsedBody[0]) == `"` && string(parsedBody[len(parsedBody)-1]) == "\"" {
  1443. parsedBody = parsedBody[1 : len(parsedBody)-1]
  1444. }
  1445. }
  1446. if schedule.Environment == "cloud" {
  1447. log.Printf("[INFO] Should START a cloud schedule for workflow %s with schedule ID %s", workflow.ID, schedule.Id)
  1448. org, err := shuffle.GetOrg(ctx, user.ActiveOrg.Id)
  1449. if err != nil {
  1450. log.Printf("Failed finding org %s: %s", org.Id, err)
  1451. return
  1452. }
  1453. // 1 = scheduleId
  1454. // 2 = schedule (cron, frequency)
  1455. // 3 = workflowId
  1456. // 4 = execution argument
  1457. action := shuffle.CloudSyncJob{
  1458. Type: "schedule",
  1459. Action: "start",
  1460. OrgId: org.Id,
  1461. PrimaryItemId: schedule.Id,
  1462. SecondaryItem: schedule.Frequency,
  1463. ThirdItem: workflow.ID,
  1464. FourthItem: schedule.ExecutionArgument,
  1465. FifthItem: startNode,
  1466. }
  1467. timeNow := int64(time.Now().Unix())
  1468. newSchedule := shuffle.ScheduleOld{
  1469. Id: schedule.Id,
  1470. WorkflowId: workflow.ID,
  1471. StartNode: startNode,
  1472. Argument: string(schedule.ExecutionArgument),
  1473. WrappedArgument: parsedBody,
  1474. CreationTime: timeNow,
  1475. LastModificationtime: timeNow,
  1476. LastRuntime: timeNow,
  1477. Org: org.Id,
  1478. Frequency: schedule.Frequency,
  1479. Environment: "cloud",
  1480. }
  1481. err = shuffle.SetSchedule(ctx, newSchedule)
  1482. if err != nil {
  1483. log.Printf("[ERROR] Failed setting cloud schedule: %s", err)
  1484. resp.WriteHeader(400)
  1485. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  1486. return
  1487. }
  1488. //log.Printf("Starting Cloud schedule Action: %#v", action)
  1489. err = executeCloudAction(action, org.SyncConfig.Apikey)
  1490. if err != nil {
  1491. log.Printf("[WARNING] Failed cloud action START schedule: %s", err)
  1492. resp.WriteHeader(401)
  1493. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  1494. return
  1495. } else {
  1496. log.Printf("[INFO] Successfully set up cloud action schedule")
  1497. resp.WriteHeader(200)
  1498. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Done"}`)))
  1499. return
  1500. }
  1501. }
  1502. //log.Printf("Schedulearg: %s", parsedBody)
  1503. err = createSchedule(
  1504. ctx,
  1505. schedule.Id,
  1506. workflow.ID,
  1507. schedule.Name,
  1508. startNode,
  1509. schedule.Frequency,
  1510. user.ActiveOrg.Id,
  1511. []byte(parsedBody),
  1512. )
  1513. // FIXME - real error message lol
  1514. if err != nil {
  1515. log.Printf("[ERROR] Failed creating schedule: %s", err)
  1516. resp.WriteHeader(400)
  1517. if schedule.Environment == "cloud" {
  1518. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Invalid argument. For cloud schedules, try cron */15 * * * *"}`)))
  1519. } else {
  1520. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Invalid argument. For onprem schedules, try 60 for 60 seconds"}`)))
  1521. }
  1522. return
  1523. }
  1524. //workflow.Schedules = append(workflow.Schedules, schedule)
  1525. err = shuffle.SetWorkflow(ctx, *workflow, workflow.ID)
  1526. if err != nil {
  1527. log.Printf("[ERROR] Failed setting workflow for schedule: %s", err)
  1528. resp.WriteHeader(400)
  1529. resp.Write([]byte(`{"success": false}`))
  1530. return
  1531. }
  1532. resp.WriteHeader(200)
  1533. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1534. return
  1535. }
  1536. func setExampleresult(ctx context.Context, result shuffle.AppExecutionExample) error {
  1537. // FIXME: Reintroduce this for stats
  1538. //key := datastore.NameKey("example_result", result.ExampleId, nil)
  1539. //// New struct, to not add body, author etc
  1540. //if _, err := dbclient.Put(ctx, key, &result); err != nil {
  1541. // log.Printf("Error adding workflow: %s", err)
  1542. // return err
  1543. //}
  1544. return nil
  1545. }
  1546. func getWorkflowApps(resp http.ResponseWriter, request *http.Request) {
  1547. cors := shuffle.HandleCors(resp, request)
  1548. if cors {
  1549. return
  1550. }
  1551. ctx := context.Background()
  1552. user, userErr := shuffle.HandleApiAuthentication(resp, request)
  1553. if userErr != nil {
  1554. log.Printf("[WARNING] Api authentication failed in get all apps - this does NOT require auth in cloud.: %s", userErr)
  1555. resp.WriteHeader(401)
  1556. resp.Write([]byte(`{"success": false}`))
  1557. return
  1558. }
  1559. workflowapps, err := shuffle.GetAllWorkflowApps(ctx, 1000, 0)
  1560. if err != nil {
  1561. log.Printf("{WARNING] Failed getting apps (getworkflowapps): %s", err)
  1562. resp.WriteHeader(400)
  1563. resp.Write([]byte(`{"success": false}`))
  1564. return
  1565. }
  1566. newapps := workflowapps
  1567. if len(user.PrivateApps) > 0 {
  1568. found := false
  1569. for _, item := range user.PrivateApps {
  1570. for _, app := range newapps {
  1571. if item.ID == app.ID {
  1572. found = true
  1573. break
  1574. }
  1575. }
  1576. if !found {
  1577. newapps = append(newapps, item)
  1578. }
  1579. }
  1580. }
  1581. // Double unmarshal because of user apps
  1582. newbody, err := json.Marshal(newapps)
  1583. if err != nil {
  1584. log.Printf("[ERROR] Failed unmarshalling all newapps: %s", err)
  1585. resp.WriteHeader(401)
  1586. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow apps"}`)))
  1587. return
  1588. }
  1589. resp.WriteHeader(200)
  1590. resp.Write(newbody)
  1591. }
  1592. func handleGetfile(resp http.ResponseWriter, request *http.Request) ([]byte, error) {
  1593. // Upload file here first
  1594. request.ParseMultipartForm(32 << 20)
  1595. file, _, err := request.FormFile("file")
  1596. if err != nil {
  1597. log.Printf("Error parsing: %s", err)
  1598. return []byte{}, err
  1599. }
  1600. defer file.Close()
  1601. buf := bytes.NewBuffer(nil)
  1602. if _, err := io.Copy(buf, file); err != nil {
  1603. return []byte{}, err
  1604. }
  1605. return buf.Bytes(), nil
  1606. }
  1607. // Basically a search for apps that aren't activated yet
  1608. func getSpecificApps(resp http.ResponseWriter, request *http.Request) {
  1609. cors := shuffle.HandleCors(resp, request)
  1610. if cors {
  1611. return
  1612. }
  1613. // Just need to be logged in
  1614. // FIXME - should have some permissions?
  1615. _, err := shuffle.HandleApiAuthentication(resp, request)
  1616. if err != nil {
  1617. log.Printf("Api authentication failed in set new app: %s", err)
  1618. resp.WriteHeader(401)
  1619. resp.Write([]byte(`{"success": false}`))
  1620. return
  1621. }
  1622. body, err := ioutil.ReadAll(request.Body)
  1623. if err != nil {
  1624. log.Printf("Error with body read: %s", err)
  1625. resp.WriteHeader(401)
  1626. resp.Write([]byte(`{"success": false}`))
  1627. return
  1628. }
  1629. type tmpStruct struct {
  1630. Search string `json:"search"`
  1631. }
  1632. var tmpBody tmpStruct
  1633. err = json.Unmarshal(body, &tmpBody)
  1634. if err != nil {
  1635. log.Printf("Error with unmarshal tmpBody: %s", err)
  1636. resp.WriteHeader(401)
  1637. resp.Write([]byte(`{"success": false}`))
  1638. return
  1639. }
  1640. // FIXME - continue the search here with github repos etc.
  1641. // Caching might be smart :D
  1642. ctx := context.Background()
  1643. workflowapps, err := shuffle.GetAllWorkflowApps(ctx, 1000, 0)
  1644. if err != nil {
  1645. log.Printf("Error: Failed getting workflowapps: %s", err)
  1646. resp.WriteHeader(401)
  1647. resp.Write([]byte(`{"success": false}`))
  1648. return
  1649. }
  1650. returnValues := []shuffle.WorkflowApp{}
  1651. search := strings.ToLower(tmpBody.Search)
  1652. for _, app := range workflowapps {
  1653. if !app.Activated && app.Generated {
  1654. // This might be heavy with A LOT
  1655. // Not too worried with todays tech tbh..
  1656. appName := strings.ToLower(app.Name)
  1657. appDesc := strings.ToLower(app.Description)
  1658. if strings.Contains(appName, search) || strings.Contains(appDesc, search) {
  1659. //log.Printf("Name: %s, Generated: %s, Activated: %s", app.Name, strconv.FormatBool(app.Generated), strconv.FormatBool(app.Activated))
  1660. returnValues = append(returnValues, app)
  1661. }
  1662. }
  1663. }
  1664. newbody, err := json.Marshal(returnValues)
  1665. if err != nil {
  1666. resp.WriteHeader(401)
  1667. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow executions"}`)))
  1668. return
  1669. }
  1670. returnData := fmt.Sprintf(`{"success": true, "reason": %s}`, string(newbody))
  1671. resp.WriteHeader(200)
  1672. resp.Write([]byte(returnData))
  1673. }
  1674. func validateAppInput(resp http.ResponseWriter, request *http.Request) {
  1675. cors := shuffle.HandleCors(resp, request)
  1676. if cors {
  1677. return
  1678. }
  1679. // Just need to be logged in
  1680. // FIXME - should have some permissions?
  1681. user, err := shuffle.HandleApiAuthentication(resp, request)
  1682. if err != nil {
  1683. log.Printf("Api authentication failed in set new app: %s", err)
  1684. resp.WriteHeader(401)
  1685. resp.Write([]byte(`{"success": false}`))
  1686. return
  1687. }
  1688. if user.Role == "org-reader" {
  1689. log.Printf("[WARNING] Org-reader doesn't have access to delete apps: %s (%s)", user.Username, user.Id)
  1690. resp.WriteHeader(401)
  1691. resp.Write([]byte(`{"success": false, "reason": "Read only user"}`))
  1692. return
  1693. }
  1694. filebytes, err := handleGetfile(resp, request)
  1695. if err != nil {
  1696. resp.WriteHeader(401)
  1697. resp.Write([]byte(`{"success": false}`))
  1698. return
  1699. }
  1700. kind, err := filetype.Match(filebytes)
  1701. if err != nil {
  1702. log.Printf("Failed parsing filetype")
  1703. resp.WriteHeader(401)
  1704. resp.Write([]byte(`{"success": false}`))
  1705. return
  1706. }
  1707. //fmt.Printf("File type: %s. MIME: %s\n", kind.Extension, kind.MIME.Value)
  1708. if kind == filetype.Unknown {
  1709. log.Println("Unknown file type")
  1710. resp.WriteHeader(401)
  1711. resp.Write([]byte(`{"success": false}`))
  1712. return
  1713. }
  1714. if kind.MIME.Value != "application/zip" {
  1715. log.Println("Not zip, can't unzip")
  1716. resp.WriteHeader(401)
  1717. resp.Write([]byte(`{"success": false}`))
  1718. return
  1719. }
  1720. // FIXME - validate folderstructure, Dockerfile, python scripts, api.yaml, requirements.txt, src/
  1721. resp.WriteHeader(200)
  1722. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1723. }
  1724. func handleSingleAppHotloadRequest(resp http.ResponseWriter, request *http.Request) {
  1725. cors := shuffle.HandleCors(resp, request)
  1726. if cors {
  1727. return
  1728. }
  1729. ctx := context.Background()
  1730. cacheKey := fmt.Sprintf("workflowapps-sorted-1000")
  1731. shuffle.DeleteCache(ctx, cacheKey)
  1732. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  1733. shuffle.DeleteCache(ctx, cacheKey)
  1734. cacheKey = fmt.Sprintf("workflowapps-sorted-0")
  1735. shuffle.DeleteCache(ctx, cacheKey)
  1736. // Just need to be logged in
  1737. // FIXME - should have some permissions?
  1738. user, err := shuffle.HandleApiAuthentication(resp, request)
  1739. if err != nil {
  1740. log.Printf("Api authentication failed in app hotload: %s", err)
  1741. resp.WriteHeader(401)
  1742. resp.Write([]byte(`{"success": false}`))
  1743. return
  1744. }
  1745. if user.Role != "admin" {
  1746. resp.WriteHeader(401)
  1747. resp.Write([]byte(`{"success": false, "reason": "Must be admin to hotload apps"}`))
  1748. return
  1749. }
  1750. location := os.Getenv("SHUFFLE_APP_HOTLOAD_FOLDER")
  1751. if len(location) == 0 {
  1752. location = "./shuffle-apps"
  1753. }
  1754. if len(location) == 0 {
  1755. resp.WriteHeader(500)
  1756. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "SHUFFLE_APP_HOTLOAD_FOLDER not specified in .env"}`)))
  1757. return
  1758. }
  1759. requestUrlFields := strings.Split(request.URL.String(), "/")
  1760. var appName string
  1761. if requestUrlFields[1] == "api" {
  1762. if len(requestUrlFields) <= 4 {
  1763. resp.WriteHeader(401)
  1764. resp.Write([]byte(`{"success": false}`))
  1765. return
  1766. }
  1767. appName = requestUrlFields[4]
  1768. if strings.Contains(appName, "?") {
  1769. appName = strings.Split(appName, "?")[0]
  1770. }
  1771. }
  1772. location = location + "/" + appName
  1773. log.Printf("[INFO] Starting hotloading from %s", location)
  1774. err = handleAppHotload(ctx, location, true)
  1775. if err != nil {
  1776. log.Printf("[WARNING] Failed app hotload: %s", err)
  1777. resp.WriteHeader(500)
  1778. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  1779. return
  1780. }
  1781. cacheKey = fmt.Sprintf("workflowapps-sorted-100")
  1782. shuffle.DeleteCache(ctx, cacheKey)
  1783. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  1784. shuffle.DeleteCache(ctx, cacheKey)
  1785. cacheKey = fmt.Sprintf("workflowapps-sorted-1000")
  1786. shuffle.DeleteCache(ctx, cacheKey)
  1787. resp.WriteHeader(200)
  1788. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1789. }
  1790. func handleAppHotloadRequest(resp http.ResponseWriter, request *http.Request) {
  1791. cors := shuffle.HandleCors(resp, request)
  1792. if cors {
  1793. return
  1794. }
  1795. ctx := context.Background()
  1796. cacheKey := fmt.Sprintf("workflowapps-sorted-1000")
  1797. shuffle.DeleteCache(ctx, cacheKey)
  1798. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  1799. shuffle.DeleteCache(ctx, cacheKey)
  1800. cacheKey = fmt.Sprintf("workflowapps-sorted-0")
  1801. shuffle.DeleteCache(ctx, cacheKey)
  1802. // Just need to be logged in
  1803. // FIXME - should have some permissions?
  1804. user, err := shuffle.HandleApiAuthentication(resp, request)
  1805. if err != nil {
  1806. log.Printf("Api authentication failed in app hotload: %s", err)
  1807. resp.WriteHeader(401)
  1808. resp.Write([]byte(`{"success": false}`))
  1809. return
  1810. }
  1811. if user.Role != "admin" {
  1812. resp.WriteHeader(401)
  1813. resp.Write([]byte(`{"success": false, "reason": "Must be admin to hotload apps"}`))
  1814. return
  1815. }
  1816. location := os.Getenv("SHUFFLE_APP_HOTLOAD_FOLDER")
  1817. if len(location) == 0 {
  1818. location = "./shuffle-apps"
  1819. }
  1820. if len(location) == 0 {
  1821. resp.WriteHeader(500)
  1822. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "SHUFFLE_APP_HOTLOAD_FOLDER not specified in .env"}`)))
  1823. return
  1824. }
  1825. log.Printf("[INFO] Starting hotloading from %s", location)
  1826. err = handleAppHotload(ctx, location, true)
  1827. if err != nil {
  1828. log.Printf("[WARNING] Failed app hotload: %s", err)
  1829. resp.WriteHeader(500)
  1830. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  1831. return
  1832. }
  1833. cacheKey = fmt.Sprintf("workflowapps-sorted-100")
  1834. shuffle.DeleteCache(ctx, cacheKey)
  1835. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  1836. shuffle.DeleteCache(ctx, cacheKey)
  1837. cacheKey = fmt.Sprintf("workflowapps-sorted-1000")
  1838. shuffle.DeleteCache(ctx, cacheKey)
  1839. resp.WriteHeader(200)
  1840. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  1841. }
  1842. func iterateOpenApiGithub(fs billy.Filesystem, dir []os.FileInfo, extra string, onlyname string) error {
  1843. ctx := context.Background()
  1844. workflowapps, err := shuffle.GetAllWorkflowApps(ctx, 1000, 0)
  1845. appCounter := 0
  1846. if err != nil {
  1847. log.Printf("[WARNING] Failed to get existing generated apps for OpenAPI verification: %s", err)
  1848. }
  1849. for _, file := range dir {
  1850. if len(onlyname) > 0 && file.Name() != onlyname {
  1851. continue
  1852. }
  1853. // Folder?
  1854. switch mode := file.Mode(); {
  1855. case mode.IsDir():
  1856. tmpExtra := fmt.Sprintf("%s%s/", extra, file.Name())
  1857. //log.Printf("TMPEXTRA: %s", tmpExtra)
  1858. dir, err := fs.ReadDir(tmpExtra)
  1859. if err != nil {
  1860. log.Printf("Failed reading dir in openapi: %s", err)
  1861. continue
  1862. }
  1863. // Go routine? Hmm, this can be super quick I guess
  1864. err = iterateOpenApiGithub(fs, dir, tmpExtra, "")
  1865. if err != nil {
  1866. log.Printf("Failed recursion in openapi: %s", err)
  1867. continue
  1868. //break
  1869. }
  1870. case mode.IsRegular():
  1871. // Check the file
  1872. filename := file.Name()
  1873. filteredNames := []string{"FUNDING.yml"}
  1874. if strings.Contains(filename, "yaml") || strings.Contains(filename, "yml") {
  1875. contOuter := false
  1876. for _, name := range filteredNames {
  1877. if filename == name {
  1878. contOuter = true
  1879. break
  1880. }
  1881. }
  1882. if contOuter {
  1883. //log.Printf("Skipping %s", filename)
  1884. continue
  1885. }
  1886. //log.Printf("File: %s", filename)
  1887. //log.Printf("Found file: %s", filename)
  1888. //log.Printf("OpenAPI app: %s", filename)
  1889. tmpExtra := fmt.Sprintf("%s%s/", extra, file.Name())
  1890. fileReader, err := fs.Open(tmpExtra)
  1891. if err != nil {
  1892. continue
  1893. }
  1894. readFile, err := ioutil.ReadAll(fileReader)
  1895. if err != nil {
  1896. log.Printf("[WARNING] Filereader error yaml for %s: %s", filename, err)
  1897. continue
  1898. }
  1899. // 1. This parses OpenAPI v2 to v3 etc, for use.
  1900. parsedOpenApi, err := handleSwaggerValidation(readFile)
  1901. if err != nil {
  1902. log.Printf("[WARNING] Validation error for %s: %s", filename, err)
  1903. continue
  1904. }
  1905. // 2. With parsedOpenApi.ID:
  1906. //http://localhost:3000/apps/new?id=06b1376f77b0563a3b1747a3a1253e88
  1907. // 3. Load this as a "standby" app
  1908. // FIXME: This should be a function ROFL
  1909. swagger, err := openapi3.NewSwaggerLoader().LoadSwaggerFromData([]byte(parsedOpenApi.Body))
  1910. if err != nil {
  1911. log.Printf("[WARNING] Swagger validation error in loop (%s): %s. Continuing.", filename, err)
  1912. continue
  1913. }
  1914. if strings.Contains(swagger.Info.Title, " ") {
  1915. strings.Replace(swagger.Info.Title, " ", "", -1)
  1916. }
  1917. //log.Printf("Should generate yaml")
  1918. swagger, api, _, err := shuffle.GenerateYaml(swagger, parsedOpenApi.ID)
  1919. if err != nil {
  1920. log.Printf("[WARNING] Failed building and generating yaml in loop (2) (%s): %s. Continuing.", filename, err)
  1921. continue
  1922. }
  1923. // FIXME: Configure user?
  1924. api.Owner = ""
  1925. api.ID = parsedOpenApi.ID
  1926. api.IsValid = true
  1927. api.Generated = true
  1928. api.Activated = false
  1929. found := false
  1930. for _, app := range workflowapps {
  1931. if app.ID == api.ID {
  1932. found = true
  1933. break
  1934. } else if app.Name == api.Name && app.AppVersion == api.AppVersion {
  1935. found = true
  1936. break
  1937. }
  1938. }
  1939. if !found {
  1940. err = shuffle.SetWorkflowAppDatastore(ctx, api, api.ID)
  1941. if err != nil {
  1942. log.Printf("[WARNING] Failed setting workflowapp %s (%s) in loop: %s", api.Name, api.ID, err)
  1943. continue
  1944. } else {
  1945. appCounter += 1
  1946. log.Printf("[INFO] Added %s:%s to the database from OpenAPI repo", api.Name, api.AppVersion)
  1947. // Set OpenAPI datastore
  1948. err = shuffle.SetOpenApiDatastore(ctx, parsedOpenApi.ID, parsedOpenApi)
  1949. if err != nil {
  1950. log.Printf("Failed uploading openapi to datastore in loop: %s", err)
  1951. continue
  1952. }
  1953. cacheKey := fmt.Sprintf("workflowapps-sorted-100")
  1954. shuffle.DeleteCache(ctx, cacheKey)
  1955. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  1956. shuffle.DeleteCache(ctx, cacheKey)
  1957. cacheKey = fmt.Sprintf("workflowapps-sorted-1000")
  1958. shuffle.DeleteCache(ctx, cacheKey)
  1959. }
  1960. } else {
  1961. //log.Printf("Skipped upload of %s (%s)", api.Name, api.ID)
  1962. }
  1963. //return nil
  1964. }
  1965. }
  1966. }
  1967. if appCounter > 0 {
  1968. //log.Printf("Preloaded %d OpenApi apps in folder %s!", appCounter, extra)
  1969. }
  1970. return nil
  1971. }
  1972. func setNewWorkflowApp(resp http.ResponseWriter, request *http.Request) {
  1973. cors := shuffle.HandleCors(resp, request)
  1974. if cors {
  1975. return
  1976. }
  1977. // Just need to be logged in
  1978. user, err := shuffle.HandleApiAuthentication(resp, request)
  1979. if err != nil {
  1980. log.Printf("Api authentication failed in set new app: %s", err)
  1981. resp.WriteHeader(401)
  1982. resp.Write([]byte(`{"success": false}`))
  1983. return
  1984. }
  1985. if user.Role == "org-reader" {
  1986. log.Printf("[WARNING] Org-reader doesn't have access to set new workflowapp: %s (%s)", user.Username, user.Id)
  1987. resp.WriteHeader(401)
  1988. resp.Write([]byte(`{"success": false, "reason": "Read only user"}`))
  1989. return
  1990. }
  1991. body, err := ioutil.ReadAll(request.Body)
  1992. if err != nil {
  1993. log.Printf("Error with body read: %s", err)
  1994. resp.WriteHeader(401)
  1995. resp.Write([]byte(`{"success": false}`))
  1996. return
  1997. }
  1998. var workflowapp shuffle.WorkflowApp
  1999. err = json.Unmarshal(body, &workflowapp)
  2000. if err != nil {
  2001. log.Printf("Failed unmarshaling: %s", err)
  2002. resp.WriteHeader(401)
  2003. resp.Write([]byte(`{"success": false}`))
  2004. return
  2005. }
  2006. ctx := context.Background()
  2007. allapps, err := shuffle.GetAllWorkflowApps(ctx, 1000, 0)
  2008. if err != nil {
  2009. log.Printf("Failed getting apps to verify: %s", err)
  2010. resp.WriteHeader(401)
  2011. resp.Write([]byte(`{"success": false}`))
  2012. return
  2013. }
  2014. appfound := false
  2015. for _, app := range allapps {
  2016. if app.Name == workflowapp.Name && app.AppVersion == workflowapp.AppVersion {
  2017. log.Printf("App upload for %s:%s already exists.", app.Name, app.AppVersion)
  2018. appfound = true
  2019. break
  2020. }
  2021. }
  2022. if appfound {
  2023. log.Printf("App %s:%s already exists. Bump the version.", workflowapp.Name, workflowapp.AppVersion)
  2024. resp.WriteHeader(409)
  2025. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "App %s:%s already exists."}`, workflowapp.Name, workflowapp.AppVersion)))
  2026. return
  2027. }
  2028. err = shuffle.CheckWorkflowApp(workflowapp)
  2029. if err != nil {
  2030. log.Printf("%s for app %s:%s", err, workflowapp.Name, workflowapp.AppVersion)
  2031. resp.WriteHeader(401)
  2032. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s for app %s:%s"}`, err, workflowapp.Name, workflowapp.AppVersion)))
  2033. return
  2034. }
  2035. //if workflowapp.Environment == "" {
  2036. // workflowapp.Environment = baseEnvironment
  2037. //}
  2038. // Fixes (appends) authentication parameters if they're required
  2039. if workflowapp.Authentication.Required {
  2040. //log.Printf("[INFO] Checking authentication fields and appending for %s!", workflowapp.Name)
  2041. // FIXME:
  2042. // Might require reflection into the python code to append the fields as well
  2043. for index, action := range workflowapp.Actions {
  2044. if action.AuthNotRequired {
  2045. log.Printf("[WARNING] Skipping auth setup for: %s", action.Name)
  2046. continue
  2047. }
  2048. // 1. Check if authentication params exists at all
  2049. // 2. Check if they're present in the action
  2050. // 3. Add them IF they DONT exist
  2051. // 4. Fix python code with reflection (FIXME)
  2052. appendParams := []shuffle.WorkflowAppActionParameter{}
  2053. for _, fieldname := range workflowapp.Authentication.Parameters {
  2054. found := false
  2055. for index, param := range action.Parameters {
  2056. if param.Name == fieldname.Name {
  2057. found = true
  2058. action.Parameters[index].Configuration = true
  2059. //log.Printf("Set config to true for field %s!", param.Name)
  2060. break
  2061. }
  2062. }
  2063. if !found {
  2064. appendParams = append(appendParams, shuffle.WorkflowAppActionParameter{
  2065. Name: fieldname.Name,
  2066. Description: fieldname.Description,
  2067. Example: fieldname.Example,
  2068. Required: fieldname.Required,
  2069. Configuration: true,
  2070. Schema: fieldname.Schema,
  2071. })
  2072. }
  2073. }
  2074. if len(appendParams) > 0 {
  2075. //log.Printf("[AUTH] Appending %d params to the START of %s", len(appendParams), action.Name)
  2076. workflowapp.Actions[index].Parameters = append(appendParams, workflowapp.Actions[index].Parameters...)
  2077. }
  2078. }
  2079. }
  2080. workflowapp.ID = uuid.NewV4().String()
  2081. workflowapp.IsValid = true
  2082. workflowapp.Generated = false
  2083. workflowapp.Activated = true
  2084. if !shuffle.ArrayContains(workflowapp.Contributors, user.Id) {
  2085. workflowapp.Contributors = append(workflowapp.Contributors, user.Id)
  2086. }
  2087. shuffle.SetAppRevision(ctx, workflowapp)
  2088. err = shuffle.SetWorkflowAppDatastore(ctx, workflowapp, workflowapp.ID)
  2089. if err != nil {
  2090. log.Printf("[WARNING] Failed setting workflowapp: %s", err)
  2091. resp.WriteHeader(401)
  2092. resp.Write([]byte(`{"success": false}`))
  2093. return
  2094. } else {
  2095. log.Printf("[INFO] Added %s:%s to the database", workflowapp.Name, workflowapp.AppVersion)
  2096. }
  2097. cacheKey := fmt.Sprintf("workflowapps-sorted-100")
  2098. shuffle.DeleteCache(ctx, cacheKey)
  2099. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  2100. shuffle.DeleteCache(ctx, cacheKey)
  2101. cacheKey = fmt.Sprintf("workflowapps-sorted-1000")
  2102. shuffle.DeleteCache(ctx, cacheKey)
  2103. resp.WriteHeader(200)
  2104. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  2105. }
  2106. func handleUserInput(trigger shuffle.Trigger, organizationId string, workflowId string, referenceExecution string) error {
  2107. // E.g. check email
  2108. sms := ""
  2109. email := ""
  2110. subflow := ""
  2111. triggerType := ""
  2112. triggerInformation := ""
  2113. for _, item := range trigger.Parameters {
  2114. if item.Name == "alertinfo" {
  2115. triggerInformation = item.Value
  2116. } else if item.Name == "type" {
  2117. triggerType = item.Value
  2118. } else if item.Name == "email" {
  2119. email = item.Value
  2120. } else if item.Name == "sms" {
  2121. sms = item.Value
  2122. } else if item.Name == "subflow" {
  2123. subflow = item.Value
  2124. }
  2125. }
  2126. _ = subflow
  2127. if len(triggerType) == 0 {
  2128. log.Printf("[WARNING] No type specified for user input node")
  2129. //return errors.New("No type specified for user input node")
  2130. }
  2131. // FIXME: This is not the right time to send them, BUT it's well served for testing. Save -> send email / sms
  2132. ctx := context.Background()
  2133. startNode := trigger.ID
  2134. if strings.Contains(triggerType, "email") {
  2135. action := shuffle.CloudSyncJob{
  2136. Type: "user_input",
  2137. Action: "send_email",
  2138. OrgId: organizationId,
  2139. PrimaryItemId: workflowId,
  2140. SecondaryItem: startNode,
  2141. ThirdItem: triggerInformation,
  2142. FourthItem: email,
  2143. FifthItem: referenceExecution,
  2144. }
  2145. org, err := shuffle.GetOrg(ctx, organizationId)
  2146. if err != nil {
  2147. log.Printf("Failed email send to cloud (1): %s", err)
  2148. return err
  2149. }
  2150. err = executeCloudAction(action, org.SyncConfig.Apikey)
  2151. if err != nil {
  2152. log.Printf("Failed email send to cloud (2): %s", err)
  2153. return err
  2154. }
  2155. log.Printf("[INFO] Should send email to %s during execution.", email)
  2156. }
  2157. if strings.Contains(triggerType, "sms") {
  2158. action := shuffle.CloudSyncJob{
  2159. Type: "user_input",
  2160. Action: "send_sms",
  2161. OrgId: organizationId,
  2162. PrimaryItemId: workflowId,
  2163. SecondaryItem: startNode,
  2164. ThirdItem: triggerInformation,
  2165. FourthItem: sms,
  2166. FifthItem: referenceExecution,
  2167. }
  2168. org, err := shuffle.GetOrg(ctx, organizationId)
  2169. if err != nil {
  2170. log.Printf("Failed sms send to cloud (3): %s", err)
  2171. return err
  2172. }
  2173. err = executeCloudAction(action, org.SyncConfig.Apikey)
  2174. if err != nil {
  2175. log.Printf("Failed sms send to cloud (4): %s", err)
  2176. return err
  2177. }
  2178. log.Printf("[DEBUG] Should send SMS to %s during execution.", sms)
  2179. }
  2180. if strings.Contains(triggerType, "subflow") {
  2181. log.Printf("[DEBUG] Should run a subflow with the result for user input.")
  2182. }
  2183. return nil
  2184. }
  2185. func executeSingleAction(resp http.ResponseWriter, request *http.Request) {
  2186. cors := shuffle.HandleCors(resp, request)
  2187. if cors {
  2188. return
  2189. }
  2190. ctx := shuffle.GetContext(request)
  2191. user, err := shuffle.HandleApiAuthentication(resp, request)
  2192. if err != nil {
  2193. // Look for org_id query as app may be private
  2194. // No validation is done here, as it's just running the app
  2195. // to find a user
  2196. orgId := request.URL.Query().Get("org_id")
  2197. if len(orgId) > 0 {
  2198. user.ActiveOrg.Id = orgId
  2199. } else {
  2200. executionId := request.URL.Query().Get("execution_id")
  2201. authorization := request.URL.Query().Get("authorization")
  2202. if len(executionId) == 0 || len(authorization) == 0 {
  2203. log.Printf("[WARNING] Bad execution id/auth in single action validate (1): %#v, %#v. Continuing with the 'public' org id", executionId, authorization)
  2204. err := shuffle.ValidateRequestOverload(resp, request)
  2205. if err != nil {
  2206. log.Printf("[INFO] Request overload for IP %s in single action execution", shuffle.GetRequestIp(request))
  2207. resp.WriteHeader(429)
  2208. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Too many requests. Please try again in 30 seconds."}`)))
  2209. return
  2210. }
  2211. user.Username = shuffle.GetRequestIp(request)
  2212. user.ActiveOrg.Name = shuffle.GetRequestIp(request)
  2213. user.ActiveOrg.Id = "public"
  2214. } else {
  2215. // Find the execution
  2216. exec, err := shuffle.GetWorkflowExecution(ctx, executionId)
  2217. if err != nil {
  2218. log.Printf("[WARNING] Bad execution id in single action validate (2): %s", err)
  2219. resp.WriteHeader(401)
  2220. resp.Write([]byte(`{"success": false, "reason": "Bad execution mapping (1)"}`))
  2221. return
  2222. }
  2223. if exec.Authorization != authorization {
  2224. log.Printf("[WARNING] Bad execution auth in single action validate (3): %#v, %#v", exec.Authorization, authorization)
  2225. resp.WriteHeader(403)
  2226. resp.Write([]byte(`{"success": false, "reason": "Bad execution mapping (2)"}`))
  2227. return
  2228. }
  2229. //log.Printf("[INFO] Found org_id from execution: %#v. Executionorg: %#v", exec.OrgId, exec.ExecutionOrg)
  2230. user.ActiveOrg.Id = exec.OrgId
  2231. if len(user.ActiveOrg.Id) == 0 {
  2232. user.ActiveOrg.Id = exec.ExecutionOrg
  2233. }
  2234. user.Username = fmt.Sprintf("org %s", user.ActiveOrg.Id)
  2235. }
  2236. }
  2237. if len(user.ActiveOrg.Id) == 0 {
  2238. resp.WriteHeader(401)
  2239. resp.Write([]byte(`{"success": false, "reason": "No org_id found to map back to"}`))
  2240. return
  2241. }
  2242. }
  2243. location := strings.Split(request.URL.String(), "/")
  2244. var appId string
  2245. if location[1] == "api" {
  2246. if len(location) <= 4 {
  2247. resp.WriteHeader(400)
  2248. resp.Write([]byte(`{"success": false}`))
  2249. return
  2250. }
  2251. appId = location[4]
  2252. }
  2253. //log.Printf("[AUDIT] User Authentication failed in execute SINGLE action - CONTINUING ANYWAY: %s. Found OrgID: %#v", err, user.ActiveOrg.Id)
  2254. log.Printf("[AUDIT] User %s (%s) in org %s (%s) is running SINGLE App run for App ID '%s'", user.Username, user.Id, user.ActiveOrg.Name, user.ActiveOrg.Id, appId)
  2255. body, err := ioutil.ReadAll(request.Body)
  2256. if err != nil {
  2257. log.Printf("[INFO] Failed single execution POST body read: %s", err)
  2258. resp.WriteHeader(400)
  2259. resp.Write([]byte(`{"success": false}`))
  2260. return
  2261. }
  2262. // Look for the query parameter "validation=true" to find the correct action for the app to test
  2263. runValidationAction := false
  2264. query := request.URL.Query()
  2265. validation, ok := query["validation"]
  2266. if ok && len(validation) > 0 && validation[0] == "true" {
  2267. runValidationAction = true
  2268. }
  2269. shouldRerun := false
  2270. rerun, rerunOk := query["rerun"]
  2271. if rerunOk && len(rerun) > 0 && rerun[0] == "true" {
  2272. shouldRerun = true
  2273. }
  2274. decisionId := ""
  2275. decision, decisionOk := query["decision_id"]
  2276. if decisionOk && len(decision) > 0 {
  2277. decisionId = decision[0]
  2278. }
  2279. log.Printf("\n\nACTION TO RUN: %s. Body: %s. Source URL: %s\n\n", appId, string(body), request.URL.String())
  2280. workflowExecution, err := shuffle.PrepareSingleAction(ctx, user, appId, body, runValidationAction, decisionId)
  2281. if appId == "agent_starter" {
  2282. log.Printf("[INFO] Returning early for agent_starter single action execution: %s", workflowExecution.ExecutionId)
  2283. resp.WriteHeader(200)
  2284. resp.Write([]byte(fmt.Sprintf(`{"success": true, "execution_id": "%s", "authorization": "%s"}`, workflowExecution.ExecutionId, workflowExecution.Authorization)))
  2285. return
  2286. }
  2287. debugUrl := fmt.Sprintf("/workflows/%s?execution_id=%s", workflowExecution.Workflow.ID, workflowExecution.ExecutionId)
  2288. resp.Header().Add("X-Debug-Url", debugUrl)
  2289. if err != nil {
  2290. log.Printf("[INFO] Failed workflowrequest POST read in single action (4): %s", err)
  2291. returndata := shuffle.ResultChecker{
  2292. Success: false,
  2293. Reason: fmt.Sprintf("%s", err),
  2294. }
  2295. resp.WriteHeader(400)
  2296. respBytes, err := json.Marshal(returndata)
  2297. if err != nil {
  2298. resp.Write([]byte(`{"success": false}`))
  2299. return
  2300. }
  2301. resp.Write(respBytes)
  2302. return
  2303. }
  2304. workflowExecution.ProjectId = ""
  2305. workflowExecution.Locations = []string{""}
  2306. foundEnv := ""
  2307. params := []string{}
  2308. for _, action := range workflowExecution.Workflow.Actions {
  2309. for _, param := range action.Parameters {
  2310. params = append(params, param.Name)
  2311. }
  2312. if len(action.Environment) > 0 {
  2313. foundEnv = action.Environment
  2314. break
  2315. }
  2316. }
  2317. go shuffle.IncrementCache(ctx, workflowExecution.OrgId, "workflow_executions")
  2318. executionRequest := shuffle.ExecutionRequest{
  2319. Priority: 15,
  2320. ExecutionId: workflowExecution.ExecutionId,
  2321. WorkflowId: workflowExecution.Workflow.ID,
  2322. Authorization: workflowExecution.Authorization,
  2323. Environments: []string{foundEnv},
  2324. }
  2325. parsedEnv := strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(foundEnv, " ", "-"), "_", "-"))
  2326. log.Printf("[INFO] Adding new single-action job to env queue (4): %s", parsedEnv)
  2327. err = shuffle.SetWorkflowQueue(ctx, executionRequest, parsedEnv)
  2328. if err != nil {
  2329. log.Printf("[WARNING] Failed adding %s to db (single action queue): %s", parsedEnv, err)
  2330. }
  2331. if shouldRerun {
  2332. //log.Printf("[DEBUG] Returning single action execution ID for rerun: %s", workflowExecution.ExecutionId)
  2333. resp.WriteHeader(200)
  2334. resp.Write([]byte(fmt.Sprintf(`{"success": true, "execution_id": "%s", "authorization": "%s"}`, workflowExecution.ExecutionId, workflowExecution.Authorization)))
  2335. return
  2336. }
  2337. actionId := ""
  2338. if len(workflowExecution.Workflow.Actions) == 1 {
  2339. actionId = workflowExecution.Workflow.Actions[0].ID
  2340. }
  2341. returnBody := shuffle.HandleRetValidation(ctx, workflowExecution, 1, 15, actionId)
  2342. returnBytes, err := json.Marshal(returnBody)
  2343. if err != nil {
  2344. log.Printf("[ERROR] Failed to marshal retStruct in single execution: %s", err)
  2345. }
  2346. // Look for delete=true query, and if it exists, delete the execution
  2347. if request.URL.Query().Get("delete") == "true" {
  2348. err = shuffle.DeleteKey(ctx, "workflowexecution", workflowExecution.ExecutionId)
  2349. if err != nil {
  2350. log.Printf("[ERROR] Failed to delete execution: %s", err)
  2351. }
  2352. }
  2353. resp.WriteHeader(200)
  2354. resp.Write([]byte(returnBytes))
  2355. }
  2356. // Onlyname is used to
  2357. func IterateAppGithubFolders(ctx context.Context, fs billy.Filesystem, dir []os.FileInfo, extra string, onlyname string, forceUpdate, duringStartup bool) ([]shuffle.BuildLaterStruct, []shuffle.BuildLaterStruct, error) {
  2358. var err error
  2359. allapps := []shuffle.WorkflowApp{}
  2360. // These are slow apps to build with some funky mechanisms
  2361. reservedNames := []string{
  2362. "OWA",
  2363. "NLP",
  2364. "YARA",
  2365. "ATTACK-PREDICTOR",
  2366. }
  2367. startupNames := []string{
  2368. "shuffle-tools",
  2369. "http",
  2370. "email",
  2371. "shuffle-ai",
  2372. "shuffle-subflow",
  2373. "yara",
  2374. "sigma",
  2375. }
  2376. // It's here to prevent getting them in every iteration
  2377. buildLaterFirst := []shuffle.BuildLaterStruct{}
  2378. buildLaterList := []shuffle.BuildLaterStruct{}
  2379. for _, file := range dir {
  2380. if len(onlyname) > 0 && file.Name() != onlyname {
  2381. continue
  2382. }
  2383. //duringStartup
  2384. if duringStartup {
  2385. // Look for names: shuffle tools, http, email, shuffle ai
  2386. if shuffle.ArrayContains(startupNames, strings.ToLower(file.Name())) {
  2387. // Allowed to build during startup
  2388. //log.Printf("\n\n\nFOUND MATCHING APP: %s\n\n\n", file.Name())
  2389. } else {
  2390. //log.Printf("\n\n\nWRONG APP (2): %s\n\n\n", file.Name())
  2391. continue
  2392. }
  2393. }
  2394. // Folder?
  2395. switch mode := file.Mode(); {
  2396. case mode.IsDir():
  2397. // Specific folder for skipping
  2398. if file.Name() == "unsupported" {
  2399. continue
  2400. }
  2401. tmpExtra := fmt.Sprintf("%s%s/", extra, file.Name())
  2402. dir, err := fs.ReadDir(tmpExtra)
  2403. if err != nil {
  2404. log.Printf("Failed to read dir: %s", err)
  2405. continue
  2406. }
  2407. // Go routine? Hmm, this can be super quick I guess
  2408. buildFirst, buildLast, err := IterateAppGithubFolders(ctx, fs, dir, tmpExtra, "", forceUpdate, false)
  2409. for _, item := range buildFirst {
  2410. buildLaterFirst = append(buildLaterFirst, item)
  2411. }
  2412. for _, item := range buildLast {
  2413. buildLaterList = append(buildLaterList, item)
  2414. }
  2415. if err != nil {
  2416. log.Printf("[WARNING] Error reading folder: %s", err)
  2417. //buildFirst, buildLast, err := IterateAppGithubFolders(fs, dir, tmpExtra, "", forceUpdate, false)
  2418. if !forceUpdate {
  2419. continue
  2420. //return buildLaterFirst, buildLaterList, err
  2421. }
  2422. }
  2423. case mode.IsRegular():
  2424. // Check the file
  2425. filename := file.Name()
  2426. if filename == "Dockerfile" {
  2427. // Set up to make md5 and check if the app is new (api.yaml+src/app.py+Dockerfile)
  2428. // Check if Dockerfile, app.py or api.yaml has changed. Hash?
  2429. //log.Printf("Handle Dockerfile in location %s", extra)
  2430. // Try api.yaml and api.yml
  2431. fullPath := fmt.Sprintf("%s%s", extra, "api.yaml")
  2432. fileReader, err := fs.Open(fullPath)
  2433. if err != nil {
  2434. fullPath = fmt.Sprintf("%s%s", extra, "api.yml")
  2435. fileReader, err = fs.Open(fullPath)
  2436. if err != nil {
  2437. log.Printf("[INFO] Failed finding api.yaml/yml for file %s: %s", filename, err)
  2438. continue
  2439. }
  2440. }
  2441. //log.Printf("HANDLING DOCKER FILEREADER - SEARCH&REPLACE?")
  2442. appfileData, err := ioutil.ReadAll(fileReader)
  2443. if err != nil {
  2444. log.Printf("Failed reading %s: %s", fullPath, err)
  2445. continue
  2446. }
  2447. if len(appfileData) == 0 {
  2448. log.Printf("Failed reading %s - length is 0.", fullPath)
  2449. continue
  2450. }
  2451. // func md5sum(data []byte) string {
  2452. // Make hash
  2453. appPython := fmt.Sprintf("%s/src/app.py", extra)
  2454. appPythonReader, err := fs.Open(appPython)
  2455. if err != nil {
  2456. log.Printf("Failed to read python app %s", appPython)
  2457. continue
  2458. }
  2459. appPythonData, err := ioutil.ReadAll(appPythonReader)
  2460. if err != nil {
  2461. log.Printf("Failed reading appdata %s: %s", appPython, err)
  2462. continue
  2463. }
  2464. dockerFp := fmt.Sprintf("%s/Dockerfile", extra)
  2465. dockerfile, err := fs.Open(dockerFp)
  2466. if err != nil {
  2467. log.Printf("Failed to read dockerfil %s", appPython)
  2468. continue
  2469. }
  2470. dockerfileData, err := ioutil.ReadAll(dockerfile)
  2471. if err != nil {
  2472. log.Printf("Failed to read dockerfile")
  2473. continue
  2474. }
  2475. combined := []byte{}
  2476. combined = append(combined, appfileData...)
  2477. combined = append(combined, appPythonData...)
  2478. combined = append(combined, dockerfileData...)
  2479. md5 := md5sum(combined)
  2480. var workflowapp shuffle.WorkflowApp
  2481. err = gyaml.Unmarshal(appfileData, &workflowapp)
  2482. if err != nil {
  2483. log.Printf("[WARNING] Failed building workflowapp %s: %s", extra, err)
  2484. continue
  2485. //return buildLaterFirst, buildLaterList, errors.New(fmt.Sprintf("Failed building %s: %s", extra, err))
  2486. //continue
  2487. }
  2488. newName := workflowapp.Name
  2489. newName = strings.ReplaceAll(newName, " ", "-")
  2490. readmeNames := []string{"README.md", "README", "readme", "readme.md", "README.MD"}
  2491. for _, readmeName := range readmeNames {
  2492. readmePath := fmt.Sprintf("%s%s", extra, readmeName)
  2493. readmeInfo, err := fs.Open(readmePath)
  2494. if err != nil {
  2495. //log.Printf("[WARNING] Failed to read README path %s", readmePath)
  2496. continue
  2497. }
  2498. fileData, err := ioutil.ReadAll(readmeInfo)
  2499. if err != nil {
  2500. log.Printf("[WARNING] Failed to read readme file at %s", readmePath)
  2501. continue
  2502. } else {
  2503. workflowapp.Documentation = string(fileData)
  2504. //log.Printf("[INFO] Found %s (README) file of length %d for %s:%s", readmePath, len(workflowapp.Documentation), newName, workflowapp.AppVersion)
  2505. break
  2506. }
  2507. }
  2508. if len(workflowapp.Documentation) == 0 {
  2509. for _, readmeName := range readmeNames {
  2510. readmePath := fmt.Sprintf("%s../%s", extra, readmeName)
  2511. readmeInfo, err := fs.Open(readmePath)
  2512. if err != nil {
  2513. //log.Printf("[WARNING] Failed to read README path %s", readmePath)
  2514. continue
  2515. }
  2516. fileData, err := ioutil.ReadAll(readmeInfo)
  2517. if err != nil {
  2518. log.Printf("[WARNING] Failed to read readme file at %s", readmePath)
  2519. continue
  2520. } else {
  2521. workflowapp.Documentation = string(fileData)
  2522. //log.Printf("[INFO] Found %s (README) file of length %d for %s:%s", readmePath, len(workflowapp.Documentation), newName, workflowapp.AppVersion)
  2523. break
  2524. }
  2525. }
  2526. }
  2527. workflowapp.ReferenceInfo.GithubUrl = fmt.Sprintf("https://github.com/shuffle/shuffle-apps/tree/master/%s/%s", strings.ToLower(newName), workflowapp.AppVersion)
  2528. tags := []string{
  2529. fmt.Sprintf("%s:%s_%s", baseDockerName, strings.ToLower(newName), workflowapp.AppVersion),
  2530. }
  2531. if len(allapps) == 0 {
  2532. allapps, err = shuffle.GetAllWorkflowApps(ctx, 0, 0)
  2533. if err != nil {
  2534. log.Printf("[WARNING] Failed getting apps to verify: %s", err)
  2535. continue
  2536. }
  2537. }
  2538. // Make an option to override existing apps?
  2539. //Hash string `json:"hash" datastore:"hash" yaml:"hash"` // api.yaml+dockerfile+src/app.py for apps
  2540. removeApps := []string{}
  2541. skip := false
  2542. for _, app := range allapps {
  2543. if app.Name == workflowapp.Name && app.AppVersion == workflowapp.AppVersion {
  2544. // FIXME: Check if there's a new APP_SDK as well.
  2545. // Skip this check if app_sdk is new.
  2546. if app.Hash == md5 && app.Hash != "" && !forceUpdate {
  2547. skip = true
  2548. break
  2549. }
  2550. //log.Printf("Overriding app %s:%s as it exists but has different hash.", app.Name, app.AppVersion)
  2551. removeApps = append(removeApps, app.ID)
  2552. }
  2553. }
  2554. if skip && !forceUpdate {
  2555. continue
  2556. }
  2557. // Fixes (appends) authentication parameters if they're required
  2558. if workflowapp.Authentication.Required {
  2559. //log.Printf("[INFO] Checking authentication fields and appending for %s!", workflowapp.Name)
  2560. // FIXME:
  2561. // Might require reflection into the python code to append the fields as well
  2562. for index, action := range workflowapp.Actions {
  2563. if action.AuthNotRequired {
  2564. log.Printf("Skipping auth setup: %s", action.Name)
  2565. continue
  2566. }
  2567. // 1. Check if authentication params exists at all
  2568. // 2. Check if they're present in the action
  2569. // 3. Add them IF they DONT exist
  2570. // 4. Fix python code with reflection (FIXME)
  2571. appendParams := []shuffle.WorkflowAppActionParameter{}
  2572. for _, fieldname := range workflowapp.Authentication.Parameters {
  2573. found := false
  2574. for index, param := range action.Parameters {
  2575. if param.Name == fieldname.Name {
  2576. found = true
  2577. action.Parameters[index].Configuration = true
  2578. //log.Printf("Set config to true for field %s!", param.Name)
  2579. break
  2580. }
  2581. }
  2582. if !found {
  2583. appendParams = append(appendParams, shuffle.WorkflowAppActionParameter{
  2584. Name: fieldname.Name,
  2585. Description: fieldname.Description,
  2586. Example: fieldname.Example,
  2587. Required: fieldname.Required,
  2588. Configuration: true,
  2589. Schema: fieldname.Schema,
  2590. })
  2591. }
  2592. }
  2593. if len(appendParams) > 0 {
  2594. //log.Printf("[AUTH] Appending %d params to the START of %s", len(appendParams), action.Name)
  2595. workflowapp.Actions[index].Parameters = append(appendParams, workflowapp.Actions[index].Parameters...)
  2596. }
  2597. }
  2598. }
  2599. err = checkWorkflowApp(workflowapp)
  2600. if err != nil {
  2601. log.Printf("[DEBUG] %s for app %s:%s", err, workflowapp.Name, workflowapp.AppVersion)
  2602. continue
  2603. }
  2604. if len(removeApps) > 0 {
  2605. for _, item := range removeApps {
  2606. log.Printf("[WARNING] Removing duplicate app: %s", item)
  2607. err = shuffle.DeleteKey(ctx, "workflowapp", item)
  2608. if err != nil {
  2609. log.Printf("[ERROR] Failed deleting duplicate %s: %s", item, err)
  2610. }
  2611. }
  2612. }
  2613. workflowapp.ID = uuid.NewV4().String()
  2614. workflowapp.IsValid = true
  2615. workflowapp.Verified = true
  2616. workflowapp.Sharing = true
  2617. workflowapp.Downloaded = true
  2618. workflowapp.Hash = md5
  2619. workflowapp.Public = true
  2620. err = shuffle.SetWorkflowAppDatastore(ctx, workflowapp, workflowapp.ID)
  2621. if err != nil {
  2622. log.Printf("[WARNING] Failed setting workflowapp in intro: %s", err)
  2623. continue
  2624. }
  2625. /*
  2626. err = increaseStatisticsField(ctx, "total_apps_created", workflowapp.ID, 1, "")
  2627. if err != nil {
  2628. log.Printf("Failed to increase total apps created stats: %s", err)
  2629. }
  2630. err = increaseStatisticsField(ctx, "total_apps_loaded", workflowapp.ID, 1, "")
  2631. if err != nil {
  2632. log.Printf("Failed to increase total apps loaded stats: %s", err)
  2633. }
  2634. */
  2635. //log.Printf("Added %s:%s to the database", workflowapp.Name, workflowapp.AppVersion)
  2636. // ID can be used to e.g. set a build status.
  2637. buildLater := shuffle.BuildLaterStruct{
  2638. Tags: tags,
  2639. Extra: extra,
  2640. Id: workflowapp.ID,
  2641. }
  2642. reservedFound := false
  2643. for _, appname := range reservedNames {
  2644. if strings.ToUpper(workflowapp.Name) == strings.ToUpper(appname) {
  2645. buildLaterList = append(buildLaterList, buildLater)
  2646. reservedFound = true
  2647. break
  2648. }
  2649. }
  2650. /// Only upload if successful and no errors
  2651. if !reservedFound {
  2652. buildLaterFirst = append(buildLaterFirst, buildLater)
  2653. } else {
  2654. log.Printf("[WARNING] Skipping build of %s to later", workflowapp.Name)
  2655. }
  2656. }
  2657. }
  2658. }
  2659. if len(buildLaterFirst) == 0 && len(buildLaterList) == 0 {
  2660. return buildLaterFirst, buildLaterList, err
  2661. }
  2662. // This is getting silly
  2663. cacheKey := fmt.Sprintf("workflowapps-sorted-100")
  2664. shuffle.DeleteCache(ctx, cacheKey)
  2665. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  2666. shuffle.DeleteCache(ctx, cacheKey)
  2667. cacheKey = fmt.Sprintf("workflowapps-sorted-1000")
  2668. shuffle.DeleteCache(ctx, cacheKey)
  2669. newSortedList := []shuffle.BuildLaterStruct{}
  2670. initApps := []string{
  2671. "tools",
  2672. "http",
  2673. "email",
  2674. }
  2675. for _, buildLater := range buildLaterFirst {
  2676. found := false
  2677. for _, appname := range initApps {
  2678. for _, tag := range buildLater.Tags {
  2679. if strings.Contains(strings.ToLower(tag), appname) {
  2680. newSortedList = append(newSortedList, buildLater)
  2681. found = true
  2682. break
  2683. }
  2684. }
  2685. if found {
  2686. break
  2687. }
  2688. }
  2689. }
  2690. // Prepend newSortedList to buildLaterFirst
  2691. handledImages := []string{}
  2692. buildLaterFirst = append(newSortedList, buildLaterFirst...)
  2693. if len(extra) == 0 {
  2694. log.Printf("[INFO] Starting build of %d containers (FIRST)", len(buildLaterFirst))
  2695. for _, item := range buildLaterFirst {
  2696. if len(item.Tags) > 0 && shuffle.ArrayContains(handledImages, item.Tags[0]) {
  2697. continue
  2698. }
  2699. handledImages = append(handledImages, item.Tags[0])
  2700. err = buildImageMemory(fs, item.Tags, item.Extra, true)
  2701. if err != nil {
  2702. orgId := ""
  2703. log.Printf("[DEBUG] Failed image build memory. Creating notification with org %#v: %s", orgId, err)
  2704. if len(item.Tags) > 0 {
  2705. err = shuffle.CreateOrgNotification(
  2706. ctx,
  2707. fmt.Sprintf("App failed to build"),
  2708. fmt.Sprintf("The app %s with image %s failed to build. Check backend logs with docker! docker logs shuffle-backend", item.Tags[0], item.Extra),
  2709. fmt.Sprintf("/apps"),
  2710. orgId,
  2711. false,
  2712. "HIGH",
  2713. "APP_BUID",
  2714. )
  2715. }
  2716. } else {
  2717. if len(item.Tags) > 0 {
  2718. log.Printf("[INFO] Successfully built image %s", item.Tags[0])
  2719. } else {
  2720. log.Printf("[INFO] Successfully built Docker image")
  2721. }
  2722. }
  2723. }
  2724. if len(buildLaterList) > 0 {
  2725. log.Printf("[INFO] Starting build of %d skipped docker images", len(buildLaterList))
  2726. for _, item := range buildLaterList {
  2727. if len(item.Tags) > 0 && shuffle.ArrayContains(handledImages, item.Tags[0]) {
  2728. continue
  2729. }
  2730. handledImages = append(handledImages, item.Tags[0])
  2731. err = buildImageMemory(fs, item.Tags, item.Extra, true)
  2732. if err != nil {
  2733. log.Printf("[INFO] Failed image build memory: %s", err)
  2734. } else {
  2735. if len(item.Tags) > 0 {
  2736. log.Printf("[INFO] Successfully built image %s", item.Tags[0])
  2737. } else {
  2738. log.Printf("[INFO] Successfully built Docker image")
  2739. }
  2740. }
  2741. }
  2742. }
  2743. }
  2744. return buildLaterFirst, buildLaterList, err
  2745. }
  2746. func LoadSpecificApps(resp http.ResponseWriter, request *http.Request) {
  2747. cors := shuffle.HandleCors(resp, request)
  2748. if cors {
  2749. return
  2750. }
  2751. // Just need to be logged in
  2752. user, err := shuffle.HandleApiAuthentication(resp, request)
  2753. if err != nil {
  2754. log.Printf("[WARNING] Api authentication failed in load specific apps: %s", err)
  2755. resp.WriteHeader(401)
  2756. resp.Write([]byte(`{"success": false}`))
  2757. return
  2758. }
  2759. if user.Role != "admin" {
  2760. log.Printf("[WARNING] Not admin during app loading: %s (%s).", user.Username, user.Id)
  2761. resp.WriteHeader(401)
  2762. resp.Write([]byte(`{"success": false, "reason": "Not admin"}`))
  2763. return
  2764. }
  2765. body, err := ioutil.ReadAll(request.Body)
  2766. if err != nil {
  2767. log.Printf("Error with body read: %s", err)
  2768. resp.WriteHeader(401)
  2769. resp.Write([]byte(`{"success": false}`))
  2770. return
  2771. }
  2772. // Field1 & 2 can be a lot of things.
  2773. // Field1 = Username
  2774. // Field2 = Password
  2775. type tmpStruct struct {
  2776. URL string `json:"url"`
  2777. Branch string `json:"branch"`
  2778. Field1 string `json:"field_1"`
  2779. Field2 string `json:"field_2"`
  2780. ForceUpdate bool `json:"force_update"`
  2781. }
  2782. //log.Printf("Body: %s", string(body))
  2783. var tmpBody tmpStruct
  2784. err = json.Unmarshal(body, &tmpBody)
  2785. if err != nil {
  2786. log.Printf("[WARNING] Error with unmarshal app git clone: %s", err)
  2787. resp.WriteHeader(500)
  2788. resp.Write([]byte(`{"success": false}`))
  2789. return
  2790. }
  2791. fs := memfs.New()
  2792. ctx := context.Background()
  2793. if strings.Contains(tmpBody.URL, "github") || strings.Contains(tmpBody.URL, "gitlab") || strings.Contains(tmpBody.URL, "bitbucket") {
  2794. cloneOptions := &git.CloneOptions{
  2795. URL: tmpBody.URL,
  2796. }
  2797. if len(tmpBody.Branch) > 0 && tmpBody.Branch != "master" && tmpBody.Branch != "main" {
  2798. cloneOptions.ReferenceName = plumbing.ReferenceName(tmpBody.Branch)
  2799. }
  2800. // FIXME: Better auth.
  2801. if len(tmpBody.Field1) > 0 && len(tmpBody.Field2) > 0 {
  2802. cloneOptions.Auth = &http2.BasicAuth{
  2803. Username: tmpBody.Field1,
  2804. Password: tmpBody.Field2,
  2805. }
  2806. }
  2807. cloneOptions = shuffle.CheckGitProxy(cloneOptions)
  2808. storer := memory.NewStorage()
  2809. r, err := git.Clone(storer, fs, cloneOptions)
  2810. if err != nil {
  2811. log.Printf("[WARNING] Failed loading repo %s into memory (github apps 2): %s", tmpBody.URL, err)
  2812. resp.WriteHeader(500)
  2813. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
  2814. return
  2815. }
  2816. dir, err := fs.ReadDir("/")
  2817. if err != nil {
  2818. log.Printf("[WARNING] FAiled reading folder: %s", err)
  2819. }
  2820. _ = r
  2821. if tmpBody.ForceUpdate {
  2822. log.Printf("[AUDIT] Running app get with force update from user %s (%s) for %s!", user.Username, user.Id, tmpBody.URL)
  2823. } else {
  2824. log.Printf("[AUDIT] Updating apps with updates for user %s (%s) for %s (no force)", user.Username, user.Id, tmpBody.URL)
  2825. }
  2826. // As it's not even Docker
  2827. if tmpBody.ForceUpdate {
  2828. // dockercli, err := dockerclient.NewEnvClient()
  2829. dockercli, _, err := shuffle.GetDockerClient()
  2830. if err == nil {
  2831. appSdk := os.Getenv("SHUFFLE_APP_SDK_VERSION")
  2832. if len(appSdk) == 0 {
  2833. _, err := dockercli.ImagePull(ctx, "frikky/shuffle:app_sdk", image.PullOptions{})
  2834. if err != nil {
  2835. log.Printf("[WARNING] Failed to download new App SDK: %s", err)
  2836. }
  2837. } else {
  2838. _, err := dockercli.ImagePull(ctx, fmt.Sprintf("%s/%s/shuffle-app_sdk:%s", "ghcr.io", "frikky", appSdk), image.PullOptions{})
  2839. if err != nil {
  2840. log.Printf("[WARNING] Failed to download new App SDK %s: %s", err)
  2841. }
  2842. }
  2843. } else {
  2844. log.Printf("[WARNING] Failed to download apps with the new App SDK because of docker cli: %s", err)
  2845. }
  2846. }
  2847. IterateAppGithubFolders(ctx, fs, dir, "", "", tmpBody.ForceUpdate, false)
  2848. } else if strings.Contains(tmpBody.URL, "s3") {
  2849. //https://docs.aws.amazon.com/sdk-for-go/api/service/s3/
  2850. //sess := session.Must(session.NewSession())
  2851. //downloader := s3manager.NewDownloader(sess)
  2852. //// Write the contents of S3 Object to the file
  2853. //storer := memory.NewStorage()
  2854. //n, err := downloader.Download(storer, &s3.GetObjectInput{
  2855. // Bucket: aws.String(myBucket),
  2856. // Key: aws.String(myString),
  2857. //})
  2858. //if err != nil {
  2859. // return fmt.Errorf("failed to download file, %v", err)
  2860. //}
  2861. //fmt.Printf("file downloaded, %d bytes\n", n)
  2862. } else {
  2863. resp.WriteHeader(401)
  2864. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s is unsupported"}`, tmpBody.URL)))
  2865. return
  2866. }
  2867. cacheKey := fmt.Sprintf("workflowapps-sorted-100")
  2868. shuffle.DeleteCache(ctx, cacheKey)
  2869. cacheKey = fmt.Sprintf("workflowapps-sorted-500")
  2870. shuffle.DeleteCache(ctx, cacheKey)
  2871. cacheKey = fmt.Sprintf("workflowapps-sorted-1000")
  2872. shuffle.DeleteCache(ctx, cacheKey)
  2873. resp.WriteHeader(200)
  2874. resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
  2875. }
  2876. // Bad check for workflowapps :)
  2877. // FIXME - use tags and struct reflection
  2878. func checkWorkflowApp(workflowApp shuffle.WorkflowApp) error {
  2879. // Validate fields
  2880. if workflowApp.Name == "" {
  2881. return errors.New("App field name doesn't exist")
  2882. }
  2883. if workflowApp.Description == "" {
  2884. return errors.New("App field description doesn't exist")
  2885. }
  2886. if workflowApp.AppVersion == "" {
  2887. return errors.New("App field app_version doesn't exist")
  2888. }
  2889. if workflowApp.ContactInfo.Name == "" {
  2890. return errors.New("App field contact_info.name doesn't exist")
  2891. }
  2892. return nil
  2893. }
  2894. func checkUnfinishedExecution(resp http.ResponseWriter, request *http.Request) {
  2895. cors := shuffle.HandleCors(resp, request)
  2896. if cors {
  2897. return
  2898. }
  2899. location := strings.Split(request.URL.String(), "/")
  2900. var fileId string
  2901. if location[1] == "api" {
  2902. if len(location) <= 4 {
  2903. resp.WriteHeader(401)
  2904. resp.Write([]byte(`{"success": false}`))
  2905. return
  2906. }
  2907. fileId = location[4]
  2908. }
  2909. if len(fileId) != 36 {
  2910. resp.WriteHeader(401)
  2911. resp.Write([]byte(`{"success": false, "reason": "Workflow ID to abort is not valid"}`))
  2912. return
  2913. }
  2914. executionId := location[6]
  2915. if len(executionId) != 36 {
  2916. resp.WriteHeader(401)
  2917. resp.Write([]byte(`{"success": false, "reason": "ExecutionID not valid"}`))
  2918. return
  2919. }
  2920. ctx := shuffle.GetContext(request)
  2921. exec, err := shuffle.GetWorkflowExecution(ctx, executionId)
  2922. if err != nil {
  2923. log.Printf("[ERROR] Failed getting execution (rerun workflow - 1) %s: %s", executionId, err)
  2924. resp.WriteHeader(401)
  2925. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution ID %s because it doesn't exist (abort)."}`, executionId)))
  2926. return
  2927. }
  2928. apikey := request.Header.Get("Authorization")
  2929. parsedKey := ""
  2930. if strings.HasPrefix(apikey, "Bearer ") {
  2931. apikeyCheck := strings.Split(apikey, " ")
  2932. if len(apikeyCheck) == 2 {
  2933. parsedKey = apikeyCheck[1]
  2934. }
  2935. }
  2936. // ONLY allowed to run automatically with the same auth (july 2022)
  2937. if exec.Authorization != parsedKey {
  2938. user, err := shuffle.HandleApiAuthentication(resp, request)
  2939. if err != nil {
  2940. log.Printf("[ERROR][%s] Bad authorization key for execution (rerun workflow - 3): %s", executionId, err)
  2941. resp.WriteHeader(403)
  2942. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed because you're not authorized to see this workflow (3)."}`)))
  2943. return
  2944. }
  2945. // Check if user is in the correct org
  2946. if user.ActiveOrg.Id == exec.ExecutionOrg && user.Role != "org-reader" {
  2947. log.Printf("[AUDIT][%s] User %s (%s) is force continuing execution from org access", executionId, user.Username, user.Id)
  2948. } else if user.SupportAccess {
  2949. log.Printf("[AUDIT][%s] User %s (%s) is force continuing execution with support access", executionId, user.Username, user.Id)
  2950. } else {
  2951. log.Printf("[ERROR][%s] Bad authorization key for continue execution (rerun workflow - 2): %s", executionId, err)
  2952. resp.WriteHeader(403)
  2953. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed because you're not authorized to see this workflow (2)."}`)))
  2954. return
  2955. }
  2956. }
  2957. // Meant as a function that periodically checks whether previous executions have finished or not.
  2958. // Should probably be based on executedIds and finishedIds
  2959. // Schedule a check in the future instead?
  2960. // Auth vs execution check!
  2961. extraInputs := 0
  2962. for _, trigger := range exec.Workflow.Triggers {
  2963. if trigger.Name == "User Input" && trigger.AppName == "User Input" {
  2964. extraInputs += 1
  2965. //exec.Workflow.Actions = append(exec.Workflow.Actions, shuffle.Action{
  2966. // ID: trigger.ID,
  2967. // Label: trigger.Label,
  2968. // Name: trigger.Name,
  2969. //})
  2970. } else if trigger.Name == "Shuffle Workflow" && trigger.AppName == "Shuffle Workflow" {
  2971. extraInputs += 1
  2972. //exec.Workflow.Actions = append(exec.Workflow.Actions, shuffle.Action{
  2973. // ID: trigger.ID,
  2974. // Label: trigger.Label,
  2975. // Name: trigger.Name,
  2976. //})
  2977. }
  2978. }
  2979. if exec.Status != "ABORTED" && exec.Status != "FINISHED" && exec.Status != "FAILURE" {
  2980. log.Printf("[DEBUG][%s] Rechecking execution and its status to send to backend IF the status is EXECUTING (%s - %d/%d finished)", exec.ExecutionId, exec.Status, len(exec.Results), len(exec.Workflow.Actions)+extraInputs)
  2981. }
  2982. // Usually caused by issue during startup
  2983. if exec.Status == "" {
  2984. resp.WriteHeader(401)
  2985. resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No status for the execution"}`)))
  2986. return
  2987. }
  2988. if exec.Status != "EXECUTING" {
  2989. resp.WriteHeader(200)
  2990. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Already finished"}`)))
  2991. return
  2992. }
  2993. // Force it back in the queue to be executed
  2994. if len(exec.Workflow.Actions) == 0 {
  2995. resp.WriteHeader(200)
  2996. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Not a cloud env workflow. Only rerunning cloud env."}`)))
  2997. return
  2998. }
  2999. log.Printf("[DEBUG][%s] Workflow: %s (%s)", exec.ExecutionId, exec.Workflow.Name, exec.Workflow.ID)
  3000. if exec.Workflow.ID == "" || exec.Workflow.Name == "" {
  3001. log.Printf("[ERROR][%s] No workflow ID found for execution", exec.ExecutionId)
  3002. shuffle.DeleteKey(ctx, "workflowexecution", exec.ExecutionId)
  3003. resp.WriteHeader(200)
  3004. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "No workflow name / ID found. Can't run. Contact support@shuffler.io if this persists."}`)))
  3005. return
  3006. }
  3007. environment := exec.Workflow.Actions[0].Environment
  3008. log.Printf("[DEBUG][%s] Not a cloud env workflow. Re-adding job in queue for env %s.", exec.ExecutionId, environment)
  3009. parsedEnv := fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(environment, " ", "-"), "_", "-")), exec.ExecutionOrg)
  3010. log.Printf("[DEBUG][%s] Adding new run job to env (2): %s", exec.ExecutionId, parsedEnv)
  3011. executionRequest := shuffle.ExecutionRequest{
  3012. ExecutionId: exec.ExecutionId,
  3013. WorkflowId: exec.Workflow.ID,
  3014. Authorization: exec.Authorization,
  3015. Environments: []string{environment},
  3016. }
  3017. // Increase priority on reruns to catch up
  3018. executionRequest.Priority = 11
  3019. err = shuffle.SetWorkflowQueue(ctx, executionRequest, parsedEnv)
  3020. if err != nil {
  3021. log.Printf("[ERROR] Failed adding execution to db: %s", err)
  3022. }
  3023. resp.WriteHeader(200)
  3024. resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Reran workflow in %s"}`, parsedEnv)))
  3025. }