package noxy import ( "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "net/url" "path" "sort" "strings" "sync" "time" "github.com/dyatlov/go-opengraph/opengraph" nostr "github.com/nbd-wtf/go-nostr" xurls "mvdan.cc/xurls/v2" ) var ( ErrNotFound = errors.New("event or resource not found") ErrUnsupportedEventKind = errors.New("unsupported event kind") ErrUnsupportedMimeType = errors.New("unsupported link mime type") ErrUnsupportedRelay = errors.New("unsupported relay") ) // LinkMeta contains metadata info about a URL. // it is typically assembled from OGP (https://ogp.me) by Noxer.FetchLinkMeta. type LinkMeta struct { Type string // og:type Title string // og:title Description string // og:description ImageURLs []string // og:image:secure_url or og:image:url } // Noxer can proxy link preview info and data streams. // See FetchLinkMeta and StreamLinkData for details. // // while the only required field is Cache, a zero value of KnownRelays // makes Noxer refuse to proxy any URLs. type Noxer struct { // Cache is used to store both link preview meta info and // data streamed to clients. it must be non-nil for Noxer to be usable. Cache Cacher // Noxer refuses to work with web pages and data streams larger than this value. MaxFileSize int64 // defaults to 1Mb // how long to keep an open connection to a relay without any activity. // an activity is any cache-miss call to FetchLinkMeta or StreamLinkData. // connections to relays are used to verify whether a link is part of // an event contents. see aforementioned methods for more details. IdleRelayTimeout time.Duration // defaults to 1min // Noxer connects only to those relays hostnames of which are specified here. // in other words, slice elements are only hostname parts of relay URLs. // KnownRelays must be sorted in ascending order. KnownRelays []string // HTTPClient is used to make HTTP connections when fetching link preview // info and data streaming. when nil, http.DefaultClient is used. HTTPClient *http.Client // clients keeps track of nostr relay connections to clean them up // and remove idle after IdleRelayTimeout. clientsMu sync.Mutex clients map[string]*relayClient cleanupTimer *time.Timer // slurpers keep track of ongoing HTTP requests, both link preview // meta info and data streams. slurpersMu sync.Mutex slurpers map[string]chan struct{} } // relayClient wraps nostr.Relay with an additional timestamp // indicating last use of the relay to keep track of all active relay // connections and remove idle. // // lastUsed is updated every time Noxer.fetchNostrEvent is called. type relayClient struct { relay *nostr.Relay lastUsed time.Time } // FetchLinkMeta requests the web page at link URL, parses it as HTML and returns // metadata found in the contents. It refuses to parse remote responses with // content-type other than text/html. // // link URL must be found in content field of the nostr event posted to the // specified relay. FetchLinkMeta connects to the nostr relay at relayURL // and sends a filter'ed request with ids field set to eventID. // the received event contents are "grepped" for the value of link as is. // // relayURL's hostname must be an element of x.KnownRelays. // remote must respond with HTTP 200 OK to the link URL. // // successfully parsed link URLs are cached using Cacher.PutJSON. so, subsequent // calls should not hit the remote server again unless x.Cache fails. // concurrent requests are suspended until the context or first call is done. func (x *Noxer) FetchLinkMeta(ctx context.Context, eventID, relayURL, link string) (*LinkMeta, error) { if err := x.verifyEventLink(ctx, eventID, relayURL, link, verifyNoMeta); err != nil { return nil, fmt.Errorf("verifyEventLink: %w", err) } return x.slurpLinkMeta(ctx, link) } func (x *Noxer) slurpLinkMeta(ctx context.Context, link string) (*LinkMeta, error) { // use cache here instead of directly in FetchLinkMeta to avoid // hitting remotes in x.verifyEventLink as much as possible. cacheKey := MakeCacheKey(link, CacheKeyURLPreview) var meta LinkMeta cacheErr := x.Cache.GetJSON(ctx, cacheKey, &meta) if cacheErr == nil { return &meta, nil } log.Printf("cache.getjson %s(%s): %v", link, cacheKey, cacheErr) ds, err := x.detachedSlurpData(ctx, link) if err != nil { return nil, fmt.Errorf("detachedSlurpData: %w", err) } defer ds.Close() if mtype := ds.MimeType(); mtype != "text/html" { return nil, fmt.Errorf("%w: received %q, want text/html", ErrUnsupportedMimeType, mtype) } res, err := parseLinkMeta(ds) if err != nil { return nil, fmt.Errorf("parseLinkMeta: %w", err) } if err := x.Cache.PutJSON(ctx, cacheKey, res); err != nil { log.Printf("cache.putjson %s(%s): %v", link, cacheKey, err) } return res, nil } // StreamLinkData opens an HTTP connection to link and streams the response back. // while doing so, it also caches the reponse bytes using Cache.PutStream. so, // subsequent calls should not hit the remote link again unless x.Cache fails. // // link URL must be found in "content" field of the nostr event posted to the // specified relay. StreamLinkData connects to the nostr relay at relayURL // and sends a filter'ed request with ids field set to eventID. // for event kinds 1 (text note) and 42 (channel message), the event contents // are simply "grepped" for the value of link as is. // for event kinds 0 (set metadata), 40 (create channel) and 41 (set channel // metadata) the link is checked against "picture" field. // // additionally, link URL may be one of LinkMeta.ImageURLs as returned by // x.FetchLinkMeta to a call with the same eventID. // // relayURL's hostname must be an element of x.KnownRelays. // remote must respond with HTTP 200 OK to the link URL. // // callers must close DataStream. // concurrent requests are suspended until the context or first call is done. func (x *Noxer) StreamLinkData(ctx context.Context, eventID, relayURL, link string) (*DataStream, error) { if err := x.verifyEventLink(ctx, eventID, relayURL, link, verifyExpandMeta); err != nil { return nil, err } cacheKey := MakeCacheKey(link, CacheKeyData) ds, err := x.Cache.GetStream(ctx, cacheKey) if err != nil { log.Printf("cache.getstream %s(%s): %v", link, cacheKey, err) ds, err = x.detachedSlurpData(ctx, link) } return ds, err } // detachedSlurpData always finishes data streaming from remote url, event if // the returned DataStream is closed prematurely, to cache the bytes for subsequent calls. func (x *Noxer) detachedSlurpData(ctx context.Context, url string) (*DataStream, error) { // check whether there's an ongoing stream. if so, wait and use cache or fail. cacheKey := MakeCacheKey(url, CacheKeyData) cacheKeyStr := cacheKey.Path() x.slurpersMu.Lock() slurpCh, found := x.slurpers[cacheKeyStr] if found { // a previous call is already streaming. // wait 'till they're done, because the stream is non-seekable, // then get it from cache or fail. x.slurpersMu.Unlock() select { case <-ctx.Done(): return nil, ctx.Err() case <-slurpCh: return x.Cache.GetStream(ctx, cacheKey) } } else { // wouldn't need this branch if close(slurpCh) was done after x.slurpersMu.Lock() // in the goroutine below. // but it's so easy to miss in future code changes that i don't want to risk it: // not a big deal to check the cache one more time. // reconsider if performance here becomes a concern. ds, err := x.Cache.GetStream(ctx, cacheKey) if err == nil { x.slurpersMu.Unlock() return ds, nil } } // no other goroutine is streaming; do it now and make others wait on slurpCh. slurpCh = x.makeSlurperChan(cacheKeyStr) x.slurpersMu.Unlock() // assuming 1min is enough to download a file. // this may be too short for large values of x.MaxFileSize. // TODO: compute ctx based on x.MaxFileSize? ctx, cancelHTTP := context.WithTimeout(context.Background(), time.Minute) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { cancelHTTP() return nil, err } resp, err := x.httpClient().Do(req) if err != nil { cancelHTTP() return nil, err } if resp.StatusCode != http.StatusOK { cancelHTTP() if resp.StatusCode == http.StatusNotFound { return nil, ErrNotFound } return nil, fmt.Errorf("bad HTTP response %s: %s", url, resp.Status) } ctype := resp.Header.Get("Content-Type") if ctype == "" { // TODO: sniff using mime magic bytes? ctype = "application/octet-stream" } // rout is returned to the caller, wout is tee'ed from resp.Body. // if the caller closes rout, tee'ing to wout also stops. rout, wout := io.Pipe() go func() { defer func() { resp.Body.Close() wout.Close() cancelHTTP() close(slurpCh) x.slurpersMu.Lock() delete(x.slurpers, cacheKeyStr) x.slurpersMu.Unlock() }() // the std io.TeeReader wouldn't work since it reports errors on reads // from tee as soon as writes to wout fail which is the case if the caller // closes rout. tee := SinkTeeReader(HardLimitReader(resp.Body, x.maxFileSize()), wout) if err := x.Cache.PutStream(ctx, cacheKey, ctype, tee); err != nil { log.Printf("cache.putstream %s: %v", cacheKey, err) // TODO: don't close; io.copy(wout, resp.body) here on cache failures? } }() return &DataStream{ContentType: ctype, r: rout}, nil } // expandMeta arg values for verifyEventLink const ( verifyExpandMeta = true verifyNoMeta = false ) // verifyEventLink checks whether link URL is in a nostr event's content, // or one of OGP link preview URLs if expandMeta is true. func (x *Noxer) verifyEventLink(ctx context.Context, eventID, relayURL, link string, expandMeta bool) error { if !x.whitelistedRelay(relayURL) { return ErrUnsupportedRelay } eventURLs, err := x.fetchEventURLs(ctx, eventID, relayURL) if err != nil { return err } log.Printf("fetched event URLs: %q", eventURLs) for _, u := range eventURLs { if u == link { return nil } } if !expandMeta { return ErrNotFound } // link not found in the event text/json. // check URLs in OGP metadata for each suitable link found in the event. for _, urlStr := range eventURLs { u, err := url.Parse(urlStr) if err != nil { continue // invalid url } if ext := path.Ext(u.Path); ext != "" { if !strings.HasSuffix(ext, "html") && !strings.HasSuffix(ext, "htm") { continue // assume not an html page } } meta, err := x.slurpLinkMeta(ctx, urlStr) if err != nil { log.Printf("verifyEventLink slurpLinkMeta(%s): %v", u, err) continue } for _, imgURL := range meta.ImageURLs { if imgURL == link { return nil } } } return ErrNotFound } // fetchEventURLs returns all URLs found in a nostr event. // it assumes the relay URL is already checked to match x.KnownRelays. func (x *Noxer) fetchEventURLs(ctx context.Context, eventID, relayURL string) ([]string, error) { // check whether there's an ongoing fetch. if so, wait and use cache or fail. cacheKey := MakeCacheKey(eventID, CacheKeyEvent) cacheKeyStr := cacheKey.Path() x.slurpersMu.Lock() slurpCh, found := x.slurpers[cacheKeyStr] if found { // a previous call is already fetching. // wait 'till they're done, then get it from cache or fail. x.slurpersMu.Unlock() select { case <-ctx.Done(): return nil, ctx.Err() case <-slurpCh: var urls []string err := x.Cache.GetJSON(ctx, cacheKey, &urls) return urls, err } } else { // same reasoning as in detachedSlurpData. // wouldn't need this branch if close(slurpCh) was done after x.slurpersMu.Lock() // in the goroutine below. but it's too easy to miss in future code changes. // checking cache one more time here is most likely insignificant when compared to // opening a websocket to a nostr relay. var urls []string if err := x.Cache.GetJSON(ctx, cacheKey, &urls); err == nil { x.slurpersMu.Unlock() return urls, nil } } // no other goroutine is fetching; do it now and make others wait on slurpCh. slurpCh = x.makeSlurperChan(cacheKeyStr) x.slurpersMu.Unlock() defer func() { close(slurpCh) x.slurpersMu.Lock() delete(x.slurpers, cacheKeyStr) x.slurpersMu.Unlock() }() event, err := x.fetchNostrEvent(ctx, eventID, relayURL) if err != nil { return nil, err } var eventURLs []string switch event.Kind { default: return nil, ErrUnsupportedEventKind case nostr.KindTextNote, nostr.KindChannelMessage: eventURLs = extractAcceptableURLs(event.Content) case nostr.KindSetMetadata, nostr.KindChannelCreation, nostr.KindChannelMetadata: var p struct{ Picture string } if err := json.Unmarshal([]byte(event.Content), &p); err != nil { return nil, err } if validURL(p.Picture) { eventURLs = append(eventURLs, p.Picture) } } if err := x.Cache.PutJSON(ctx, cacheKey, eventURLs); err != nil { log.Printf("cache.putjson %s: %v", cacheKey, err) } return eventURLs, nil } // assuming relay is whitelisted func (x *Noxer) fetchNostrEvent(ctx context.Context, eventID, relayURL string) (*nostr.Event, error) { relay, err := x.relayConn(ctx, relayURL) if err != nil { return nil, err } var ( event *nostr.Event fetchErr error ) // assuming 10sec is more than enough for a simple filter'ed sub with a single // event ID. ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() done := make(chan struct{}) go func() { defer close(done) f := nostr.Filter{IDs: []string{eventID}, Limit: 1} sub := relay.Subscribe(nostr.Filters{f}) defer sub.Unsub() select { case e := <-sub.Events: // e.CheckSignature() is already done by the client event = &e case <-ctx.Done(): fetchErr = ctx.Err() } }() select { case <-done: return event, fetchErr case <-ctx.Done(): return nil, ctx.Err() } } // connect to a nostr relay at relayURL or reuse an existing conn. // it blocks all other callers. func (x *Noxer) relayConn(ctx context.Context, relayURL string) (*nostr.Relay, error) { // check existing conn and reuse if found relayURL = nostr.NormalizeURL(relayURL) x.clientsMu.Lock() defer x.clientsMu.Unlock() if cl, ok := x.clients[relayURL]; ok { // "touch" the last used to let cleanup timer know we aren't idling cl.lastUsed = time.Now() return cl.relay, nil } // none found. make a new conn. var ( relay *nostr.Relay connErr error ) // assuming 10sec is more than enough to connect to a websocket. connCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() done := make(chan struct{}) go func() { // TODO: send a patch upstream for a nostr.RelayConnectContext(ctx, url) relay, connErr = nostr.RelayConnect(relayURL) close(done) }() select { case <-connCtx.Done(): // unfortunately, this leaves the above goroutine hanging, and will keep // piling up for non-responsive relays. // can be solved with a nostr.RelayConnectContext. return nil, connCtx.Err() case <-done: if connErr != nil { return nil, connErr } } if x.clients == nil { x.clients = make(map[string]*relayClient) } x.clients[relayURL] = &relayClient{ relay: relay, lastUsed: time.Now(), } // a self-cleanup goroutine to delete ourselves if relay reports conn errors. go func() { err := <-relay.ConnectionError log.Printf("%s: closing due to: %v", relayURL, err) x.clientsMu.Lock() defer x.clientsMu.Unlock() relay.Close() delete(x.clients, relayURL) }() if x.cleanupTimer == nil { x.cleanupTimer = time.AfterFunc(x.idleRelayTimeout(), x.cleanupRelayConn) } return relay, nil } // close and delete nostr relay connections idling for more than x.idleRelayTimeout(). func (x *Noxer) cleanupRelayConn() { x.clientsMu.Lock() defer x.clientsMu.Unlock() for url, cl := range x.clients { if time.Since(cl.lastUsed) > x.idleRelayTimeout() { log.Printf("closing idle conn to %s", url) cl.relay.Close() delete(x.clients, url) } } if len(x.clients) > 0 { x.cleanupTimer = time.AfterFunc(time.Minute, x.cleanupRelayConn) } else { x.cleanupTimer = nil } } // assumes x.slurpersMu is handled by the caller. func (x *Noxer) makeSlurperChan(k string) chan struct{} { if x.slurpers == nil { x.slurpers = make(map[string]chan struct{}) } ch := make(chan struct{}) x.slurpers[k] = ch return ch } func (x *Noxer) httpClient() *http.Client { if x.HTTPClient == nil { return http.DefaultClient } return x.HTTPClient } func (x *Noxer) idleRelayTimeout() time.Duration { if x.IdleRelayTimeout == 0 { return time.Minute } return x.IdleRelayTimeout } func (x *Noxer) maxFileSize() int64 { if x.MaxFileSize == 0 { return 1 << 20 // 1Mb } return x.MaxFileSize } // whitelistedRelay reports whether a nostr relay at urlStr is in x.KnownRelays. // it expects x.KnownRelays to be sorted in lexical order. // // only hostname of urlStr is checked against x.KnownRelays. func (x *Noxer) whitelistedRelay(urlStr string) bool { u, err := url.Parse(urlStr) if err != nil { return false } host := u.Hostname() i := sort.SearchStrings(x.KnownRelays, host) return i < len(x.KnownRelays) && x.KnownRelays[i] == host } // TODO: use oEmbed if OGP fails? func parseLinkMeta(r io.Reader) (*LinkMeta, error) { og := opengraph.NewOpenGraph() if err := og.ProcessHTML(r); err != nil { return nil, err } if len(og.Images) == 0 { return nil, ErrNotFound } meta := &LinkMeta{ Type: og.Type, Title: og.Title, Description: og.Description, ImageURLs: make([]string, 0, len(og.Images)), } for _, img := range og.Images { u := img.SecureURL if u == "" { u = img.URL } if u == "" { continue } meta.ImageURLs = append(meta.ImageURLs, u) } return meta, nil } // TODO: patch to extract only host/ip; no emails and such var urlRegexp = xurls.Relaxed() func extractAcceptableURLs(text string) []string { var urls []string for _, a := range urlRegexp.FindAllString(text, -1) { if validURL(a) { urls = append(urls, a) } } return urls } func validURL(urlStr string) bool { if urlStr == "" { return false } u, err := url.Parse(urlStr) if err != nil { return false } if u.Hostname() == "" { return false } return u.Scheme == "" || u.Scheme == "http" || u.Scheme == "https" }