You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
noxy/noxy.go

618 lines
19 KiB
Go

package noxy
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/dyatlov/go-opengraph/opengraph"
nostr "github.com/nbd-wtf/go-nostr"
xurls "mvdan.cc/xurls/v2"
)
var (
ErrNotFound = errors.New("event or resource not found")
ErrUnsupportedEventKind = errors.New("unsupported event kind")
ErrUnsupportedMimeType = errors.New("unsupported link mime type")
ErrUnsupportedRelay = errors.New("unsupported relay")
)
// LinkMeta contains metadata info about a URL.
// it is typically assembled from OGP (https://ogp.me) by Noxer.FetchLinkMeta.
type LinkMeta struct {
Type string // og:type
Title string // og:title
Description string // og:description
ImageURLs []string // og:image:secure_url or og:image:url
}
// Noxer can proxy link preview info and data streams.
// See FetchLinkMeta and StreamLinkData for details.
//
// while the only required field is Cache, a zero value of KnownRelays
// makes Noxer refuse to proxy any URLs.
type Noxer struct {
// Cache is used to store both link preview meta info and
// data streamed to clients. it must be non-nil for Noxer to be usable.
Cache Cacher
// Noxer refuses to work with web pages and data streams larger than this value.
MaxFileSize int64 // defaults to 1Mb
// how long to keep an open connection to a relay without any activity.
// an activity is any cache-miss call to FetchLinkMeta or StreamLinkData.
// connections to relays are used to verify whether a link is part of
// an event contents. see aforementioned methods for more details.
IdleRelayTimeout time.Duration // defaults to 1min
// Noxer connects only to those relays hostnames of which are specified here.
// in other words, slice elements are only hostname parts of relay URLs.
// KnownRelays must be sorted in ascending order.
KnownRelays []string
// HTTPClient is used to make HTTP connections when fetching link preview
// info and data streaming. when nil, http.DefaultClient is used.
HTTPClient *http.Client
// clients keeps track of nostr relay connections to clean them up
// and remove idle after IdleRelayTimeout.
clientsMu sync.Mutex
clients map[string]*relayClient
cleanupTimer *time.Timer
// slurpers keep track of ongoing HTTP requests, both link preview
// meta info and data streams.
slurpersMu sync.Mutex
slurpers map[string]chan struct{}
}
// relayClient wraps nostr.Relay with an additional timestamp
// indicating last use of the relay to keep track of all active relay
// connections and remove idle.
//
// lastUsed is updated every time Noxer.fetchNostrEvent is called.
type relayClient struct {
relay *nostr.Relay
lastUsed time.Time
}
// FetchLinkMeta requests the web page at link URL, parses it as HTML and returns
// metadata found in the contents. It refuses to parse remote responses with
// content-type other than text/html.
//
// link URL must be found in content field of the nostr event posted to the
// specified relay. FetchLinkMeta connects to the nostr relay at relayURL
// and sends a filter'ed request with ids field set to eventID.
// the received event contents are "grepped" for the value of link as is.
//
// relayURL's hostname must be an element of x.KnownRelays.
// remote must respond with HTTP 200 OK to the link URL.
//
// successfully parsed link URLs are cached using Cacher.PutJSON. so, subsequent
// calls should not hit the remote server again unless x.Cache fails.
// concurrent requests are suspended until the context or first call is done.
func (x *Noxer) FetchLinkMeta(ctx context.Context, eventID, relayURL, link string) (*LinkMeta, error) {
if err := x.verifyEventLink(ctx, eventID, relayURL, link, verifyNoMeta); err != nil {
return nil, fmt.Errorf("verifyEventLink: %w", err)
}
return x.slurpLinkMeta(ctx, link)
}
func (x *Noxer) slurpLinkMeta(ctx context.Context, link string) (*LinkMeta, error) {
// use cache here instead of directly in FetchLinkMeta to avoid
// hitting remotes in x.verifyEventLink as much as possible.
cacheKey := MakeCacheKey(link, CacheKeyURLPreview)
var meta LinkMeta
cacheErr := x.Cache.GetJSON(ctx, cacheKey, &meta)
if cacheErr == nil {
return &meta, nil
}
log.Printf("cache.getjson %s(%s): %v", link, cacheKey, cacheErr)
ds, err := x.detachedSlurpData(ctx, link)
if err != nil {
return nil, fmt.Errorf("detachedSlurpData: %w", err)
}
defer ds.Close()
if mtype := ds.MimeType(); mtype != "text/html" {
return nil, fmt.Errorf("%w: received %q, want text/html", ErrUnsupportedMimeType, mtype)
}
res, err := parseLinkMeta(ds)
if err != nil {
return nil, fmt.Errorf("parseLinkMeta: %w", err)
}
if err := x.Cache.PutJSON(ctx, cacheKey, res); err != nil {
log.Printf("cache.putjson %s(%s): %v", link, cacheKey, err)
}
return res, nil
}
// StreamLinkData opens an HTTP connection to link and streams the response back.
// while doing so, it also caches the reponse bytes using Cache.PutStream. so,
// subsequent calls should not hit the remote link again unless x.Cache fails.
//
// link URL must be found in "content" field of the nostr event posted to the
// specified relay. StreamLinkData connects to the nostr relay at relayURL
// and sends a filter'ed request with ids field set to eventID.
// for event kinds 1 (text note) and 42 (channel message), the event contents
// are simply "grepped" for the value of link as is.
// for event kinds 0 (set metadata), 40 (create channel) and 41 (set channel
// metadata) the link is checked against "picture" field.
//
// additionally, link URL may be one of LinkMeta.ImageURLs as returned by
// x.FetchLinkMeta to a call with the same eventID.
//
// relayURL's hostname must be an element of x.KnownRelays.
// remote must respond with HTTP 200 OK to the link URL.
//
// callers must close DataStream.
// concurrent requests are suspended until the context or first call is done.
func (x *Noxer) StreamLinkData(ctx context.Context, eventID, relayURL, link string) (*DataStream, error) {
if err := x.verifyEventLink(ctx, eventID, relayURL, link, verifyExpandMeta); err != nil {
return nil, err
}
cacheKey := MakeCacheKey(link, CacheKeyData)
ds, err := x.Cache.GetStream(ctx, cacheKey)
if err != nil {
log.Printf("cache.getstream %s(%s): %v", link, cacheKey, err)
ds, err = x.detachedSlurpData(ctx, link)
}
return ds, err
}
// detachedSlurpData always finishes data streaming from remote url, event if
// the returned DataStream is closed prematurely, to cache the bytes for subsequent calls.
func (x *Noxer) detachedSlurpData(ctx context.Context, url string) (*DataStream, error) {
// check whether there's an ongoing stream. if so, wait and use cache or fail.
cacheKey := MakeCacheKey(url, CacheKeyData)
cacheKeyStr := cacheKey.Path()
x.slurpersMu.Lock()
slurpCh, found := x.slurpers[cacheKeyStr]
if found {
// a previous call is already streaming.
// wait 'till they're done, because the stream is non-seekable,
// then get it from cache or fail.
x.slurpersMu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-slurpCh:
return x.Cache.GetStream(ctx, cacheKey)
}
} else {
// wouldn't need this branch if close(slurpCh) was done after x.slurpersMu.Lock()
// in the goroutine below.
// but it's so easy to miss in future code changes that i don't want to risk it:
// not a big deal to check the cache one more time.
// reconsider if performance here becomes a concern.
ds, err := x.Cache.GetStream(ctx, cacheKey)
if err == nil {
x.slurpersMu.Unlock()
return ds, nil
}
}
// no other goroutine is streaming; do it now and make others wait on slurpCh.
slurpCh = x.makeSlurperChan(cacheKeyStr)
x.slurpersMu.Unlock()
// assuming 1min is enough to download a file.
// this may be too short for large values of x.MaxFileSize.
// TODO: compute ctx based on x.MaxFileSize?
ctx, cancelHTTP := context.WithTimeout(context.Background(), time.Minute)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
cancelHTTP()
return nil, err
}
resp, err := x.httpClient().Do(req)
if err != nil {
cancelHTTP()
return nil, err
}
if resp.StatusCode != http.StatusOK {
cancelHTTP()
if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}
return nil, fmt.Errorf("bad HTTP response %s: %s", url, resp.Status)
}
ctype := resp.Header.Get("Content-Type")
if ctype == "" {
// TODO: sniff using mime magic bytes?
ctype = "application/octet-stream"
}
// rout is returned to the caller, wout is tee'ed from resp.Body.
// if the caller closes rout, tee'ing to wout also stops.
rout, wout := io.Pipe()
go func() {
defer func() {
resp.Body.Close()
wout.Close()
cancelHTTP()
close(slurpCh)
x.slurpersMu.Lock()
delete(x.slurpers, cacheKeyStr)
x.slurpersMu.Unlock()
}()
// the std io.TeeReader wouldn't work since it reports errors on reads
// from tee as soon as writes to wout fail which is the case if the caller
// closes rout.
tee := SinkTeeReader(HardLimitReader(resp.Body, x.maxFileSize()), wout)
if err := x.Cache.PutStream(ctx, cacheKey, ctype, tee); err != nil {
log.Printf("cache.putstream %s: %v", cacheKey, err)
// TODO: don't close; io.copy(wout, resp.body) here on cache failures?
}
}()
return &DataStream{ContentType: ctype, r: rout}, nil
}
// expandMeta arg values for verifyEventLink
const (
verifyExpandMeta = true
verifyNoMeta = false
)
// verifyEventLink checks whether link URL is in a nostr event's content,
// or one of OGP link preview URLs if expandMeta is true.
func (x *Noxer) verifyEventLink(ctx context.Context, eventID, relayURL, link string, expandMeta bool) error {
if !x.whitelistedRelay(relayURL) {
return ErrUnsupportedRelay
}
eventURLs, err := x.fetchEventURLs(ctx, eventID, relayURL)
if err != nil {
return err
}
log.Printf("fetched event URLs: %q", eventURLs)
for _, u := range eventURLs {
if u == link {
return nil
}
}
if !expandMeta {
return ErrNotFound
}
// link not found in the event text/json.
// check URLs in OGP metadata for each suitable link found in the event.
for _, urlInEvent := range eventURLs {
// try only cache first. a client may have already requested /meta
// with this URL. if so, need no further parsing and network roundtrips.
var cachedMeta LinkMeta
if x.Cache.GetJSON(ctx, MakeCacheKey(urlInEvent, CacheKeyURLPreview), &cachedMeta) == nil {
if nonSortedSliceContains(cachedMeta.ImageURLs, link) {
return nil // ok; found
}
continue // move on to the next url in the event
}
// cached failed or miss; possibly fetch from remote and parse.
if !looksLikeHTMLPage(urlInEvent) {
continue
}
meta, err := x.slurpLinkMeta(ctx, urlInEvent)
if err != nil {
log.Printf("verifyEventLink slurpLinkMeta(%s): %v", urlInEvent, err)
continue
}
if nonSortedSliceContains(meta.ImageURLs, link) {
return nil // ok; found
}
}
return ErrNotFound
}
// fetchEventURLs returns all URLs found in a nostr event.
// it assumes the relay URL is already checked to match x.KnownRelays.
func (x *Noxer) fetchEventURLs(ctx context.Context, eventID, relayURL string) ([]string, error) {
// check whether there's an ongoing fetch. if so, wait and use cache or fail.
cacheKey := MakeCacheKey(eventID, CacheKeyEvent)
cacheKeyStr := cacheKey.Path()
x.slurpersMu.Lock()
slurpCh, found := x.slurpers[cacheKeyStr]
if found {
// a previous call is already fetching.
// wait 'till they're done, then get it from cache or fail.
x.slurpersMu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-slurpCh:
var urls []string
err := x.Cache.GetJSON(ctx, cacheKey, &urls)
return urls, err
}
} else {
// same reasoning as in detachedSlurpData.
// wouldn't need this branch if close(slurpCh) was done after x.slurpersMu.Lock()
// in the goroutine below. but it's too easy to miss in future code changes.
// checking cache one more time here is most likely insignificant when compared to
// opening a websocket to a nostr relay.
var urls []string
if err := x.Cache.GetJSON(ctx, cacheKey, &urls); err == nil {
x.slurpersMu.Unlock()
return urls, nil
}
}
// no other goroutine is fetching; do it now and make others wait on slurpCh.
slurpCh = x.makeSlurperChan(cacheKeyStr)
x.slurpersMu.Unlock()
defer func() {
close(slurpCh)
x.slurpersMu.Lock()
delete(x.slurpers, cacheKeyStr)
x.slurpersMu.Unlock()
}()
event, err := x.fetchNostrEvent(ctx, eventID, relayURL)
if err != nil {
return nil, err
}
var eventURLs []string
switch event.Kind {
default:
return nil, ErrUnsupportedEventKind
case nostr.KindTextNote, nostr.KindChannelMessage:
eventURLs = extractAcceptableURLs(event.Content)
case nostr.KindSetMetadata, nostr.KindChannelCreation, nostr.KindChannelMetadata:
var p struct{ Picture string }
if err := json.Unmarshal([]byte(event.Content), &p); err != nil {
return nil, err
}
if validURL(p.Picture) {
eventURLs = append(eventURLs, p.Picture)
}
}
if err := x.Cache.PutJSON(ctx, cacheKey, eventURLs); err != nil {
log.Printf("cache.putjson %s: %v", cacheKey, err)
}
return eventURLs, nil
}
// assuming relay is whitelisted
func (x *Noxer) fetchNostrEvent(ctx context.Context, eventID, relayURL string) (*nostr.Event, error) {
relay, err := x.relayConn(ctx, relayURL)
if err != nil {
return nil, err
}
var (
event *nostr.Event
fetchErr error
)
// assuming 10sec is more than enough for a simple filter'ed sub with a single
// event ID.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
defer close(done)
f := nostr.Filter{IDs: []string{eventID}, Limit: 1}
sub := relay.Subscribe(nostr.Filters{f})
defer sub.Unsub()
select {
case e := <-sub.Events:
// e.CheckSignature() is already done by the client
event = &e
case <-ctx.Done():
fetchErr = ctx.Err()
}
}()
select {
case <-done:
return event, fetchErr
case <-ctx.Done():
return nil, ctx.Err()
}
}
// connect to a nostr relay at relayURL or reuse an existing conn.
// it blocks all other callers.
func (x *Noxer) relayConn(ctx context.Context, relayURL string) (*nostr.Relay, error) {
// check existing conn and reuse if found
relayURL = nostr.NormalizeURL(relayURL)
x.clientsMu.Lock()
defer x.clientsMu.Unlock()
if cl, ok := x.clients[relayURL]; ok {
// "touch" the last used to let cleanup timer know we aren't idling
cl.lastUsed = time.Now()
return cl.relay, nil
}
// none found. make a new conn.
var (
relay *nostr.Relay
connErr error
)
// assuming 10sec is more than enough to connect to a websocket.
connCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
// TODO: send a patch upstream for a nostr.RelayConnectContext(ctx, url)
relay, connErr = nostr.RelayConnect(relayURL)
close(done)
}()
select {
case <-connCtx.Done():
// unfortunately, this leaves the above goroutine hanging, and will keep
// piling up for non-responsive relays.
// can be solved with a nostr.RelayConnectContext.
return nil, connCtx.Err()
case <-done:
if connErr != nil {
return nil, connErr
}
}
if x.clients == nil {
x.clients = make(map[string]*relayClient)
}
x.clients[relayURL] = &relayClient{
relay: relay,
lastUsed: time.Now(),
}
// a self-cleanup goroutine to delete ourselves if relay reports conn errors.
go func() {
err := <-relay.ConnectionError
log.Printf("%s: closing due to: %v", relayURL, err)
x.clientsMu.Lock()
defer x.clientsMu.Unlock()
relay.Close()
delete(x.clients, relayURL)
}()
if x.cleanupTimer == nil {
x.cleanupTimer = time.AfterFunc(x.idleRelayTimeout(), x.cleanupRelayConn)
}
return relay, nil
}
// close and delete nostr relay connections idling for more than x.idleRelayTimeout().
func (x *Noxer) cleanupRelayConn() {
x.clientsMu.Lock()
defer x.clientsMu.Unlock()
for url, cl := range x.clients {
if time.Since(cl.lastUsed) > x.idleRelayTimeout() {
log.Printf("closing idle conn to %s", url)
cl.relay.Close()
delete(x.clients, url)
}
}
if len(x.clients) > 0 {
x.cleanupTimer = time.AfterFunc(time.Minute, x.cleanupRelayConn)
} else {
x.cleanupTimer = nil
}
}
// assumes x.slurpersMu is handled by the caller.
func (x *Noxer) makeSlurperChan(k string) chan struct{} {
if x.slurpers == nil {
x.slurpers = make(map[string]chan struct{})
}
ch := make(chan struct{})
x.slurpers[k] = ch
return ch
}
func (x *Noxer) httpClient() *http.Client {
if x.HTTPClient == nil {
return http.DefaultClient
}
return x.HTTPClient
}
func (x *Noxer) idleRelayTimeout() time.Duration {
if x.IdleRelayTimeout == 0 {
return time.Minute
}
return x.IdleRelayTimeout
}
func (x *Noxer) maxFileSize() int64 {
if x.MaxFileSize == 0 {
return 1 << 20 // 1Mb
}
return x.MaxFileSize
}
// whitelistedRelay reports whether a nostr relay at urlStr is in x.KnownRelays.
// it expects x.KnownRelays to be sorted in lexical order.
//
// only hostname of urlStr is checked against x.KnownRelays.
func (x *Noxer) whitelistedRelay(urlStr string) bool {
u, err := url.Parse(urlStr)
if err != nil {
return false
}
host := u.Hostname()
i := sort.SearchStrings(x.KnownRelays, host)
return i < len(x.KnownRelays) && x.KnownRelays[i] == host
}
// TODO: use oEmbed if OGP fails?
func parseLinkMeta(r io.Reader) (*LinkMeta, error) {
og := opengraph.NewOpenGraph()
if err := og.ProcessHTML(r); err != nil {
return nil, err
}
if len(og.Images) == 0 {
return nil, ErrNotFound
}
meta := &LinkMeta{
Type: og.Type,
Title: og.Title,
Description: og.Description,
ImageURLs: make([]string, 0, len(og.Images)),
}
for _, img := range og.Images {
u := img.SecureURL
if u == "" {
u = img.URL
}
if u == "" {
continue
}
meta.ImageURLs = append(meta.ImageURLs, u)
}
return meta, nil
}
// TODO: patch to extract only host/ip; no emails and such
var urlRegexp = xurls.Relaxed()
func extractAcceptableURLs(text string) []string {
var urls []string
for _, a := range urlRegexp.FindAllString(text, -1) {
if validURL(a) {
urls = append(urls, a)
}
}
return urls
}
func validURL(urlStr string) bool {
if urlStr == "" {
return false
}
u, err := url.Parse(urlStr)
if err != nil {
return false
}
if u.Hostname() == "" {
return false
}
return u.Scheme == "" || u.Scheme == "http" || u.Scheme == "https"
}
// O(N) lookup of elem in a
func nonSortedSliceContains(a []string, elem string) bool {
for _, v := range a {
if v == elem {
return true
}
}
return false
}
// reports whether urlStr looks like a URL of an html webpage.
func looksLikeHTMLPage(urlStr string) bool {
u, err := url.Parse(urlStr)
if err != nil {
return false
}
ext := path.Ext(u.Path)
// any .xxxhtml is ok
return ext == "" || strings.HasSuffix(ext, "html") || strings.HasSuffix(ext, "htm")
}